Glinteco事例研究:ロック機構を用いたCeleryタスクにおけるレート制限の実装
By hientd, at: 2023年12月30日15:43
Estimated Reading Time: __READING_TIME__ minutes


1. はじめに
タスクの同時実行管理に関する以前の議論に基づいて、今度はCeleryタスクにおけるもう一つの重要な側面であるレート制限について詳しく見ていきます。タスクの同時実行管理について理解したい場合は、レート制限について調べる前に、以前の記事 を参照してください。
2. ロックメカニズムを使用したレート制限
この議論では、Celeryタスクにおけるレート制限戦略に焦点を移します。以前は同時実行性について説明しましたが、今日の調査はロックベースのメカニズムを使用したレート制限の実装を中心に展開します。
3. レート制限タスクの実装
RateLimitTask
の実装は、以前に説明したUnique Taskの実装と非常によく似ていますが、__call__
メソッド内のカスタムロジックが重要な違いです。以下は、レート制限の適用のための独自のロジックを示すコードスニペットです。
class RateLimitTask(Task):
LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
RATELIMIT = {
"tasks": 4,
"time": 60, # 秒
}
def __call__(self, *args, **kwargs):
rate_limit = self.RATELIMIT
custom_rate_limit = getattr(self, "custom_rate_limit", None)
if custom_rate_limit:
rate_limit = custom_rate_limit
base_task_id = self.generate_lock_id(*args, **kwargs)
first_task_at_lock_id = f"{base_task_id}-first_task_at"
count_tasks_lock_id = f"{base_task_id}-count_tasks"
first_task_at = cache.get(first_task_at_lock_id)
count_tasks = cache.get(count_tasks_lock_id, 0)
now = datetime.now()
if not first_task_at:
first_task_at = now
count_tasks = 0
cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])
time_from_first_task = (now - first_task_at).total_seconds()
valid_to_run = (
time_from_first_task < rate_limit["time"]
and count_tasks < rate_limit["tasks"]
)
if valid_to_run:
cache.incr(count_tasks_lock_id)
return self.run(*args, **kwargs)
logger.warning(
f"レート制限に達しました: {count_tasks}/{rate_limit['tasks']} "
f"{rate_limit['time']}秒あたり、task_id: {base_task_id}"
)
raise Ignore()
# ... (その他のメソッドはUnique Taskの実装と同一のままです)
4. レート制限タスクの実際的な使用方法
レート制限の適用が重要なシナリオを考えてみましょう。RateLimitTask
は、以下に示すように、Celeryタスク内でレート制限を効率的に管理します。
from celery import Celery
# Celeryアプリケーションの初期化
app = Celery("example")
app.config_from_object("celeryconfig")
# Glintecoの
RateLimitTask
を使用したサンプルタスクの定義
@app.task(base=RateLimitTask
)
def process_data(data_id):
"""
GlintecoのRateLimitTask
を使用する例
"""
# data_idに基づいてデータ処理を実行します
pass
このコードスニペットは、Celeryアプリケーション内でのGlintecoのRateLimitTask
の統合を示しています。Celeryタスクprocess_data
を定義し、そのベースをRateLimitTask
として設定することで、タスクの実行はRateLimitTask
クラス内で指定された事前定義されたレート制限に自動的に準拠します。
この例は、GlintecoのRateLimitTask
がCeleryベースの環境内でタスク実行頻度をシームレスに制御し、タスクデータを効率的に処理しながら定義されたレート制限を確実に遵守する方法を示しています。
5. 完全な実装リファレンス
コミット履歴内でGlintecoのRateLimitTask
クラスの完全な実装を調べたい開発者は、Glintecoのサンプルリポジトリで確認できます。 Glintecoのサンプルコミット
この特定のコミットには、RateLimitTask
クラスの包括的な実装が含まれており、Celeryベースのワークフロー内での機能と使用方法を示しています。
さらに、Glintecoのサンプルリポジトリには、Python、Django、Flask、FastAPI、スクレイパーなどを網羅したさまざまな便利なスニペットとデモがホストされています。さまざまな実践的なコードサンプルとデモを探求してください。
6. まとめ
要約すると、RateLimitTask
はCeleryフレームワーク内で堅牢なソリューションとして機能し、タスク実行頻度を効果的に管理し、指定された時間枠内で同一のパラメータを持つタスクの同時実行を防止します。ロックベースのメカニズムを採用し、キャッシュシステムに格納されたカウンターを使用することで、このカスタムCeleryタスクは、事前定義されたレート制限を適用することにより、システムの安定性を確保します。RateLimitTask
は制御されたタスク実行を容易にし、最大限度に達すると後続のトリガーを無視し、タスク完了時にリソースの適切な解放を保証します。
最終的に、CeleryベースのアプリケーションにRateLimitTask
を統合することで、タスク実行をインテリジェントに調整することにより、システムのパフォーマンスを維持し、過負荷を防ぐための信頼できる手段が提供されます。