jueves, 11 de agosto de 2016

Programación reactiva II.

En la entrada anterior puse un ejemplo de programación reactiva. Ese ejemplo no es de muy buena calidad pero creo que ayuda mucho a obtener la mentalidad que hay que adoptar para la programación orientada a eventos.

Los problemas principales del ejemplo anterior son dos:
  1. La variable estática isRunning para filtrar el stream de eventos de scroll hace que me sangren los ojos cada vez que la veo. Es chapucero como sincronizador de hilos de ejecución y agrega una dependencia sin contexto en el método updateGrid; y el contexto importa; y mucho; si se quiere evitar tener una asquerosa bola de lodo como código fuente.
  2. No tenemos control sobre el índice de elementos a recuperar (---0---100---200---300--->) y eso no es bueno puesto que en un entorno real es necesario un máximo (aunque sólo sea para evitar un overflow) y probablemente más cosas, como decrementarlo, saltar directamente a una página no secuencial, etc.
En esta entrada toca hacer lo mismo pero un poco más pro. Al fin y al cabo necesitamos seguir ignorando eventos mientras cargamos la rejilla y necesitamos un stream con el índice de los elementos a recuperar para que se integre con los demás streams.


Si queremos tener un control imperativo de un stream para manejar el índice podemos usar Sujetos. Hay varios tipos, pero en este caso nos vale este:
var index = new BehaviorSubject<int>(0);
---0--->
Por otro lado necesitamos el stream de eventos y en este caso lo convertimos a un stream de "pulsos" cuyos elementos son siempre el mismo.
var source = Observable.FromEventPattern<ScrollEventArgs>(dataGridView2"Scroll")
     .Where(e => dataGridView2.Rows.Count - e.EventArgs.NewValue < 50)
    .Select(_ => Unit.Default);

Usamos un filtro:

Un mapeo a un valor que es siempre igual:

Ya tenemos una secuencia de "pulsos" iguales independientes del índice.

Para ignorar los elementos del stream mientras carga la rejilla podemos mezclar el stream de eventos y el stream de índice y filtrar para que sólo contenga elementos diferentes:

var fetchStream = source
    .WithLatestFrom(index, (_i=> i)
    .DistinctUntilChanged()
    .StartWith(0)
    .SelectMany(i => getContent(i).ToObservable());

Mezclamos con el stream de índice:

Seleccionamos los eltos diferentes del stream mientras la secuencia no cambie:

Empezamos con índice 0:

Creamos streams a partir del resultado de una función asíncrona y los retornamos todos juntos en un solo stream:

No hay marble disponible. :(

Ya tenemos una secuencia que nunca tendrá 2 o más veces seguidas el mismo valor en el índice aunque salten los eventos de scroll y por lo tanto los contenidos obtenidos de la función asíncrona no se repiten.

Ahora sólo nos queda reaccionar al stream de contenidos, pero como queremos actualizar la rejilla e incrementar el índice controlando un máximo lo hacemos de la siguiente forma:
var contentStream = fetchStream.WithLatestFrom(index, (ci=> new { Content = cIndex = i })
         .ObserveOn(dataGridView2)
         .Subscribe(a =>
         {
           updateGrid(a.Content);
           if (a.Index < 1000index.OnNext(a.Index + 100);
         });

Mezclamos con el stream de índice y creamos un objeto anónimo con la información que necesitamos en el callback:

Y ya nos hemos quitado de encima el feo campo isRunning y además podemos controlar imperativamente el índice para que no se nos vaya de madre.

El código final queda así:

public partial class Form2 : Form
{
 
  public Form2()
  {
 
    InitializeComponent();
 
    var index = new BehaviorSubject<int>(0);
 
    var source = Observable.FromEventPattern<ScrollEventArgs>(dataGridView2"Scroll")
        .Where(e => dataGridView2.Rows.Count - e.EventArgs.NewValue < 50)
        .Select(_ => Unit.Default);
       // .StartWith(Unit.Default);
 
    var fetchStream = source
        .WithLatestFrom(index, (_i=> i)
        .DistinctUntilChanged()
        .StartWith(0)
        .SelectMany(i => getContent(i).ToObservable());
 
     var contentStream = fetchStream.WithLatestFrom(index, (ci=> new { Content = cIndex = i })
        .ObserveOn(dataGridView2)
        .Subscribe(a =>
        {
          updateGrid(a.Content);
          if (a.Index < 1000index.OnNext(a.Index + 100);
        });
  }
 
  async private Task<Content> getContent(int index)
  {
 
    await Task.Delay(100); //request to a web api...
    return new Content(index);//mock the response
  }
 
  private void updateGrid(Content c)
  {
    foreach (var item in c.pageContent)
    {
      dataGridView2.Rows.Add(item);
    }
  }
 
}



2 comentarios:

  1. Olé. Te recomiendo un libro llamado "Concurrency in C# Cookbook" que lo peta. Te enseña recetas mezclando TPL Dataflows (MOLAN UN WEBO), Rx y PLinq. Además, si esto te mola... mirate Akka.net!!

    ResponderEliminar
  2. Ya lo he conseguido! Pinta bien. Ahora es cuestion de sacar tiempo para leerlo y trastear un rato.

    ResponderEliminar