AMQP Broker

8 minute read

This was an exercise of the Network Programming subject that I attended during the first semester of my master’s course. I was really happy with the result, so I thought it is worth to be mentioned here.

The idea was to create a server written in C that should interpret and process AMQP messages, used by message brokers such as RabbitMQ. Encryption and fail tolerance wasn’t required, so, I didn’t implemented that. This means that this code is a toy, and it isn’t suitable for real-life purposes.

As technical requirements, it had to:

  • be compatible with AMQP 0.9.1;
  • be able to accept connections and disconnections of clients;
  • declare queues;
  • accept connections of several clients at the same time;
  • allow clients to subscribe a queue;
  • allow clients to push a message to a queue;
  • only use ASCII characters;
  • work on GNU/Linux;

The code is available on GitHub. You can find instructions on its README.

Development

This code was written based on the AMQP specification. In addition, I observed the message exchange between the clients and the server using Wireshark, so, this was partially developed by reverse-engineering RabbitMQ.

I used as clients the command line tools suit amqp-tools, available on the repositories of most Linux distributions and on Homebrew.

Detailed design

Data structures

Queues

The queues were defined as linked lists. These are their nodes:

typedef struct q_node {
    struct q_node *parent;
    int length;
    char body[1];
} q_node;

Note that the last field (the body of message in a queue) is a char array with only one element. This is because this linked list node is also a flexible array. This is a technique to dynamically allocate an array in C and other fields (in this case, the pointer to the next node and the length of the body) in a single call, like this:

q_node *n;
int length = strlen(body);

n = malloc(sizeof(*n) + length * sizeof(char));
n->parent = NULL;
n->length = length;
strcpy(n->body, body);

Here, malloc allocates enough size to the struct fields and the number of characters of the string body. The byte used for the null terminator is already in the struct (the 1 in body[1]).

Tries

Tries are a data structure that I always found very elegant but I have never had a use case that I need them. So, this was the first time that I implemented one.

Oh, and if you don’t know what a trie is, you can read about it on Wikipedia. They are trees used to build symbol tables where the keys are strings. Each character of the string is a node, and the node of the last character references the value.

I used tries to index the queues by their names. In the following picture, I have two queues named CAFE and CASA (the Portuguese words for “coffee” and “house”). The trie nodes are in blue, the fixed portion of the queue nodes are in green and the flexible portion of the queue nodes are in red:

Trie and queues

These two data structure are the only ones that I’m dynamically allocating. I’m avoiding mallocs in this code, so I don’t need to care too much about memory leaks.

Messages

Writing a code in C has its advantages, mainly because of it being in a lower level compared to most of the other high-level languages. One thing that it helped a lot here is that I didn’t need write a complex parser to read AMQP messages. The AMQP protocol defines fixed-length fields, that can be represented as fields in a struct in C. This means that, for most cases, it was enough to declare the structure, as I did here:

/* Message header. The header of an amqp message (except protocol header). */
typedef struct {
    uint8_t msg_type;
    uint16_t channel;
    uint32_t length;
} __attribute__((packed)) amqp_message_header;

And after that, the raw data could be directly copied to the memory address of that struct, and converting the endianness of the fields:

static int parse_message_header(char *s, size_t n, amqp_message_header *header) {
    size_t header_size = sizeof(amqp_message_header);

    if (n < header_size) return 1;
    memcpy(header, s, header_size);

    header->channel = ntohs(header->channel);
    header->length = ntohl(header->length);
    return 0;
}

If you are curious about what __attribute__((packed)) means, it is needed to avoid padding.

State machine

This was mostly copied from the README. Sorry for the ASCII diagrams, but the README was written to be displayed in a terminal, and would take too much time remaking them in SVG, so I kept them as they were.

AMQP is a stateful protocol, so it’s natural that we keep the connection control using a state machine.

Each state machine is created after the beginning of the connection with the client, starting in the state WAIT. In that state, it waits for the protocol header.

The last state of the connection is FINISHED, when the connection finishes gracefully. There is also a FAIL state, if something goes wrong.

State machine diagram

The FAIL stated is hidden here in order to simplify the diagram, as any state may have a event that leads to it.

The connection establishment states are the following:

        *--------*                      *------------*           *------------*
        |  WAIT  | -------------------> |   HEADER   | --------> |    WAIT    |
        |        | C: Protocol Header   |  RECEIVED  | S: Start  |  START OK  |
        *--------*                      *------------*           *------------*
                                                                          |
                                                              C: Start OK |
                                                                          |
*-----------------*            *-----------*            *------------*    |
|       WAIT      |<---------- |   WAIT    | <--------  |  START OK  | <--
| OPEN CONNECTION | C: Tune Ok |  TUNE OK  |  S: Tune   |  RECEIVED  |
*-----------------*            *-----------*            *------------*
  |
  | C: Open Connection
  |
  |    *-----------------*                         *-----------------*
   --> | OPEN CONNECTION | ----------------------> | OPEN CONNECTION |
       |     RECEIVED    | S: Open Connection OK   |     RECEIVED    |
       *-----------------*                         *-----------------*
                                                                   |
                                                  C: Open Channel  |
                                                                   |
       *-------------*                        *--------------*     |
       |     WAIT    | <--------------------- | OPEN CHANNEL | <---
       |  FUNCTIONAL |  S: Open Channel OK    |   RECEIVED   |
       *-------------*                        *--------------*
         |
         | C: Method
         |
         |-> Queue declare states
         |
         |-> Publish states
         |
          -> Consume states

After the state WAIT FUNCTIONAL, it can close the connection, or take one of these different paths:

  1. queue declare
  2. publish
  3. consume

Each one of them can begin the connection closing in its states. This will also be hidden here in order to simplify the diagram.

The queue declaration states are the following:

*------------*                      *---------------*
|    WAIT    | -------------------> | QUEUE DECLARE |
| FUNCTIONAL | C: Queue Declare     |    RECEIVED   |
|            |                      |               |
|            | <------------------  |               |
|            | S: Queue Declare OK  |               |
*------------*                      *---------------*

The publish states are the following:

*------------*                      *---------------*    *----------------*
|    WAIT    | -------------------> | BASIC PUBLISH | -> |  WAIT PUBLISH  |
| FUNCTIONAL | C: Basic Publish     |    RECEIVED   |    | CONTENT HEADER |
*------------*                      *---------------*    *----------------*
                                          ^                              |
                         C: Basic Publish |            C: Content Header |
                                          |                              |
                                          |         *----------------*   |
                                          |-------- |  WAIT PUBLISH  | <-
                                          |         |     CONTENT    |
                                           -------> |                |
                                           C: Body  |                |
                                                    *----------------*

The consume states are the following:

*------------*                   *---------------*
|    WAIT    | ----------------> | BASIC CONSUME |
| FUNCTIONAL | C: Basic Consume  |    RECEIVED   |
*------------*                   *---------------*
                                         |
                                         | S: Basic Consume OK   *------------*
                                          ---------------------> | WAIT VALUE |
                      -----------------------------------------> |  DEQUEUE   |
                     |   C: Consume Ack                          *------------*
                     |                                                  |
                     |                                                  |
                     |                                       Q: Dequeue |
                     |                                                  |
             *--------------*                      *---------------*    |
             | WAIT CONSUME | <------------------- | VALUE DEQUEUE | <--
             |     ACK      |   S: Basic Deliver   |   RECEIVED    |
             |              |   S: Content Header  |               |
             |              |   S: Body            |               |
             *--------------*                      *---------------*

The connection close can be started on several states when they receive close channel or close connection messages:

                  *---------------*                      *-----------*
----------------> | CLOSE CHANNEL | -------------------> | WAIT OPEN | 
 C: Close Channel |   RECEIVED    | S: Close Channel OK  |  CHANNEL  | 
                  *---------------*                      *-----------*

                     *------------------*                         *----------*
-------------------> | CLOSE CONNECTION | ----------------------> | FINISHED | 
 C: Close Connection |     RECEIVED     | S: Close Connection OK  |          | 
                     *------------------*                         *--------- *

Experiments

Performance analysis was part of the exercise, and I think it’s cool to show it here only as a matter of curiosity. I don’t want to be too much scientific here.

Setup

This setup was composed by old single-board computers (two Beaglebones and one Raspberry Pi) named Emerson, Lake and Palmer (I consider yourself my friend if you know who are them), my old laptop (name Wall-e), and an old Linksys WRT54G router.

My new laptop (named Eve, yeah, just because the older one is Wall-e) was not part of the experiment because it is too powerful and I wanted to see it running on weak machines, however, I used it to remote access them.

Here’s their specs and roles:

Machine CPU Number of Cores RAM OS Connection Type Role
Wall-e Intel core i5 2 8GB Manjaro Linux Wired Server
Eve Apple M1 8 8GB Mac OS Monterey WiFi Remote access
Emerson ARM Cortex A8 1 230MB Debian Buster Wired Publisher
Lake ARM Cortex A8 1 484MB Debian Buster Wired Publisher
Palmer BCM2835 1 432MB Debian Bullseye Wired Consumer

And about the network, iperf show us that the network throughput between Emerson and Wall-e and Lake and Wall-e was about 94 Mb/s and between Palmer and Wall-e was about 53Mb/s.

Observations

CPU

Comparing the CPU usage of this AMQP server and RabbitMQ, we can see that this performed slightly better for 0, 2 and 100 clients connected (the server developed here denoted by “ep1” in all the graphics).

CPU usage

Network

Comparing the upload rate of a publisher machine between both servers, they are basically the same:

Upload rate

Comparing the download rate of the consumer machine, we can see that RabbitMQ provided more data per second:

Download rate

RabbitMQ is the winner here!

Conclusion

This was a really hard exercise, and it demanded a lot of time to understand the protocol, to develop and to experiment. But sincerely, I like this kind of thing that reinvents the wheel because it makes me learn more deeply about how things work. I have never used RabbitMQ directly before, and everything is now clear to understand since I have an idea of what’s under the hood.

If you are thinking that the performance was too good to a toy project developed by just one person, keep in mind that this is extremely simple compared to the real RabbitMQ. A lot of features of RabbitMQ (for example, channels, exchanges, encryption, authentication) are missing.

Also remember that RabbitMQ runs on the Erlang virtual machine while this is compiled to a native binary, so, it’s expected to use less resources when there’s low demand. However, it doesn’t mean that it is performing better, as you could see RabbitMQ could deliver more data per second compared to this one.

Updated: