Database.Advanced.How would you implement a priority queue in a relational database?

Implementing a priority queue in a relational database involves modeling and querying data in a way that allows you to enqueue items with priorities and dequeue the highest-priority item efficiently.


✅ Step-by-Step Guide

🧱 1. Schema Design

Here’s a basic schema for a priority queue:

CREATE TABLE priority_queue (
    id SERIAL PRIMARY KEY,
    payload TEXT,            -- or JSON, etc.
    priority INT,            -- lower number = higher priority
    created_at TIMESTAMP DEFAULT now(),
    processed BOOLEAN DEFAULT FALSE
);

➕ 2. Enqueue Items

To enqueue (insert) a new item with a priority:

INSERT INTO priority_queue (payload, priority) VALUES ('task A', 2);
INSERT INTO priority_queue (payload, priority) VALUES ('task B', 1);

🔽 3. Dequeue Items (Highest Priority First)

To select and mark the highest-priority unprocessed item:

-- Select without race conditions (for one worker):
WITH next_task AS (
  SELECT id FROM priority_queue
  WHERE processed = FALSE
  ORDER BY priority ASC, created_at ASC
  LIMIT 1
  FOR UPDATE SKIP LOCKED
)
UPDATE priority_queue
SET processed = TRUE
WHERE id IN (SELECT id FROM next_task)
RETURNING *;

🧠 FOR UPDATE SKIP LOCKED ensures safe concurrent access by skipping rows already locked by other transactions.

🔄 4. Alternative: Mark as “in progress”

Instead of marking it as processed immediately, you can use a status column (pending, in_progress, done) to allow retries or timeouts.


🔧 Optional Enhancements

🕒 Add available_at column

For delayed jobs or time-based release.

available_at TIMESTAMP DEFAULT now()

And update the dequeue logic:

WHERE processed = FALSE AND available_at <= now()

📦 Use Indexes

Create an index to speed up dequeueing:

CREATE INDEX idx_queue_priority ON priority_queue (processed, priority, created_at);

🛠️ Use Triggers or Procedures (optional)

For periodic processing, consider using:

  • PostgreSQL: LISTEN/NOTIFY + background workers
  • MySQL: polling in application logic
  • Scheduled job runners (e.g., cron, pg_cron)

🔁 Summary

OperationSQL Logic
EnqueueINSERT with priority
DequeueSELECT ... FOR UPDATE SKIP LOCKED ORDER BY priority
ProcessMark processed = TRUE or update status
ConcurrentUse SKIP LOCKED for safe multi-consumer processing
This entry was posted in Без рубрики. Bookmark the permalink.