888                                         888b     d888  .d88888b.  
888                                         8888b   d8888 d88P" "Y88b 
888                                         88888b.d88888 888     888 
888      .d88b.  88888b.  88888b.  888  888 888Y88888P888 888     888 
888     d88""88b 888 "88b 888 "88b 888  888 888 Y888P 888 888     888 
888     888  888 888  888 888  888 888  888 888  Y8P  888 888 Y8b 888 
888     Y88..88P 888  888 888  888 Y88b 888 888   "   888 Y88b.Y8b88P 
88888888 "Y88P"  888  888 888  888  "Y88888 888       888  "Y888888"  
                                        888                      Y8b  
                                   Y8b d88P                           
                                    "Y88P"

Building LonnyMQ

PostgreSQL is well known for its ability to function as a performant message queue. Indeed there exists a wealth of articles on the internet on how this can be achieved using SELECT FOR UPDATE SKIP LOCKED.

I would like to opine about my own experiences trying to write a message queue using PostgreSQL - going beyond a simple FIFO implementation and exploring more advanced features such as:

Whilst keeping our solution minimal, elegant and preserving performance as best I can - ideally relying solely on logarithmic index probes to perform message dequeues vs. any sort of linear scanning.

Why PostgreSQL?

Before we dive in, lets first quickly re-answer the question of "Why would you use PostgreSQL as a message queue driver over something like Redis or RabbitMQ?". Selecting PostgreSQL affords several benefits:

  1. Assuming you are already using PostgreSQL (not unlikely), it keeps your infrastructure simple and cheap.
  2. Queue actions can theoretically piggy-back on existing DB transactions, allowing them to happen in lock-step with your business logic - eliminating a huge number of different race conditions that you would experience using something like Redis - even with AOF/WAL mode enabled.
  3. Raw message throughput can easily reach north of 1_000 messages per second, which I would posit is plenty fast for a majority of situations.
  4. If queue state lives in the database, then it stands to reason that it can be inspected and monitored using SQL queries and familiar DB admin tooling.

Our implementation

We will implement our message queue by creating plpgsql functions that drive queue actions (such as message enqueue/dequeue). The benefits of doing it like this, vs. pulling the logic into your application are:

  1. Writing language-specific implementations of your message queue becomes trivial. The core business logic lives as plpgsql functions that are installed to your database and all you must do is write lightweight bindings to marshal data back and forth for each language you wish to support.
  2. Any action that can't be "one-shotted" in a single query will take much longer to run if intermediate data needs to be shuttled back and forth over the network
  3. The "requirements" for the underlying database driver are lessened as explicit support for transactions isn't required for multi-step actions. If everything runs within a single plpgsql function call, then everything inside that function runs inside a single transaction context anyway.

A simple FIFO queue (quick refresher)

Lets first begin by building a simple, performant FIFO queue that uses FOR UPDATE SKIP LOCKED.

We can begin by creating a simple table to store our messages:

CREATE TABLE message (
	id BIGSERIAL PRIMARY KEY,
	content BYTEA NOT NULL
);

En-queuing a message is as simple as performing an insert (right now using plpgsql functions will feel like overkill but we will persist for consistency):

CREATE FUNCTION message_create (
	p_content BYTEA
) RETURNS VOID AS $$
BEGIN
	INSERT INTO "message" (content)
	VALUES (p_content);
END;
$$ LANGUAGE plpgsql

A dequeue involves capturing a message using a SELECT with FOR UPDATE to prevent concurrent dequeues from returning the same message and a SKIP LOCKED to prevent concurrent dequeues from being "head-of-line" blocked by each other. The query ordering ensures FIFO delivery as IDs assigned at message creating are monotonic increasing. Ordering is performant because there will necessarily be a B-tree index on the id primary key which will find the lowest id message in logarithmic time.

CREATE FUNCTION message_dequeue ()
RETURNS BYTEA
AS $$
DECLARE
	v_message RECORD;
BEGIN
	-- Capture a message
	SELECT "id", "content" FROM "message"
	ORDER BY "id" ASC
	LIMIT 1
	FOR UPDATE SKIP LOCKED
	INTO v_message;

	-- If there is nothing to capture return a NULL
	IF v_message."id" IS NULL THEN
		RETURN NULL::BYTEA;
	END IF;

	-- If we've captured a message, delete it and return its content
	DELETE FROM "message"
	WHERE "id" = v_message."id";
	
	RETURN v_message."content";
END;
$$ LANGUAGE plpgsql

N.B. Using BIGSERIAL is superior to using a timestamp field for FIFO ordering. If we were to use a created_at timestamp set to CURRENT_TIMESTAMP(), all messages created within the same transaction have the same time and thus their order would be undefined. Using CLOCK_TIMESTAMP() somewhat alleviates the issue - but still leaves us vulnerable to the clock changing unpredictably (perhaps an NTP update moves the clock backwards) within the context of a transaction.

Scheduling

We now have a performant - albeit bare-bones PostgreSQL message queue, and I believe this is the point many blog posts will leave you. We will endeavour to go further - and our first improvement will be to add simple scheduling - that is to be able to specify a timestamp after which a message is allowed to be dequeued.

To do this, we make a relatively simple addition to our message table's schema - we add a dequeue_after field. We will make it take a BIGINT that holds the target time as a Unix timestamp with ms precision.

CREATE TABLE message (
	id BIGSERIAL PRIMARY KEY,
	content BYTEA NOT NULL,
	dequeue_after BIGINT NOT NULL
);

Our message creation function will require this to passed in as an input. If not provided, we will default to whatever the value of NOW()/CURRENT_TIMESTAMP(). We will define a helper function called to_epoch that helps us convert a TIMESTAMPTZ to a ms precision BIGINT.

CREATE FUNCTION to_epoch (
	p_timestamp TIMESTAMPTZ
) RETURNS BIGINT AS $$
BEGIN
	RETURN
		EXTRACT(EPOCH FROM p_timestamp)::BIGINT * 1000 +
		EXTRACT(MILLISECOND FROM p_timestamp)::BIGINT;
END;
$$ LANGUAGE plpgsql

Message creation then becomes:

CREATE FUNCTION message_create (
	p_content BYTEA,
	p_dequeue_after TIMESTAMPTZ
) RETURNS VOID AS $$
DECLARE
	v_dequeue_after BIGINT;
BEGIN
	v_dequeue_after = CASE 
		WHEN p_dequeue_after IS NULL 
		THEN to_epoch(NOW())	
		ELSE p_dequeue_after
	END;
		
	INSERT INTO "message" (content, dequeue_after)
	VALUES (p_content, v_dequeue_after);
END;
$$ LANGUAGE plpgsql

Dequeue is now changed to only return messages that have elapsed their scheduled waiting time. We will now also perform a lexicographical sort by dequeue_after and then id to ensure the oldest messages are processed first.

CREATE FUNCTION message_dequeue ()
RETURNS BYTEA
AS $$
DECLARE
	v_now BIGINT;
	v_message RECORD;
BEGIN
	v_now := to_epoch(NOW());
	
	-- Capture a message
	SELECT "id", "content" FROM "message"
	WHERE "dequeue_after" <= v_now
	ORDER BY "dequeue_after" ASC, "id" ASC
	LIMIT 1
	FOR UPDATE SKIP LOCKED
	INTO v_message;

	-- If there is nothing to capture return a NULL
	IF v_message."id" IS NULL THEN
		RETURN NULL::BYTEA;
	END IF;

	-- If we've captured a message, delete it and return its content
	DELETE FROM "message"
	WHERE "id" = v_message."id";
	
	RETURN v_message."content";
END;
$$ LANGUAGE plpgsql

Keen eyed observers may already be groaning. I previously espoused the pitfalls of using NOW() for ordering messages and now it appears that this is exactly what I'm now doing. However, note that within a transaction, the value of NOW() will be unchanged - thus all messages enqueued within a single transaction will have the same dequeue_after and thus their order is still entirely determined by their id - FIFO ordering within a single transaction is still preserved.

One thing is still missing - an explicit index. Right now our dequeue would be non-performant for large queues as we would have to sequentially scan through every row to find the one with the lowest value of dequeue_after to return. Thus we must now add:

CREATE INDEX message_dequeue_ix
ON "message"
COLUMNS ("dequeue_after" ASC, "id" ASC);

Prioritisation

Lets now consider how to add a "priority" field to messages that would ensure under backlogged conditions the right messages are processed first...

Although seemingly simple - there are significant performance pitfalls you can fall in unless your solution is well considered.

We can imagine a naive solution would involve adding a priority INTEGER field to the message table:

CREATE TABLE message (
	id BIGSERIAL PRIMARY KEY,
	content BYTEA NOT NULL,
	dequeue_after BIGINT NOT NULL,
	priority INTEGER NOT NULL
);

The dequeue method would then be changed to instead order by (priority, id):

CREATE FUNCTION message_dequeue ()
RETURNS BYTEA
AS $$
DECLARE
	v_now BIGINT;
	v_message RECORD;
BEGIN
	v_now := to_epoch(NOW());
	
	-- Capture a message
	SELECT "id", "content" FROM "message"
	WHERE "dequeue_after" <= v_now
	ORDER BY "priority" ASC, "id" ASC
	LIMIT 1
	FOR UPDATE SKIP LOCKED
	INTO v_message;

	-- If there is nothing to capture return a NULL
	IF v_message."id" IS NULL THEN
		RETURN NULL::BYTEA;
	END IF;

	-- If we've captured a message, delete it and return its content
	DELETE FROM "message"
	WHERE "id" = v_message."id";
	
	RETURN v_message."content";
END;
$$ LANGUAGE plpgsql

We could do this - and indeed this gives us the behaviour that we want - messages are dequeued after they are permitted to do so in accordance to dequeue_after and if there exists a backlog, the highest priority messages (which usually corresponds to the lowest possible priority value...) will be dequeued first.

The problem is that there is no way to do this in a performant way that is immune from pathological cases. Lets consider 3 different indexing strategies and note the pitfalls with each of them:

Separate indices

If we define two separate indices - one over dequeue_after to quickly filter out messages not ready for dequeue and another to help with priority based ordering:

CREATE INDEX message_dequeue_after_ix 
ON "message" 
COLUMNS ("dequeue_after" ASC);

CREATE INDEX message_priority_id_ix
ON "message"
COLUMNS ("priority" ASC, "id" ASC);

Then our dequeue method will rely on a bitmap scan. We will efficiently determine the set of messages available for dequeue using message_dequeue_after_ix. We then will use the ordering provided to us by message_priority_id_ix to scan - highest priority first through all messages until we find one in said set.

The pathological case is clear - if we have a large number of future-dated, high priority messages, we will need to scan a significant portion of our message table until we find a lower priority message that is actually available for dequeue. Thus in the worst case, a dequeue can take linear time.

A compound index

So two separate indices have a pathological worst case - can we fix this by instead using a compound index as we've been doing previously?

If we use the index:

CREATE INDEX message_dequeue_ix
ON "message"
COLUMNS ("dequeue_after" ASC, "priority" ASC, "id" ASC);

We might feel like we've solved the issue (and your LLM du jour will probably agree). However, the dequeue will first perform a filter of all rows that aren't ready for dequeue - this it can do thanks to our index. However, we need the resultant set of returned rows to be ordered by (priority, id) when in actuality, they will be ordered by (dequeue_after, priority, id) - necessitating a scan.

Thus a pathological case emerges again - this time when there is a large backlog of available messages - this feels quite scary as one could imagine that degrading dequeue performance could further influence and worsen message backlogs leading to a positive feedback loop of doom.

A better compound index?

Lets not give up on our compound index approach just yet - what if we do:

CREATE INDEX message_dequeue_ix
ON "message"
COLUMNS ("priority" ASC, "dequeue_after" ASC, "id" ASC);

This is actually a much better solution - and in situations where you can tightly bound the number of distinct priorities - probably sufficient. The dequeue method now must scan through every priority-grouped set of messages (highest to lowest) - until it finds one that has a message that is ready for dequeue. This query is thankfully just a simple index probe (albeit one that must be done for each distinct priority).

A pathological case is obvious - a large number of future dated messages with high-yet-distinct priorities - resulting in a scan through a large number of priority buckets before we get to a lower priority message that is ready for dequeue.

However, as touched upon above, the number of distinct priorities is something we can easily control thus the impact here can be bounded.

Explicit marking

An alternative strategy which I won't explore deeply - is to have a background process running that constantly marks messages as "ready" for dequeue via an explicit column (i.e. is_ready) on the message table.

If such a process existed - we could create a partial index of all messages that are not yet ready, ordered by "dequeue_after" to efficiently find and update messages (via an index probe) that are ready to be marked:

CREATE INDEX message_mark_ix
ON "message"
COLUMNS ("dequeue_after" ASC)
WHERE NOT is_ready;

Correspondingly, we would need another partial index for efficient message dequeue - this one of all messages that are ready for dequeue and ordered by priority.

CREATE INDEX message_dequeue_ix
ON "message"
COLUMNS ("priority" ASC, "id" ASC)
WHERE is_ready;

I believe this solution to be quite hairy. It goes against our initial design ethos of keeping our solution minimal and elegant. We have to manage a separate background process, we also now have to worry about said process becoming a bottleneck and starving upstream message processors - all extra complexity that isn't worth thinking about!

No priority!

The final solution I will propose (and the one we will use going forward) is to implement priority - by not implementing priority at all!

No I haven't had a brain injury - thanks for asking. What I am actually talking about is overloading or re-purposing the dequeue_after field to be used as both a way to schedule messages and to prioritise them (This is why I wanted to use BIGINT for dequeue_after instead of the more natural choice - TIMESTAMPTZ).

Remember that before we even considered adding the priority field, message dequeues worked by dequeueing available messages in ascending order of dequeue_at. Thus - messages with a normal or base priority can be created as normal (with a dequeue_at equivalent to NOW() or some time in the future) and messages that need high priority can be created with historical or even negative dequeue_after values.

Although there are no pathological cases with respect to performance, the caveat is that we cannot have messages that are both high priority and scheduled to run at some point in the future. All prioritised messages have the semantics of needing to run immediately. Generally I'm happy with this trade-off as the concept of a high priority message needing to run in the future seems slightly contradictory...

WORK IN PROGRESS...