地方在住IT系ニート

bkds

Prefectのインストールと使い方メモ【Airflowとの比較も解説】

はじめに

この記事では、Pythonベースのワークフローオーケストレーションツール「Prefect」を利用して、バッチ処理(定期実行タスク)を効率的に管理する方法をまとめました。

従来はcronAirflowを利用するケースが多くありますが、軽量かつPythonネイティブなPrefectが最近注目を集めています。

Prefectとは

Prefectは、Pythonで記述できるワークフロー管理ツールです。 データ処理・ETL・バッチ実行・クラウドAPI連携など、複数のタスクを柔軟に制御することができます。

主な特徴は以下のとおりです。

インストール

PrefectはPythonパッケージとして提供されています。以下のコマンドで簡単に導入可能です。

pip install prefect

バージョンを確認して、正しくインストールできていることを確認します。

prefect version

AirflowとPrefectの比較

項目AirflowPrefect
定義方法DAGオブジェクトで定義(独自DSL)通常のPython関数にデコレータを追加するだけ
セットアップScheduler・Webserver・DBなど複数コンポーネント必要最小1コンテナ(SQLite内蔵)で動作可能
動的ワークフロー静的なDAGのみ(条件分岐は複雑)Pythonの制御フローをそのまま使える
ローカルテスト環境構築が複雑通常のPythonスクリプトとして実行可能
コミュニティ大規模・成熟成長中
向いている用途大規模ETL・複雑なバッチ処理ML・データサイエンス・軽量バッチ

インストール

PrefectはPythonパッケージとして提供されています。

pip install prefect

バージョンを確認します。

prefect version

基本構成

Prefectの基本構成は以下の要素で成り立ちます。

要素説明
Flow一連のタスク処理を表すPython関数
Task処理単位を表す関数(オプション)
DeploymentFlowをスケジュール・環境設定と結びつける定義
WorkerFlowの実行を担当するプロセス

シンプルなFlowの例

以下は、Hello Worldを出力するシンプルなFlowの例です。

from prefect import flow, task

@task
def say_hello():
    print("Hello, Prefect!")

@flow
def hello_flow():
    say_hello()

if __name__ == "__main__":
    hello_flow()

このスクリプトを実行すると、タスクがFlowとしてPrefectのトラッキング下で実行されます。

Prefect ServerのUIを起動する

Prefect ServerのWeb UIをローカルで起動することができます。

prefect server start

起動後、http://localhost:4200にアクセスするとUIが表示されます。 FlowのRun履歴・ログ・スケジュールの管理をブラウザから確認できます。

デプロイメント作成

Prefectでは、CLIからFlowをデプロイします。

prefect deploy flows/job.py:job -n hourly-job -i 3600

上記コマンドは、prefect.yaml を自動生成し、Flowを1時間ごとに実行するデプロイメントを作成します。

prefect.yaml は以下のような形式です。

deployments:
  - name: hourly-job
    entrypoint: flows/job.py:job
    schedule:
      interval: 1h

ワーカーの起動

ジョブを自動実行するには**ワーカー(Worker)**を起動します。

prefect worker start -p default-agent-pool

このコマンドを実行すると、登録されたデプロイメントがスケジュールに従って順次実行されます。

トラブルシューティング

WorkerがFlowを実行しない

Prefect ServerとWorkerが正しく接続されているか確認します。

# Prefect ServerのURLを確認
prefect config view

# デフォルトのURLに設定
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

スケジュールが動作しない

prefect server startでサーバーが起動していることを確認してください。 サーバーが起動していない場合、スケジュールはトリガーされません。

メモ

まとめ

Prefectは@flow@taskデコレータを追加するだけで、既存のPythonコードをそのままワークフロー化できます。 Airflowより導入が簡単でローカルテストもしやすいため、個人開発や小〜中規模のバッチ処理に特に向いています。

<-- Back to home
にほんブログ村 IT技術ブログ IT技術メモへ PVアクセスランキング にほんブログ村