Saturday, August 1, 2015

Pipes and Filters Pattern

 
Requirements: To understand the article, a basic knowledge of OOPs and Visual studio is required along with the C# basic knowledge programming.

I tried to explain the article in such a fashion that simulates the typical software projects environment. The program developed in this article does not actually publish message to any queue. Just to simulate the message being encrypted and priority set, added some properties in the Message to support them. But in actual projects, we might be doing the job in different manner.

Existing system:
Company ABC wants to build a new application called “Messenger” that publishes the messages to MSMQ. They are expecting two new consumers to this application as soon as it is developed. One consumer will be publishing sensitive data and so expects the message to be encrypted while publishing it to the Queue and also wants to set the priority for message. The other consumer will be publishing simple text data that does not need any encryption and only sets the message priority.
New requests from other consumers are expected in soon and the application should be built in such a way to incorporate any changes to the existing architecture in future.

So the Application manager approaches the technical team with following problem statement:
I want to publish a message to a queue and it should undergo certain transformations or perform certain operations before publishing it.
I want my code design to support adding new operations or removing the existing operations.

Then the Architect/Technical lead jumps in and proposes “Pipes and Filters Pattern” as the solution to the problem and provides the design to the development team with the High level details as below:



Figure 1 – Publishing message to the Queue

Color depictions used in the diagram: Incoming message is in green color gets transformed to blue color after the message priority is set and then transformed to orange color after message is encrypted and sent as an output.

High Level Details:
- Pipes and Filters architectural pattern is one of the Messaging Patterns which helps in splitting a large set of operations on a message into different processes so that each process can work on the message independently and complete the transformation of the message.
- Here each process will be called as “Filters” and they are connected through the channels or connectors called “Pipes”.
- All the filters implement a common interface so that all will conform to the contract that they are supposed to work on.
- Message from the source will be set as input to the process and the output or result of the process will be sent as an input to another process and so on until it reaches the sink or destination process. (Note: Here terms “Process” and “Filters” are used interchangeably.
- In the above diagram first process sets the message priority by taking the incoming message through pipe and then sends the output to another process to encrypt the message.
- Then the outgoing message is published to the Queue.

Advantages and disadvantages of this architectural pattern:
- All the filters or operations are independent and each can plugged into or unplugged without affecting the other one. So this is a great feature that can be specialized for each consumer of the feature.
- Because the filters are independent of each other, you cannot establish any communication between the filters and the message can be sent sequentially i.e. in a linear fashion.
Then the architect provides the low level design for the implanting the solution as below:

Class diagram:




Figure 2 – Class diagram

Class Diagram in Detail:

All operations expose one interface so that they comply with the interface implementation. In our example, Setting Message Priority and Encrypting the Message are the two operations or filters that needs to be implemented on the source message before publishing.

So we created an interface “IOperation<T>” which supports generic data type so that the implementing classes adhere to the type it should support and implement the method of the interface “Execute(T input)” which takes the generic type T as the input and returns the same type. This also helps in different types to implement the same interface. This interface has only one method “Execute” which should be implemented by the implementing classes. This method takes input parameter type, performs operation and returns the same type.

Note: Since we are working with messages, always message will be the input and output for the filter or operation methods and so our method Execute supports that.

Now coming to the operations: We need to create classes each performing one operation and which takes the same input type and returns the same type. As per the requirements, we need to create two classes “EncryptMessage” and “MessagePriority” which encrypts and sets the priority for the message.

Now there must be some way that tells which operations have to be registered from the incoming message. This job will be done by the pipeline classes. So we created an abstract base class “Pipelinebase” for pipeline that supports generic types and it has methods for registering the operations and performing the operations. As per our requirement, we need one pipeline that sends a message from the client to the messaging queue and so we named our pipeline class as “SendPipeline” which inherits the behavior of the base class “Pipeline” and also created an interface that defines our message called “IMessage” and our SendPipeline class supports the classes that implement the IMessage interface.

Also the SendPipeline class should have a constructor that takes Boolean parameters for each message operation. Since, we have only two operations, we are creating a constructor that takes two Boolean parameters. If you have more number of parameters, it is advisable to use the enums with flag attribute or a type instance that carries all the setting properties.

Then the development team comes with the coding implementation as per the class diagram and specifications provided by the architect as below:

Coding in Action:

IOperation interface: This interface supports generic type and all the classes that implement this interface will be implementing the Execute method that takes and returns the same type.
namespace PipesAndFiltersExample
{
    /// <summary>
    /// Operation Interface
    /// All operations must implement the interface
    /// </summary>
    /// <typeparam name="T">Data type supported in the operation</typeparam>
    public interface IOperation<T>
    {
        /// <summary>
        /// Executes the operation
        /// </summary>
        /// <param name="input">The Input Parameter</param>
        /// <returns>Type defined for the operation</returns>
        T Execute(T input);
    }
}

EncryptMessage class: This class encrypts the incoming message and implements the interface IOperation<IMessage>.
namespace PipesAndFiltersExample
{
    /// <summary>
    /// Encrypts the message
    /// </summary>
    class EncryptMessage : IOperation<IMessage>
    {
        /// <summary>
        /// Executes the operation on the message
        /// </summary>
        /// <param name="input">The Input Message</param>
        /// <returns>Type of IMessage</returns>
        public IMessage Execute(IMessage input)
        {
            return Encrypt(input);
        }
        /// <summary>
        /// Encrypts the message
        /// </summary>
        /// <param name="input">The Input Message</param>
        /// <returns>Type of IMessage</returns>
        private IMessage Encrypt(IMessage input)
        {
            input.IsEncrypted = true;
            //Encryption code
            return input;
        }
    }
}

MessagePriority class: This class sets the message priority and implements the interface IOperation<IMessage>.
namespace PipesAndFiltersExample
{
    /// <summary>
    /// Sets Message priority to the incoming message
    /// </summary>
    class MessagePriority:IOperation<IMessage>
    {
        /// <summary>
        /// Executes the operations
        /// </summary>
        /// <param name="input">The input message</param>
        /// <returns>Message with the priority set</returns>
        public IMessage Execute(IMessage input)
        {
            input.Priority = 1;
            return input;
        }
    }
}

IMessage interface:
namespace PipesAndFiltersExample
{
    /// <summary>
    /// Message Interface
    /// </summary>
    public interface IMessage
    {
        /// <summary>
        /// Priority of the message
        /// </summary>
        int Priority{get;set;}
        /// <summary>
        /// Field that needs to be implemented by the
        /// implementing the classes
        /// </summary>
        bool IsEncrypted { get; set; }
        /// <summary>
        /// Tells if the message is encrypted
        /// </summary>
        /// <returns></returns>
        bool IsMessageEncrypted();
        /// <summary>
        /// Tells if the priority is set
        /// </summary>
        /// <returns></returns>
        bool IsMessagePrioritySet();
        /// <summary>
        /// Message Id of the message
        /// </summary>
        string MessageId {get;}
        /// <summary>
        /// Body of the message
        /// </summary>
        string Body { get; }
        /// <summary>
        /// Header of the message
        /// </summary>
        string Header { get; }
        /// <summary>
        /// Subject of the message
        /// </summary>
        string Subject { get; }
    }
}


Message class: This class implements the interface IMessage.
namespace PipesAndFiltersExample
{
    /// <summary>
    /// Message class
    /// </summary>
    public class Message:IMessage
    {
        /// <summary>
        /// boolean variables
        /// </summary>
        private string messageId;
        private string body;
        private string header;
        private string subject;
        /// <summary>
        /// Properties to be set
        /// </summary>
        public int Priority { get; set; }
        public bool IsEncrypted { get; set; }
        /// <summary>
        /// Property that returns MessageId
        /// </summary>
        public string MessageId
        {
            get { return messageId; }
        }
        /// <summary>
        /// Property that returns body of the message
        /// </summary>
        public string Body
        {
            get { return body; }
        }
        /// <summary>
        /// Property that returns the header of the message
        /// </summary>
        public string Header
        {
            get { return header; }
        }
        /// <summary>
        /// Property that returns subject of the message
        /// </summary>
        public string Subject
        {
            get { return subject; }
        }
        /// <summary>
        /// Constructor for the class Message
        /// </summary>
        /// <param name="messageId">The MessageId</param>
        /// <param name="body">The Body</param>
        /// <param name="header">The Header</param>
        /// <param name="subject">The Subject</param>
        public Message(string messageId, string body, string header, string subject)
        {
            this.messageId = messageId;
            this.body = body;
            this.header = header;
            this.subject = subject;
        }
        /// <summary>
        /// Informs if the message is encrypted
        /// </summary>
        /// <returns>The value true/false</returns>
        public bool IsMessageEncrypted()
        {
            return IsEncrypted;
        }
        /// <summary>
        /// Informs if the message priority set
        /// </summary>
        /// <returns>The value true/falses</returns>
        public bool IsMessagePrioritySet()
        {
            return (Priority != 0);
        }
    }
}


Pipelinebase abstract class: This class has two methods Register and PerformOperation. Register method takes an IOperation<T> as an input and adds it to the list type operations in the class. So, based on the choice of the clients, all the required operations will be added to the list through Register method and then PerformOperation method loops through all the operations and executes the method “Execute” of the each operation. Each operation takes an input message and performs the operation and then the resultant message will be sent to another operation and likewise till all the operations are completed.
using System.Collections.Generic;
using System.Linq;
namespace PipesAndFiltersExample
{
    /// <summary>
    /// Base class for the pipeline classes
    /// </summary>
    /// <typeparam name="T">Type supported by the Pipeline base</typeparam>
    public abstract class Pipelinebase<T>
    {
        private readonly List<IOperation<T>> operations = new List<IOperation<T>>();
        /// <summary>
        /// Registers the operation
        /// </summary>
        /// <param name="operation">The operation</param>
        /// <returns>The instance of the class</returns>
        public Pipelinebase<T> Register(IOperation<T> operation)
        {
            this.operations.Add(operation);
            return this;
        }
        /// <summary>
        /// Perform the operation
        /// </summary>
        /// <param name="input">The input message</param>
        /// <returns>The instance of the class</returns>
        public T PerformOperation(T input)
        {
            return this.operations.Aggregate(input, (current, operation) => operation.Execute(current));
        }
    }
}


SendPipeline class: This class has a constructor that takes two parameters bool setPriority, bool encryptMessage and registers the operation classes based on the parameter values.
namespace PipesAndFiltersExample
{
    /// <summary>
    /// Pipeline that sends message to the destination by performing operations
    /// on the incoming message
    /// </summary>
    public class SendPipeline : Pipelinebase<IMessage>
    {
        /// <summary>
        /// Constructor for the class where the operations are registered
        /// for the messages
        /// </summary>
        /// <param name="setPriority">The Set Priority parameter</param>
        /// <param name="encryptMessage">The Encrypt Message parameter</param>
        public SendPipeline(bool setPriority, bool encryptMessage)
        {
            if (setPriority)
            {
                Register(new MessagePriority());
            }
            if (encryptMessage)
            {
                Register(new EncryptMessage());
            }
        }
    }
}

Created two console applications to demonstrate the two clients. One client wants to encrypt and set the message priority and the other just wants the message priority to be set.

Client 1 program:

using PipesAndFiltersExample;
using System;
namespace Client1
{
    class Program
    {
        static void Main(string[] args)
        {
            IMessage message = new Message(messageId:"1",
                body:"This is the message body",
                header:"Header Information",
                subject:"Please set priority and encrypt the message");
            //sending code to the pipeline and the message is encrypted and published into the queue
            SendPipeline sendPipeline = new SendPipeline(true,true);
            var publishedMessage = sendPipeline.PerformOperation(message);
            Console.WriteLine("I am client1 and my messages should be prioritized and encrypted!!!");
            Console.WriteLine("My message Priority Set? {0}", publishedMessage.IsMessagePrioritySet());
            Console.WriteLine("My message Encrypted? {0}", publishedMessage.IsMessageEncrypted());
            Console.WriteLine();
            Console.Read();
        }
    }
}

Client 2 program:

using PipesAndFiltersExample;
using System;
namespace Client2
{
    class Program
    {
        static void Main(string[] args)
        {
            IMessage message = new Message(messageId: "1",
                body: "This is the message body",
                header: "Header Information",
                subject: "Please set priority and encrypt the message");
            //sending code to the pipeline and the message is encrypted and published into the queue
            SendPipeline sendPipeline = new SendPipeline(true, false);
            var publishedMessage = sendPipeline.PerformOperation(message);
            Console.WriteLine("I am client2 and my messages should only be prioritized and not encrypted!!!");
            Console.WriteLine("My message Priority Set? {0}", publishedMessage.IsMessagePrioritySet());
            Console.WriteLine("My message Encrypted? {0}", publishedMessage.IsMessageEncrypted());
            Console.WriteLine();
            Console.Read();
        }
    }
}

When you run the code, you should see the following:

Client 1 program:

Figure 3 – Client 1 Program output shows that the message is encrypted and the priority is set

Client 2 program:

Figure 4 – Client 2 Program output shows that the message priority is set but not encrypted


You can start two projects by right clicking the solution and select “Set StartUp Projects…” and select “Start” for the two projects in the “Action” column and you should be good to run two console applications at one shot as below:

Figure 5 – Starting two projects at one shot