Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism

By hientd, at: 2023年12月30日15:43

Estimated Reading Time: __READING_TIME__ minutes

Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism
Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism

1. Introduction

Building upon our previous discussion on managing task concurrency, we now delve into another critical aspect: rate limiting within Celery tasks. If you're interested in understanding task concurrency management, refer to our previous article before exploring rate limiting.

 

2. Rate Limiting with Lock Mechanism

In this discussion, we shift our focus to rate limiting strategies within Celery tasks. While earlier we addressed concurrency, today's exploration centers around implementing rate limits using a lock-based mechanism.

 

3. Rate Limit Task Implementation

The implementation of the RateLimitTask largely resembles that of the previously discussed Unique Task, with a key distinction lying in the custom logic within the __call__ method. Below is the code snippet showcasing the unique logic for rate limit enforcement:

class RateLimitTask(Task):
    LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
    RATELIMIT = {
        "tasks": 4,
        "time": 60,  # in seconds
    }

    def __call__(self, *args, **kwargs):
        rate_limit = self.RATELIMIT
        custom_rate_limit = getattr(self, "custom_rate_limit", None)
        if custom_rate_limit:
            rate_limit = custom_rate_limit

        base_task_id = self.generate_lock_id(*args, **kwargs)
        first_task_at_lock_id = f"{base_task_id}-first_task_at"
        count_tasks_lock_id = f"{base_task_id}-count_tasks"

        first_task_at = cache.get(first_task_at_lock_id)
        count_tasks = cache.get(count_tasks_lock_id, 0)

        now = datetime.now()

        if not first_task_at:
            first_task_at = now
            count_tasks = 0
            cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
            cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])

        time_from_first_task = (now - first_task_at).total_seconds()
        valid_to_run = (
            time_from_first_task < rate_limit["time"]
            and count_tasks < rate_limit["tasks"]
        )
        if valid_to_run:
            cache.incr(count_tasks_lock_id)
            return self.run(*args, **kwargs)

        logger.warning(
            f"Reach rate limit: {count_tasks}/{rate_limit['tasks']} "
            f"per {rate_limit['time']}s with task_id: {base_task_id}"
        )
        raise Ignore()

    # ... (other methods remain identical to the Unique Task implementation)

 

4. Practical Usage of Rate Limit Task

Consider a scenario where enforcing rate limits is critical. The RateLimitTask efficiently manages rate limits within Celery tasks, as demonstrated below:

from celery import Celery

# Initialize Celery application
app = Celery("example")
app.config_from_object("celeryconfig")

# Define a sample task using Glinteco's RateLimitTask
@app.task(base=RateLimitTask)
def process_data(data_id):
    """
    Example task utilizing Glinteco's
RateLimitTask
    """
    # Perform data processing here based on data_id
    pass


This code snippet exemplifies the integration of Glinteco's RateLimitTask within a Celery application. By defining a Celery task process_data and setting its base as RateLimitTask, the task execution automatically adheres to the predefined rate limits specified within the RateLimitTask class.

This example demonstrates how Glinteco's RateLimitTask can seamlessly control task execution frequencies within a Celery-based environment, ensuring compliance with defined rate limits while processing task data efficiently.

 

5. Full Implementation Reference

For developers interested in exploring the complete implementation of Glinteco's RateLimitTask class within the commit history, you can find it in the Glinteco Samples repository: Glinteco's Sample commit

This specific commit contains the comprehensive implementation of the RateLimitTask class, showcasing its functionalities and usage within Celery-based workflows.

Additionally, Glinteco's Samples repository hosts a variety of useful snippets and demos covering Python, Django, Flask, FastAPI, Scraper, and more. Explore the repository for an array of practical code samples and demonstrations.

 

6. Conclusion

In summary, the RateLimitTask serves as a robust solution within the Celery framework, effectively managing task execution frequencies and preventing concurrent executions of tasks with identical parameters within specified timeframes. By employing a lock-based mechanism and utilizing a counter stored in the cache system, this custom Celery Task ensures system stability by enforcing predefined rate limits. The RateLimitTask facilitates controlled task execution, disregarding subsequent triggers once the maximum limit is reached, and ensures the proper release of resources upon task completion. Ultimately, integrating the RateLimitTask into Celery-based applications offers a reliable means to maintain system performance and prevent overload by intelligently regulating task executions.

Tag list:
- python
- django
- concurrency
- rate limit
- celery
- message queue
- celery configuration

Subscribe

Subscribe to our newsletter and never miss out lastest news.