A Streaming Message Writer & Reader in C# & Json.NET

Here is a C# implementation of a high performance message reader and writer that can read and write messages to any stream using Json.NET. We are using a similar implementation here at BancVue as our message store and it is performing quite well.

I settled on Json.NET after trying several other serializers and reading a few posts on serialization performance. Objects serialized as Json are much smaller than Xml, and the Json.NET project seems to have the fastest serializer, and pretty wide support in the developer community.

Overview

What I set out to create was something that can serialize millions of messages to a temporary holding place, then deserialize them for processing later. This serializer simply writes the Json to a file, but it could be used to write to any stream. There is a MessageWriter and a MessageReader. Together they can be used to form a “Message Store” as mentioned in some of Greg Young’s posts. Below is a unit test modeling how I want to use these objects.

[ DataContract ]
internal class TestMessage
{
	[ DataMember ]
	public string Text { get; set; }
}

public class Given_a_message_writer_and_reader_tied_to_the_same_stream : ContextSpecification
{
	protected MemoryStream _stream;
	protected IMessageWriter _writer;
	protected IMessageReader _reader;

	protected override void SharedContext()
	{
		_stream = new MemoryStream();
		_writer = new MessageWriter( _stream );
		_reader = new MessageReader( _stream );
	}
}

[ Concern( typeof ( MessageReader ) ) ]
public class When_a_message_is_written_to_the_stream : Given_a_message_writer_and_reader_tied_to_the_same_stream
{
	private TestMessage _message;

	protected override void Context()
	{
		_message = new TestMessage {Text = "TestValue"};
	}

	protected override void Because()
	{
		_writer.WriteMessage( _message );
		_writer.Flush();

		// Need to rewind to beginning of stream so we can read.
		_stream.Seek( 0, SeekOrigin.Begin );
	}

	[ Observation ]
	public void Should_be_able_to_retrieve_that_message_from_the_reader()
	{
		var actualMessage = _reader.ReadMessage< TestMessage >();
		Assert.That( ( actualMessage ).Text, Is.EqualTo( _message.Text ) );
	}
}

Simply calling writer.WriteMessage() will write the message to the stream. Calling reader.ReadMessage() is all that is needed to read a message from the stream. I added Flush() to the writer so that when writing to files I can use a buffer. Calling flush simply calls flush on the underlying stream.

The Interfaces

Here are the interfaces for the writer and reader.

public interface IMessageWriter : IDisposable
{
	void WriteMessage( object message );
	void Close();
	void Flush();
}

public interface IMessageReader : IDisposable
{
	T ReadMessage< T >();
	bool Eof { get; }
	void Close();
}

The Implementation

Now lets look at the implementations. First, the writer…

public class MessageWriter : IMessageWriter
{
	private readonly StreamWriter _writer;
	private readonly JsonSerializerSettings _jsonSerializerSettings;

	public MessageWriter( Stream stream )
	{
		_writer = new StreamWriter( stream );

		_jsonSerializerSettings =
			new JsonSerializerSettings
				{
					DefaultValueHandling = DefaultValueHandling.Ignore,
					NullValueHandling = NullValueHandling.Ignore,
					MissingMemberHandling = MissingMemberHandling.Ignore,
					TypeNameHandling = TypeNameHandling.Objects
				};
	}

	public void WriteMessage( object message )
	{
		_writer.WriteLine( JsonConvert.SerializeObject( message,
		                                                Formatting.None,
		                                                _jsonSerializerSettings ) );
	}

	public void Close()
	{
		_writer.Close();
	}

	public void Flush()
	{
		_writer.Flush();
	}

	public void Dispose()
	{
		Close();
	}
}

Something to note… I am using WriteLine() to write each object. This separates them with a CRLF. I had to do this because Json.NET currently requires you to have only one root element…unless you want to deserialize all the root elements as an array. Since I have millions of messages, I can’t create an array of that size in memory or I will run out of RAM. Until this is fixed in Json.NET, I will just use the WriteLine mechanism. It has worked well so far.

As you may have guessed, I am using ReadLine() in the reader to retrieve each message. Here is the reader’s implementation…

public class MessageReader : IMessageReader
{
	private readonly StreamReader _reader;
	private readonly JsonSerializer _serializer;

	public MessageReader( Stream stream )
	{
		_reader = new StreamReader( stream );

		_serializer = new JsonSerializer
				{
					DefaultValueHandling = DefaultValueHandling.Ignore,
					NullValueHandling = NullValueHandling.Ignore,
					MissingMemberHandling = MissingMemberHandling.Ignore,
					TypeNameHandling = TypeNameHandling.Objects
				};
	}

	public T ReadMessage< T >()
	{
		return (T)ReadMessage();
	}

	public object ReadMessage()
	{
		string line = _reader.ReadLine();

		return _serializer.Deserialize(
				new JsonTextReader( new StringReader( line ) ) );
	}

	public bool Eof
	{
		get { return _reader.EndOfStream; }
	}

	public void Close()
	{
		_reader.Close();
	}

	public void Dispose()
	{
		Close();
	}
}

Getting Json.NET to deserialize your types

Notice the JsonSerializerSettings. Most of these are to compress the Json by not showing empty or null values, but notice the last setting:

TypeNameHandling = TypeNameHandling.Objects

This setting tells the serializer to insert the type names of your objects into the Json itself. It then can use it when deserializing to determine what type to create. It does increase the size of your Json, but it makes it much easier to work with since you don’t have to know exactly what type of message you are deserializing.

Closing

All in all, this serializer/deserializer set is very fast. We are using a similar implementation here at BancVue in our production environment and are very pleased with the results.

FYI: Multiple message test

Here is a test that shows a bit more about how we use the serializer when working with multiple messages.

[ Concern( typeof ( MessageReader ) ) ]
public class When_multiple_messages_are_written_to_the_stream : Given_a_message_writer_and_reader_tied_to_the_same_stream
{
	private TestMessage _message1;
	private TestMessage _message2;

	protected override void Context()
	{
		_message1 = new TestMessage {Text = "TestValue1"};
		_message2 = new TestMessage {Text = "TestValue2"};
	}

	protected override void Because()
	{
		_writer.WriteMessage( _message1 );
		_writer.WriteMessage( _message2 );
		_writer.Flush();

		// Need to rewind to beginning of stream to read.
		_stream.Seek( 0, SeekOrigin.Begin );
	}

	[ Observation ]
	public void Should_be_able_to_retrieve_that_message_from_the_reader()
	{
		var actualMessage1 = _reader.ReadMessage< TestMessage >();
		Assert.That( actualMessage1.Text, Is.EqualTo( _message1.Text ) );

		var actualMessage2 = _reader.ReadMessage< TestMessage >();
		Assert.That( actualMessage2.Text, Is.EqualTo( _message2.Text ) );
	}
}

-Chris

This entry was posted in .NET, C#, Messaging, SOA and tagged , , , . Bookmark the permalink. Post a comment or leave a trackback: Trackback URL.

Post a Comment

Your email is never published nor shared. Required fields are marked *

You may use these HTML tags and attributes <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*
*