Nghiên cứu trường hợp của Glinteco: Triển khai giới hạn tốc độ trong tác vụ Celery bằng cơ chế Khóa

By hientd, at: 15:43 Ngày 30 tháng 12 năm 2023

Thời gian đọc ước tính: __READING_TIME__ minutes

Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism
Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism

1. Giới thiệu

 

Tiếp nối bài thảo luận trước đó về quản lý đồng thời nhiệm vụ, chúng ta giờ đây sẽ đi sâu vào một khía cạnh quan trọng khác: giới hạn tốc độ trong các tác vụ Celery. Nếu bạn quan tâm đến việc hiểu về quản lý đồng thời nhiệm vụ, hãy tham khảo bài viết trước của chúng tôi trước khi tìm hiểu về giới hạn tốc độ.

 

2. Giới hạn tốc độ với cơ chế khóa

 

Trong bài thảo luận này, chúng ta sẽ chuyển trọng tâm sang các chiến lược giới hạn tốc độ trong các tác vụ Celery. Trong khi trước đây chúng ta đã đề cập đến đồng thời, thì việc tìm hiểu ngày hôm nay tập trung vào việc triển khai giới hạn tốc độ bằng cơ chế dựa trên khóa.

 

3. Triển khai nhiệm vụ giới hạn tốc độ

 

Việc triển khai RateLimitTask phần lớn giống với Nhiệm vụ Duy nhất được thảo luận trước đó, với sự khác biệt chính nằm ở logic tùy chỉnh bên trong phương thức __call__. Dưới đây là đoạn mã cho thấy logic duy nhất để thực thi giới hạn tốc độ:

class RateLimitTask(Task):
    LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
    RATELIMIT = {
        "tasks": 4,
        "time": 60,  # tính bằng giây
    }

    def __call__(self, *args, **kwargs):
        rate_limit = self.RATELIMIT
        custom_rate_limit = getattr(self, "custom_rate_limit", None)
        if custom_rate_limit:
            rate_limit = custom_rate_limit

        base_task_id = self.generate_lock_id(*args, **kwargs)
        first_task_at_lock_id = f"{base_task_id}-first_task_at"
        count_tasks_lock_id = f"{base_task_id}-count_tasks"

        first_task_at = cache.get(first_task_at_lock_id)
        count_tasks = cache.get(count_tasks_lock_id, 0)

        now = datetime.now()

        if not first_task_at:
            first_task_at = now
            count_tasks = 0
            cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
            cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])

        time_from_first_task = (now - first_task_at).total_seconds()
        valid_to_run = (
            time_from_first_task < rate_limit["time"]
            and count_tasks < rate_limit["tasks"]
        )
        if valid_to_run:
            cache.incr(count_tasks_lock_id)
            return self.run(*args, **kwargs)

        logger.warning(
            f"Đạt giới hạn tốc độ: {count_tasks}/{rate_limit['tasks']} "
            f"trong {rate_limit['time']}s với task_id: {base_task_id}"
        )
        raise Ignore()

    # ... (các phương thức khác vẫn giống như triển khai Nhiệm vụ Duy nhất)

 

4. Sử dụng thực tế của Nhiệm vụ Giới hạn Tốc độ

 

Hãy xem xét một kịch bản mà việc thực thi giới hạn tốc độ là rất quan trọng. RateLimitTask quản lý hiệu quả giới hạn tốc độ trong các tác vụ Celery, như được chứng minh dưới đây:

 

from celery import Celery

# Khởi tạo ứng dụng Celery
app = Celery("example")
app.config_from_object("celeryconfig")

# Định nghĩa một tác vụ mẫu sử dụng RateLimitTask của Glinteco
@app.task(base=
RateLimitTask)
def process_data(data_id):
    """
    Ví dụ về tác vụ sử dụng
RateLimitTask của Glinteco
    """
    # Thực hiện xử lý dữ liệu ở đây dựa trên data_id
    pass


Đoạn mã này minh họa việc tích hợp RateLimitTask của Glinteco vào một ứng dụng Celery. Bằng cách định nghĩa một tác vụ Celery process_data và đặt cơ sở của nó là RateLimitTask, việc thực thi tác vụ tự động tuân thủ các giới hạn tốc độ được xác định trước trong lớp RateLimitTask.

Ví dụ này chứng minh cách RateLimitTask của Glinteco có thể liền mạch kiểm soát tần suất thực thi tác vụ trong môi trường dựa trên Celery, đảm bảo tuân thủ các giới hạn tốc độ được xác định trong khi xử lý dữ liệu tác vụ hiệu quả.

 

5. Tài liệu tham khảo triển khai đầy đủ

 

Đối với các nhà phát triển quan tâm đến việc khám phá việc triển khai đầy đủ lớp RateLimitTask của Glinteco trong lịch sử commit, bạn có thể tìm thấy nó trong kho lưu trữ Samples của Glinteco: Commit mẫu của Glinteco

Commit cụ thể này chứa việc triển khai toàn diện lớp RateLimitTask, thể hiện các chức năng và cách sử dụng của nó trong các luồng công việc dựa trên Celery.

Ngoài ra, kho lưu trữ Samples của Glinteco lưu trữ nhiều đoạn mã và bản demo hữu ích bao gồm Python, Django, Flask, FastAPI, Scraper, và hơn thế nữa. Khám phá kho lưu trữ để tìm hiểu một loạt các mẫu mã thực tế và các bản trình diễn.

 

6. Kết luận

 

Tóm lại, RateLimitTask đóng vai trò là một giải pháp mạnh mẽ trong framework Celery, quản lý hiệu quả tần suất thực thi tác vụ và ngăn chặn việc thực thi đồng thời các tác vụ có tham số giống hệt nhau trong khoảng thời gian được chỉ định. Bằng cách sử dụng cơ chế dựa trên khóa và sử dụng bộ đếm được lưu trữ trong hệ thống cache, Tác vụ Celery tùy chỉnh này đảm bảo sự ổn định của hệ thống bằng cách thực thi các giới hạn tốc độ được xác định trước. RateLimitTask tạo điều kiện cho việc thực thi tác vụ được kiểm soát, bỏ qua các kích hoạt tiếp theo khi đạt đến giới hạn tối đa và đảm bảo việc giải phóng tài nguyên đúng cách khi tác vụ hoàn tất.

Cuối cùng, việc tích hợp RateLimitTask vào các ứng dụng dựa trên Celery cung cấp một phương tiện đáng tin cậy để duy trì hiệu suất hệ thống và ngăn ngừa quá tải bằng cách điều chỉnh thông minh việc thực thi tác vụ.

Tag list:
- python
- django
- concurrency
- rate limit
- celery
- message queue
- celery configuration

Liên quan

Experience Django

Đọc thêm
Django Python

Đọc thêm
Python Experience

ChatGPT: Tương lai của Giáo dục

Đọc thêm

Theo dõi

Theo dõi bản tin của chúng tôi và không bao giờ bỏ lỡ những tin tức mới nhất.