miércoles, 10 de agosto de 2016

Programación reactiva.

Pues estoy aquí, enredando con Reactive Extensions (Rx) en .NET y la verdad es que me gusta bastante. La inmutabilidad de los Observables y la programación funcional son conceptos encantadores para arquitecturas orientadas a eventos.

Antes de poner algo de código tonto para satisfacer mi ego me gustaría comentar ciertos detalles que voy descubriendo a medida que me pego con esto.


-Con estos tutoriales se puede empezar decentemente a hacer algo real.

-No te preocupes mucho con entender muy a fondo el concepto de Observables al principio. Para poder empezar basta con que pienses en ellos como streams a los que llegan (o ya tienen) uno o más datos. Estos streams pueden ser finitos o infinitos; una secuencia de 10 números enteros o la información que te llega de vez en cuando por un socket TCP.

-Cuando hayas hecho algo funcional, entendiéndolo así muy por encima, puedes empezar a analizar como funcionan los Observables jugueteando con sus métodos para ver que hace cada uno.

-Una vez que empiezas con los streams, todo tiene que ser un stream. Aquí no puedes juntar churras con merinas e incluso una simple variable de retorno tiene que ser un stream. Con deciros que existe hasta una manera en la API para esto, ya os debería bastar para haceros una idea de que no hay escapatoria: Observable.Return<string>("Value");.

-Los streams son, por naturaleza, inmutables. El truco para trabajar con ellos está en crear, mezclar y filtrar streams.

-Es muy útil dibujarse los streams para hacerse una idea: "----elto3----elto2----elto1--->".

Voy a poner un ejemplo muy sencillo que se basa en crear una paginación de datos consultados a través de una API REST y mostrarlos en pantalla. Se cargarán 100 items en una rejilla y cuando el scroll esté a más de la mitad de los últimos 100 elementos se realizará asíncronamente -esto es, sin bloquear el hilo de ejecución de la interfaz de usuario- otro request para obtener los siguiente 100 items.

Lo primero es crearse un Observable del evento del scroll de la rejilla
contentStream = Observable.FromEventPattern<ScrollEventArgs>(dataGridView1"Scroll"
----event-----event-----event---->

Filtrar sus elementos y que sólo contenga eventos de scroll cuando esté en los últimos 50 eltos de la rejilla
.Where(e => (dataGridView1.Rows.Count - e.EventArgs.NewValue < 50))

Transformamos cada evento en un entero con el número de items a obtener
.Select(_ => return 100; )
-----100----100----100---->

Sumamos acumulatívamente los elementos
.Scan((xy=> x + y)
----100----200----300---->

Añadimos a mano un primer elemento para que realice la primera paginación desde el elto 0
.StartWith(0)
----0----100----200----300---->

Creamos streams a partir del resultado de una función asíncrona y los retornamos todos juntos en un solo Observable
.SelectMany(i => getContent(i).ToObservable());
---content(0)----content(100)----content(200)---content(300)---->

Y ya tenemos un stream de contenidos de 100 items que podemos observar y utilizarlo para llenar la rejilla
contentStream.Subscribe(c => updateGrid(c));


El código final queda de esta guisa:

public partial class Form1 : Form
{
 
  protected IObservable<ContentcontentStream;
  protected static volatile bool isRunning = false;
 
  public Form1()
  {
 
    InitializeComponent();
    // create scroll event observable
    contentStream = Observable.FromEventPattern<ScrollEventArgs>(dataGridView1"Scroll")
        //discart event if scroll is not down enough
        //or we are already retrieving items (isRunning)  
        .Where(e => (dataGridView1.Rows.Count - e.EventArgs.NewValue < 50 && !isRunning))
        //transform to 100--100--100--> stream, locks next events until we finish 
        .Select(e => { isRunning = truereturn 100; })
        //get item index by accumulating stream items
        .Scan((xy=> x + y)
        //start with 0 before event gets triggered 
        .StartWith(0)
        //create a stream with the result of an async function 
        //and merge them into just one stream
        .SelectMany(i => getContent(i).ToObservable());
 
    //just update the control whit a item of contentStream
    contentStream.ObserveOn(dataGridView1).Subscribe(c => updateGrid(c)); 
 
  }
 
  async private Task<Content> getContent(int index)
  {
 
    await Task.Delay(1000);//request to a web api that last 1 second...
    return new Content(index);//mock the response
  }
 
  private void updateGrid(Content c)
  {
    foreach (var item in c.pageContent)
    {
      dataGridView1.Rows.Add(item);
    }
    isRunning = false//unlocks event filter
  }
 
}
 
public class Content
{
  public List<stringpageContent = new List<string>();
  public const string content_template = "This is the item {0}.";
  public Content()
  {
  }
  public Content(int index)
  {
 
    for (int i = indexi < index + 100i++)
    {
      pageContent.Add(string.Format(content_templatei));
    }
 
  }
}

El campo isRunning es para que, mientra se cargan los nuevo items, ignoremos los eventos del scroll de la rejilla.

ObserveOn es para que el callback de la suscripción se ejecute en el mismo hilo de ejecución en el que se encuentra la rejilla. Si el código se ejecutase en otro hilo nos daría un error al querer modificar el control.

Esto es un poco chapucero y fue mi primera implementación de algo funcional con Rx. En la siguiente entrada hablaré de los posibles problemas que presenta y mostraré una implementación más profesional que elimina la mayoría de estos.

2 comentarios:

  1. Reactive power!! La verdad es que me encantaria que estas cosas tuvieran una representación visual seria como ver trabajar hormigas de un lado para otro.

    ResponderEliminar
    Respuestas
    1. Jejeje seria un punto. Los mas parecido que he encontrado de eso es http://rxmarbles.com/ y la verdad es que es muy util para hacerte una idea de como quedara el stream cuando le hagas perrerias.

      Eliminar