How to Organize Event-Flow in Microservice Architecture

Numerous enterprise solutions based on the Microservice Architecture have an issue with generalizing event flow from different sources. Also, a lot of solutions have various providers, for example:

  • Azure Service Bus
  • Apache Kafka
  • RabbitMQ

Here we need a component with the ability to join event publishers and event subscribers.

Solution

Before we deep into the implementation, we can have look at Martin Fowler’s Event Aggregator.

Figure 1.

event-agregator

“An Event Aggregator is a simple element of indirection. In its simplest form, you have it register with all the source objects you are interested in and register all target objects with the Event Aggregator. The Event Aggregator responds to any event from a source object by propagating that event to the target objects.”

Another example that follows this principle is Azure Event-Grid. With the EventGrid, you can join cloud resources that produce events (publishers) and resources that handle the events (subscribers). You can see this in the image below.

event-agregator

Let’s see how we can reuse this pattern in the actual example. I wrote components and services of the solution in sharp. As primary Event-Bus, I used Azure Service Bus.

I designed the solution to be easy to extend. So you can create providers for Apache Kafka and Rabbit MQ. You can see it in the image below:

event-agregator

The first component is ServiceBusManager it contains a method to subscribe to the messages. You can see the code below.

public async Task RegisterOnReceiveMessages(string subscription, Dictionary<string, Func<Message, bool>> subscriptionToLabelHandler, CancellationToken cancellationToken)
{
     var taskCompletionSource = new TaskCompletionSource<bool>();
     SubscriptionClient subscriptionClient = GetSubscriptionClient(subscription);

     RegisterCancellationToken(cancellationToken, subscriptionClient, taskCompletionSource);

     var messageHandlerOptions = MessageHandlerOptions;

    // Register the function that will process messages
    subscriptionClient.RegisterMessageHandler(async (message, token) =>
    {
        //Process the message
        Console.WriteLine($"Received message: SequenceNumber:{message.Label} | SequenceNumber:{message.SystemProperties.SequenceNumber} | Body:{Encoding.UTF8.GetString(message.Body)}");

        subscriptionToLabelHandler[message.Label](message);

        // Complete the message so that it is not received again.
        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);

    }, messageHandlerOptions);

    await taskCompletionSource.Task;
}

And the method that allows another component to Send Messages.

public async Task SendMessage(string label, string messageContent)
{
    try
    {
        var topicClient = new TopicClient(serviceBusSettings.ConnectionString, serviceBusSettings.TopicName);

        var messageData = GetMessageContent(label, messageContent);

        var message = new Message
        {
            Body = messageData,
            Label = label,
        };

        // Send the message to the queue
        await topicClient.SendAsync(message);

        await topicClient.CloseAsync();
    }
    catch (Exception exception)
    {
        Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
    }
}

Here you can find a complete implementation of the Service Bus Manager.

Event Aggregator

The second component of our system is EventAggregator class. It contains:

  • Configuration that maps event handler label and event handler function
private Dictionary<string, Func<Message, bool>> SubscriptionToLabelFuncs => new Dictionary<string, Func<Message, bool>>
{
    { "First", DoFirstHandler },
    { "Second", DoSecondHandler }
};

....
public bool DoFirstHandler(Message message)
{
    // Get message body example
    var data = GetMessageBody(message);

    return true;
}

/// <summary>
/// Second message handler example.
/// </summary>
/// <param name="message">The message.</param>
/// <returns></returns>
public bool DoSecondHandler(Message message)
{
    // Get message body example
    var data = GetMessageBody(message);

    return true;
}
  • And the method that runs the aggregation process in the background.
        /// <summary>
        /// Starts the agregating.
        /// </summary>
        /// <returns></returns>
        public async Task StartAgregating()
        {
            this.cancellationToken = new CancellationTokenSource().Token;
            await serviceBusManager.RegisterOnReceiveMessages(Subscription, SubscriptionToLabelFuncs, cancellationToken);
        }

The Complete implementation of the Event Aggregator.

Conclusion

In this article, I’ve explained one way on how to organize the event communication in your Microservice Solution. If you want to receive advanced skills in building Event-Driven architecture, you can subscribe and receive Architecture Digest or enroll in the course Building Event-Driven and Microservices Architecture in Azure