Streaming and buffering in C# and Talend

This could also be entitled: How to make your code over 3x faster*.

I love LINQ in C#, and I think that I’m not the only C# programmer who does.  However, there are hidden problems to watch out for, and I’ll describe one now.  It is based on the difference between streaming and buffering, which is something that applies to the parts of a LINQ expression.

One way to think of a LINQ expression is that it’s a data processing pipeline.  Data flows in one end, e.g. from an array, gets processed in some way, and some result emerges from the other end.  The various stages that make up the pipeline can each be described as streaming or buffering.

Streaming operations can process one lump of data completely, release the output for that lump to the next stage in the pipeline, and then move on to the next lump.  In contrast, buffering operations need to see all of the data before they can complete their processing and release anything.

An example of streaming is filtering – each lump of data can be separately tested against a filter, without needing to know about the other lumps of data.  This means that the filter can look at a lump, release it to the next stage if the lump meets the conditions of the filter, and then move on to the next lump.

An example of buffering is grouping – you can’t release any groups on to the next stage until you’ve seen the last lump of data as you don’t know which group that last lump will be added to.

Another example of buffering that might take you by surprise is ToList().  If you don’t need to turn things into a list, then I suggest that you don’t.  Or, at least, be very careful where you do, and try to do it as late as possible in the pipeline and as rarely as possible.  This is particularly true if you assemble your LINQ pipeline in stages via more than one method – you could have more than one method doing a ToList().

Here’s a small bit of code to try to highlight that.  There are 4 code files, and I’ll miss out the first two as they’re trivial:

  1. A class called Thing that is the err… thing in an enumerator.  It has just one reason to exist, which is the int property called Num.
  2. An enumerator over a lot of Things called ThingEnum.  Its constructor says the first and last int values to return via Things.
  3. Two methods that use LINQ to build a pipeline that processes a lot of Things (1 to 100,000,000).  The first method creates the ThingEnum and uses a Where to pass on only the Things whose Num is odd.  The second method takes the output of the first and further filters the Things so that the Num has to also be a multiple of 3.  The important part is that both methods can return their results as is, or can turn them into a list first and then return them.  As the return type of the methods is IEnumerable<Thing> the output can be used in the same way in the two cases.
  4. Code that calls the LINQ pipeline, and runs through the output with a foreach, just assigning an int variable to the value of Thing.Num.  It also times how long everything takes.

The code that builds the LINQ pipeline:

static public IEnumerable<Thing> OddThrees(Stopwatch sw, bool asList)
{
   Console.WriteLine($" >> {sw.ElapsedMilliseconds} OddThrees");

   IEnumerable<Thing> odds = Odds(sw, asList);

   Console.WriteLine($" {sw.ElapsedMilliseconds} OddThrees: Got odds");

   IEnumerable<Thing> oddThrees = odds.Where(x => x.Num % 3 == 0);

   Console.WriteLine($" {sw.ElapsedMilliseconds} OddThrees: About to prepare results");

   IEnumerable<Thing> result;

   if (asList)
   {
      result = oddThrees.ToList();
   }
   else
   {
      result = oddThrees;
   }

   Console.WriteLine($" << {sw.ElapsedMilliseconds} OddThrees");

   return result;
}

static private IEnumerable<Thing> Odds(Stopwatch sw, bool asList)
{
   Console.WriteLine($" >> {sw.ElapsedMilliseconds} Odds");

   ThingEnum things = new ThingEnum(1, 100000000);

   Console.WriteLine($" {sw.ElapsedMilliseconds} Odds: Created enum");

   IEnumerable<Thing> odds = things.Where(x => x.Num % 2 == 1);

   Console.WriteLine($" {sw.ElapsedMilliseconds} Odds: About to prepare results");

   IEnumerable<Thing> result;

   if (asList)
   {
      result = odds.ToList();
   }
   else
   {
      result = odds;
   }

   Console.WriteLine($" << {sw.ElapsedMilliseconds} Odds");

   return result;
}

Here is the code that calls the LINQ pipeline and iterates through the results (which is called twice – once with asList = true, and once with asList = false):

static void thingTest(bool asList)
{
   Stopwatch sw = Stopwatch.StartNew();

   Console.WriteLine($"{sw.ElapsedMilliseconds} Main: About to get odd threes, as list = {asList}");

   IEnumerable<Thing> things = Iterators.OddThrees(sw, asList);

   Console.WriteLine($"{sw.ElapsedMilliseconds} Main: got odd threes");

   int ignored;

   foreach (Thing t in things)
   {
      ignored = t.Num;
   }

   sw.Stop();
   Console.WriteLine($"{sw.ElapsedMilliseconds} Main: finished looping through results");
 }

These are the results (the numbers at the start of the lines are elapsed time in milliseconds, and the >> and << show entering and exiting a method):

0 Main: About to get odd threes, as list = False
   >> 6 OddThrees
     >> 7 Odds
     7 Odds: Created enum
     8 Odds: About to prepare results
     << 8 Odds
   9 OddThrees: Got odds
   9 OddThrees: About to prepare results
   << 10 OddThrees
10 Main: got odd threes
4759 Main: finished looping through results

0 Main: About to get odd threes, as list = True
   >> 0 OddThrees
     >> 1 Odds
     2 Odds: Created enum
     2 Odds: About to prepare results
     << 16344 Odds
   16344 OddThrees: Got odds
   16344 OddThrees: About to prepare results
   << 18490 OddThrees
18490 Main: got odd threes
18831 Main: finished looping through results

There are a few things to notice in the times:

  1. Using ToList makes things over 3 times slower, so don’t use it unless you have to.
  2. The first run (without ToList) finishes all the LINQ stuff suspiciously quickly.  That’s because it doesn’t actually do any processing until the foreach loop in the top-level code needs a value.  Up until then it was merely setting up its ability to do that later (a bit like creating a delegate rather than executing it).
  3. The second run (with ToList) also appears to spend very little time doing the actual LINQ parts (in Odds, the part between creating the Enum and preparing the result).  That’s because it too is deferring the work until it’s needed, but in this case it’s needed before the end of the method when ToList needs all the results to put them into a list.

In case you’re still baffled by data processing pipelines made up of streaming and buffering elements, here is a more direct example from an ETL tool called Talend.  In this you really do make an explicit pipeline, with pipeline elements linked together on screen.  When you run it, data flows from element to element along the links.  (Behind the scenes this is implemented in Java, but the user doesn’t need to worry about that most of the time and can instead concentrate on the GUI.)

In this example, which is a bit contrived to make a point but hopefully still helpful, the pipeline has two branches that come together to produce a single output.  It reads from table A, filters its rows, joins the results of the filter with rows from another table (table B), groups the joined data, filters the groups and then writes the output.

As far as streaming / buffering are concerned:

  • Streaming = read, filter, join, write
  • Buffering = group

When the code starts, a link’s text shows the data flowing along that link: how many rows so far and at what speed.  The text starts off coloured blue, and then turns green when that part of the processing has finished.  Each element will start processing as soon as it gets data along its inputs.

talend2_crop

You can see that the read from table A has started but not finished.  As soon as some data was available, the filter started.  In parallel, the read from table B started and almost immediately finished because there was such little data to read.  This meant that the join of A and B data could also start as soon as there was some filtered A data.  The joined data could start flowing into the group element.  Remember, there is still more data to read from table A, and flow through the filter and the join.

Note that there is no activity downstream of the group.  Nothing will be produced by the group element until it has received its last input row.  Only then will it produce things, at which point it can send out all of its groups.

When everything has finished things look like this:

talend3_crop

Like I said before, this is an extreme example implemented perversely to illustrate a point.  The point is: pay attention to streaming and buffering in your data pipelines, and remember that LINQ can be thought of as a data pipeline.

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s