Prefectのメモ
はじめに
この記事では、**Pythonベースのワークフローオーケストレーションツール「Prefect」**を利用して、バッチ処理(定期実行タスク)を効率的に管理する方法をまとめました。
従来は cron
や Airflow
を利用するケースが多くありましたが、最近では 軽量かつクラウド連携しやすい Prefect が注目を集めています。
ここでは、基本概念からインストール、ジョブ登録、スケジュール設定までを一通り整理しています。
Prefectとは
Prefect は、Pythonで記述できるワークフロー管理ツールです。データ処理・ETL・バッチ実行・クラウドAPI連携など、複数のタスクを柔軟に制御することができます。
主な特徴は以下のとおりです。
- Pythonコードで定義可能
YAMLやDSLではなく、Pythonそのものでタスクや依存関係を記述できます。 - 柔軟なスケジューリング
IntervalSchedule
やCronSchedule
など、さまざまな形式でスケジュール設定が可能です。 - ローカル or クラウド運用の両対応
Prefect Cloud や Prefect Server(OSS版)を選択できます。 - UIによる可視化と監視
実行結果や依存関係をWeb UI上で確認できます。
インストール
PrefectはPythonパッケージとして提供されています。以下のコマンドで簡単に導入可能です。
pip install prefect
バージョンを確認して、正しくインストールできていることを確認します。
prefect version
基本構成
Prefectの基本構成は以下の要素で成り立ちます。
要素 | 説明 |
---|---|
Flow | 一連のタスク処理を表すPython関数 |
Task | 処理単位を表す関数(オプション) |
Deployment | Flowをスケジュール・環境設定と結びつける定義 |
Worker | Flowの実行を担当するプロセス |
シンプルなFlowの例
以下は、Hello Worldを出力するシンプルなFlowの例です。
from prefect import flow, task
@task
def say_hello():
print("Hello, Prefect!")
@flow
def hello_flow():
say_hello()
if __name__ == "__main__":
hello_flow()
このスクリプトを実行すると、タスクがFlowとしてPrefectのトラッキング下で実行されます。
デプロイメント作成
Prefectでは、CLIからFlowをデプロイします。
prefect deploy flows/job.py:job -n hourly-job -i 3600
上記コマンドは、prefect.yaml
を自動生成し、Flowを1時間ごとに実行するデプロイメントを作成します。
prefect.yaml
は以下のような形式です。
deployments:
- name: hourly-job
entrypoint: flows/job.py:job
schedule:
interval: 1h
ワーカーの起動
ジョブを自動実行するには、Prefect 3では**ワーカー(Worker)**を利用します。
prefect worker start -p default-agent-pool
このコマンドを実行すると、登録されたデプロイメントがスケジュールに従って順次実行されます。
メモ
- PrefectはAirflowより軽量で、ローカル実行やDocker運用に向いています。
- Prefectではエージェント (Agent) が ワーカー (Worker) に統合されました。
- スケジュール設定はYAML管理が基本となり、コード内スケジュール指定は非推奨です。
- UI(Prefect ServerまたはCloud)からも、ジョブの状態やログを確認できます。
- Gitリポジトリと組み合わせることで、GitOps的な運用も容易です。