Informers – the next LINQ abstraction

The introduction of LINQ based API in .NET has revolutionized how developers process collections of objects. Traditional approaches involved running nested, difficult to read loops often involving complex state machines. LINQ has introduced high-level functional fold operators that can be applied to some “source repository” in order to establish a logical pipeline that allows filters, groups, shape and recombines multiple sources to derive new representation, iterate over it, and often times “collect” it into new target “repository” such as in-memory List. LINQ operations are implemented as extension methods and the standard .NET BCL actually has two flavors of these methods. They both mainly look the same on the surface, but one targets sources that implement IEnumerable<T> and the other IQueryable<T>. The main difference is that sources that implement IQueryable<T> actually capture the LINQ operations not as pure delegates, but Expressions. In essence, expressions form something called “expression trees” which describe code as an object structure (aka a tree). This allows us to capture developers’ intent on how the source sequence should be transformed, but alter implementation on how it is applied at execution time. This is how an Entity Framework is able to take what looks like C# code (LINQ statements) and rewrite them to form SQL queries for a specific implementation of a database. Regardless, the core LINQ in both implementation is targeting sources that provide data-at-rest. When we execute a LINQ query, we are executing a pipeline that copies data from source to target and then completes. This is why most LINQ queries will end with something like .ToList() or .ToDictionary which combines the function of creating a “collector” where the results of pipelines should be channeled to and executing the pipeline in one step.

There’s another pattern that is very similar to LINQ that focuses on data-in-motion. This does not ship with .NET BCL but is found as part of extension library called Reactive Extensions (Rx) found as part of System.Reactive NuGet package. The basic idea of reactive programming is that we have a source that emits events over time of some kind. It provides a similar set of functional fold abstractions that establish a transformation pipeline that is applied to events emitted from the source. While similar to LINQ, Rx is different because it does not complete after it is executed. Instead, it establishes a subscription to the source that accepts events on one end and applies transformation functional folds (pipeline) to them. This is why every Rx query is ended .Subscribe which returns an IDisposable. The disposable is your subscription handle – when you dispose of it, you disconnect your pipeline. While many functional folds are similar to that found in LINQ such .Where, others are only Rx specific. This is because we’re dealing with the movement of data over time. For example, .Throttle will drop events if they exceed a frequency threshold over a given time window. While Rx has existed for a long time, it is gaining popularity. Rx implementation is not .NET specific and is found in most modern languages – see http://reactivex.io/languages.html. As developers are increasingly embracing event-based programming models, Rx offers a powerful high-level abstraction model that can simplify many tasks. However, the primary focus of Rx is to be notified of a generic event from the source.

Need for a third abstraction model

As programming is evolving from batch-based processing towards real-time, event-driven programming, there’s a growing need for repositories that allow execution of “live queries”. The idea is that as a developer I want my program to execute a query against some resource provider and then be notified when anything in the query selection changes, as well as update the local state of the query results in memory. This in effect combines features of classic LINQ with Rx. The proof that this pattern works can be seen in Kubernetes. Kubernetes API works as a form of database exposed over RESTful API. It allows defining collections of resources of a given type (similar to tables), querying them, but also watching them. When you create a watch query, Kubernetes will execute a subscription query which will first dump the results of the current state of resources that match your query and then give you a notification on how values in the query are changing if they are updated over time. This model is embraced by the concept of “controllers” which power most of Kubernetes, which are nothing more than simple programs that execute these “live queries” and take actions in response to them (usually to reconcile the desired state of architecture with the actual state).

Informers

Before we look at how to actually solve it, lets define some high level requirements for the problem.

I, as a developer, would like to define a query using C# functional folds using familiar LINQ syntax style against a data source and reflect the results of that query in in-memory representation while keeping it up to date and being alerted to when query results change.

The idea of informers (and the term itself) is inspired by the Kubernetes SDK library. They allow our code to be “informed” about a state of a given set of resources. If we were trying to model it using familiar abstractions found in .NET, some things we would want out of this API are:

  • a close approximation to standard LINQ operators
  • control over the lifetime of query subscription
  • ability to communicate the state or change-in-state of a given resource
  • synchronize a local in-memory cache with results of a query
  • join resources from two different sources
  • any transient interruptions in the query should be transparent for the consumer and manifest themselves as just delays in delivery/synchronization

As such lets take a look at at an example of what an API like this could look like:

var cache = new SimpleCache<string, Person>();
IDisposable subscriptionHandle = personInformer
    .Where(x => x.Address.City == "Toronto")
    .Into(cache)
    .Do(x => _informerLogger.LogInformation(x.EventFlags.ToString()))
    .Subscribe();

Let’s break this down. First, we establish a target for where results will be collected. We use SimpleCache object which is basically a thread-safe Dictionary<string, Person>. There are a few extra quirks that are involved which makes ConcurrentDictionary not suitable. Where the clause is self-explanatory. Now because we need to control the target of collection and lifetime of subscription we can’t use standard .ToList – it’s instead split up into an .Into operator and we establish the connection to the informer with the call to .Subscribe. The .Do is our callback we can use to be notified when something in the query changes and take action – it’s a standard method in Rx arsenal. One thing to note is that SimpleCache is Into operate on a dictionary structure instead of a simple list. Because we are dealing with events related to specific objects, we need a unique way of identifying them, hence the need for a key.

Architecture

Fundamentally we’re dealing with state of resources in a repository, so this style is closer to LINQ then it is to Rx. However, given the requirement to receive updates to our live query, we need it to support for push events originating from the source. IEnumerable<T> and IQueryable<T> are not well suited for this task, as you need to “pull” data by requesting the next element from an underlying enumerator. This is much better suited for the Rx paradigm, but the events we’re receiving are very specific to communicating the state or change-in-state of a given resource. It’s also not enough to just carry just the payload of the resource as IEnumerable does. We need to know HOW the object is changing, meaning we need metadata attached to our payload. An informer is based on IInformable which looks like this:

public interface IInformable<TKey, TResource> : IObservable<ResourceEvent<TKey, TResource>>

This gives us an Observable stream of events that lets us communicate the change in the state of a given resource. Let’s take a look at ResourceEvent

public struct ResourceEvent<TKey, TResource>
{
    public EventTypeFlags EventFlags { get; }
    public TKey Key { get; set; }
    public TResource Value { get; }
    public TResource OldValue { get; }
    public string Version { get; set; }
}
  • Key – allows us to uniquely identify a resource. It’s also possible to receive a message regarding a resource with just a key (without an actual body), such as a Delete event
  • Value – the actual resource we’re informing about
  • OldValue – if the event is an incremental update (such as modify), this may carry the previous known state of the resource before it was changed
  • Version – the unique version of the resource. This is useful when we need to understand if two instances of the resource are the same without trying to compare them field by field
  • EventTypeFlags – used to inform how the event should be interpreted. It is used to communicate what kind of change in the resource we’re notifying. Let’s take a closer look:
[Flags]
public enum EventTypeFlags
{
    Add = 1,
    Delete = 2,
    Modify = 4,
    Current = 8,
    Sync = 16,
    Reset = 32,
    ResetStart = 64,
    ResetEnd = 128,
    ResetEmpty = 256,
    Computed = 512
}

The first four should be self-explanatory. Ignore Sync and Computed for now, they are special edge case flags. Let’s discuss the need for Reset flags.

One of the primary use-cases is synchronizing a local in-memory cache with the source of the informer. This in-memory cache will be used in our code-base to make some kind of decisions and act on them. When our code first starts, this collection is empty. When we establish subscription it will start filling up as we’re receiving updates from the server. The fundamental question becomes is when is it synchronized (caught up with all current state) and I can start making decisions in my code about what’s in it? For that to be answered we need a way of marking that all current results of the query have been sent, and anything going forward is an incremental update on top of the original state. The reset events allow you to do that.

The boundaries of the reset window are marked by ResetStart and ResetEnd flags, but every item in the reset window also has Reset flag set. There’s also a special case where the underlying resource is completely empty (no data). For that, we send a single event with no payload, but a single ResetEmpty flag set. This marks the boundary transition from listing current to watching for changes.

The reset window can also be sent if there’s a disconnect from the server and there’s no way to detect which events have been missed. In this case this tells the upstream that existing cache should be invalidated and replaced with a new version. Higher-level operators may attempt to reconcile it with the existing known state in memory if they choose to do so.

Consumer API – LINQ flavor

Since IInformable is essentially a IObservable<ResourceEvent<TKey,TResource>>, any LINQ operations applied against it will be those defined in Rx space. So when we try a .Where, we’re actually going to be working against ResourceEvent<TKey,TResource>. That’s not very intuitive, especially when there can be special events in the stream where the Value is empty (such as ResetEmpty, or Delete). The difference can be seen in this example:

personInformer.Where(x => x.EventFlags.HasFlag(EventTypeFlags.ResetEmpty) || x.EventFlags.HasFlag(EventTypeFlags.Delete) || x.Value.Address.City == "Toronto")

This can quickly expose unnecessary complexity of how events in the informer stream should be interpreted to the user code. I, as a developer, obviously don’t wanna deal with this. What I want to write is

personInformer.Where(x => x.Address.City == "Toronto")

As such we need a new set of LINQ extension methods that wrap this complexity by hiding underlying ResourceEvent, allowing the developer to operate with TResource directly, and access the wrapper ResourceEvent only when needed. This can be done by following the same pattern that provides an extension method for IEnumerable<T>found in System.Linq.Enumerable class, and and IQueryable<T> found in System.Linq.Queryable class.

There will be a lot of overlap with other implementations, such as your standard .Where, .Select, .Group, etc. But there will also be a need for functions that are specific only to this particular paradigm such as a .Into.

Extensibility

At a high level we want to provide a base API ruleset for the paradigm and build high-level LINQ like operations that work with the underlying abstractions. This would be part of the core Informers package abstraction. The implementors of such abstraction would be responsible for exposing some underlying API using this abstraction for the specific service type, with a relationship very similar to that found between Entity Framework and pluggable database drivers for it. They would hide any implementation-specific quarks, seamlessly handle connection management, transient connection interruptions, and be responsible for keeping stream “in-sync”.

Roadmap

This work has been implemented mostly as part of adding support for Kubernetes informers in .NET space, and many features are directly driven by the need to interact with Kubernetes API server. Currently, the implementation is found under project Steeltoe.io in the incubator. The API is still evolving and being validated, but this is a new and exciting way of interacting with live data.

It would be interesting to explore if this can be made to integrate with Entity Framework as a pluggable abstraction..

Leave a comment

Your email address will not be published. Required fields are marked *