Glinteco事例研究:ロック機構を用いたCeleryタスクにおけるレート制限の実装

By hientd, at: 2023年12月30日15:43

Estimated Reading Time: __READING_TIME__ minutes

Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism
Glinteco's Case Study: Implementing Rate Limiting in Celery Tasks using Lock Mechanism

1. はじめに

 

タスクの同時実行管理に関する以前の議論に基づいて、今度はCeleryタスクにおけるもう一つの重要な側面であるレート制限について詳しく見ていきます。タスクの同時実行管理について理解したい場合は、レート制限について調べる前に、以前の記事を参照してください。

 

2. ロックメカニズムによるレート制限

 

この議論では、Celeryタスクにおけるレート制限戦略に焦点を移します。以前は同時実行性について説明しましたが、今日の説明では、ロックベースのメカニズムを使用したレート制限の実装を中心に説明します。

 

3. レート制限タスクの実装

 

RateLimitTaskの実装は、以前説明した一意のタスクと非常によく似ていますが、__call__メソッド内のカスタムロジックが異なります。以下は、レート制限の適用に関する一意のロジックを示すコードスニペットです。

class RateLimitTask(Task):
    LOCK_KEY_TEMPLATE = "celery:rate_limit_task-{0}"
    RATELIMIT = {
        "tasks": 4,
        "time": 60,  # 秒
    }

    def __call__(self, *args, **kwargs):
        rate_limit = self.RATELIMIT
        custom_rate_limit = getattr(self, "custom_rate_limit", None)
        if custom_rate_limit:
            rate_limit = custom_rate_limit

        base_task_id = self.generate_lock_id(*args, **kwargs)
        first_task_at_lock_id = f"{base_task_id}-first_task_at"
        count_tasks_lock_id = f"{base_task_id}-count_tasks"

        first_task_at = cache.get(first_task_at_lock_id)
        count_tasks = cache.get(count_tasks_lock_id, 0)

        now = datetime.now()

        if not first_task_at:
            first_task_at = now
            count_tasks = 0
            cache.set(first_task_at_lock_id, first_task_at, rate_limit["time"])
            cache.set(count_tasks_lock_id, count_tasks, rate_limit["time"])

        time_from_first_task = (now - first_task_at).total_seconds()
        valid_to_run = (
            time_from_first_task < rate_limit["time"]
            and count_tasks < rate_limit["tasks"]
        )
        if valid_to_run:
            cache.incr(count_tasks_lock_id)
            return self.run(*args, **kwargs)

        logger.warning(
            f"レート制限に達しました: {count_tasks}/{rate_limit['tasks']} "
            f"{rate_limit['time']}秒あたり task_id: {base_task_id}"
        )
        raise Ignore()

    # ...(他のメソッドは一意のタスクの実装と同一のままです)

 

4. レート制限タスクの実用的な使用方法

 

レート制限が不可欠なシナリオを考えてみましょう。RateLimitTaskは、以下に示すように、Celeryタスク内でレート制限を効率的に管理します。

 

from celery import Celery

# Celeryアプリケーションの初期化
app = Celery("example")
app.config_from_object("celeryconfig")

# GlintecoのRateLimitTask を使用するサンプルタスクの定義
@app.task(base=
RateLimitTask)
def process_data(data_id):
    """
    Glintecoの
RateLimitTask を使用するサンプルタスク
    """
    # data_idに基づいてデータ処理を実行します
    pass


このコードスニペットは、Celeryアプリケーション内でのGlintecoのRateLimitTaskの統合を示しています。Celeryタスクprocess_dataを定義し、そのベースをRateLimitTaskとして設定することで、タスクの実行はRateLimitTaskクラス内で指定された事前に定義されたレート制限を自動的に遵守します。

この例は、GlintecoのRateLimitTaskがどのようにCeleryベースの環境内でタスク実行頻度をシームレスに制御し、タスクデータを効率的に処理しながら定義されたレート制限を遵守できるかを示しています。

 

5. 完全な実装リファレンス

 

コミット履歴内でGlintecoのRateLimitTaskクラスの完全な実装を調べたい開発者のために、GlintecoのSamplesリポジトリで見つけることができます:Glintecoのサンプルコミット

この特定のコミットには、RateLimitTaskクラスの包括的な実装が含まれており、Celeryベースのワークフロー内での機能と使用方法を示しています。

さらに、GlintecoのSamplesリポジトリには、Python、Django、Flask、FastAPI、Scraperなどを網羅したさまざまな便利なスニペットとデモが掲載されています。さまざまな実用的なコードサンプルとデモを探索してください。

 

6. まとめ

 

要約すると、RateLimitTaskはCeleryフレームワーク内で堅牢なソリューションとして機能し、タスクの実行頻度を効果的に管理し、指定された時間枠内で同一のパラメータを持つタスクの同時実行を防止します。ロックベースのメカニズムを採用し、キャッシュシステムに保存されたカウンターを利用することで、このカスタムCeleryタスクは、事前に定義されたレート制限を適用することでシステムの安定性を確保します。RateLimitTaskは制御されたタスク実行を容易にし、最大限度に達すると後続のトリガーを無視し、タスクの完了時にリソースが適切に解放されるようにします。

最終的に、CeleryベースのアプリケーションにRateLimitTaskを統合することで、タスク実行をインテリジェントに調整することにより、システムのパフォーマンスを維持し、過負荷を防ぐための信頼性の高い手段が提供されます。

Tag list:
- python
- django
- concurrency
- rate limit
- celery
- message queue
- celery configuration

Related

Experience Django

Read more
Django Python

Read more
Python Experience

ChatGPT:教育の未来

Read more

Subscribe

Subscribe to our newsletter and never miss out lastest news.