Airflowのメモ
目次
はじめに
Airflowはオープンソースでバッチ処理の開発・スケジュール・監視ができるアプリケーションです。
Pythonで構築されており、バッチ手順をコード化することでより柔軟に管理・作成することができます。
インストール
# Airflowを管理するディレクトリを設定
export AIRFLOW_HOME=~/airflow
# インストールするバージョンの指定
AIRFLOW_VERSION=2.6.2
# Pythonのバージョン取得
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# インストールに必要なファイルをファイルパスを取得
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# インストール
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
初期設定
Airflowに利用するデータベースとユーザを作成する。
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
Airflowの初期DBがSQLiteになっているので、MySQLに変更する。
[core]
executor = LocalExecutor
[database]
sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost/airflow_db
# データベースの初期化
airflow db init
# ユーザの登録
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected]
起動
バックグランドでスケジューラーとサーバを起動します。
airflow scheduler -D
airflow webserver -D
SQLiteからMySQLにデータ移行
AirflowのデフォルトDBがSQLiteのため、途中からMySQLに変更する方法をメモします。
特にデータを引き継ぐ必要がなければ、初期設定から実施すれば大丈夫です。
SQLiteのデータをそのままインサートすることはできないため、SQLite3 to MySQLを利用します。
-f
でSQLiteのデータベースファイルを指定し、-d
でMySQLのデータベースを指定します。
他のオプションは、MySQLへの接続に利用します。
pip install sqlite3-to-mysql
sqlite3mysql -f ~/airflow/airflow.db -d airflow_db -uroot -h localhost -p
Dagの確認
下記コマンドで現在登録されているDagを確認できます。
airflow dags list
エラーが有る場合は、下記コマンドで原因を確認できます。
airflow dags list-import-errors
Taskの確認
特定のDagに入っているTaskを確認するコマンドです。
# リスト形式
airflow tasks list {Dag名}
# ツリー形式
airflow tasks list {Dag名} --tree
ローカル実行
Taskをローカル実行するコマンドです。
airflow tasks test {Dag名} {Task名}
DAGファイルのメモ
Hashicorp Vault
from airflow.providers.hashicorp.hooks.vault import VaultHook
vaultHook = VaultHook()
secretDict = vaultHook.get_secret("secret_path")
Log Mask
from airflow.utils.log.secrets_masker import mask_secret
mask_secret(mask_variable)