Sunday, December 19, 2010

Queues in SQL

This is actually a post I should've written a long time ago, but never got to it. The problem this post is discussing is implementing a queue in SQL. There are a lot of problems and some of them aren't immediately obvious, so it's not surprising that this is a frequently asked question on the IRC channel. Let's use the following schema:

CREATE TABLE items
(
    id serial PRIMARY KEY,
    available boolean DEFAULT TRUE NOT NULL,

    -- application-specific data here
);

-- fast lookup of available items
CREATE INDEX items_lookup_index ON
    items (id) WHERE available;

And you have one or more "consumer" processes grabbing items from the queue, processing them and then deleting them. When a consumer starts processing an item, it marks it "not available". After the consumer is done, it removes the row from the table. You also have zero or more "producer" processes inserting items into the queue.



Inserting items is quite straightforward, but how would you pop an item off the queue? There is no UPDATE .. LIMIT in PostgreSQL (yet), so that won't work. Instead, you might be tempted to do something like this:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            min(id)
        FROM items
        WHERE available
    )
RETURNING *
;

But there is a small problem with correctness: two concurrently running consumers might both get the same row. The reason for this is that the subquery is evaluated using a snapshot taken when the query started. If the "availability" of the row changes after that (i.e. another concurrently running process grabs the item), we still UPDATE it and both clients think they got the item. Fortunately, there's a way around this; add the check to the WHERE clause of the UPDATE:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            min(id)
        FROM items
        WHERE available
    )

-- important
AND available

RETURNING *
;

This works, because the WHERE clause in an UPDATE is always evaluated against the latest version of the row (assuming we're dealing with a table and not a view). However, the subquery always returns exactly one row, and if that row does not match our criteria when we get to the UPDATE, we might end up updating no rows! So either the application needs to be prepared to redo the UPDATE or (what I think is a lot better) this code has to be in a server-side function which retries it automatically.


Now that correctness is not an issue and you have deployed the code to your production server, you might notice that your clients spend a lot of time just popping items off the queue. Because all consumers try to lock the same row, they end up waiting for the "fastest" transaction to finish just to notice that the row was already processed. So effectively, only one consumer transaction can be running at a time, and the remaining need to do a lot of unnecessary work. In the absolute worst case scenario your consumers process the items very quickly and almost immediately come back to the loop with other processes. However, if you only have a small number of consumers and/or processing an item takes a significant amount of time, you might not experience any problems with this approach. But if you do, there is a way to make the queue faster: advisory locks.

What we need is a way to say "if this item is not immediately available, try the next one". There is no way to do that using only SQL, but fortunately we can mix function calls into our SQL. Viz:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            min(id)
        FROM items
        WHERE
            available AND
            pg_try_advisory_lock(id)
    )
RETURNING *
;

pg_try_advisory_lock() tries to lock an application-defined resource identified by a bigint and returns TRUE if the object was successfully locked, or FALSE if it could not be locked because another session is already holding a lock on that resource. Postgres does not use advisory locks internally, and it is up to the application to define the meaning of the bigint parameter. In this case, an id works well. By doing this, we tell PG to skip the row if it is not available immediately, which is exactly what we wanted. Great! Well, not exactly. There are still three problems with this:

  1. The snapshot problem mentioned earlier is still there.
  2. The planner might prefer a sequential scan over an index scan for the subquery.
  3. Advisory locks are not released automatically (not even when the transaction ends), so we need to manually release them.

The first one is easy to fix: again, add the check to the WHERE clause of the UPDATE. To fix the second one, we need to do a bit more work. The usual solution looks as follows (just the subquery part):

SELECT
    id
FROM
(
    SELECT
        id
    FROM items
    WHERE available
    ORDER BY id
    LIMIT $n
) ss
WHERE
    pg_try_advisory_lock(id)
ORDER BY id
LIMIT 1
;

where $n is the maximum number of consumer processes you expect to be running at any time. What happens here is the subquery tells the planner exactly how many rows we end up processing in the worst case, and then on the outer level we only take the rows we need (and can get). This way the planner doesn't have to try to guess for which rows pg_try_advisory_lock() returns TRUE and it can usually choose the best plan. Here's our final query:

UPDATE
    items
SET available = FALSE
WHERE id =
    (
        SELECT
            id
        FROM
        (
            -- try to convince the planner to choose an
            -- index scan over a seq scan
            SELECT
                id
            FROM items
            WHERE available
            ORDER BY id
            LIMIT $n
        ) ss
        WHERE
            pg_try_advisory_lock(id)
        ORDER BY id
        LIMIT 1
    )

-- important
AND available

RETURNING *
;

Unfortunately, even if we use advisory locks, it is possible for this query to return 0 rows. The later we release the advisory lock, the smaller the probability of that happening becomes. In practice, if you release the advisory lock after COMMIT, the chance of that happening is very close to 0, but releasing advisory locks after COMMIT puts the burden to the client because you can't force a COMMIT in a server-side function (anyone feel like implementing stored procedures? :-) ). Even if you release the lock before committing, you should be fine as long as you have the retry logic in place. Test it out and see how it works!

Keep in mind that this is not a complete solution. What if a consumer dies while processing an item? What if the server crashes? The best way to deal with this is very application specific, so I decided not to cover it, at least not in this post.

You might also have multiple queues in a single database, so using the bigint version of advisory locks might result in problems. In that case, you can sometimes use the integer version of the function like this: pg_try_advisory_lock('tablename'::regclass::int, id). For information about regclass, see the docs.

15 comments:

  1. What about SELECT ... FOR UPDATE; Isn't that exactly what you want?

    ReplyDelete
  2. @Anonymous:

    SELECT .. FOR UPDATE has the same problems as UPDATE in versions before 9.0 so it's not really better in any way (unless I'm missing something, of course).

    The change introduced in 9.0 might help if you're constantly hitting the "updated 0 rows" problem but it still has the same locking problem so you need advisory locking to make the queue work better in concurrent scenarios.

    ReplyDelete
  3. Marko, I think you wrong: you can use FOR UPDATE NOWAIT. I do so and I get good results.

    ReplyDelete
  4. SELECT .. FOR UPDATE NOWAIT doesn't help either. All transactions still try to lock the *same row*, and all but one session get: ERROR: could not obtain lock on row in relation "foo". You could then retry with OFFSET 1, if that fails, retry with OFFSET 2 etc, but that's a lot slower and uglier than the advisory lock method - not to mention it requires a subtransaction for every query to avoid aborting the main transaction if the row is already locked.

    In short, I think you should go with the advisory lock method if you experience problems with the simpler method (i.e. you constantly end up waiting and then updating 0 rows).

    ReplyDelete
  5. SQL has its limitations. That is why we have stored procedures.

    create table items (
    id serial not null primary key,
    time_stamp timestamp not null default current_timestamp
    );

    create or replace function lock_next_unlocked_id() returns integer as
    $$
    declare
    p_id integer;
    rtnval integer := null;
    begin
    for p_id in select id
    from items
    order by time_stamp
    loop
    declare
    next_open_id integer;
    begin
    for next_open_id in select id
    from items
    where id = p_id
    for update nowait
    loop
    rtnval := next_open_id;
    end loop;
    exception
    when lock_not_available then
    rtnval = null;
    end;
    exit when rtnval is not null;
    end loop;
    return rtnval;
    end;
    $$
    language plpgsql;

    ReplyDelete
  6. @Anonymous:

    That's just an implementation of exactly what I described in my previous comment, but doing lookups on the primary key instead of using OFFSET. Please re-read what it said to get an idea of why you should use the advisory lock method instead.

    ReplyDelete
  7. Hey..I also tried SELECT .. FOR UPDATE and SELECT .. FOR UPDATE NOWAIT as well but my problem has not been sorted out..

    Let me know if there is any other alternate solution for this query.....

    ReplyDelete
  8. I don't understand why 'FOR UPDATE' won't work. It's actually quite simple. As you've described, you have producers that put stuff in the queue. Consumers take stuff from the queue using this function:


    REATE OR REPLACE FUNCTION consume_begin()
    RETURNS integer AS
    $BODY$
    DECLARE
    resourceId integer;

    BEGIN
    SELECT INTO resourceID id FROM items WHERE available = true ORDER BY id LIMIT 1 FOR UPDATE;

    IF FOUND THEN
    UPDATE items SET available = false WHERE id = resourceId;
    RETURN resourceId;
    ELSE
    RETURN -1;
    END IF;
    END;


    So, when a consumer wants new resource, it connects to the database and starts a transaction asking for a resource. If it gets one, fine, it disconnects/commits and starts consuming. When it's done it calls consume_done with resource id as parametar - a function that would delete a row from items table - empties the queue. If it returns -1 it means there is nothing in the queue. Consumer can nap for a while, then check again, or whatever you want it to do.

    Or am I missing something here?

    ReplyDelete
  9. Neat article, but I do not agree with the final result :-)
    First off, there is a (harmless) race condition with the inner query that does the LIMIT $n. I think the author mentions the query can return 0 rows, I am not sure he refers to this race condition. What can happen, in theory, is some consumer can quickly process several items that are in that inner query of another consumer. So the query doesn't return any items. Second, I wouldn't do the 'order by'. If you order, then postgres has no choice but to sort, before applying the limit $n. You can imagine the implication of this on large queues and high processing rates. If items have processing priorities, you can consider a priority column, an outter dequeue function, that iterates through all possible priorities and calls an inner dequeue function with the priority number. The inner dequeue can use that priority with an equality search argument in the where clause. The outter dequeue can return when it gets sufficient number of items from calls to the inner one to satisfy the caller's batch size. Seems crude, but it'd be fast with the proper indices and avoids sorts. Anyway, to my proposed solution:

    select * from queue;

    qid | status
    -----+------------
    1 |
    2 |
    3 | processing
    4 | register
    5 | register
    6 | processing
    7 | processing

    -- Open a session and do:
    -----------------------------------------------------------
    begin;

    select *
    from queue q
    where
    q.status = 'processing'
    and pg_try_advisory_xact_lock(q.qid, q.tableoid::int) = true
    limit 2 -- specify batch size here
    for update;
    -----------------------------------------------------------

    -- Do the same in another session,
    -- before commit/rollback in the first session

    You don't have to worry about releasing the advisory lock, it will be released automatically when the transaction is commited/rolledback.

    The only danger is that SQL has no left to right short-circuit evaluation guarantee, so you are at the hands of the optimizer to choose to scan by status (or risk locking a lot of rows until you find two that match q.status = 'processing'.) In reality, it most certainly will do that, because it has no clue about the cost of scanning by the pg_try_advisory_xact_lock function.

    Kiriakos

    ReplyDelete
  10. I have to take back the above. I'm beginning to like your approach more (except I wouldn't do any order by's.) The solution I provided is theoretically dangerous (confirmed by Tom Lane on the pg mailing list lol.) I still don't think the optimizer would choose to evaluate pg_try_advisory_xact_lock() first and scan based on that, but you never know. I don't like to depend on the mood of the optimizer :-)

    Kiriakos

    ReplyDelete
  11. @Kiriakos:

    The purpose of the ORDER BY+LIMIT is precisely to force PG to use an index scan. Sorting (and thus sequential scans) should never happen.

    Also, the transaction-scoped versions of advisory locks were added in 9.1, which was released after I wrote this post. I think all of the points I mentioned in the post should still apply, other than the fact that you don't have to manually release the locks (or wait until the backend dies).

    ReplyDelete
  12. Also, I'd like to add that I had a discussion with Mike and he is indeed correct that using FOR UPDATE the way he demonstrated is safe, but only on PostgreSQL 9.0 or later (handling of FOR UPDATE changed slightly in that version).

    However, I would still argue that it is slower than the advisory lock version if the queue gets a lot of traffic, but I haven't tested this on 9.0.

    ReplyDelete
  13. I have met some really neat people through the comments, I certainly want to encourage more me to comment. Thanks for the tips!

    ReplyDelete