チュートリアル:ETL Lakeflow宣言型パイプラインを使用して パイプラインを構築する
Lakeflow宣言型パイプラインとAuto Loaderを使用して、データ オーケストレーション用のETL (抽出、変換、読み込み) パイプラインを作成およびデプロイする方法を学習します。ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックやレコード重複排除などの要件に基づいてそのデータを変換し、データ ウェアハウスやデータレイクなどのターゲット システムにデータを書き込む手順を実装します。
このチュートリアルでは、Lakeflow宣言型パイプラインとAuto Loaderを使用し、次のことを行います。
- 生のソースデータをターゲットテーブルに取り込みます。
- 生のソース データを変換し、変換されたデータを 2 つのターゲット マテリアライズドビューに書き込みます。
- 変換されたデータをクエリします。
- Databricks ジョブを使用して ETL パイプラインを自動化します。
Lakeflow 宣言型パイプラインと Auto Loaderの詳細については、「Lakeflow 宣言型パイプライン」および「Auto Loaderとは」を参照してください。
必要条件
このチュートリアルを完了するには、以下の条件を満たす必要があります。
- Databricks ワークスペースにログインします。
- ワークスペースで Unity Catalog が有効になっていることを確認します。
- サーバレス コンピュートをアカウントで有効にしてください。サーバレス Lakeflow 宣言型パイプラインは、すべてのワークスペース リージョンで使用できるわけではありません。 利用可能なリージョンについては、「 利用可能な地域が限られている機能 」を参照してください。
- コンピュート リソースを作成する権限、またはコンピュート リソースにアクセスする権限を持っている。
- カタログに新しいスキーマを作成する権限がある。必要な権限は、
ALL PRIVILEGES
またはUSE CATALOG
およびCREATE SCHEMA
です。 - 既存のスキーマに新しいボリュームを作成するアクセス許可がある。必要な権限は、
ALL PRIVILEGES
またはUSE SCHEMA
およびCREATE VOLUME
です。
データセットについて
この例で使用されているデータセットは、現代音楽トラックの機能とメタデータのコレクションである Million Song データセットのサブセットです。 このデータセットは、Databricksワークスペースに含まれている サンプルデータセットに含まれています。
ステップ 1: パイプラインを作成する
まず、 LakeFlow宣言型パイプラインでETLパイプラインを作成します。 LakeFlow宣言型パイプラインは、 LakeFlow宣言型パイプライン構文を使用してファイル ( ソース コード と呼ばれる) で定義された依存関係を解決することによってパイプラインを作成します。 各ソース コード ファイルに含めることができる言語は 1 つだけですが、パイプラインには複数の言語固有のファイルを追加できます。詳細については、 LakeFlow宣言型パイプライン」を参照してください。
このチュートリアルでは、サーバレス コンピュート と Unity Catalogを使用します。 指定されていないすべての構成オプションについては、デフォルト設定を使用します。サーバレス コンピュートがワークスペースで有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記載されているとおりにチュートリアルを完了できます。
Lakeflow宣言型パイプラインで新しい ETL パイプラインを作成するには次の手順を実行します。
- ワークスペースで、
サイドバーで [新規] を クリックし、 [ETL パイプライン] を選択します。
- パイプラインに一意の名前を付けます。
- 名前のすぐ下で、生成するデータのデフォルトのカタログとスキーマを選択します。変換で他の宛先を指定することもできますが、このチュートリアルではこれらのデフォルトを使用します。作成するカタログとスキーマに対する権限が必要です。要件を参照してください。
- このチュートリアルでは、 「空のファイルから開始」 を選択します。
- [フォルダー パス] で、ソース ファイルの場所を指定するか、そのまま (ユーザー フォルダー) を受け入れます。
- 最初のソース ファイルの言語として Python または SQL を選択します (パイプラインでは言語を混在させることができますが、各ファイルは単一の言語である必要があります)。
- [選択] をクリックします。
新しいパイプラインのパイプライン エディターが表示されます。言語の空のソース ファイルが作成され、最初の変換の準備が整います。
ステップ 2: パイプライン ロジックを開発する
このステップでは、 LakeFlow Pipelines Editorを使用して、 LakeFlow宣言型パイプラインのソース コードを対話的に開発および検証します。
このコードでは、インクリメンタル データ取り込みに Auto Loader を使用します。 Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。 詳細については、Auto Loaderとはを参照してください。
パイプライン用に空のソース コード ファイルが自動的に作成され、構成されます。ファイルはパイプラインの変換フォルダーに作成されます。デフォルトでは、変換フォルダー内のすべての *.py ファイルと *.sql ファイルは、パイプラインのソースの一部になります。
- 次のコードをコピーしてソース ファイルに貼り付けます。必ずステップ 1 でファイルに対して選択した言語を使用してください。
- Python
- SQL
# Import modules
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
# Define the path to the source data
file_path = f"/databricks-datasets/songs/data-001/"
# Define a streaming table to ingest data from a volume
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)
@dlt.table(
comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
)
def songs_raw():
return (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.load(file_path))
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="Million Song Dataset with data cleaned and prepared for analysis."
)
@dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
@dlt.expect("valid_title", "song_title IS NOT NULL")
@dlt.expect("valid_duration", "duration > 0")
def songs_prepared():
return (
spark.read.table("songs_raw")
.withColumnRenamed("title", "song_title")
.select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of songs released by the artists who released the most songs each year."
)
def top_artists_by_year():
return (
spark.read.table("songs_prepared")
.filter(expr("year > 0"))
.groupBy("artist_name", "year")
.count().withColumnRenamed("count", "total_number_of_songs")
.sort(desc("total_number_of_songs"), desc("year"))
)
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE songs_raw
COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
AS SELECT *
FROM STREAM read_files(
'/databricks-datasets/songs/data-001/part*',
format => "csv",
header => "false",
delimiter => "\t",
schema => """
artist_id STRING,
artist_lat DOUBLE,
artist_long DOUBLE,
artist_location STRING,
artist_name STRING,
duration DOUBLE,
end_of_fade_in DOUBLE,
key INT,
key_confidence DOUBLE,
loudness DOUBLE,
release STRING,
song_hotnes DOUBLE,
song_id STRING,
start_of_fade_out DOUBLE,
tempo DOUBLE,
time_signature INT,
time_signature_confidence DOUBLE,
title STRING,
year INT,
partial_sequence STRING
""",
schemaEvolutionMode => "none");
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
CONSTRAINT valid_duration EXPECT (duration > 0)
)
COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
FROM songs_raw;
-- Define a materialized view that has a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
AS SELECT
artist_name,
year,
COUNT(*) AS total_number_of_songs
FROM songs_prepared
WHERE year > 0
GROUP BY artist_name, year
ORDER BY total_number_of_songs DESC, year DESC;
このソースには 3 つのクエリのコードが含まれています。これらのクエリを別々のファイルに入れて、ファイルを整理し、好みの方法でコーディングすることもできます。
- クリック
ファイルを実行する か、 パイプラインを実行して、 接続されたパイプラインの更新を開始します。パイプラインにソース ファイルが 1 つしかない場合、これらは機能的に同等です。
更新が完了すると、エディターはパイプラインに関する情報で更新されます。
- コードの右側のサイドバーにあるパイプライン グラフ (DAG) には、3 つのテーブル
songs_raw
、songs_prepared
、top_artists_by_year
表示されます。 - 更新の概要は、パイプライン アセット ブラウザーの上部に表示されます。
- 生成されたテーブルの詳細は下部のペインに表示され、テーブルを選択してデータを参照できます。
これには、生のデータとクリーンアップされたデータのほか、年ごとにトップアーティストを見つけるための簡単な分析が含まれています。次のステップでは、パイプラインの別のファイルでさらに分析するためのアドホック クエリを作成します。
ステップ 3: パイプラインによって作成されたデータセットを探索する
このステップでは、 ETLパイプラインで処理されたデータに対してアドホック クエリを実行し、 Databricks SQLエディターで曲データを分析します。 これらのクエリは、前のステップで作成された準備済みレコードを使用します。
まず、1990 年以降に毎年最も多くの曲をリリースしたアーティストを見つけるクエリを実行します。
-
パイプラインアセットブラウザのサイドバーから、
追加し てから 探索します 。
-
名前 を入力し、探索ファイルとして SQL を選択します。新しい
explorations
フォルダーに SQL ノートブックが作成されます。explorations
フォルダ内のファイルは、デフォルトではパイプライン更新の一部として実行されません。SQL ノートブックには、一緒に実行することも、個別に実行することもできるセルがあります。 -
1990 年以降の各年に最も多くの曲をリリースしたアーティストのテーブルを作成するには、新しい SQL ファイルに次のコードを入力します (ファイル内にサンプル コードがある場合は置き換えます)。このノートブックはパイプラインの一部ではないため、デフォルトのカタログとスキーマは使用されません。
<catalog>.<schema>
、パイプラインのデフォルトとして使用したカタログとスキーマに置き換えます。SQL-- Which artists released the most songs each year in 1990 or later?
SELECT artist_name, total_number_of_songs, year
-- replace with the catalog/schema you are using:
FROM <catalog>.<schema>.top_artists_by_year
WHERE year >= 1990
ORDER BY total_number_of_songs DESC, year DESC; -
クリック
または、
Shift + Enter
を押してこのクエリを実行します。
次に、4/4 ビートとダンサブルなテンポの曲を見つける別のクエリを実行します。
-
同じファイル内の次のセルに次のコードを追加します。ここでも、
<catalog>.<schema>
、パイプラインのデフォルトとして使用したカタログとスキーマに置き換えます。SQL-- Find songs with a 4/4 beat and danceable tempo
SELECT artist_name, song_title, tempo
-- replace with the catalog/schema you are using:
FROM <catalog>.<schema>.songs_prepared
WHERE time_signature = 4 AND tempo between 100 and 140; -
クリック
または、
Shift + Enter
を押してこのクエリを実行します。
手順 4: パイプラインを実行するジョブを作成する
次に、スケジュールに従って実行するDatabricksジョブを使用して、データの取り込み、処理、分析ステップを自動化するワークフローを作成します。
- エディターの上部にある [スケジュール] ボタンを選択します。
- [スケジュール] ダイアログが表示されたら、 [スケジュールの追加] を 選択します。
- これにより、[ 新しいスケジュール] ダイアログが開き、スケジュールに従ってパイプラインを実行するジョブを作成できます。
- 必要に応じて、ジョブに名前を付けます。
- デフォルトでは、スケジュールは 1 日に 1 回実行されるように設定されています。このデフォルトを受け入れるか、独自のスケジュールを設定することができます。 「詳細」 を選択すると、ジョブを実行する特定の時間を設定するオプションが提供されます。 その他のオプション を選択すると、ジョブの実行時に通知を作成できます。
- 変更を適用してジョブを作成するには、 [作成] を選択します。
これで、ジョブは毎日実行され、パイプラインが最新の状態に保たれます。スケジュールのリストを表示するには、もう一度 「スケジュール」 を選択します。このダイアログから、スケジュールの追加、編集、削除など、パイプラインのスケジュールを管理できます。
スケジュール (またはジョブ) の名前をクリックすると、 [ジョブとパイプライン] リストのジョブのページに移動します。そこから、実行の履歴を含むジョブ実行に関する詳細を表示したり、 今すぐ実行 ボタンを使用してジョブをすぐに実行したりできます。
ジョブの実行の詳細については、「Lakeflowジョブのモニタリングと可観測性」を参照してください。
詳細情報
- Lakeflow 宣言型パイプラインを使用したデータ処理パイプラインの詳細については、「Lakeflow 宣言型パイプライン」を参照してください。
- Databricks ノートブックの詳細については、「Databricks ノートブック」を参照してください。
- Lakeflowジョブの詳細については、「ジョブとは」を参照してください。
- Delta Lake の詳細については、「 Databricks の Delta Lake とは」を参照してください。