Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism
By hientd, at: Dec. 30, 2023, 3:43 p.m.
Estimated Reading Time: __READING_TIME__ minutes
1. Introduction
Building upon our previous discussion on managing task concurrency, we now delve into another critical aspect: rate limiting within Celery tasks. If you're interested in understanding task concurrency management, refer to our previous article before exploring rate limiting.
2. Rate Limiting with Lock Mechanism
In this discussion, we shift our focus to rate limiting strategies within Celery tasks. While earlier we addressed concurrency, today's exploration centers around implementing rate limits using a lock-based mechanism.
3. Rate Limit Task Implementation
The implementation of the RateLimitTask
largely resembles that of the previously discussed Unique Task, with a key distinction lying in the custom logic within the __call__
method. Below is the code snippet showcasing the unique logic for rate limit enforcement:
class RateLimitTask(Task):
LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
RATELIMIT = {
"tasks": 4,
"time": 60, # in seconds
}
def __call__(self, *args, **kwargs):
rate_limit = self.RATELIMIT
custom_rate_limit = getattr(self, "custom_rate_limit", None)
if custom_rate_limit:
rate_limit = custom_rate_limit
base_task_id = self.generate_lock_id(*args, **kwargs)
first_task_at_lock_id = f"{base_task_id}-first_task_at"
count_tasks_lock_id = f"{base_task_id}-count_tasks"
first_task_at = cache.get(first_task_at_lock_id)
count_tasks = cache.get(count_tasks_lock_id, 0)
now = datetime.now()
if not first_task_at:
first_task_at = now
count_tasks = 0
cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])
time_from_first_task = (now - first_task_at).total_seconds()
valid_to_run = (
time_from_first_task < rate_limit["time"]
and count_tasks < rate_limit["tasks"]
)
if valid_to_run:
cache.incr(count_tasks_lock_id)
return self.run(*args, **kwargs)
logger.warning(
f"Reach rate limit: {count_tasks}/{rate_limit['tasks']} "
f"per {rate_limit['time']}s with task_id: {base_task_id}"
)
raise Ignore()
# ... (other methods remain identical to the Unique Task implementation)
4. Practical Usage of Rate Limit Task
Consider a scenario where enforcing rate limits is critical. The RateLimitTask
efficiently manages rate limits within Celery tasks, as demonstrated below:
from celery import Celery
# Initialize Celery application
app = Celery("example")
app.config_from_object("celeryconfig")
# Define a sample task using Glinteco's
RateLimitTask
@app.task(base=
RateLimitTask
)
def process_data(data_id):
"""
Example task utilizing Glinteco's RateLimitTask
"""
# Perform data processing here based on data_id
pass
This code snippet exemplifies the integration of Glinteco's RateLimitTask
within a Celery application. By defining a Celery task process_data
and setting its base as RateLimitTask
, the task execution automatically adheres to the predefined rate limits specified within the RateLimitTask
class.
This example demonstrates how Glinteco's RateLimitTask
can seamlessly control task execution frequencies within a Celery-based environment, ensuring compliance with defined rate limits while processing task data efficiently.
5. Full Implementation Reference
For developers interested in exploring the complete implementation of Glinteco's RateLimitTask
class within the commit history, you can find it in the Glinteco Samples repository: Glinteco's Sample commit
This specific commit contains the comprehensive implementation of the RateLimitTask
class, showcasing its functionalities and usage within Celery-based workflows.
Additionally, Glinteco's Samples repository hosts a variety of useful snippets and demos covering Python, Django, Flask, FastAPI, Scraper, and more. Explore the repository for an array of practical code samples and demonstrations.
6. Conclusion
In summary, the RateLimitTask
serves as a robust solution within the Celery framework, effectively managing task execution frequencies and preventing concurrent executions of tasks with identical parameters within specified timeframes. By employing a lock-based mechanism and utilizing a counter stored in the cache system, this custom Celery Task ensures system stability by enforcing predefined rate limits. The RateLimitTask
facilitates controlled task execution, disregarding subsequent triggers once the maximum limit is reached, and ensures the proper release of resources upon task completion. Ultimately, integrating the RateLimitTask
into Celery-based applications offers a reliable means to maintain system performance and prevent overload by intelligently regulating task executions.