Glinteco's Case Study: Mitigating Duplicate Task Execution with a Custom Celery Solution
By hientd, at: Jan. 2, 2024, 2:19 p.m.
Estimated Reading Time: __READING_TIME__ minutes
1. Introduction
Within the landscape of Python asynchronous task queues, Celery reigns as a cornerstone tool for managing distributed workloads. At Glinteco, our team encountered a recurring challenge: the need to ensure unique execution of tasks under specific circumstances.
Consider scenarios where critical tasks must not overlap or where concurrent manipulations of resources with identical parameters are problematic. Our efforts centered on devising a solution to prevent multiple concurrent executions of tasks sharing the same parameters within our Celery workflows.
2. Exploring Potential Solutions
When addressing the challenge of preventing concurrent task executions with identical parameters, several approaches exist, each with its merits and limitations:
a. Database Locks or Flags
Pros:
- Reliable Locking Mechanism: Database locks or flags provide a reliable way to control access to resources, ensuring exclusive execution.
- Persistence: Locks stored in the database offer persistence, retaining their state across system restarts.
Cons:
- Performance Overhead: Implementing and managing database locks may introduce additional performance overhead due to database interactions.
- Complexity: Handling concurrent access via database locks often involves intricate logic and potential contention issues.
b. ETA-based Scheduling with Celery
Pros:
- Scheduled Execution: ETA-based scheduling allows tasks to be queued for execution at specific times, preventing immediate overlap.
- Celery Integration: Leverages Celery's capabilities without the need for external locking mechanisms.
Cons:
- Limited Immediate Execution: Not suitable for scenarios requiring immediate task execution while preventing concurrency with identical parameters.
- Complex Task Management: Handling various ETA schedules might complicate task management in the long term.
c. Glinteco's Custom Celery Task
Pros:
- Immediate Execution: Offers immediate task execution while intelligently preventing concurrent runs with identical parameters.
- Simplified Implementation: Provides a straightforward and specific solution without intricate database interactions.
- Efficient Task Handling: Leverages Celery's features and cache system for efficient task management.
Cons:
- Specific Use Case: Tailored for preventing concurrency within Celery tasks, might not cover broader concurrency challenges.
- Cache Dependency: Relies on the cache system for lock management, which might require careful configuration and monitoring.
3. Glinteco's Custom Celery Task: Step-by-Step Implementation
To prevent concurrent executions of tasks with identical parameters, Glinteco developed a custom Celery Task. This solution leverages a combination of key generation, cache system utilization, and lock management. Let's delve into the implementation:
Step 1: Lock Creation and Storage
The first step involves creating and managing locks within a defined duration using the memcache_lock
context manager. This context manager checks for existing locks in the cache and adds a new lock if none exists.
@contextmanager
def memcache_lock(lock_id, oid, lock_expire):
timeout_at = time.monotonic() + lock_expire
status = cache.add(lock_id, oid, lock_expire)
try:
yield status
finally:
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
Step 2: Preventing Duplicate Triggers
The implemented Celery Task intelligently ignores subsequent triggers of the same task with identical parameters within the defined lock duration, preventing overlapping executions.
class UniqueCeleryTask(Task):
# ...
def __call__(self, *args, **kwargs):
"""The body of the task executed by workers."""
lock_expire = self.LOCK_EXPIRE
custom_lock_expire = getattr(self, "custom_lock_expire", None)
if custom_lock_expire:
lock_expire = custom_lock_expire
lock_id = self.generate_lock_id(*args, **kwargs)
with memcache_lock(lock_id, self.app.oid, lock_expire) as acquired:
if acquired:
return self.run(*args, **kwargs)
else:
raise Ignore()
The __call__
method within the UniqueCeleryTask
class manages the task execution while ensuring the prevention of concurrent runs by checking and acquiring the appropriate lock.
Step 3: Lock Release upon Task Completion
Upon completion of the task, successful or otherwise, the associated lock is released, allowing subsequent executions to proceed unhindered.
class UniqueCeleryTask(Task):
# ...
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""
Release the lock associated with the task upon completion.
"""
lock_id = self.generate_lock_id(*args, **kwargs)
self.release_lock(lock_id)
return super().after_return(
status, retval, task_id, args, kwargs, einfo
)
def release_lock(self, lock_id):
"""
Release the lock associated with the given lock ID.
"""
cache.delete(lock_id)The after_return
method of the UniqueCeleryTask
class ensures the proper release of the lock associated with the completed task, allowing subsequent tasks to execute as needed.
This meticulous implementation by Glinteco using Celery's capabilities ensures the precise management of task concurrency, enabling efficient and exclusive task execution.
4. Example Usage
To illustrate the practical application of Glinteco's custom Celery Task designed to prevent concurrent executions, let's consider a scenario where multiple user actions trigger tasks that require exclusive execution based on specific parameters.
from celery import Celery
# Initialize Celery application
app = Celery("example")
app.config_from_object("celeryconfig")
# Define a sample task using Glinteco's UniqueCeleryTask
@app.task(base=UniqueCeleryTask)
def process_data(data_id):
"""
Example task utilizing Glinteco's UniqueCeleryTask
"""
# Perform data processing here based on data_id
pass
5. Full Version
For developers interested in exploring the complete implementation of Glinteco's UniqueCeleryTask
class within the commit history, you can find it in the Glinteco Samples repository: Glinteco's Sample commit
This specific commit contains the comprehensive implementation of the UniqueCeleryTask
class, showcasing its functionalities and usage within Celery-based workflows.
Additionally, Glinteco's Samples repository hosts a variety of useful snippets and demos covering Python, Django, Flask, FastAPI, Scraper, and more. Explore the repository for an array of practical code samples and demonstrations.
6. Conclusion
Glinteco's journey to tackle the challenge of preventing concurrent Celery task execution has led to the development of a tailored solution. By harnessing Celery's capabilities and implementing a task-specific approach to managing locks, our solution ensures precise task execution in asynchronous environments.
This case study showcases Glinteco's commitment to refining and innovating within the realm of Python development. Our custom Celery Task not only streamlines the handling of critical tasks but also bolsters code stability, empowering developers to navigate complex asynchronous scenarios with confidence.