Sunday, December 19, 2010

Queues in SQL

This is actually a post I should've written a long time ago, but never got to it. The problem this post is discussing is implementing a queue in SQL. There are a lot of problems and some of them aren't immediately obvious, so it's not surprising that this is a frequently asked question on the IRC channel. Let's use the following schema:

CREATE TABLE items
(
    id serial PRIMARY KEY,
    available boolean DEFAULT TRUE NOT NULL,

    -- application-specific data here
);

-- fast lookup of available items
CREATE INDEX items_lookup_index ON
    items (id) WHERE available;

And you have one or more "consumer" processes grabbing items from the queue, processing them and then deleting them. When a consumer starts processing an item, it marks it "not available". After the consumer is done, it removes the row from the table. You also have zero or more "producer" processes inserting items into the queue.



Inserting items is quite straightforward, but how would you pop an item off the queue? There is no UPDATE .. LIMIT in PostgreSQL (yet), so that won't work. Instead, you might be tempted to do something like this:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            min(id)
        FROM items
        WHERE available
    )
RETURNING *
;

But there is a small problem with correctness: two concurrently running consumers might both get the same row. The reason for this is that the subquery is evaluated using a snapshot taken when the query started. If the "availability" of the row changes after that (i.e. another concurrently running process grabs the item), we still UPDATE it and both clients think they got the item. Fortunately, there's a way around this; add the check to the WHERE clause of the UPDATE:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            min(id)
        FROM items
        WHERE available
    )

-- important
AND available

RETURNING *
;

This works, because the WHERE clause in an UPDATE is always evaluated against the latest version of the row (assuming we're dealing with a table and not a view). However, the subquery always returns exactly one row, and if that row does not match our criteria when we get to the UPDATE, we might end up updating no rows! So either the application needs to be prepared to redo the UPDATE or (what I think is a lot better) this code has to be in a server-side function which retries it automatically.


Now that correctness is not an issue and you have deployed the code to your production server, you might notice that your clients spend a lot of time just popping items off the queue. Because all consumers try to lock the same row, they end up waiting for the "fastest" transaction to finish just to notice that the row was already processed. So effectively, only one consumer transaction can be running at a time, and the remaining need to do a lot of unnecessary work. In the absolute worst case scenario your consumers process the items very quickly and almost immediately come back to the loop with other processes. However, if you only have a small number of consumers and/or processing an item takes a significant amount of time, you might not experience any problems with this approach. But if you do, there is a way to make the queue faster: advisory locks.

What we need is a way to say "if this item is not immediately available, try the next one". There is no way to do that using only SQL, but fortunately we can mix function calls into our SQL. Viz:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            min(id)
        FROM items
        WHERE
            available AND
            pg_try_advisory_lock(id)
    )
RETURNING *
;

pg_try_advisory_lock() tries to lock an application-defined resource identified by a bigint and returns TRUE if the object was successfully locked, or FALSE if it could not be locked because another session is already holding a lock on that resource. Postgres does not use advisory locks internally, and it is up to the application to define the meaning of the bigint parameter. In this case, an id works well. By doing this, we tell PG to skip the row if it is not available immediately, which is exactly what we wanted. Great! Well, not exactly. There are still three problems with this:

  1. The snapshot problem mentioned earlier is still there.
  2. The planner might prefer a sequential scan over an index scan for the subquery.
  3. Advisory locks are not released automatically (not even when the transaction ends), so we need to manually release them.

The first one is easy to fix: again, add the check to the WHERE clause of the UPDATE. To fix the second one, we need to do a bit more work. The usual solution looks as follows (just the subquery part):

SELECT
    id
FROM
(
    SELECT
        id
    FROM items
    WHERE available
    ORDER BY id
    LIMIT $n
) ss
WHERE
    pg_try_advisory_lock(id)
ORDER BY id
LIMIT 1
;

where $n is the maximum number of consumer processes you expect to be running at any time. What happens here is the subquery tells the planner exactly how many rows we end up processing in the worst case, and then on the outer level we only take the rows we need (and can get). This way the planner doesn't have to try to guess for which rows pg_try_advisory_lock() returns TRUE and it can usually choose the best plan. Here's our final query:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            id
        FROM
        (
            -- try to convince the planner to choose an
            -- index scan over a seq scan
            SELECT
                id
            FROM items
            WHERE available
            ORDER BY id
            LIMIT $n
        ) ss
        WHERE
            pg_try_advisory_lock(id)
        ORDER BY id
        LIMIT 1
    )

-- important
AND available

RETURNING *
;

Unfortunately, even if we use advisory locks, it is possible for this query to return 0 rows. The later we release the advisory lock, the smaller the probability of that happening becomes. In practice, if you release the advisory lock after COMMIT, the chance of that happening is very close to 0, but releasing advisory locks after COMMIT puts the burden to the client because you can't force a COMMIT in a server-side function (anyone feel like implementing stored procedures? :-) ). Even if you release the lock before committing, you should be fine as long as you have the retry logic in place. Test it out and see how it works!

Keep in mind that this is not a complete solution. What if a consumer dies while processing an item? What if the server crashes? The best way to deal with this is very application specific, so I decided not to cover it, at least not in this post.

You might also have multiple queues in a single database, so using the bigint version of advisory locks might result in problems. In that case, you can sometimes use the integer version of the function like this: pg_try_advisory_lock('tablename'::regclass::int, id). For information about regclass, see the docs.