Glinteco事例研究:ロック機構を用いたCeleryタスクにおけるレート制限の実装
By hientd, at: 2023年12月30日15:43
Estimated Reading Time: __READING_TIME__ minutes


1. はじめに
タスクの同時実行管理に関する以前の議論に基づいて、今度はCeleryタスクにおけるもう一つの重要な側面であるレート制限について詳しく見ていきます。タスクの同時実行管理について理解したい場合は、レート制限について調べる前に、以前の記事を参照してください。
2. ロックメカニズムによるレート制限
この議論では、Celeryタスクにおけるレート制限戦略に焦点を移します。以前は同時実行性について説明しましたが、今日の説明では、ロックベースのメカニズムを使用したレート制限の実装を中心に説明します。
3. レート制限タスクの実装
RateLimitTask
の実装は、以前説明した一意のタスクと非常によく似ていますが、__call__
メソッド内のカスタムロジックが異なります。以下は、レート制限の適用に関する一意のロジックを示すコードスニペットです。
class RateLimitTask(Task):
LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
RATELIMIT = {
"tasks": 4,
"time": 60, # 秒
}
def __call__(self, *args, **kwargs):
rate_limit = self.RATELIMIT
custom_rate_limit = getattr(self, "custom_rate_limit", None)
if custom_rate_limit:
rate_limit = custom_rate_limit
base_task_id = self.generate_lock_id(*args, **kwargs)
first_task_at_lock_id = f"{base_task_id}-first_task_at"
count_tasks_lock_id = f"{base_task_id}-count_tasks"
first_task_at = cache.get(first_task_at_lock_id)
count_tasks = cache.get(count_tasks_lock_id, 0)
now = datetime.now()
if not first_task_at:
first_task_at = now
count_tasks = 0
cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])
time_from_first_task = (now - first_task_at).total_seconds()
valid_to_run = (
time_from_first_task < rate_limit["time"]
and count_tasks < rate_limit["tasks"]
)
if valid_to_run:
cache.incr(count_tasks_lock_id)
return self.run(*args, **kwargs)
logger.warning(
f"レート制限に達しました: {count_tasks}/{rate_limit['tasks']} "
f"{rate_limit['time']}秒あたり task_id: {base_task_id}"
)
raise Ignore()
# ...(他のメソッドは一意のタスクの実装と同一のままです)
4. レート制限タスクの実用的な使用方法
レート制限が不可欠なシナリオを考えてみましょう。RateLimitTask
は、以下に示すように、Celeryタスク内でレート制限を効率的に管理します。
from celery import Celery
# Celeryアプリケーションの初期化
app = Celery("example")
app.config_from_object("celeryconfig")
# Glintecoの
RateLimitTask
を使用するサンプルタスクの定義
@app.task(base=RateLimitTask
)
def process_data(data_id):
"""
GlintecoのRateLimitTask
を使用するサンプルタスク
"""
# data_idに基づいてデータ処理を実行します
pass
このコードスニペットは、Celeryアプリケーション内でのGlintecoのRateLimitTask
の統合を示しています。Celeryタスクprocess_data
を定義し、そのベースをRateLimitTask
として設定することで、タスクの実行はRateLimitTask
クラス内で指定された事前に定義されたレート制限を自動的に遵守します。
この例は、GlintecoのRateLimitTask
がどのようにCeleryベースの環境内でタスク実行頻度をシームレスに制御し、タスクデータを効率的に処理しながら定義されたレート制限を遵守できるかを示しています。
5. 完全な実装リファレンス
コミット履歴内でGlintecoのRateLimitTask
クラスの完全な実装を調べたい開発者のために、GlintecoのSamplesリポジトリで見つけることができます:Glintecoのサンプルコミット
この特定のコミットには、RateLimitTask
クラスの包括的な実装が含まれており、Celeryベースのワークフロー内での機能と使用方法を示しています。
さらに、GlintecoのSamplesリポジトリには、Python、Django、Flask、FastAPI、Scraperなどを網羅したさまざまな便利なスニペットとデモが掲載されています。さまざまな実用的なコードサンプルとデモを探索してください。
6. まとめ
要約すると、RateLimitTask
はCeleryフレームワーク内で堅牢なソリューションとして機能し、タスクの実行頻度を効果的に管理し、指定された時間枠内で同一のパラメータを持つタスクの同時実行を防止します。ロックベースのメカニズムを採用し、キャッシュシステムに保存されたカウンターを利用することで、このカスタムCeleryタスクは、事前に定義されたレート制限を適用することでシステムの安定性を確保します。RateLimitTask
は制御されたタスク実行を容易にし、最大限度に達すると後続のトリガーを無視し、タスクの完了時にリソースが適切に解放されるようにします。
最終的に、CeleryベースのアプリケーションにRateLimitTask
を統合することで、タスク実行をインテリジェントに調整することにより、システムのパフォーマンスを維持し、過負荷を防ぐための信頼性の高い手段が提供されます。