-
Notifications
You must be signed in to change notification settings - Fork 13
deleteFlows
Atenção: Para execução dos scripts descritos neste tutorial é preciso ter as variáveis de ambiente do Prefect configuradas (ver README no repositório principal) e a versão 4.0.0 do Selenium
É comum ter que deletar flows em produção ou em desenvolvimento depois de uma alteração específica do Flow ou depois uma alteração geral em todas as pipelines. É possível deletar esses flows na própria UI do Prefect clicando no ícone de lixeira no canto superior direito da página do Flow. O problema dessa solução, contudo, é que quando você precisa deletar muitos flows, você irá gastar muito tempo deletando um por um. Felizmente, é possível usar o cliente em python do prefect para deletar vários flows de uma vez.
Para deletar apenas um flow via python, use o seguinte snippet:
from prefect.client import Client
flow_id = "{flow_hash}"
client = Client()
client.graphql(
"""
mutation {
delete_flow(input: {flow_id: "%s"}) {
success
}
}
""" % flow_id
)
Em que a variável flow_id
é o id do flow que você deseja deletar. Esse id é criado no momento de registro do flow e também pode ser consultado na UI do prefect.
Para deletar todas as versões de um dado flow, basta alterar o argumento no método client.graphql
:
flow_group_id = "{flow_hash}"
client = Client()
client.graphql(
"""
mutation {
delete_flow_group(input: {flow_group_id: "%s"}) {
success
}
}
Dessa vez, porém, deve-se usar o id do grupo do Flow de interesse, também disponível na UI.
É fácil notar que é possível deletar todos os flows em produção se soubermos de antemão o valor dos ids dos grupos. Uma forma de obter esses ids é fazer um web scraping simples na UI do Prefect e então deletar os flows, como no snippet abaixo:
from prefect.client import Client
from selenium import webdriver
from selenium.webdriver.common.by import By
from time import sleep
options = webdriver.ChromeOptions()
options.headless = True
options.add_experimental_option("prefs", {
"download": {"prompt_for_download": False} })
options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=options)
url = 'http://prefect-ui.prefect.svc.cluster.local:8080/default?flows'
driver.get(url)
sleep(10)
elements = driver.find_elements_by_class_name("link")
flow_group_ids=[]
for element in elements:
link = element.get_attribute('href')
print(link)
if link.__contains__('flow'):
flow_group_ids.append(link.split('/')[-1])
client = Client()
for flow_group_id in flow_group_ids:
client.graphql(
"""
mutation {
delete_flow_group(input: {flow_group_id: "%s"}) {
success
}
}
""" % flow_group_id
)
print(f'delete {flow_group_id}')
driver.close()
Outra forma de cancelar as runs de flows arquivados é capturando os id's dessas runs via api UI do Prefect e então deletá-las, como no snippet abaixo:
import pandas as pd
import json
import requests
from prefect import Client
### pra esse código funcionar precisa:
# ativar a .venv e carregar as variáveis de .env
# source /pipelines/.venv/bin/activate
# source /pipelines/.env
client = Client()
get_scheduled_flow_runs_from_archived_flows = """{
flow(where: {archived: {_eq: true}, flow_runs: {auto_scheduled: {_eq: true}}}){
id
flow_runs(where: {state: {_eq: "Scheduled"}}){
state
id
scheduled_start_time
}
}
}"""
r = client.graphql(
query= get_scheduled_flow_runs_from_archived_flows
)
def delete_flow_run_mutation(flow_run_id):
mutation = """
mutation {
delete_flow_run(input: {flow_run_id: "%s"}) {
success
}
}
""" % flow_run_id
return mutation
for flow in r['data']['flow']:
print("\nflow_id: "+flow['id'])
for scheduled_run in flow["flow_runs"]:
flow_run_id = scheduled_run["id"]
mutation = delete_flow_run_mutation(flow_run_id=flow_run_id)
client.graphql(query= mutation)
print(f'\t- delete {flow_run_id}\t- scheduled_start_time: {scheduled_run["scheduled_start_time"]}')