Running group select

We're currently working on creating a somewhat large XML file where we need to read records from our database, perform some calculations on a small subset of the records based on a key, and then write everything to a blob.

Services such as Azure functions, which are priced by memory consumption, makes .ToList() more than a triviality

As it is now we have excluded a lot of content and we can safely read the whole set into memory, but we know that our dataset will grow, and I someone decides to include our archives, we will run out of memory.

Enumerators to the rescue

As we only need a small subset of the data in memory at the same time we can work with an enumerator and then read in batches, perform our operations and then write the data to disk. Fairly easy, however we needed to do this more than one place and I figured I could just as well write it as a general Linq-ish function.

NOTE: This does only make sense when you do not want to load the whole dataset into memory and the data is sorted by the key selector

IEnumerable RunningGroupSelect(this IEnumerable source, Func keySelector, Func, IEnumerable> resultSelector)

The method signature takes a regular TKey. When enumerating over the source the result will be buffered until the result of the KeySelector changes. When the KeySelector changes, the key and the collected buffer will be passed to the result selector function. The calling code will then enumerate over the result without noticing anything to the buffering.

A use case is calculating availability for a season based on the rights for each episode.


 public static IEnumerable RunningGroupSelect(this IEnumerable source, Func keySelector, Func, IEnumerable> resultSelector)
        {
            return new GroupedResultIterator(source.GetEnumerator(), keySelector, resultSelector);
        }
    }
    
    internal sealed class GroupedResultIterator : IEnumerable, IEnumerator
    {
        private enum State
        {
            New,
            Open,
            SourceEOF,
            Disposed
        }
        private State currentState;
        private readonly IEnumerator _source;
        private readonly Func _keySelector;
        private readonly Func, IEnumerable> _resultSelector;
        private TResult[] groupResult;

        private int position;

        public GroupedResultIterator(IEnumerator source, Func keySelector, Func, IEnumerable> resultSelector)
        {
            if (source == null)
                throw new ArgumentNullException(nameof(source));

            if (keySelector == null)
                throw new ArgumentNullException(nameof(keySelector));

            if (resultSelector == null)
                throw new ArgumentNullException(nameof(resultSelector));

            currentState = State.New;

            position = 0;

            _source = source;
            _keySelector = keySelector;
            _resultSelector = resultSelector;
        }
        
        public TResult Current
        {
            get
            {
                return groupResult[position];
            }
        }

        object IEnumerator.Current
        {
            get
            {
                return Current;
            }
        }
        public IEnumerator GetEnumerator()
        {
            return this;
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return this;
        }

        public void Dispose()
        {          
            groupResult = null;
            currentState = State.Disposed;
        }

        public bool MoveNext()
        {
            if (currentState == State.Disposed)
                return false;

            if (currentState == State.New)
            {
                currentState = State.Open;
                if (!_source.MoveNext())
                    currentState = State.SourceEOF;
            }

            if (groupResult == null || (position == groupResult.Length - 1 && currentState != State.SourceEOF))
            {
                var keyResult = new TResult[0];

                while (keyResult.Length == 0 && currentState != State.SourceEOF)
                    keyResult = GetResultForKey(_keySelector(_source.Current));

                groupResult = keyResult;
                position = -1;
            }

            if (currentState == State.SourceEOF && groupResult.Length == 0 || position == groupResult.Length - 1)
                return false;

            position++;
            return true;
        }

        public void Reset()
        {
            throw new NotImplementedException();
        }

        private TResult[] GetResultForKey(TKey key)
        {
            var buffer = new List();

            if (key == null)
            {
                buffer.Add(_source.Current);
                if (!_source.MoveNext())
                {
                    currentState = State.SourceEOF;
                }
            }
            else
                while (key.Equals(_keySelector(_source.Current)))
                {
                    buffer.Add(_source.Current);
                    if (!_source.MoveNext())
                    {
                        currentState = State.SourceEOF;
                        break;
                    }
                }
            return _resultSelector(key, buffer).ToArray();
        }
    }

I have a hunch that this can just as well be solved with TPL Dataflow or Rx, but for now it seems sufficient.