地方在住IT系ニート

bkds

Prefectのメモ

はじめに

この記事では、**Pythonベースのワークフローオーケストレーションツール「Prefect」**を利用して、バッチ処理(定期実行タスク)を効率的に管理する方法をまとめました。 従来は cronAirflow を利用するケースが多くありましたが、最近では 軽量かつクラウド連携しやすい Prefect が注目を集めています。 ここでは、基本概念からインストール、ジョブ登録、スケジュール設定までを一通り整理しています。

Prefectとは

Prefect は、Pythonで記述できるワークフロー管理ツールです。データ処理・ETL・バッチ実行・クラウドAPI連携など、複数のタスクを柔軟に制御することができます。

主な特徴は以下のとおりです。

  • Pythonコードで定義可能
    YAMLやDSLではなく、Pythonそのものでタスクや依存関係を記述できます。
  • 柔軟なスケジューリング
    IntervalScheduleCronSchedule など、さまざまな形式でスケジュール設定が可能です。
  • ローカル or クラウド運用の両対応
    Prefect Cloud や Prefect Server(OSS版)を選択できます。
  • UIによる可視化と監視
    実行結果や依存関係をWeb UI上で確認できます。

インストール

PrefectはPythonパッケージとして提供されています。以下のコマンドで簡単に導入可能です。

pip install prefect

バージョンを確認して、正しくインストールできていることを確認します。

prefect version

基本構成

Prefectの基本構成は以下の要素で成り立ちます。

要素 説明
Flow 一連のタスク処理を表すPython関数
Task 処理単位を表す関数(オプション)
Deployment Flowをスケジュール・環境設定と結びつける定義
Worker Flowの実行を担当するプロセス

シンプルなFlowの例

以下は、Hello Worldを出力するシンプルなFlowの例です。

from prefect import flow, task

@task
def say_hello():
    print("Hello, Prefect!")

@flow
def hello_flow():
    say_hello()

if __name__ == "__main__":
    hello_flow()

このスクリプトを実行すると、タスクがFlowとしてPrefectのトラッキング下で実行されます。

デプロイメント作成

Prefectでは、CLIからFlowをデプロイします。

prefect deploy flows/job.py:job -n hourly-job -i 3600

上記コマンドは、prefect.yaml を自動生成し、Flowを1時間ごとに実行するデプロイメントを作成します。

prefect.yaml は以下のような形式です。

deployments:
  - name: hourly-job
    entrypoint: flows/job.py:job
    schedule:
      interval: 1h

ワーカーの起動

ジョブを自動実行するには、Prefect 3では**ワーカー(Worker)**を利用します。

prefect worker start -p default-agent-pool

このコマンドを実行すると、登録されたデプロイメントがスケジュールに従って順次実行されます。

メモ

  • PrefectはAirflowより軽量で、ローカル実行やDocker運用に向いています。
  • Prefectではエージェント (Agent)ワーカー (Worker) に統合されました。
  • スケジュール設定はYAML管理が基本となり、コード内スケジュール指定は非推奨です。
  • UI(Prefect ServerまたはCloud)からも、ジョブの状態やログを確認できます。
  • Gitリポジトリと組み合わせることで、GitOps的な運用も容易です。
<-- Back to home
にほんブログ村 IT技術ブログ IT技術メモへPVアクセスランキング にほんブログ村