ArgoWorkflow で行う ETL 運用
ArgoWorkflow で行う ETL 運用

ArgoWorkflow で行う ETL 運用

はじめに

DROBE では Argo workflow を利用して ETL を行なっています

Workflow エンジンとして Argo workflow を選んだ経緯や弊社での使い方などを書いていこうと思います

ETL とは

ETL とは Extract (抽出), T (Transform), L (Load) の頭文字を繋げたもので、データウェアハウスなどを構築する際に行われるデータフローを指します

弊社では (本来の定義からは少し外れるかもしれないですが) ETL を、データにまつわるバッチ処理全般、というような意味で使っています

Argo Workflow とは

公式の説明が端的で的を得ているので引用します

Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes.

意訳すると、Argo Workflow は kubernetest 上で並列実行可能な job をオーケストレーションする為の、コンテナネイティブなオープンソースのワークフローエンジンです、という感じかと思います

特徴としては以下のようなものになります

  • ワークフローの各ステップの実態はコンテナである
  • 複数ステップのワークフローをタスクのシーケンスとしてモデルしたり、タスク間の依存関係を DAG (directed acyclic graph) を用いて表現する
  • コンピューティングリソースが必要な機械学習の job やデータプロセスを簡単に実行可能

こちらに Argo Workflow を利用している会社やプロジェクトが列挙されています

Google や NVIDIA といった大きな企業や機械学習のワークフローエンジンである kubeflow も Argo Workflow を利用しているみたいです

公式の github repository はこちらです

なぜ Argo Workflow を採用したのか

Argo Workflow 採用前に持っていた課題感

DROBE では元々 AWS の Step Function + Glue を使って ETL を運用していましたが、以下のような課題がありました

1. ETL の実行時間が長い

DROBE では ETL の際に snapshot を取るテーブルが 100 以上ありますが、Step Function では並列に実行するように設定した task は同時に実行されてしまいます

これを素直に設定していまうと DB にコネクションが同時に貼られてしまい、too many connection などになってしまうので、仕方なく並列実行する task をグルーピングしまとめていました

Step Function での実行イメージ
Step Function での実行イメージ

この場合、例えば左図のようにグルーピングしていましたが Table A, B, C 全ての処理が終わらない限り、Table D, E, F の処理は始まりません

ここで例えば Table C が A, B に対して巨大で処理に時間が掛かるなどがあると、(A, B の処理が先に終わるので) DB のコネクション的には余裕があるにも関わらず Table C の処理が終わらない限り次の処理が始まらないので、待ち時間が長くなるという問題がありました

また、DAG のような複雑な task 同士の依存関係の定義をうまく表現する事が出来ず、結果として実行時間が長い要因の一つとなってしまっていました

2. 定義ファイルの管理が大変

Step Function では json を使って workflow を管理しますが、task が 100 を超えるなどなってくると、かなり巨大な json となり、管理が非常に大変になります

* もちろん jsonnet などを使って分割したものを事前に build して適用するといった事も可能だとは思います

細かいものなどを上げていくと他にも色々ありますが、大きくは上記 2 点だったと思います。振り返ると AWS Step Function は ETL のような巨大なワークフローを管理する為のものでは無かったんじゃないかなと思います。適材適所という観点でいうと少し無茶な事をしてしまっていたんじゃないかなと考えています

Argo Workflow を採用に至った理由

上記の課題を踏まえて、Argo Workflow を採用した理由は以下のような特徴を持っている事から課題を解決出来るのでは無いかと考えたからです

1. 並列実行可能な Task の数を柔軟に変更可能

これは非常に大事な機能だと思います。Task を全て同じ優先度として定義が出来るものの並列実行数は簡単に設定が出来ます。その結果、DB に負荷を掛け過ぎないようにしつつも、指定したリソースの範囲で出来る限り並列に Task を実行できるようになりました

2. Task 同士の依存関係を簡単に定義が出来る

Argo Workflow では yaml を使って、DAG を簡単に定義できます (https://argoproj.github.io/argo-workflows/examples/#dag)

1 の並列実行可能な Task 数の設定と合わせて、Step Function の時に行なっていたようなグルーピングは完全に必要なくなりました

以下のスクリーンショットのようにそれなりに複雑な依存関係も yaml を使って直感的に定義する事が可能です

DROBE の snapshot と transform の Workflow (一部)
DROBE の snapshot と transform の Workflow (一部)

3. Workflow の定義は yaml で比較的簡潔に記載が可能で、かつ分割も可能

Argo Workflow では、Workflow 自体を分割して定義して、それを一つの Workflow として動かす、といった事も可能です

ファイルを分割する事によって見通しが良くなりレビューなどもしやすいのと、コンフリクトの危険も減らす事ができます

DROBE での使い方

DROBE では主に以下の用途で Argo Workflow を利用しています

1. 正規化されたテーブルを分析しやすい形に非正規化し、データウェアハウス (弊社では BigQuery を利用) にアップロードする

データウェアハウスには Redash からアクセス可能にしておき、分析などは Redash 上で行っています

2. リアルタイム性がそこまで求められないような集計処理

具体的な集計としては以下のようなものがあります

  • 商品の購入率や返品率の集計
  • 在庫情報の集計と更新
  • ランキングの作成/更新
  • など

集計結果は Elasticsearch に入れてしまう事が多く、アプリケーションは Elasticsearch を通して集計結果にアクセスします

3. 機械学習で使うためにデータを使いやすい形にする処理

所謂 feature engineering の初段を行っています

イメージとしては、Kaggle などの問題でダウンロード出来るような形にまでデータエンジニアリングを行なっています

4. 機械学習の処理

実際にアプリケーションで使うための機械学習のバッチ処理も Argo Workflow 上で運用しています

処理結果は出来る限りアプリケーションからは疎結合になるように気をつけつつも、アプリケーションから参照できるような形にしています

DROBE における Argo Workflow の活用
DROBE における Argo Workflow の活用

さいごに

DROBE における Argo Workflow 採用の経緯や、使われ方の概要を記載しました