地方在住IT系ニート

bkds

Airflowのメモ

はじめに

Airflowはオープンソースでバッチ処理の開発・スケジュール・監視ができるアプリケーションです。
Pythonで構築されており、バッチ手順をコード化することでより柔軟に管理・作成することができます。

インストール

# Airflowを管理するディレクトリを設定
export AIRFLOW_HOME=~/airflow

# インストールするバージョンの指定
AIRFLOW_VERSION=2.6.2

# Pythonのバージョン取得
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# インストールに必要なファイルをファイルパスを取得
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# インストール
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

初期設定

Airflowに利用するデータベースとユーザを作成する。

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

Airflowの初期DBがSQLiteになっているので、MySQLに変更する。

[core]
executor = LocalExecutor

[database]
sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost/airflow_db
# データベースの初期化
airflow db init 

# ユーザの登録
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email [email protected]

起動

バックグランドでスケジューラーとサーバを起動します。

airflow scheduler -D
airflow webserver -D

SQLiteからMySQLにデータ移行

AirflowのデフォルトDBがSQLiteのため、途中からMySQLに変更する方法をメモします。
特にデータを引き継ぐ必要がなければ、初期設定から実施すれば大丈夫です。

SQLiteのデータをそのままインサートすることはできないため、SQLite3 to MySQLを利用します。
-fでSQLiteのデータベースファイルを指定し、-dでMySQLのデータベースを指定します。
他のオプションは、MySQLへの接続に利用します。

pip install sqlite3-to-mysql
sqlite3mysql -f ~/airflow/airflow.db -d airflow_db -uroot -h localhost -p

Dagの確認

下記コマンドで現在登録されているDagを確認できます。

airflow dags list

エラーが有る場合は、下記コマンドで原因を確認できます。

airflow dags list-import-errors

Taskの確認

特定のDagに入っているTaskを確認するコマンドです。

# リスト形式
airflow tasks list {Dag名}

# ツリー形式
airflow tasks list {Dag名} --tree

ローカル実行

Taskをローカル実行するコマンドです。

airflow tasks test {Dag名} {Task名}

DAGファイルのメモ

Hashicorp Vault

from airflow.providers.hashicorp.hooks.vault import VaultHook

vaultHook = VaultHook()
secretDict = vaultHook.get_secret("secret_path")

Log Mask

from airflow.utils.log.secrets_masker import mask_secret
mask_secret(mask_variable)
にほんブログ村 IT技術ブログ IT技術メモへPVアクセスランキング にほんブログ村