Pular para o conteúdo principal

Tutorial: Crie um ETL pipeline com o pipeline declarativo LakeFlow

Aprenda a criar e implantar um ETL (extrair, transformar e carregar) pipeline para a obtenção de dados usando LakeFlow Declarative pipeline e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados dos sistemas de origem, transformar esses dados com base nos requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros, e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.

Neste tutorial, o senhor usará o LakeFlow Declarative pipeline e o Auto Loader para:

  • Ingira dados de origem brutos em uma tabela de destino.
  • Transformar os dados brutos de origem e gravar os dados transformados em duas visualizações materializadas de destino.
  • Consulte os dados transformados.
  • Automatize o ETL pipeline com um trabalho Databricks.

Para obter mais informações sobre LakeFlow Declarative pipeline e Auto Loader, consulte LakeFlow Declarative pipeline e What is Auto Loader?

Requisitos

Para completar este tutorial, você deve atender aos seguintes requisitos:

Sobre o dataset

O site dataset usado neste exemplo é um subconjunto do conjunto de dados Million Song, uma coleção de recursos e metadados para faixas de música contemporânea. Esse dataset está disponível no conjunto de dados de amostra incluído em seu Databricks workspace.

Etapa 1: Criar um pipeline

Primeiro, crie um pipeline ETL no pipeline declarativo LakeFlow . O pipeline declarativo LakeFlow cria um pipeline resolvendo dependências definidas em arquivos (chamados de código-fonte ) usando a sintaxe do pipeline declarativo LakeFlow . Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários arquivos específicos de idioma no pipeline. Para saber mais, consulte o pipeline declarativoLakeFlow

Este tutorial usa compute serverless e Unity Catalog. Para todas as opções de configuração que não forem especificadas, use as configurações default . Se compute serverless não estiver habilitada ou não for suportada no seu workspace, você poderá concluir o tutorial conforme escrito usando as configurações compute default .

Para criar um novo ETL pipeline no pipeline LakeFlow Declarative, siga estas etapas:

  1. No seu workspace, clique em Ícone de mais. Novo na barra lateral, então selecione pipelineETL .
  2. Dê ao seu pipeline um nome exclusivo.
  3. Logo abaixo do nome, selecione o catálogo e o esquema default para os dados que você gerar. Você pode especificar outros destinos em suas transformações, mas este tutorial usa esses padrões. Você deve ter permissões para o catálogo e o esquema que você criar. Veja Requisitos.
  4. Para este tutorial, selecione começar com um arquivo vazio .
  5. Em Caminho da pasta , especifique um local para seus arquivos de origem ou aceite o default (sua pasta de usuário).
  6. Escolha Python ou SQL como a linguagem para seu primeiro arquivo de origem (um pipeline pode misturar e combinar linguagens, mas cada arquivo deve estar em uma única linguagem).
  7. Clique em Selecionar .

O editor de pipeline aparece para o novo pipeline. Um arquivo de origem vazio para seu idioma é criado, pronto para suas primeiras transformações.

o passo 2: Desenvolva a lógica do seu pipeline

Nesta etapa, você usará o LakeFlow Pipelines Editor para desenvolver e validar o código-fonte do pipeline declarativo LakeFlow interativamente.

O código usa Auto Loader para ingestão incremental de dados. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem. Para saber mais, consulte O que é o Auto Loader?

Um arquivo de código-fonte em branco é criado e configurado automaticamente para o pipeline. O arquivo é criado na pasta de transformações do seu pipeline. Por default, todos os arquivos *.py e *.sql na pasta transformações fazem parte da fonte do seu pipeline.

  1. Copie e cole o seguinte código no seu arquivo de origem. Certifique-se de usar o idioma que você selecionou para o arquivo no passo 1.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

# Define the path to the source data
file_path = f"/databricks-datasets/songs/data-001/"

# Define a streaming table to ingest data from a volume
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)

@dlt.table(
comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
)
def songs_raw():
return (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.load(file_path))

# Define a materialized view that validates data and renames a column
@dlt.table(
comment="Million Song Dataset with data cleaned and prepared for analysis."
)
@dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
@dlt.expect("valid_title", "song_title IS NOT NULL")
@dlt.expect("valid_duration", "duration > 0")
def songs_prepared():
return (
spark.read.table("songs_raw")
.withColumnRenamed("title", "song_title")
.select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
)

# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of songs released by the artists who released the most songs each year."
)
def top_artists_by_year():
return (
spark.read.table("songs_prepared")
.filter(expr("year > 0"))
.groupBy("artist_name", "year")
.count().withColumnRenamed("count", "total_number_of_songs")
.sort(desc("total_number_of_songs"), desc("year"))
)

Esta fonte inclui código para três consultas. Você também pode colocar essas consultas em arquivos separados para organizar os arquivos e codificar da maneira que preferir.

  1. Clique Ícone de reprodução. arquivo de execução ou pipelinede execução para iniciar uma atualização para o pipeline conectado. Com apenas um arquivo de origem no seu pipeline, eles são funcionalmente equivalentes.

Quando a atualização for concluída, o editor será atualizado com informações sobre seu pipeline.

  • O gráfico de pipeline (DAG), na barra lateral à direita do seu código, mostra três tabelas, songs_raw, songs_prepared e top_artists_by_year.
  • Um resumo da atualização é mostrado na parte superior do navegador ativo pipeline .
  • Os detalhes das tabelas geradas são mostrados no painel inferior, e você pode navegar pelos dados das tabelas selecionando uma.

Isso inclui dados brutos e limpos, bem como algumas análises simples para encontrar os principais artistas por ano. No próximo passo, você cria consultas ad-hoc para análise posterior em um arquivo separado no seu pipeline.

o passo 3: Explore o conjunto de dados criado pelo seu pipeline

Nesta etapa, você executa consultas ad-hoc nos dados processados no pipeline ETL para analisar os dados da música no Editor Databricks SQL . Essas consultas usam os registros preparados criados no passo anterior.

Primeiro, execute uma consulta que encontre os artistas que lançaram o maior número de músicas por ano desde 1990.

  1. Na barra lateral do navegador ativo pipeline , clique em Ícone de mais. Adicione então Exploração .

  2. Digite um nome e selecione SQL para o arquivo de exploração. Um SQL Notebook é criado em uma nova pasta explorations . Os arquivos na pasta explorations não são executados como parte de uma atualização pipeline por default. O SQL Notebook tem células que você pode executar juntas ou separadamente.

  3. Para criar uma tabela de artistas que lançaram mais músicas em cada ano após 1990, insira o seguinte código no novo arquivo SQL (se houver código de exemplo no arquivo, substitua-o). Como este Notebook não faz parte do pipeline, ele não usa o catálogo e o esquema default . Substitua o <catalog>.<schema> pelo catálogo e esquema que você usou como padrão para o pipeline:

    SQL
    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    -- replace with the catalog/schema you are using:
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC;
  4. Clique Ícone de reprodução. ou pressione Shift + Enter para executar esta consulta.

Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançante.

  1. Adicione o seguinte código à próxima célula no mesmo arquivo. Novamente, substitua o <catalog>.<schema> pelo catálogo e esquema que você usou como padrão para o pipeline:

    SQL
    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
    -- replace with the catalog/schema you are using:
    FROM <catalog>.<schema>.songs_prepared
    WHERE time_signature = 4 AND tempo between 100 and 140;
  2. Clique Ícone de reprodução. ou pressione Shift + Enter para executar esta consulta.

Etapa 4: Criar um trabalho para executar o pipeline

A seguir, crie um fluxo de trabalho para automatizar a ingestão de dados, processamento e análise dos passos usando um Databricks Job que é executado em um programar.

  1. Na parte superior do editor, escolha o botão programar .
  2. Se a caixa de diálogo do programa aparecer, escolha Adicionar programa .
  3. Isso abre a caixa de diálogo Novo programa , onde você pode criar um Job para executar seu pipeline em um programa.
  4. Opcionalmente, dê um nome ao trabalho.
  5. Por default, o programador é configurado para executar uma vez por dia. Você pode aceitar esse padrão ou definir seu próprio programa. Escolher Avançado lhe dá a opção de definir um horário específico para a execução do trabalho. Selecionar Mais opções permite que você crie notificações quando o trabalho for executado.
  6. Selecione Criar para aplicar as alterações e criar o trabalho.

Agora o Job será executado diariamente para manter seu pipeline atualizado. Você pode escolher programar novamente para view a lista de programas. Você pode gerenciar programas para seu pipeline a partir dessa caixa de diálogo, incluindo adicionar, editar ou remover programas.

Clicar no nome do programa (ou Job) leva você para a página do Job na lista Jobs & pipeline . A partir daí você pode view detalhes sobre a execução do Job, incluindo a história de execução, ou executar o Job imediatamente com o botão executar agora .

Consulte monitoramento e observabilidade para LakeFlow Jobs para obter mais informações sobre a execução de trabalhos.

Saiba mais