Skip to content

Commit

Permalink
[Custom DC] Add a csv file trigger for controller runner PubSub (#181)
Browse files Browse the repository at this point in the history
* [Custom DC] Add a csv file trigger for controller runner PubSub

* change controller trigger file from *.csv to process/<import_name>/trigger.txt

* fix bugs

* Fix import path

* Fix type, improve comment

---------

Co-authored-by: Alex Chen <[email protected]>
  • Loading branch information
Fructokinase and Alex Chen authored Feb 3, 2023
1 parent d2d3fdd commit d0de1eb
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 16 deletions.
50 changes: 46 additions & 4 deletions bigtable_automation/gcf/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

// Package gcf runs a GCF function that triggers in 2 scenarios:
//
// 1) completion of prophet-flume job in borg. On triggering it sets up new
// cloud BT table, scales up BT cluster (if needed) and starts a dataflow job.
// 2) completion of BT cache ingestion dataflow job. It scales BT cluster down
// (if needed).
// 1. completion of prophet-flume job in borg. On triggering it sets up new
// cloud BT table, scales up BT cluster (if needed) and starts a dataflow job.
// 2. completion of BT cache ingestion dataflow job. It scales BT cluster down
// (if needed).
//
// There are two set of trigger functions defined:
// - ProdBTImportController
Expand All @@ -31,6 +31,7 @@ import (
"context"
"fmt"
"log"
"path/filepath"
"strings"
"time"

Expand All @@ -42,6 +43,7 @@ import (
const (
createTableRetries = 3
columnFamily = "csv"
gsScheme = "gs://"
)

// GCSEvent is the payload of a GCS event.
Expand Down Expand Up @@ -128,3 +130,43 @@ func deleteBTTable(ctx context.Context, projectID, instance, table string) error
}
return adminClient.DeleteTable(ctx, table)
}

// ImportGCSPath holds GCS path info for custom dc v1.
// import name inside the pubsub message to trigger controller must match
// the import name in config.textproto (specified by customManifestPath).
// Ideally we should read import name from config.text proto directly, but
// since manifest protos are not public yet, we will use the folder name instead.
//
// Custom DC resource bucket MUST follow the following directory structure.
// <resource bucket name>/<some path>/<import name>/config/config.textproto
// <resource bucket name>/<some path>/<import name>/tmcf_csv/*.csv
// <resource bucket name>/<some path>/<import name>/tmcf_csv/*.tmcf
// <resource bucket name>/<some path>/<import name>/<other folders like control, cache>
type ImportGCSPath struct {
// GCS base path for a particular import.
// This should be under the resource bucket.
importName string
}

func (p ImportGCSPath) ImportName() string {
return filepath.Base(p.importName)
}

// ConfigPath must be <base>/config/config.textproto
func (p ImportGCSPath) ConfigPath() string {
return filepath.Join(p.importName, "config", "config.textproto")
}

// DataDirectory is the expected location for tmcf and csvs.
// It is expected that csv files are dropped off in this directory.
func (p ImportGCSPath) DataDirectory() string {
return filepath.Join(p.importName, "tmcf_csv")
}

func (p ImportGCSPath) CacheDirectory() string {
return filepath.Join(p.importName, "cache")
}

func (p ImportGCSPath) ControlDirectory() string {
return filepath.Join(p.importName, "control")
}
108 changes: 108 additions & 0 deletions bigtable_automation/gcf/controller_trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcf

import (
"context"
"fmt"
"log"
"path/filepath"
"strings"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
)

const (
dcManifestPath = "/memfile/core_resolved_mcfs_memfile/core_resolved_mcfs.binarypb"
controllerTriggerFile = "trigger.txt"
)

type PublishConfig struct {
// TopicName is the full PubSub topic name in the following format
// projects/<project id>/topics/<topic id>
TopicName string
}

func (p PublishConfig) ProjectID() string {
return strings.Split(p.TopicName, "/")[1]
}

func (p PublishConfig) TopicID() string {
return strings.Split(p.TopicName, "/")[3]
}

type CustomDCPubSubMsg struct {
importName string
dcManifestPath string
customManifestPath string
bigstoreDataDirectory string
bigstoreCacheDirectory string
bigstoreControlDirectory string
}

func (s CustomDCPubSubMsg) String() string {
return fmt.Sprintf(`import_name=%s,\
dc_manifest_path=%s,\
custom_manifest_path=%s,\
bigstore_data_directory=%s,\
bigstore_cache_directory=%s,\
bigstore_control_directory=%s,\
`,
s.importName,
s.dcManifestPath,
s.customManifestPath,
s.bigstoreDataDirectory,
s.bigstoreCacheDirectory,
s.bigstoreControlDirectory,
)
}

// Publish publishes the current import config to a topic.
func (s CustomDCPubSubMsg) Publish(ctx context.Context, p PublishConfig) error {
client, err := pubsub.NewClient(ctx, p.ProjectID())
if err != nil {
return err
}
defer client.Close()

t := client.Topic(p.TopicID())
res := t.Publish(ctx, &pubsub.Message{
Data: []byte(s.String()),
})

id, err := res.Get(ctx)
if err != nil {
return errors.Wrap(err, "Failed to verify import notification msg was published")
}
log.Printf("PubSub import notification published with server generated msg id: %v\n", id)
return nil
}

// triggerPath is .../<import_name>/process/<import id>/trigger.txt
// process folder is currently only used for control purposes for 1 process: trigger controller.
func TriggerController(ctx context.Context, p PublishConfig, triggerPath string) error {
importName := filepath.Dir(filepath.Dir(filepath.Dir(triggerPath)))
path := ImportGCSPath{importName: importName}

msg := CustomDCPubSubMsg{
importName: path.ImportName(),
dcManifestPath: dcManifestPath,
customManifestPath: path.ConfigPath(),
bigstoreDataDirectory: path.DataDirectory(),
bigstoreCacheDirectory: path.CacheDirectory(),
bigstoreControlDirectory: path.ControlDirectory(),
}
return msg.Publish(ctx, p)
}
64 changes: 52 additions & 12 deletions bigtable_automation/gcf/custom_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"context"
"log"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"
)

func customInternal(ctx context.Context, e GCSEvent) error {
// blobName is assumed to be under the correct path in "control".
func handleBTCache(ctx context.Context, blobName string) error {
projectID := os.Getenv("projectID")
bucket := os.Getenv("bucket")
instance := os.Getenv("instance")
Expand All @@ -41,20 +43,22 @@ func customInternal(ctx context.Context, e GCSEvent) error {
if dataflowTemplate == "" {
return errors.New("dataflowTemplate is not set in environment")
}
if bucket == "" {
return errors.New("bucket is not set in environment")
}

parts := strings.Split(blobName, "/")

// Get table ID.
// e.Name should is like "**/<user>/<import>/control/<table_id>/launched.txt"
parts := strings.Split(e.Name, "/")
idxControl := len(parts) - 3
idxTable := len(parts) - 2
if parts[idxControl] != "control" {
log.Printf("Ignore irrelevant trigger from file %s", e.Name)
return nil
}
tableID := parts[idxTable]

idxControl := len(parts) - 3
rootFolder := "gs://" + bucket + "/" + strings.Join(parts[0:idxControl], "/")

if strings.HasSuffix(e.Name, initFile) {
log.Printf("[%s] State Init", e.Name)
if strings.HasSuffix(blobName, initFile) {
log.Printf("[%s] State Init", blobName)
// Called when the state-machine is at Init. Logic below moves it to Launched state.
launchedPath := joinURL(rootFolder, "control", tableID, launchedFile)
exist, err := doesObjectExist(ctx, launchedPath)
Expand Down Expand Up @@ -84,14 +88,50 @@ func customInternal(ctx context.Context, e GCSEvent) error {
}
return err
}
log.Printf("[%s] State Launched", e.Name)
} else if strings.HasSuffix(e.Name, completedFile) {
log.Printf("[%s] State Launched", blobName)
} else if strings.HasSuffix(blobName, completedFile) {
// TODO: else, notify Mixer to load the BT table.
log.Printf("[%s] Completed work", e.Name)
log.Printf("[%s] Completed work", blobName)
}
return nil
}

func handleControllerTrigger(ctx context.Context, blobPath string) error {
controllerTriggerTopic := os.Getenv("controllerTriggerTopic")
bucket := os.Getenv("bucket")
if controllerTriggerTopic == "" {
return errors.New("controllerTriggerTopic is not set in environment")
}
if bucket == "" {
return errors.New("bucket is not set in environment")
}

bigstoreCSVPath := filepath.Join("/bigstore", bucket, blobPath)
log.Printf("Using PubSub topic: %s", controllerTriggerTopic)
pcfg := PublishConfig{TopicName: controllerTriggerTopic}
return TriggerController(ctx, pcfg, bigstoreCSVPath)
}

// TODO(alex): refactor path -> event handler logic.
func customInternal(ctx context.Context, e GCSEvent) error {
parts := strings.Split(e.Name, "/")
idxControlOrProcess := len(parts) - 3
if len(parts) < 3 {
log.Printf("Expected 3+ '/'-separated parts, got %s", e.Name)
log.Println("Ignoring as irrelevant file")
return nil
}

if parts[idxControlOrProcess] == "control" {
return handleBTCache(ctx, e.Name)
} else if parts[idxControlOrProcess] == "process" && strings.HasSuffix(e.Name, controllerTriggerFile) {
return handleControllerTrigger(ctx, e.Name)
} else {
log.Printf("Ignore irrelevant trigger from file %s", e.Name)
return nil
}
}

// CustomBTImportController consumes a GCS event and runs an import state machine.
func CustomBTImportController(ctx context.Context, e GCSEvent) error {
err := customInternal(ctx, e)
Expand Down
1 change: 1 addition & 0 deletions bigtable_automation/gcf/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
cloud.google.com/go/bigtable v1.13.0
cloud.google.com/go/pubsub v1.28.0
cloud.google.com/go/storage v1.27.0
github.com/GoogleCloudPlatform/functions-framework-go v1.6.1
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 4 additions & 0 deletions bigtable_automation/gcf/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIA
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.26.0/go.mod h1:QgBH3U/jdJy/ftjPhTkyXNj543Tin1pRYcdcPRnFIRI=
cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0=
cloud.google.com/go/pubsub v1.28.0 h1:XzabfdPx/+eNrsVVGLFgeUnQQKPGkMb8klRCeYK52is=
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg=
cloud.google.com/go/recaptchaenterprise v1.3.1/go.mod h1:OdD+q+y4XGeAlxRaMn1Y7/GveP6zmq76byL6tjPE7d4=
cloud.google.com/go/recaptchaenterprise/v2 v2.1.0/go.mod h1:w9yVqajwroDNTfGuhmOjPDN//rZGySaf6PtFVcSCa7o=
Expand Down Expand Up @@ -725,6 +727,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1044,6 +1047,7 @@ google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+
google.golang.org/genproto v0.0.0-20221117204609-8f9c96812029/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg=
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg=
google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg=
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE=
google.golang.org/genproto v0.0.0-20221206210731-b1a01be3a5f6 h1:AGXp12e/9rItf6/4QymU7WsAUwCf+ICW75cuR91nJIc=
google.golang.org/genproto v0.0.0-20221206210731-b1a01be3a5f6/go.mod h1:1dOng4TWOomJrDGhpXjfCD35wQC6jnC7HpRmOFRqEV0=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
4 changes: 4 additions & 0 deletions bigtable_automation/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ resource "google_cloudfunctions_function" "bt_automation" {

dataflowTemplate = var.csv2bt_template_path
tempLocation = format("gs://%s/dataflow/tmp", var.dc_resource_bucket)

bucket = var.dc_resource_bucket
controllerTriggerTopic = "projects/datcom-204919/topics/private-import-notification-prod"

}

depends_on = [
Expand Down

0 comments on commit d0de1eb

Please sign in to comment.