In this post I’ll discuss using RabbitMQ as a centralized service bus. RabbitMQ allows users to create a communication pipeline that enables loosely coupled processes to communicate asynchronously, scale with load, initiate system state changes, and report back on those changes as they occur. It’s a flexible system that, when used appropriately, can greatly enhance the speed and resiliency of complex systems. Message queuing systems are not, however, a magic bullet that should be applied to every situation. If you need a system that only performs synchronous actions, or you have a requirement to scale up (adding resources to the existing server) instead of scaling out (adding more servers to share processing load), then a message queue will just add a layer of complication without adding any benefit. With all that out of the way, let’s jump right in.
RabbitMQ is an open source implementation of the Advanced Message Queuing Protocol (AMQP). The server is written in Erlang and supports clustering and failover through the Open Telecom Platform. What that all means is that it’s fast, lightweight and can be built with a high level of fault tolerance. There are clients and tutorials for numerous languages, and many common patterns are well documented.
There are also install packages for many operating systems and plenty of documentation for each. Whichever implementation you choose, I highly recommend enabling the management plugin. It enables a web frontend that can be used to view queues, post messages, and sample messages from queues. The management plugin also enables the HTTP API, which allows you to send messages regardless of the language you are using. I’ll be using C# and Python for our examples, but the patterns are applicable to any language.
Here’s our scenario: we have various service buckets, including a web front end, database, RabbitMQ, and worker services. I use the term “service bucket” because these patterns apply whether you’re dealing with one server or a multi-node cluster. Actions are initiated by users from the web frontend or from scheduled processes within the system. Action requests come in the form of messages dropped on the queue. Actions are performed by worker services listening for specific messages or message patterns (more on this in a bit).
In the RabbitMQ world, if you are just passing messages from one service to another, all you need is a queue. However, if that’s all you need to do, a socket-based approach from MSMQ or ZeroMQ is probably more appropriate. These services assume a direct connection between sender and receiver. RabbitMQ starts to be much more helpful when we bring in exchanges and message routing. At a high level, messages are dropped on an exchange, and queues attach to that exchange to receive some subset of the messages. There are various types of exchanges, and each one is applicable to any number of situations. For our purposes we will use a topic exchange.
Topic exchanges allow us to tag messages with a routing key and are commonly used in many publish/subscribe situations. Queues bind to the topic exchange with a key, and messages that have a matching key are routed to them. Additionally, we will use named queues so that multiple copies of the same service can attach to the same queue. When there are multiple listeners on the same queue, Rabbit round-robins through them, balancing the load across all listeners. This last piece is key. Instead of services creating anonymous queues and there being a one-to-one relationship between between the queue and the service, we can have one or more instances of the service listening, and we get the load balanced for free. The queue lives in the Rabbit service bucket, so listeners can be added and removed based on need. If an instance disconnects, any messages that aren’t acknowledged (ACKd) will be redelivered to the next available instance.
With workers connecting to specific named queues, we are able to spread the work load for a queue across multiple services, hosts, even OSs. As long as the service that receives the message does the job that the system is expecting, the results will be the same. The only technical limitation to the number of listeners that you can have per queue is the number of connections the operating system can handle. If you hit that level, you would be better off creating a cluster and load balancing the connections across that.
The routing key in a topic exchange is a set of dot delimited words; it functions similarly to an IP address. Patterns can match against exact keys, but more powerfully, \* and \# can be used to match partial patterns. \* can be substituted for exactly one word while \# will match zero or more words. If the routing key in the message followed a convention like
would match foo jobs for any environment and service combination while
would match any message concerning prod.
Note that a queue can be bound with multiple routing key patterns to receive different types of messages.
How about some code?
Here we have a Python service that declares an exchange and publishes a message to it:
And that’s all there is to it. Each message producer in the system will look similar to this. Declaring exchanges is idempotent, so it’s appropriate to declare the exchange each time you use it. It will only be created once. However, if you try to declare an existing exchange with a different type, you will get an error.
On the listener side we can take advantage of the topic exchange’s ability to deliver the same message to multiple endpoints. We will create a Python service that declares a queue listening with “#” to get a copy of every message that passes through the queue, as well as a C# service in our worker service bucket that will do the actual work.
We declared the queue with exclusive=false so that we can have more than one listener attached. We do that so we can have multiple instances of the service running in order to balance load. We bound the queue to the exchange with the “#” routing key so we’ll get all the messages. Also notice that the basic_consume function takes a callback function as its first parameter. This callback function will be called every time a message is received.
In this example we get the JSON message from the body parameter and the routing key from the method parameter. We also send an ACK back to RabbitMQ to acknowledge that we received the message and performed the work that we intended to do.This service can be used to keep track of state changes to objects and task start and stop times.
In the worker service bucket, we’ll use a Windows service to start a listener and wait for specific routing keys. Keep in mind that these workers need not be on the same machine or even the same OSs. In this example the catchall service running Python above could be on a Linux machine while the C# service below could run on Windows. Also remember that each copy of the service need not run on the same machine.
RabbitMQ has native .Net libraries available in NuGet.
Here we declare the exchange just like a message producer, however we also declare a durable, non-exclusive, non-autodelete queue, and bind the queue to the exchange using a specific routing key. We start a while loop and pull messages off the queue as they come. Again, since the queue is non-exclusive, we can have multiple workers connected to it.
So what we have at this point is a task that is creating messages and dropping them on the queue in response to some action request further up the chain. We have a catchall service that grabs messages as they come across and updates object state based on the contents of the message. We also have a worker service that listens for specific messages and performs tasks based on those messages. What we lack, however, is a way to notify the system that a task is complete and return whatever results need to be returned. Something like this is really all we need:
This can be added to the end of the work workflow after the triggering message has been ACKd. Any data that needs to be returned to the system can be added to the msg dictionary.
The following image illustrates the system we’ve mapped out:
With these few pieces we have created a communication pipeline that allows for loosely coupled processes to scale with load, initiate system state changes, and report back on those changes as they occur. It’s a flexible system that is applicable to numerous situations.