From d0de1ebe9318d1a70747e7457593f00a150cdbd4 Mon Sep 17 00:00:00 2001 From: Alex <8507196+Fructokinase@users.noreply.github.com> Date: Fri, 3 Feb 2023 09:41:23 -0800 Subject: [PATCH] [Custom DC] Add a csv file trigger for controller runner PubSub (#181) * [Custom DC] Add a csv file trigger for controller runner PubSub * change controller trigger file from *.csv to process//trigger.txt * fix bugs * Fix import path * Fix type, improve comment --------- Co-authored-by: Alex Chen --- bigtable_automation/gcf/common.go | 50 +++++++- bigtable_automation/gcf/controller_trigger.go | 108 ++++++++++++++++++ bigtable_automation/gcf/custom_instance.go | 64 +++++++++-- bigtable_automation/gcf/go.mod | 1 + bigtable_automation/gcf/go.sum | 4 + bigtable_automation/terraform/main.tf | 4 + 6 files changed, 215 insertions(+), 16 deletions(-) create mode 100644 bigtable_automation/gcf/controller_trigger.go diff --git a/bigtable_automation/gcf/common.go b/bigtable_automation/gcf/common.go index d81d7157..f0882ff8 100644 --- a/bigtable_automation/gcf/common.go +++ b/bigtable_automation/gcf/common.go @@ -14,10 +14,10 @@ // Package gcf runs a GCF function that triggers in 2 scenarios: // -// 1) completion of prophet-flume job in borg. On triggering it sets up new -// cloud BT table, scales up BT cluster (if needed) and starts a dataflow job. -// 2) completion of BT cache ingestion dataflow job. It scales BT cluster down -// (if needed). +// 1. completion of prophet-flume job in borg. On triggering it sets up new +// cloud BT table, scales up BT cluster (if needed) and starts a dataflow job. +// 2. completion of BT cache ingestion dataflow job. It scales BT cluster down +// (if needed). // // There are two set of trigger functions defined: // - ProdBTImportController @@ -31,6 +31,7 @@ import ( "context" "fmt" "log" + "path/filepath" "strings" "time" @@ -42,6 +43,7 @@ import ( const ( createTableRetries = 3 columnFamily = "csv" + gsScheme = "gs://" ) // GCSEvent is the payload of a GCS event. @@ -128,3 +130,43 @@ func deleteBTTable(ctx context.Context, projectID, instance, table string) error } return adminClient.DeleteTable(ctx, table) } + +// ImportGCSPath holds GCS path info for custom dc v1. +// import name inside the pubsub message to trigger controller must match +// the import name in config.textproto (specified by customManifestPath). +// Ideally we should read import name from config.text proto directly, but +// since manifest protos are not public yet, we will use the folder name instead. +// +// Custom DC resource bucket MUST follow the following directory structure. +// ///config/config.textproto +// ///tmcf_csv/*.csv +// ///tmcf_csv/*.tmcf +// /// +type ImportGCSPath struct { + // GCS base path for a particular import. + // This should be under the resource bucket. + importName string +} + +func (p ImportGCSPath) ImportName() string { + return filepath.Base(p.importName) +} + +// ConfigPath must be /config/config.textproto +func (p ImportGCSPath) ConfigPath() string { + return filepath.Join(p.importName, "config", "config.textproto") +} + +// DataDirectory is the expected location for tmcf and csvs. +// It is expected that csv files are dropped off in this directory. +func (p ImportGCSPath) DataDirectory() string { + return filepath.Join(p.importName, "tmcf_csv") +} + +func (p ImportGCSPath) CacheDirectory() string { + return filepath.Join(p.importName, "cache") +} + +func (p ImportGCSPath) ControlDirectory() string { + return filepath.Join(p.importName, "control") +} diff --git a/bigtable_automation/gcf/controller_trigger.go b/bigtable_automation/gcf/controller_trigger.go new file mode 100644 index 00000000..132d973f --- /dev/null +++ b/bigtable_automation/gcf/controller_trigger.go @@ -0,0 +1,108 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package gcf + +import ( + "context" + "fmt" + "log" + "path/filepath" + "strings" + + "cloud.google.com/go/pubsub" + "github.com/pkg/errors" +) + +const ( + dcManifestPath = "/memfile/core_resolved_mcfs_memfile/core_resolved_mcfs.binarypb" + controllerTriggerFile = "trigger.txt" +) + +type PublishConfig struct { + // TopicName is the full PubSub topic name in the following format + // projects//topics/ + TopicName string +} + +func (p PublishConfig) ProjectID() string { + return strings.Split(p.TopicName, "/")[1] +} + +func (p PublishConfig) TopicID() string { + return strings.Split(p.TopicName, "/")[3] +} + +type CustomDCPubSubMsg struct { + importName string + dcManifestPath string + customManifestPath string + bigstoreDataDirectory string + bigstoreCacheDirectory string + bigstoreControlDirectory string +} + +func (s CustomDCPubSubMsg) String() string { + return fmt.Sprintf(`import_name=%s,\ +dc_manifest_path=%s,\ +custom_manifest_path=%s,\ +bigstore_data_directory=%s,\ +bigstore_cache_directory=%s,\ +bigstore_control_directory=%s,\ +`, + s.importName, + s.dcManifestPath, + s.customManifestPath, + s.bigstoreDataDirectory, + s.bigstoreCacheDirectory, + s.bigstoreControlDirectory, + ) +} + +// Publish publishes the current import config to a topic. +func (s CustomDCPubSubMsg) Publish(ctx context.Context, p PublishConfig) error { + client, err := pubsub.NewClient(ctx, p.ProjectID()) + if err != nil { + return err + } + defer client.Close() + + t := client.Topic(p.TopicID()) + res := t.Publish(ctx, &pubsub.Message{ + Data: []byte(s.String()), + }) + + id, err := res.Get(ctx) + if err != nil { + return errors.Wrap(err, "Failed to verify import notification msg was published") + } + log.Printf("PubSub import notification published with server generated msg id: %v\n", id) + return nil +} + +// triggerPath is ...//process//trigger.txt +// process folder is currently only used for control purposes for 1 process: trigger controller. +func TriggerController(ctx context.Context, p PublishConfig, triggerPath string) error { + importName := filepath.Dir(filepath.Dir(filepath.Dir(triggerPath))) + path := ImportGCSPath{importName: importName} + + msg := CustomDCPubSubMsg{ + importName: path.ImportName(), + dcManifestPath: dcManifestPath, + customManifestPath: path.ConfigPath(), + bigstoreDataDirectory: path.DataDirectory(), + bigstoreCacheDirectory: path.CacheDirectory(), + bigstoreControlDirectory: path.ControlDirectory(), + } + return msg.Publish(ctx, p) +} diff --git a/bigtable_automation/gcf/custom_instance.go b/bigtable_automation/gcf/custom_instance.go index f287cd65..fd3a066a 100644 --- a/bigtable_automation/gcf/custom_instance.go +++ b/bigtable_automation/gcf/custom_instance.go @@ -18,12 +18,14 @@ import ( "context" "log" "os" + "path/filepath" "strings" "github.com/pkg/errors" ) -func customInternal(ctx context.Context, e GCSEvent) error { +// blobName is assumed to be under the correct path in "control". +func handleBTCache(ctx context.Context, blobName string) error { projectID := os.Getenv("projectID") bucket := os.Getenv("bucket") instance := os.Getenv("instance") @@ -41,20 +43,22 @@ func customInternal(ctx context.Context, e GCSEvent) error { if dataflowTemplate == "" { return errors.New("dataflowTemplate is not set in environment") } + if bucket == "" { + return errors.New("bucket is not set in environment") + } + + parts := strings.Split(blobName, "/") + // Get table ID. // e.Name should is like "**///control//launched.txt" - parts := strings.Split(e.Name, "/") - idxControl := len(parts) - 3 idxTable := len(parts) - 2 - if parts[idxControl] != "control" { - log.Printf("Ignore irrelevant trigger from file %s", e.Name) - return nil - } tableID := parts[idxTable] + + idxControl := len(parts) - 3 rootFolder := "gs://" + bucket + "/" + strings.Join(parts[0:idxControl], "/") - if strings.HasSuffix(e.Name, initFile) { - log.Printf("[%s] State Init", e.Name) + if strings.HasSuffix(blobName, initFile) { + log.Printf("[%s] State Init", blobName) // Called when the state-machine is at Init. Logic below moves it to Launched state. launchedPath := joinURL(rootFolder, "control", tableID, launchedFile) exist, err := doesObjectExist(ctx, launchedPath) @@ -84,14 +88,50 @@ func customInternal(ctx context.Context, e GCSEvent) error { } return err } - log.Printf("[%s] State Launched", e.Name) - } else if strings.HasSuffix(e.Name, completedFile) { + log.Printf("[%s] State Launched", blobName) + } else if strings.HasSuffix(blobName, completedFile) { // TODO: else, notify Mixer to load the BT table. - log.Printf("[%s] Completed work", e.Name) + log.Printf("[%s] Completed work", blobName) } return nil } +func handleControllerTrigger(ctx context.Context, blobPath string) error { + controllerTriggerTopic := os.Getenv("controllerTriggerTopic") + bucket := os.Getenv("bucket") + if controllerTriggerTopic == "" { + return errors.New("controllerTriggerTopic is not set in environment") + } + if bucket == "" { + return errors.New("bucket is not set in environment") + } + + bigstoreCSVPath := filepath.Join("/bigstore", bucket, blobPath) + log.Printf("Using PubSub topic: %s", controllerTriggerTopic) + pcfg := PublishConfig{TopicName: controllerTriggerTopic} + return TriggerController(ctx, pcfg, bigstoreCSVPath) +} + +// TODO(alex): refactor path -> event handler logic. +func customInternal(ctx context.Context, e GCSEvent) error { + parts := strings.Split(e.Name, "/") + idxControlOrProcess := len(parts) - 3 + if len(parts) < 3 { + log.Printf("Expected 3+ '/'-separated parts, got %s", e.Name) + log.Println("Ignoring as irrelevant file") + return nil + } + + if parts[idxControlOrProcess] == "control" { + return handleBTCache(ctx, e.Name) + } else if parts[idxControlOrProcess] == "process" && strings.HasSuffix(e.Name, controllerTriggerFile) { + return handleControllerTrigger(ctx, e.Name) + } else { + log.Printf("Ignore irrelevant trigger from file %s", e.Name) + return nil + } +} + // CustomBTImportController consumes a GCS event and runs an import state machine. func CustomBTImportController(ctx context.Context, e GCSEvent) error { err := customInternal(ctx, e) diff --git a/bigtable_automation/gcf/go.mod b/bigtable_automation/gcf/go.mod index 29b052c4..f47725d5 100644 --- a/bigtable_automation/gcf/go.mod +++ b/bigtable_automation/gcf/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( cloud.google.com/go/bigtable v1.13.0 + cloud.google.com/go/pubsub v1.28.0 cloud.google.com/go/storage v1.27.0 github.com/GoogleCloudPlatform/functions-framework-go v1.6.1 github.com/pkg/errors v0.9.1 diff --git a/bigtable_automation/gcf/go.sum b/bigtable_automation/gcf/go.sum index 610b9600..a8f1f7cb 100644 --- a/bigtable_automation/gcf/go.sum +++ b/bigtable_automation/gcf/go.sum @@ -278,6 +278,8 @@ cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIA cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= cloud.google.com/go/pubsub v1.26.0/go.mod h1:QgBH3U/jdJy/ftjPhTkyXNj543Tin1pRYcdcPRnFIRI= cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0= +cloud.google.com/go/pubsub v1.28.0 h1:XzabfdPx/+eNrsVVGLFgeUnQQKPGkMb8klRCeYK52is= +cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8= cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg= cloud.google.com/go/recaptchaenterprise v1.3.1/go.mod h1:OdD+q+y4XGeAlxRaMn1Y7/GveP6zmq76byL6tjPE7d4= cloud.google.com/go/recaptchaenterprise/v2 v2.1.0/go.mod h1:w9yVqajwroDNTfGuhmOjPDN//rZGySaf6PtFVcSCa7o= @@ -725,6 +727,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1044,6 +1047,7 @@ google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+ google.golang.org/genproto v0.0.0-20221117204609-8f9c96812029/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= +google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE= google.golang.org/genproto v0.0.0-20221206210731-b1a01be3a5f6 h1:AGXp12e/9rItf6/4QymU7WsAUwCf+ICW75cuR91nJIc= google.golang.org/genproto v0.0.0-20221206210731-b1a01be3a5f6/go.mod h1:1dOng4TWOomJrDGhpXjfCD35wQC6jnC7HpRmOFRqEV0= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/bigtable_automation/terraform/main.tf b/bigtable_automation/terraform/main.tf index 065bde59..4a3e1600 100644 --- a/bigtable_automation/terraform/main.tf +++ b/bigtable_automation/terraform/main.tf @@ -93,6 +93,10 @@ resource "google_cloudfunctions_function" "bt_automation" { dataflowTemplate = var.csv2bt_template_path tempLocation = format("gs://%s/dataflow/tmp", var.dc_resource_bucket) + + bucket = var.dc_resource_bucket + controllerTriggerTopic = "projects/datcom-204919/topics/private-import-notification-prod" + } depends_on = [