データアナリティクス部の大貫です。
Databricks には Lakeflow Spark 宣言型パイプライン(以下、SDP)という機能があります。簡単にパイプラインを開発できる機能なのですが、私は業務ではほとんど使用したことがなく、あまり馴染みのない機能でした。
しかし、SDPのチュートリアルを動かしてみたところ、どのような機能かイメージが付き、説明を読むだけではなく、実際に手を動かしてみることは非常に重要だなと改めて感じました。
そこで今回の記事では、SDPのチュートリアルの動かし方と、SDPがどのような機能か簡単にご紹介したいと思います。
ちなみに、SDP は Databricks Certified Data Engineer Professional の試験範囲となっています。(試験ガイドはこちらです。)
本記事では下記について記載していきたいと思います。
本記事は執筆時点(2026年6月時点)での内容です。
SDPは、SQLおよびPythonでバッチおよびストリーミング データ パイプラインを開発および実行するための宣言型フレームワークです。
簡単に言うと、『どう処理するか(手順)』を細かく書かなくても、『どんなデータが欲しいか(完成図)』を伝えるだけで、後はDatabricksが勝手に裏側を整えてくれる賢い仕組みです。
こちらに記載のチュートリアルを実際にやってみます。
実行するための前提条件が書かれていますので、もし上手くいかない場合は確認してみてください。
1.作成
1.1.「ジョブとパイプライン」を選択
1.2.「作成」を選択
1.3.「ETLパイプライン」を選択

2.パイプライン名の変更

3.カタログとスキーマの設定
3.1.パイプライン名の右側にあるカタログとスキーマをクリック

3.2.デフォルトで使用するカタログとスキーマを選択して保存

4.言語ドロップダウンリストから Python または SQL を選択
(今回はPythonを選択します)

5.「サンプルコードを使用」をクリック


6.パイプラインの実行



1.「+」アイコンをクリックし、「変換」を選択

2.新しいコードの追加
2.1.Python または SQLを選択(今回はPythonを選択)
2.2.ファイル名を”users_cleaned.py”に変更
2.3.マテリアライズドビューをクリック(サンプルコードが生成される)

3.公式ドキュメントでは”[ 作成 ] をクリック”とありますが、2の時点でファイルが作成されます
4.“users_cleaned.py”を下記の内容に編集(“sample_users_tutorial”は、実際のテーブル名に変更してください)
from pyspark import pipelines as dp
# Drop all rows that do not have an email address
@dp.materialized_view
@dp.expect_or_drop("no null emails", "email IS NOT NULL")
def users_cleaned():
return (
spark.read.table("sample_users_tutorial")
)

5.パイプラインを実行、新しく”users_cleaned”テーブルができたことを確認

from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, desc
# Get the top 100 users by number of bookings
@dp.materialized_view
def users_and_bookings():
return (
spark.read.table("users_cleaned")
.join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
.groupBy(col("name"))
.agg(count("booking_id").alias("booking_count"))
.orderBy(desc("booking_count"))
.limit(100)
)

コメント