Nghiên cứu trường hợp của Glinteco: Triển khai giới hạn tốc độ trong tác vụ Celery bằng cơ chế Khóa
By hientd, at: 15:43 Ngày 30 tháng 12 năm 2023
Thời gian đọc ước tính: __READING_TIME__ minutes


1. Giới thiệu
Tiếp nối bài thảo luận trước đó về quản lý đồng thời nhiệm vụ, chúng ta giờ đây sẽ đi sâu vào một khía cạnh quan trọng khác: giới hạn tốc độ trong các tác vụ Celery. Nếu bạn quan tâm đến việc hiểu về quản lý đồng thời nhiệm vụ, hãy tham khảo bài viết trước của chúng tôi trước khi tìm hiểu về giới hạn tốc độ.
2. Giới hạn tốc độ với cơ chế khóa
Trong bài thảo luận này, chúng ta sẽ chuyển trọng tâm sang các chiến lược giới hạn tốc độ trong các tác vụ Celery. Trong khi trước đây chúng ta đã đề cập đến đồng thời, thì việc tìm hiểu ngày hôm nay tập trung vào việc triển khai giới hạn tốc độ bằng cơ chế dựa trên khóa.
3. Triển khai nhiệm vụ giới hạn tốc độ
Việc triển khai RateLimitTask
phần lớn giống với Nhiệm vụ Duy nhất được thảo luận trước đó, với sự khác biệt chính nằm ở logic tùy chỉnh bên trong phương thức __call__
. Dưới đây là đoạn mã cho thấy logic duy nhất để thực thi giới hạn tốc độ:
class RateLimitTask(Task):
LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
RATELIMIT = {
"tasks": 4,
"time": 60, # tính bằng giây
}
def __call__(self, *args, **kwargs):
rate_limit = self.RATELIMIT
custom_rate_limit = getattr(self, "custom_rate_limit", None)
if custom_rate_limit:
rate_limit = custom_rate_limit
base_task_id = self.generate_lock_id(*args, **kwargs)
first_task_at_lock_id = f"{base_task_id}-first_task_at"
count_tasks_lock_id = f"{base_task_id}-count_tasks"
first_task_at = cache.get(first_task_at_lock_id)
count_tasks = cache.get(count_tasks_lock_id, 0)
now = datetime.now()
if not first_task_at:
first_task_at = now
count_tasks = 0
cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])
time_from_first_task = (now - first_task_at).total_seconds()
valid_to_run = (
time_from_first_task < rate_limit["time"]
and count_tasks < rate_limit["tasks"]
)
if valid_to_run:
cache.incr(count_tasks_lock_id)
return self.run(*args, **kwargs)
logger.warning(
f"Đạt giới hạn tốc độ: {count_tasks}/{rate_limit['tasks']} "
f"trong {rate_limit['time']}s với task_id: {base_task_id}"
)
raise Ignore()
# ... (các phương thức khác vẫn giống như triển khai Nhiệm vụ Duy nhất)
4. Sử dụng thực tế của Nhiệm vụ Giới hạn Tốc độ
Hãy xem xét một kịch bản mà việc thực thi giới hạn tốc độ là rất quan trọng. RateLimitTask
quản lý hiệu quả giới hạn tốc độ trong các tác vụ Celery, như được chứng minh dưới đây:
from celery import Celery
# Khởi tạo ứng dụng Celery
app = Celery("example")
app.config_from_object("celeryconfig")
# Định nghĩa một tác vụ mẫu sử dụng
RateLimitTask
của Glinteco
@app.task(base=RateLimitTask
)
def process_data(data_id):
"""
Ví dụ về tác vụ sử dụng RateLimitTask
của Glinteco
"""
# Thực hiện xử lý dữ liệu ở đây dựa trên data_id
pass
Đoạn mã này minh họa việc tích hợp RateLimitTask
của Glinteco vào một ứng dụng Celery. Bằng cách định nghĩa một tác vụ Celery process_data
và đặt cơ sở của nó là RateLimitTask
, việc thực thi tác vụ tự động tuân thủ các giới hạn tốc độ được xác định trước trong lớp RateLimitTask
.
Ví dụ này chứng minh cách RateLimitTask
của Glinteco có thể liền mạch kiểm soát tần suất thực thi tác vụ trong môi trường dựa trên Celery, đảm bảo tuân thủ các giới hạn tốc độ được xác định trong khi xử lý dữ liệu tác vụ hiệu quả.
5. Tài liệu tham khảo triển khai đầy đủ
Đối với các nhà phát triển quan tâm đến việc khám phá việc triển khai đầy đủ lớp RateLimitTask
của Glinteco trong lịch sử commit, bạn có thể tìm thấy nó trong kho lưu trữ Samples của Glinteco: Commit mẫu của Glinteco
Commit cụ thể này chứa việc triển khai toàn diện lớp RateLimitTask
, thể hiện các chức năng và cách sử dụng của nó trong các luồng công việc dựa trên Celery.
Ngoài ra, kho lưu trữ Samples của Glinteco lưu trữ nhiều đoạn mã và bản demo hữu ích bao gồm Python, Django, Flask, FastAPI, Scraper, và hơn thế nữa. Khám phá kho lưu trữ để tìm hiểu một loạt các mẫu mã thực tế và các bản trình diễn.
6. Kết luận
Tóm lại, RateLimitTask
đóng vai trò là một giải pháp mạnh mẽ trong framework Celery, quản lý hiệu quả tần suất thực thi tác vụ và ngăn chặn việc thực thi đồng thời các tác vụ có tham số giống hệt nhau trong khoảng thời gian được chỉ định. Bằng cách sử dụng cơ chế dựa trên khóa và sử dụng bộ đếm được lưu trữ trong hệ thống cache, Tác vụ Celery tùy chỉnh này đảm bảo sự ổn định của hệ thống bằng cách thực thi các giới hạn tốc độ được xác định trước. RateLimitTask
tạo điều kiện cho việc thực thi tác vụ được kiểm soát, bỏ qua các kích hoạt tiếp theo khi đạt đến giới hạn tối đa và đảm bảo việc giải phóng tài nguyên đúng cách khi tác vụ hoàn tất.
Cuối cùng, việc tích hợp RateLimitTask
vào các ứng dụng dựa trên Celery cung cấp một phương tiện đáng tin cậy để duy trì hiệu suất hệ thống và ngăn ngừa quá tải bằng cách điều chỉnh thông minh việc thực thi tác vụ.