Glinteco事例研究:ロック機構を用いたCeleryタスクにおけるレート制限の実装

By hientd, at: 2023年12月30日15:43

Estimated Reading Time: __READING_TIME__ minutes

Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism
Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism

1. はじめに

 

タスクの同時実行管理に関する以前の議論に基づいて、今度はCeleryタスクにおけるもう一つの重要な側面であるレート制限について詳しく見ていきます。タスクの同時実行管理について理解したい場合は、レート制限について調べる前に、以前の記事 を参照してください。

 

2. ロックメカニズムを使用したレート制限

 

この議論では、Celeryタスクにおけるレート制限戦略に焦点を移します。以前は同時実行性について説明しましたが、今日の調査はロックベースのメカニズムを使用したレート制限の実装を中心に展開します。

 

3. レート制限タスクの実装

 

RateLimitTaskの実装は、以前に説明したUnique Taskの実装と非常によく似ていますが、__call__メソッド内のカスタムロジックが重要な違いです。以下は、レート制限の適用のための独自のロジックを示すコードスニペットです。

 

class RateLimitTask(Task):
    LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
    RATELIMIT = {
        "tasks": 4,
        "time": 60,  # 秒
    }

    def __call__(self, *args, **kwargs):
        rate_limit = self.RATELIMIT
        custom_rate_limit = getattr(self, "custom_rate_limit", None)
        if custom_rate_limit:
            rate_limit = custom_rate_limit

        base_task_id = self.generate_lock_id(*args, **kwargs)
        first_task_at_lock_id = f"{base_task_id}-first_task_at"
        count_tasks_lock_id = f"{base_task_id}-count_tasks"

        first_task_at = cache.get(first_task_at_lock_id)
        count_tasks = cache.get(count_tasks_lock_id, 0)

        now = datetime.now()

        if not first_task_at:
            first_task_at = now
            count_tasks = 0
            cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
            cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])

        time_from_first_task = (now - first_task_at).total_seconds()
        valid_to_run = (
            time_from_first_task < rate_limit["time"]
            and count_tasks < rate_limit["tasks"]
        )
        if valid_to_run:
            cache.incr(count_tasks_lock_id)
            return self.run(*args, **kwargs)

        logger.warning(
            f"レート制限に達しました: {count_tasks}/{rate_limit['tasks']} "
            f"{rate_limit['time']}秒あたり、task_id: {base_task_id}"
        )
        raise Ignore()

    # ... (その他のメソッドはUnique Taskの実装と同一のままです)

 

 

4. レート制限タスクの実際的な使用方法

 

レート制限の適用が重要なシナリオを考えてみましょう。RateLimitTaskは、以下に示すように、Celeryタスク内でレート制限を効率的に管理します。

 

from celery import Celery

# Celeryアプリケーションの初期化
app = Celery("example")
app.config_from_object("celeryconfig")

# GlintecoのRateLimitTaskを使用したサンプルタスクの定義
@app.task(base=
RateLimitTask)
def process_data(data_id):
    """
    Glintecoの
RateLimitTaskを使用する例
    """
    # data_idに基づいてデータ処理を実行します
    pass


このコードスニペットは、Celeryアプリケーション内でのGlintecoのRateLimitTaskの統合を示しています。Celeryタスクprocess_dataを定義し、そのベースをRateLimitTaskとして設定することで、タスクの実行はRateLimitTaskクラス内で指定された事前定義されたレート制限に自動的に準拠します。

 

この例は、GlintecoのRateLimitTaskがCeleryベースの環境内でタスク実行頻度をシームレスに制御し、タスクデータを効率的に処理しながら定義されたレート制限を確実に遵守する方法を示しています。

 

5. 完全な実装リファレンス

 

コミット履歴内でGlintecoのRateLimitTaskクラスの完全な実装を調べたい開発者は、Glintecoのサンプルリポジトリで確認できます。 Glintecoのサンプルコミット

 

この特定のコミットには、RateLimitTaskクラスの包括的な実装が含まれており、Celeryベースのワークフロー内での機能と使用方法を示しています。

 

さらに、Glintecoのサンプルリポジトリには、Python、Django、Flask、FastAPI、スクレイパーなどを網羅したさまざまな便利なスニペットとデモがホストされています。さまざまな実践的なコードサンプルとデモを探求してください。

 

6. まとめ

 

要約すると、RateLimitTaskはCeleryフレームワーク内で堅牢なソリューションとして機能し、タスク実行頻度を効果的に管理し、指定された時間枠内で同一のパラメータを持つタスクの同時実行を防止します。ロックベースのメカニズムを採用し、キャッシュシステムに格納されたカウンターを使用することで、このカスタムCeleryタスクは、事前定義されたレート制限を適用することにより、システムの安定性を確保します。RateLimitTaskは制御されたタスク実行を容易にし、最大限度に達すると後続のトリガーを無視し、タスク完了時にリソースの適切な解放を保証します。

 

最終的に、CeleryベースのアプリケーションにRateLimitTaskを統合することで、タスク実行をインテリジェントに調整することにより、システムのパフォーマンスを維持し、過負荷を防ぐための信頼できる手段が提供されます。

Tag list:
- python
- django
- concurrency
- rate limit
- celery
- message queue
- celery configuration

Related

Experience Django

Read more
Django Python

Read more
Python Experience

ChatGPT:教育の未来

Read more

Subscribe

Subscribe to our newsletter and never miss out lastest news.