Skip to content

Merge pull request #15 from jipc3/construccion #15

Merge pull request #15 from jipc3/construccion

Merge pull request #15 from jipc3/construccion #15

name: Dynamic Databricks Notebook Deploy
on:
push:
branches:
- main
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v3
- name: Install jq & curl
run: sudo apt-get update && sudo apt-get install -y jq curl
- name: Export multiple notebooks (raw)
run: |
ORIGIN_HOST=${{ secrets.DATABRICKS_ORIGIN_HOST }}
ORIGIN_TOKEN=${{ secrets.DATABRICKS_ORIGIN_TOKEN }}
NOTEBOOK_BASE="/Workspace/Users/jeremypalma2022@gmail.com/Proyect_Databricks_GitHub"
NOTEBOOK_PATHS=(
"process/Ingest_supercias_compania"
"process/Ingest_supercias_ranking"
"process/Ingest_supercias_sector"
"process/Ingest_supercias_segmento"
"process/Load_supercias"
"process/Transform_supercias"
"scripts/Preparacion_Ambiente"
"security/grants"
) # Agrega más rutas según se requiera
mkdir -p notebooks_to_deploy
for nb_path in "${NOTEBOOK_PATHS[@]}"; do
nb=$(basename "$nb_path")
echo "Exportando $nb_path en modo raw..."
curl -s -X GET \
-H "Authorization: Bearer $ORIGIN_TOKEN" \
"$ORIGIN_HOST/api/2.0/workspace/export?path=$NOTEBOOK_BASE/$nb_path&format=SOURCE&direct_download=true" \
--output "notebooks_to_deploy/$nb.py"
done
- name: Deploy notebooks to Destination Workspace
run: |
DEST_HOST=${{ secrets.DATABRICKS_DEST_HOST }}
DEST_TOKEN=${{ secrets.DATABRICKS_DEST_TOKEN }}
DEST_BASE="/prod/scripts/main"
for file in notebooks_to_deploy/*.py; do
name=$(basename "$file" .py)
dest_path="$DEST_BASE/$name"
echo "Creando carpeta $DEST_BASE si no existe..."
curl -s -X POST \
-H "Authorization: Bearer $DEST_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"path\":\"$DEST_BASE\"}" \
"$DEST_HOST/api/2.0/workspace/mkdirs"
echo "Importando $file → $dest_path"
response=$(curl -s -X POST \
-H "Authorization: Bearer $DEST_TOKEN" \
-H "Content-Type: multipart/form-data" \
-F "path=$dest_path" \
-F "format=SOURCE" \
-F "language=PYTHON" \
-F "overwrite=true" \
-F "content=@$file" \
"$DEST_HOST/api/2.0/workspace/import")
echo "Response: $response"
done
- name: Check if workflow exists and delete if necessary
run: |
DEST_HOST=${{ secrets.DATABRICKS_DEST_HOST }}
DEST_TOKEN=${{ secrets.DATABRICKS_DEST_TOKEN }}
WORKFLOW_NAME="WF_ADB"
echo "Verificando si existe el workflow: $WORKFLOW_NAME"
# Listar todos los workflows y buscar por nombre
workflows_response=$(curl -s -X GET \
-H "Authorization: Bearer $DEST_TOKEN" \
"$DEST_HOST/api/2.1/jobs/list")
# Extraer job_id si existe el workflow
existing_job_id=$(echo "$workflows_response" | jq -r --arg name "$WORKFLOW_NAME" '.jobs[]? | select(.settings.name == $name) | .job_id')
if [ "$existing_job_id" != "" ] && [ "$existing_job_id" != "null" ]; then
echo "Workflow encontrado con ID: $existing_job_id. Eliminando..."
delete_response=$(curl -s -X POST \
-H "Authorization: Bearer $DEST_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"job_id\": $existing_job_id}" \
"$DEST_HOST/api/2.1/jobs/delete")
echo "Delete response: $delete_response"
else
echo "No se encontró workflow existente con nombre: $WORKFLOW_NAME"
fi
- name: Get existing cluster ID
run: |
DEST_HOST=${{ secrets.DATABRICKS_DEST_HOST }}
DEST_TOKEN=${{ secrets.DATABRICKS_DEST_TOKEN }}
CLUSTER_NAME="Cluster_Produccion"
echo "Buscando cluster existente: $CLUSTER_NAME"
# Obtener lista de clusters
clusters_response=$(curl -s -X GET \
-H "Authorization: Bearer $DEST_TOKEN" \
"$DEST_HOST/api/2.0/clusters/list")
echo "Clusters response: $clusters_response"
# Extraer cluster_id del cluster especificado
cluster_id=$(echo "$clusters_response" | jq -r --arg name "$CLUSTER_NAME" '.clusters[]? | select(.cluster_name == $name) | .cluster_id')
if [ "$cluster_id" != "" ] && [ "$cluster_id" != "null" ]; then
echo "✅ Cluster encontrado: $CLUSTER_NAME con ID: $cluster_id"
echo "CLUSTER_ID=$cluster_id" >> $GITHUB_ENV
else
echo "❌ No se encontró el cluster: $CLUSTER_NAME"
echo "Clusters disponibles:"
echo "$clusters_response" | jq -r '.clusters[]? | .cluster_name'
exit 1
fi
- name: Create Databricks Workflow WF_ADB
run: |
DEST_HOST=${{ secrets.DATABRICKS_DEST_HOST }}
DEST_TOKEN=${{ secrets.DATABRICKS_DEST_TOKEN }}
DEST_BASE="/py/scripts/main"
CLUSTER_ID="${{ env.CLUSTER_ID }}"
echo "Creando workflow: WF_ADB con cluster existente ID: $CLUSTER_ID"
# Crear el JSON del workflow usando cluster existente
cat > workflow_config.json << EOF
{
"name": "WF_ADB",
"format": "MULTI_TASK",
"tasks": [
{
"task_key": "Preparacion_Ambiente",
"description": "Ejecuta notebook Preparacion_Ambiente",
"notebook_task": {
"notebook_path": "/prod/scripts/main/Preparacion_Ambiente",
"source": "WORKSPACE",
"base_parameters": {
"storageName": "saccexplorer",
"catalogName": "unit_catalog_explorer"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "Ingest_supercias_compania",
"description": "Ejecuta notebook Ingest_supercias_compania",
"notebook_task": {
"notebook_path": "/prod/scripts/main/Ingest_supercias_compania",
"source": "WORKSPACE",
"base_parameters": {
"storageName": "saccexplorer",
"catalogName": "unit_catalog_explorer",
"schemaName": "uc_bronze",
"containerName": "bronze"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2,
"depends_on": [
{
"task_key": "Preparacion_Ambiente"
}
]
},
{
"task_key": "Ingest_supercias_ranking",
"description": "Ejecuta notebook Ingest_supercias_ranking",
"notebook_task": {
"notebook_path": "/prod/scripts/main/Ingest_supercias_ranking",
"source": "WORKSPACE",
"base_parameters": {
"storageName": "saccexplorer",
"catalogName": "unit_catalog_explorer",
"schemaName": "uc_bronze",
"containerName": "bronze"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2,
"depends_on": [
{
"task_key": "Preparacion_Ambiente"
}
]
},
{
"task_key": "Ingest_supercias_sector",
"description": "Ejecuta notebook Ingest_supercias_sector",
"notebook_task": {
"notebook_path": "/prod/scripts/main/Ingest_supercias_sector",
"source": "WORKSPACE",
"base_parameters": {
"storageName": "saccexplorer",
"catalogName": "unit_catalog_explorer",
"schemaName": "uc_bronze",
"containerName": "bronze"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2,
"depends_on": [
{
"task_key": "Preparacion_Ambiente"
}
]
},
{
"task_key": "Ingest_supercias_segmento",
"description": "Ejecuta notebook Ingest_supercias_segmento",
"notebook_task": {
"notebook_path": "/prod/scripts/main/Ingest_supercias_segmento",
"source": "WORKSPACE",
"base_parameters": {
"storageName": "saccexplorer",
"catalogName": "unit_catalog_explorer",
"schemaName": "uc_bronze",
"containerName": "bronze"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2,
"depends_on": [
{
"task_key": "Preparacion_Ambiente"
}
]
},
{
"task_key": "Transform_supercias",
"description": "Ejecuta notebook Transform_supercias",
"notebook_task": {
"notebook_path": "/prod/scripts/main/Transform_supercias",
"source": "WORKSPACE",
"base_parameters": {
"storageName": "saccexplorer",
"catalogName": "unit_catalog_explorer",
"schema_source": "uc_bronze",
"schema_sink": "uc_silver",
"containerName": "silver"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2,
"depends_on": [
{
"task_key": "Ingest_supercias_compania"
},
{
"task_key": "Ingest_supercias_ranking"
},
{
"task_key": "Ingest_supercias_sector"
},
{
"task_key": "Ingest_supercias_segmento"
}
]
},
{
"task_key": "Load_supercias",
"description": "Ejecuta notebook Load_supercias",
"notebook_task": {
"notebook_path": "/prod/scripts/main/Load_supercias",
"source": "WORKSPACE",
"base_parameters": {
"storageName": "saccexplorer",
"catalogName": "unit_catalog_explorer",
"schema_source": "uc_silver",
"schema_sink": "uc_golden",
"containerName": "golden"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2,
"depends_on": [
{
"task_key": "Transform_supercias"
}
]
},
{
"task_key": "Grants",
"description": "Ejecuta notebook Grants",
"notebook_task": {
"notebook_path": "/prod/scripts/main/grants",
"source": "WORKSPACE",
"base_parameters": {
"catalogName": "unit_catalog_explorer",
"groupName": "ext_users"
}
},
"existing_cluster_id": "$CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2,
"depends_on": [
{
"task_key": "Load_supercias"
}
]
}
],
"schedule": {
"quartz_cron_expression": "0 0 8 * * ?",
"timezone_id": "America/Lima",
"pause_status": "PAUSED"
},
"email_notifications": {
"on_failure": [],
"on_success": [],
"no_alert_for_skipped_runs": false
},
"webhook_notifications": {},
"timeout_seconds": 7200,
"max_concurrent_runs": 1,
"tags": {
"environment": "production",
"created_by": "github_actions",
"project": "automated_deployment",
"cluster_used": "Cluster_Produccion"
}
}
EOF
# Crear el workflow
create_response=$(curl -s -X POST \
-H "Authorization: Bearer $DEST_TOKEN" \
-H "Content-Type: application/json" \
-d @workflow_config.json \
"$DEST_HOST/api/2.1/jobs/create")
echo "Workflow creation response: $create_response"
# Extraer job_id del response
job_id=$(echo "$create_response" | jq -r '.job_id')
if [ "$job_id" != "" ] && [ "$job_id" != "null" ]; then
echo "✅ Workflow 'WF_ADB' creado exitosamente con ID: $job_id"
# Obtener detalles del workflow creado
workflow_details=$(curl -s -X GET \
-H "Authorization: Bearer $DEST_TOKEN" \
"$DEST_HOST/api/2.1/jobs/get?job_id=$job_id")
echo "Detalles del workflow:"
echo "$workflow_details" | jq '.settings | {name, tasks: (.tasks | map({task_key, notebook_task: .notebook_task.notebook_path}))}'
else
echo "❌ Error al crear el workflow"
echo "Response completo: $create_response"
exit 1
fi
- name: Validate Workflow Configuration
run: |
DEST_HOST=${{ secrets.DATABRICKS_DEST_HOST }}
DEST_TOKEN=${{ secrets.DATABRICKS_DEST_TOKEN }}
WORKFLOW_NAME="WF_ADB"
echo "🔍 Validando la configuración del workflow creado..."
# Obtener lista de workflows y encontrar el recién creado
workflows_list=$(curl -s -X GET \
-H "Authorization: Bearer $DEST_TOKEN" \
"$DEST_HOST/api/2.1/jobs/list")
job_id=$(echo "$workflows_list" | jq -r --arg name "$WORKFLOW_NAME" '.jobs[]? | select(.settings.name == $name) | .job_id')
if [ "$job_id" != "" ] && [ "$job_id" != "null" ]; then
echo "✅ Workflow encontrado con ID: $job_id"
# Obtener configuración detallada
job_details=$(curl -s -X GET \
-H "Authorization: Bearer $DEST_TOKEN" \
"$DEST_HOST/api/2.1/jobs/get?job_id=$job_id")
echo "📋 Resumen del workflow:"
echo "Nombre: $(echo "$job_details" | jq -r '.settings.name')"
echo "Número de tareas: $(echo "$job_details" | jq '.settings.tasks | length')"
echo ""
echo "📝 Tareas configuradas:"
echo "$job_details" | jq -r '.settings.tasks[] | "- " + .task_key + " → " + .notebook_task.notebook_path'
echo ""
echo "🖥️ Cluster configurado:"
echo "Cluster ID: $(echo "$job_details" | jq -r '.settings.tasks[0].existing_cluster_id')"
echo "Cluster Name: Cluster_Produccion (reutilizado)"
else
echo "❌ No se pudo encontrar el workflow creado"
exit 1
fi
- name: Execute Workflow WF_ADB
run: |
DEST_HOST=${{ secrets.DATABRICKS_DEST_HOST }}
DEST_TOKEN=${{ secrets.DATABRICKS_DEST_TOKEN }}
WORKFLOW_NAME="WF_ADB"
echo "🚀 Ejecutando workflow: $WORKFLOW_NAME"
# Obtener job_id del workflow
workflows_list=$(curl -s -X GET \
-H "Authorization: Bearer $DEST_TOKEN" \
"$DEST_HOST/api/2.1/jobs/list")
job_id=$(echo "$workflows_list" | jq -r --arg name "$WORKFLOW_NAME" '.jobs[]? | select(.settings.name == $name) | .job_id')
if [ "$job_id" != "" ] && [ "$job_id" != "null" ]; then
echo "✅ Workflow encontrado con ID: $job_id"
# Ejecutar el workflow
run_response=$(curl -s -X POST \
-H "Authorization: Bearer $DEST_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"job_id\": $job_id}" \
"$DEST_HOST/api/2.1/jobs/run-now")
run_id=$(echo "$run_response" | jq -r '.run_id')
if [ "$run_id" != "" ] && [ "$run_id" != "null" ]; then
echo "🎯 Workflow ejecutado exitosamente!"
echo "Run ID: $run_id"
echo "WORKFLOW_RUN_ID=$run_id" >> $GITHUB_ENV
echo "WORKFLOW_JOB_ID=$job_id" >> $GITHUB_ENV
# Mostrar URL del workflow en ejecución
echo "🔗 URL del workflow: $DEST_HOST/jobs/$job_id/runs/$run_id"
else
echo "❌ Error al ejecutar el workflow"
echo "Response: $run_response"
exit 1
fi
else
echo "❌ No se pudo encontrar el workflow para ejecutar"
exit 1
fi
- name: Monitor Workflow Execution
run: |
DEST_HOST=${{ secrets.DATABRICKS_DEST_HOST }}
DEST_TOKEN=${{ secrets.DATABRICKS_DEST_TOKEN }}
RUN_ID="${{ env.WORKFLOW_RUN_ID }}"
JOB_ID="${{ env.WORKFLOW_JOB_ID }}"
echo "📊 Monitoreando ejecución del workflow..."
echo "Job ID: $JOB_ID"
echo "Run ID: $RUN_ID"
# Monitorear por máximo 10 minutos (600 segundos)
max_wait_time=600
wait_time=0
check_interval=30
while [ $wait_time -lt $max_wait_time ]; do
# Obtener estado actual
run_status=$(curl -s -X GET \
-H "Authorization: Bearer $DEST_TOKEN" \
"$DEST_HOST/api/2.1/jobs/runs/get?run_id=$RUN_ID")
state=$(echo "$run_status" | jq -r '.state.life_cycle_state')
result_state=$(echo "$run_status" | jq -r '.state.result_state // "RUNNING"')
echo "⏱️ Estado actual: $state ($result_state) - Tiempo transcurrido: ${wait_time}s"
# Mostrar progreso de las tareas
echo "$run_status" | jq -r '.tasks[]? | " 📋 " + .task_key + ": " + .state.life_cycle_state + " (" + (.state.result_state // "RUNNING") + ")"'
case "$state" in
"TERMINATED")
if [ "$result_state" = "SUCCESS" ]; then
echo "🎉 ¡Workflow completado exitosamente!"
# Mostrar resumen final
echo ""
echo "📈 Resumen de ejecución:"
echo "$run_status" | jq -r '.tasks[]? | "✅ " + .task_key + " → " + (.state.result_state // "SUCCESS")'
# Obtener duración
start_time=$(echo "$run_status" | jq -r '.start_time')
end_time=$(echo "$run_status" | jq -r '.end_time')
if [ "$start_time" != "null" ] && [ "$end_time" != "null" ]; then
duration=$((($end_time - $start_time) / 1000))
echo "⏰ Duración total: ${duration} segundos"
fi
exit 0
else
echo "❌ Workflow terminó con errores: $result_state"
echo "📋 Detalles de las tareas:"
echo "$run_status" | jq -r '.tasks[]? | "❌ " + .task_key + ": " + (.state.result_state // "UNKNOWN")'
exit 1
fi
;;
"INTERNAL_ERROR"|"SKIPPED")
echo "❌ Workflow falló con estado: $state"
exit 1
;;
*)
# Estados: PENDING, RUNNING, TERMINATING
echo "⏳ Workflow aún ejecutándose..."
;;
esac
sleep $check_interval
wait_time=$((wait_time + check_interval))
done
echo "⚠️ Timeout: El workflow aún se está ejecutando después de $max_wait_time segundos"
echo "🔗 Verifica el estado en: $DEST_HOST/jobs/$JOB_ID/runs/$RUN_ID"
echo "ℹ️ El workflow seguirá ejecutándose en Databricks"
exit 0
- name: Clean up
run: |
rm -rf notebooks_to_deploy
rm -f workflow_config.json
- name: Done
run: |
echo "🎉 ¡Despliegue y ejecución completados exitosamente!"
echo ""
echo "📊 Resumen:"
echo "✅ Notebooks desplegados: ntbk_1, ntbk_2"
echo "✅ Workflow creado: WF_ADB"
echo "✅ Tareas configuradas:"
echo " - Preparacion_Ambiente"
echo " - Ingest_supercias_compania"
echo " - Ingest_supercias_ranking"
echo " - Ingest_supercias_sector"
echo " - Ingest_supercias_segmento"
echo " - Transform_supercias"
echo " - Load_supercias"
echo "✅ Cluster existente: Cluster_Produccion configurado"
echo "🚀 Workflow ejecutado automáticamente"
echo ""
echo "🔗 Accede a tu workspace de Databricks para ver los resultados detallados"