Writing a Concurrent .NET Stream

Streams. They’re everywhere in .NET but whilst they’re powerful they can, initially, be a headache until the penny drops. Here we’ll have a look at how to work with them, the great thing is that once you’re learned how to work with some you’ve pretty much learned how to work with them all.
In this post we’ll look at the Stream class first and have a play with the MemoryStream class to see how things fit together. Once we’ve done this we’ll create some custom implementations and finally (armed with this newfound wealth of knowledge) we’ll get to know some of the implementation that come with .NET.

The Stream class

Have a look at the MSDN documentation on the Stream class and you’ll see how many implementations of it there are in the System.IO namespace alone. It’s an abstract type with lots of implementations.

The main members for the Stream class are:

Properties

  • bool CanRead
  • bool CanSeek
  • bool CanTimeout
  • bool CanWrite
  • long Length
  • long Position
  • int ReadTimeout
  • int WriteTimeout

Methods

  • void Close()
  • void Dispose()
  • void Flush()
  • long Seek(long offset, SeekOrigin origin)
  • void SetLength(long value)
  • int Read(byte[] buffer, int offset, int count)
    • int ReadByte()
    • IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
    • int EndRead(IAsyncResult asyncResult)
  • void Write(byte[] buffer, int offset, int count)
    • void WriteByte(byte value)
    • IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
    • void EndWrite(IAsyncResult asyncResult)

These members are there to support the key operations that we need streams to do:

  • Reading and\or writing.

Everything else is there to aid this.

We can determine what a stream supports with the CanRead, CanWrite and CanSeek properties. It’s fair to say that a stream that you can neither read from nor write to would be pretty pointless, even though you could, technically, write one.

The MemoryStream class

The MemoryStream is probably the lowest level implementation of the Stream class so we’ll be using it for demostrations. If we create a MemoryStream then we can see what actions it supports:

var memoryStream = new MemoryStream();

Console.WriteLine("CanRead = " + memoryStream.CanRead);
Console.WriteLine("CanWrite = " + memoryStream.CanWrite);
Console.WriteLine("CanSeek = " + memoryStream.CanSeek);

Output:

CanRead = True
CanWrite = True
CanSeek = True

It supports everything, so we can read from it and write to it and seek in it as well!

Important Thing No. 1: The verbs “read” and “write” are from the perspective of the stream, not from you; therefore in order to write to the stream you have the stream read something and to read from the stream you have it write to something. Obvious in hindsight, but it tripped me up never-the-less.

var bufferInput = new byte[] {1, 2, 3};
var bufferOutput = new byte[bufferInput.Length];

Console.WriteLine("bufferInput  = {0,-2}", string.Join(", ", bufferInput));
Console.WriteLine("bufferOutput = {0,-2}", string.Join(", ", bufferOutput));

var memoryStream = new MemoryStream();
memoryStream.Write(bufferInput, 0, bufferInput.Length);
memoryStream.Read(bufferOutput, 0, bufferInput.Length);

Console.WriteLine("bufferInput  = {0}", string.Join(", ", bufferInput));
Console.WriteLine("bufferOutput = {0}", string.Join(", ", bufferOutput));

This doesn’t actually work, when we read the bytes back from the stream we find that the buffer is empty:

bufferInput  = 1, 2, 3
bufferOutput = 0, 0, 0
bufferInput  = 1, 2, 3
bufferOutput = 0, 0, 0

The reason for this is that the stream has an internal pointer (or cursor, if you prefer). When you write a byte to the stream it will move forward one and when we read it also moves forward by one. This gives us a problem, in the short term we can fix it by explicitly setting the pointer back to zero before reading from it. i.e.:

var bufferInput = new byte[] {1, 2, 3};
var bufferOutput = new byte[bufferInput.Length];

Console.WriteLine("bufferInput  = {0,-2}", string.Join(", ", bufferInput));
Console.WriteLine("bufferOutput = {0,-2}", string.Join(", ", bufferOutput));

var memoryStream = new MemoryStream();
memoryStream.Write(bufferInput, 0, bufferInput.Length);
memoryStream.Position = 0;
memoryStream.Read(bufferOutput, 0, bufferInput.Length);

Console.WriteLine("bufferInput  = {0}", string.Join(", ", bufferInput));
Console.WriteLine("bufferOutput = {0}", string.Join(", ", bufferOutput));

This gives us the output we want:

bufferInput  = 1, 2, 3
bufferOutput = 0, 0, 0
bufferInput  = 1, 2, 3
bufferOutput = 1, 2, 3

It’s not really like a stream

Why not?

As I said in this Stack Exchange question, streams are, to me a really bad name for the group of classes that claim to be streams (i.e. everything inheriting from the Stream class). Yes, this is the analogy that’s always made. Everyone seems to agree that these things are like streams but, personally, I found the name confusing. To me, a stream need to have two key functions:

  1. Put things in one end and receive it from the other end in the same order.
  2. Do these things at the same time. i.e. continuously be adding to the top of the stream and reading from the bottom.
  3. Be forward-only.

Like a conveyor belt (or a stream) as illustrated in my diagram below..

Adding and removing bytes from a conveyor belt

Why does it behave so non-intuitively?

You can see from the example above the there is only one pointer that’s used for both reading and writing. If I want to read then I need to:

  1. Stop writing,
  2. Move the pointer to where I want to read from
  3. Do the reading

If I want to write then I need to:

  1. Stop reading.
  2. Move the pointer to the end of the internal buffer.
  3. Do the writing.

This means that I either have to have some external collaboration between the writers and readers or I need to write everything to the stream before reading from it. If a stream supports seeking then this also means that we will have to hold all the data in order to seek to it or re-read it from the source of the data.

Given all this one has to ask why you’d bother at all. We have plenty of options for writing data to memory and then reading.

Building a more stream-like stream

Streams (in my opinion) should let you input and output at the same time. We have a type that already supports this. The Queue type is a FIFO collection. If we use this as the backing store for a stream then we have exactly what we’re looking for.

NB: As we’re going to be reading and writing at the same time, we’ll need to use the ConcurrentQueue.

What’s (not) supported

  • Supported:
    • Readining
    • Writing
  • Not supported
    • Seeking
    • Timeout

As such all the following throw a NotSupportedException:

  • Seek(long offset, SeekOrigin origin)
  • SetLength(long value)
  • Length {get;}
  • ReadTimeout {get; set;)
  • WriteTimeout {get; set;}
  • Position {get; set;}

Finally, Flush() shouldn’t throw an exception, it should just return without doing anything if it’s not implemented.

(Incorrect) Implementation

The two methods we need to implement are the Read(buffer, offset, length) and Write(buffer, offset, length). The base implementation of ReadByte() and WriteByte(byte) both call the Read(buffer, offset, length) and Write(buffer, offset, length) with an byte array with a single element, se we don’t need to implement them.

public class ContinuousStream : Stream
{
    private readonly IProducerConsumerCollection<byte> _buffer;

    public ContinuousStream() => _buffer = new ConcurrentQueue<byte>();

    public override int Read(byte[] buffer, int offset, int count)
    {
        var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;
        var actualBytesRead = 0;

        for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++)
        {
            buffer[i] = b;
            actualBytesRead++;
        }

        return actualBytesRead;
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;
        for (var i = offset; i < maxByteCount; i++)
        {
            _buffer.TryAdd(buffer[i]);
        }
    }
   // Removed the CanRead, CanWrite... properties, Flush(), and methods and the methods that are not supported.
}

The internal queue handles all of the pain of writing to the top and reading from the bottom.

Implementing correct Read(buffer, offset, count) behaviour

We have one problem to solve. We should only return zero bytes if the end of the stream is reached. As we might be reading before data is written we can’t just return the number of bytes as above. Practically speaking, if the internal buffer is empty then we can’t return at all unless the writing process either writes more or signals that the writing is finished.

There’s no way to do this using just the Stream type’s interface so we have no choice but to add a new method that the writer can call to signal that it is finished.

I’ve added a new method and a new property. The property is get only as we can’t have a writer changing it’s mind after the reader has stopped reading because those bytes would never be read.

bool WritingFinished { get; private set; }

public void CloseWrite()
{
    WritingFinished = true;
}

The read will not check to see if the internal buffer is empty and, if it is then it’ll check whether the writer is finished. If both are true then it’ll just return zero to signal to the reader that it has everything now and should move on.

NB: A naive implementation might involve a loop and some Thread.Sleep() type thing polling something. This is bad for reasons that we won’t go into here. A much cleaner solution is to make use of the AutoResetEvent.

The AutoResetEvent is a private field in our new stream class.

private readonly AutoResetEvent _readWait = new AutoResetEvent(false);

It’s initialised to false (i.e. blocking reading) and is then released and locked again as reads and writes happen.

Finally, we need to throw a InvalidOperationException is thrown if a write happens after the stream has been closed.

The reading, writing and close members look like this now:

public bool WritingFinished { get; private set; }

public void CloseWrite()
{
    WritingFinished = true;
    _readWait.Set();
}

public override int Read(byte[] buffer, int offset, int count)
{
    var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;

    if (_buffer.Count == 0)
    {
        _readWait.WaitOne();

    }
    _readWait.Reset();
    var actualBytesRead = 0;

    for (var i = offset; i < maxByteCount && _buffer.TryTake(out var b); i++)
    {
        buffer[i] = b;
        actualBytesRead++;
    }

    return actualBytesRead;
}

public override void Write(byte[] buffer, int offset, int count)
{
    if (WritingFinished)
    {
        throw new InvalidOperationException("Attempt to write after CloseWrite() has been called.");
    }

    var maxByteCount = offset + count > buffer.Length ? buffer.Length : count;
    for (var i = offset; i < maxByteCount; i++)
    {
        _buffer.TryAdd(buffer[i]);
    }
    _readWait.Set();
}

Notice where we are calling Set() and Reset() on the AutoResetEvent and how this is coordinating the end of the stream.

The source code and unit tests for this are available one my BitBucket Repo.

Thanks for reading.

Please feel free to get in contact @BanksySan.