Job queue as SQL table with multiple consumers (PostgreSQL)

I use postgres for a FIFO queue as well. I originally used ACCESS EXCLUSIVE, which yields correct results in high concurrency, but has the unfortunate effect of being mutually exclusive with pg_dump, which acquires a ACCESS SHARE lock during its execution. This causes my next() function to lock for a very long time (the duration of the pg_dump). This was not acceptable since we are a 24×7 shop and customers didn’t like the dead time on the queue in the middle of the night.

I figured there must be a less-restrictive lock which would still be concurrent-safe and not lock while pg_dump is running. My search led me to this SO post.

Then I did some research.

The following modes are sufficient for a FIFO queue NEXT() function which will update the status of a job from queued to running without any concurrency fail, and also not block against pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Query:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status="running"
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status="queued"
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Result looks like:

UPDATE 1
 job_id
--------
     98
(1 row)

Here is a shell script which tests all of the different lock mode at high concurrency (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status="running{}" where job_id in (select job_id from tx_test_queue where status="queued" order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Code is here as well if you want to edit: https://gist.github.com/1083936

I am updating my application to use the EXCLUSIVE mode since it’s the most restrictive mode that a) is correct and b) doesn’t conflict with pg_dump. I chose the most restrictive since it seems the least risky in terms of changing the app from ACCESS EXCLUSIVE without being an uber-expert in postgres locking.

I feel pretty comfortable with my test rig and with the general ideas behind the answer. I hope that sharing this helps solve this problem for others.

Leave a Comment

Hata!: SQLSTATE[HY000] [1045] Access denied for user 'divattrend_liink'@'localhost' (using password: YES)