Skip to content

Commit

Permalink
Merge pull request #3039 from redpanda-data/bigquery-parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Nov 29, 2024
2 parents 5a64fed + 64f5763 commit dbf3418
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 6 deletions.
10 changes: 9 additions & 1 deletion docs/modules/components/pages/outputs/gcp_bigquery.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ By default Redpanda Connect will use a shared credentials file when connecting t
== Format
This output currently supports only CSV and NEWLINE_DELIMITED_JSON formats. Learn more about how to use GCP BigQuery with them here:
This output currently supports only CSV, NEWLINE_DELIMITED_JSON and PARQUET, formats. Learn more about how to use GCP BigQuery with them here:
- https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json[`NEWLINE_DELIMITED_JSON`^]
- https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv[`CSV`^]
- https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet[`PARQUET`^]
Each message may contain multiple elements separated by newlines. For example a single message containing:
Expand All @@ -134,6 +135,12 @@ The same is true for the CSV format.
For the CSV format when the field `csv.header` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.
=== Parquet
For parquet, the data can be encoded using the `parquet_encode` processor and each message that is sent to the output must be a full parquet message.
== Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`.
Expand Down Expand Up @@ -188,6 +195,7 @@ The format of each incoming message.
Options:
`NEWLINE_DELIMITED_JSON`
, `CSV`
, `PARQUET`
.
=== `max_in_flight`
Expand Down
56 changes: 51 additions & 5 deletions internal/impl/gcp/output_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,11 @@ By default Redpanda Connect will use a shared credentials file when connecting t
== Format
This output currently supports only CSV and NEWLINE_DELIMITED_JSON formats. Learn more about how to use GCP BigQuery with them here:
This output currently supports only CSV, NEWLINE_DELIMITED_JSON and PARQUET, formats. Learn more about how to use GCP BigQuery with them here:
- ` + "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json[`NEWLINE_DELIMITED_JSON`^]" + `
- ` + "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv[`CSV`^]" + `
- ` + "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet[`PARQUET`^]" + `
Each message may contain multiple elements separated by newlines. For example a single message containing:
Expand All @@ -184,12 +185,18 @@ The same is true for the CSV format.
=== CSV
For the CSV format when the field ` + "`csv.header`" + ` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.` + service.OutputPerformanceDocs(true, true)).
For the CSV format when the field ` + "`csv.header`" + ` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.
=== Parquet
For parquet, the data can be encoded using the ` + "`parquet_encode`" + ` processor and each message that is sent to the output must be a full parquet message.
` + service.OutputPerformanceDocs(true, true)).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")).
Field(service.NewStringField("job_project").Description("The project ID in which jobs will be exectuted. If not set, project will be used.").Default("")).
Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")).
Field(service.NewStringField("table").Description("The table to insert messages to.")).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV), string(bigquery.Parquet)).
Description("The format of each incoming message.").
Default(string(bigquery.JSON))).
Field(service.NewIntField("max_in_flight").
Expand Down Expand Up @@ -276,7 +283,9 @@ type gcpBigQueryOutput struct {

fieldDelimiterBytes []byte
csvHeaderBytes []byte
newLineBytes []byte
// if nil, then this is a format that we expect to be created upstream in a processor and each
// message is a file that needs to be loaded.
newLineBytes []byte

log *service.Logger
}
Expand All @@ -289,7 +298,9 @@ func newGCPBigQueryOutput(
conf: conf,
log: log,
}

if conf.Format == string(bigquery.Parquet) {
return g, nil
}
g.newLineBytes = []byte("\n")
if conf.Format != string(bigquery.CSV) {
return g, nil
Expand Down Expand Up @@ -386,6 +397,41 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag
return service.ErrNotConnected
}

if g.newLineBytes == nil {
var batchErr *service.BatchError
setErr := func(idx int, err error) {
if batchErr == nil {
batchErr = service.NewBatchError(batch, err)
}
batchErr = batchErr.Failed(idx, err)
}
jobs := map[int]*bigquery.Job{}
for idx, msg := range batch {
msgBytes, err := msg.AsBytes()
if err != nil {
setErr(idx, err)
continue
}
job, err := g.createTableLoader(&msgBytes).Run(ctx)
if err != nil {
setErr(idx, err)
continue
}
jobs[idx] = job
}
for idx, job := range jobs {
status, err := job.Wait(ctx)
if err == nil {
setErr(idx, fmt.Errorf("error while waiting on bigquery job: %w", err))
continue
}
if err = errorFromStatus(status); err != nil {
setErr(idx, err)
}
}
return batchErr
}

var data bytes.Buffer

if g.csvHeaderBytes != nil {
Expand Down

0 comments on commit dbf3418

Please sign in to comment.