Skip to content

Como deve ser um flow de uma pipeline de conjunto

Laura Amaral edited this page Jul 2, 2024 · 29 revisions

To do:

  • criar uma pipeline default com todos esses passos

1. Check for updates:

Verifica na fonte original se existem dados mais atualizados que os metadados

  • cria uma task para puxar a data de atualização da fonte original get_table_source_max_date()
  • usar a task from pipelines.utils.metadata.tasks import check_if_data_is_outdated para comparar com metadados

To do:

  • Criar função para capturar data de atualização da fonte original em modelos genéricos:
    • ftp do governo
    • ckan - no nome do arquivo
    • criar função para download e extrair a data original diretamente do arquivo
  • achar uma maneira de separar os flows que atualizaram dados e os que não

2. Extract and Load:

Construir as tasks de modo que o código apenas extraia os dados e os envie para o BQ

  • caso for necessário renomear e incluir ou excluir colunas usar o utils/apply_architecture_to_dataframe
  • para download dos dados usar a função to_download que faz o download com o async o que melhora a velocidade da pipeline
  • para upload: create_table_and_upload_to_gcs

3. Transform:

A transformação dos dados rola via dbt na task execute dbt model

  • para criar yaml e arquivos .sql usar a função em /queries-basedosdados-dev/gists/create_yaml_file.py
  • Obs: caso o modelo DBT seja incremental e seja BDPro, é necessário remover todas as restrições para permitir que o dbt acesse a tabela. Em seguida, as restrições serão reativadas ao rodar o update_metadata(passo 5).
        pre_hook = "DROP ALL ROW ACCESS POLICIES ON {{ this }}",
        post_hook = [...]

To do:

  • macro de correção de municípios

4. Tests:

Depois da etapa de transformação, enquanto a pipeline está em desenvolvimento, é necessário passar por alguns testes essenciais:

  • Chaves únicas
  • Conexão com diretórios
  • Nenhuma coluna é inteiramente nula
  • Concordância com o dicionário

Esses testes já estão incorporados no script de construção do yaml

5. Update metadata e row_access_policy:

A função de update metadata já realiza a mudança do row acces policy. Essa única função faz a modificação nos metadados e no permissionamento do BQ, assim as duas coisas ficam sempre sincronizadas.

Usar from pipelines.utils.metadata.tasks import update_django_metadata