Show / Hide Table of Contents

    Stream Parsers

    This article introduces the context of streamed parsers. The goal of a stream parser to process a continuous stream of input data into one or more parsed messages.

    Streamed parsers are Observable. To receive the messages an observer must subscribe to the parser. Once a parsed message is available the observable will push it to the subscribed observers.

    There are two type of streamed parsers available in the library, the StreamParser and the PipeParser. Both using the System.IO.Pipelines in their implementation. Usage of the two parsers are very similar, hence this article is introducing it through the PipeParser.

    Note, that stream parsers are designed to return the same type of messages.

    PipeParser

    The public interface of a PipeParser is simple. In the constructor it is given a Pipe or a PipeReader, an IMessageParser<T, byte> to parse messages from bytes to type of T, and a SupportedFixVersion to indicate that each message must support a given fix version.

    Optionally, a Func<ReadOnlyMemory<byte>, T> function can be passed to let the user create the target object given the input.

    PipeParser provides a Subscribe method for its observables, and a ListenAsync to start listening the input stream of data. ListenAsync will complete once the input has ended or cancellation has been triggered.

    The example below shows how to process data with the PipeParser:

    // Construct the pipe
    var pipe = new Pipe();
    
    // Create a message parser that can parser bytes into Order
    var messageParser = new ParserBuilder<Order>().Build<byte>(newMessageParserOptions() { ThrowIfInvalidMessage = false });
    
    // Create the piped parser, providing the pipe and an IMessageParser
    var parser = new PipeParser<Order>(pipe.Reader, messageParser,SupportedFixVersion.Fix44);
    
    // We subscribe to the observable to print the parsed messages
    parser.Subscribe(
      order => Console.WriteLine($"Order {order.Symbol} - Px{order.Price}, Qty {order.Quantity}"),
      ex => Console.WriteLine($"Error: {ex.Message}"));
    
    // Start observing and await until all messages observed
    await parser.ListenAsync();
    
    • Improve this Doc
    Back to top DanubeDev.RapideFix - Documentation