Problem
Running background tasks reliably in a distributed environment without duplicate execution or missed jobs.
Solution
A task scheduler inspired by Celery and Sidekiq, with distributed locking to ensure only one worker picks up each task.
import redis
from contextlib import contextmanager
@contextmanager
def distributed_lock(lock_name, timeout=10):
lock = redis_client.lock(lock_name, timeout=timeout)
acquired = lock.acquire(blocking=False)
try:
yield acquired
finally:
if acquired:
lock.release()
def execute_task(task_id):
with distributed_lock(f"task:{task_id}") as acquired:
if not acquired:
return # Another worker is handling this
# Execute task
process_task(task_id)
Features
- Cron Expressions: Standard cron syntax for scheduling
- Retries: Exponential backoff with max attempts
- Priorities: High/medium/low priority queues
- Dead Letter Queue: Failed tasks for manual review
Reliability
- Exactly-once execution guarantee
- Persistent queue in PostgreSQL
- Worker health checks and automatic restart