diff --git a/docs/modules/components/pages/outputs/gcp_bigquery.adoc b/docs/modules/components/pages/outputs/gcp_bigquery.adoc index f451d6200e..e97143a8e2 100644 --- a/docs/modules/components/pages/outputs/gcp_bigquery.adoc +++ b/docs/modules/components/pages/outputs/gcp_bigquery.adoc @@ -104,10 +104,11 @@ By default Redpanda Connect will use a shared credentials file when connecting t == Format -This output currently supports only CSV and NEWLINE_DELIMITED_JSON formats. Learn more about how to use GCP BigQuery with them here: +This output currently supports only CSV, NEWLINE_DELIMITED_JSON and PARQUET, formats. Learn more about how to use GCP BigQuery with them here: - https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json[`NEWLINE_DELIMITED_JSON`^] - https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv[`CSV`^] +- https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet[`PARQUET`^] Each message may contain multiple elements separated by newlines. For example a single message containing: @@ -134,6 +135,12 @@ The same is true for the CSV format. For the CSV format when the field `csv.header` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line. +=== Parquet + +For parquet, the data can be encoded using the `parquet_encode` processor and each message that is sent to the output must be a full parquet message. + + + == Performance This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. @@ -188,6 +195,7 @@ The format of each incoming message. Options: `NEWLINE_DELIMITED_JSON` , `CSV` +, `PARQUET` . === `max_in_flight` diff --git a/internal/impl/gcp/output_bigquery.go b/internal/impl/gcp/output_bigquery.go index da98f0efb8..6010e36918 100644 --- a/internal/impl/gcp/output_bigquery.go +++ b/internal/impl/gcp/output_bigquery.go @@ -156,10 +156,11 @@ By default Redpanda Connect will use a shared credentials file when connecting t == Format -This output currently supports only CSV and NEWLINE_DELIMITED_JSON formats. Learn more about how to use GCP BigQuery with them here: +This output currently supports only CSV, NEWLINE_DELIMITED_JSON and PARQUET, formats. Learn more about how to use GCP BigQuery with them here: - ` + "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json[`NEWLINE_DELIMITED_JSON`^]" + ` - ` + "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv[`CSV`^]" + ` +- ` + "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet[`PARQUET`^]" + ` Each message may contain multiple elements separated by newlines. For example a single message containing: @@ -184,12 +185,18 @@ The same is true for the CSV format. === CSV -For the CSV format when the field ` + "`csv.header`" + ` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.` + service.OutputPerformanceDocs(true, true)). +For the CSV format when the field ` + "`csv.header`" + ` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line. + +=== Parquet + +For parquet, the data can be encoded using the ` + "`parquet_encode`" + ` processor and each message that is sent to the output must be a full parquet message. + +` + service.OutputPerformanceDocs(true, true)). Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")). Field(service.NewStringField("job_project").Description("The project ID in which jobs will be exectuted. If not set, project will be used.").Default("")). Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")). Field(service.NewStringField("table").Description("The table to insert messages to.")). - Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)). + Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV), string(bigquery.Parquet)). Description("The format of each incoming message."). Default(string(bigquery.JSON))). Field(service.NewIntField("max_in_flight"). @@ -276,7 +283,9 @@ type gcpBigQueryOutput struct { fieldDelimiterBytes []byte csvHeaderBytes []byte - newLineBytes []byte + // if nil, then this is a format that we expect to be created upstream in a processor and each + // message is a file that needs to be loaded. + newLineBytes []byte log *service.Logger } @@ -289,7 +298,9 @@ func newGCPBigQueryOutput( conf: conf, log: log, } - + if conf.Format == string(bigquery.Parquet) { + return g, nil + } g.newLineBytes = []byte("\n") if conf.Format != string(bigquery.CSV) { return g, nil @@ -386,6 +397,41 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag return service.ErrNotConnected } + if g.newLineBytes == nil { + var batchErr *service.BatchError + setErr := func(idx int, err error) { + if batchErr == nil { + batchErr = service.NewBatchError(batch, err) + } + batchErr = batchErr.Failed(idx, err) + } + jobs := map[int]*bigquery.Job{} + for idx, msg := range batch { + msgBytes, err := msg.AsBytes() + if err != nil { + setErr(idx, err) + continue + } + job, err := g.createTableLoader(&msgBytes).Run(ctx) + if err != nil { + setErr(idx, err) + continue + } + jobs[idx] = job + } + for idx, job := range jobs { + status, err := job.Wait(ctx) + if err == nil { + setErr(idx, fmt.Errorf("error while waiting on bigquery job: %w", err)) + continue + } + if err = errorFromStatus(status); err != nil { + setErr(idx, err) + } + } + return batchErr + } + var data bytes.Buffer if g.csvHeaderBytes != nil {