From b6eec24e5e643621c4912a40e3a851684be403b2 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Tue, 10 Dec 2024 20:57:17 +0000 Subject: [PATCH] snowflake: add some totally wicked examples --- .../pages/outputs/snowflake_streaming.adoc | 56 ++++++++++++++++--- .../snowflake/output_snowflake_streaming.go | 15 +++-- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/docs/modules/components/pages/outputs/snowflake_streaming.adoc b/docs/modules/components/pages/outputs/snowflake_streaming.adoc index 41ba4ff581..79c94be4d2 100644 --- a/docs/modules/components/pages/outputs/snowflake_streaming.adoc +++ b/docs/modules/components/pages/outputs/snowflake_streaming.adoc @@ -42,9 +42,9 @@ output: account: ORG-ACCOUNT # No default (required) user: "" # No default (required) role: ACCOUNTADMIN # No default (required) - database: "" # No default (required) - schema: "" # No default (required) - table: "" # No default (required) + database: MY_DATABASE # No default (required) + schema: PUBLIC # No default (required) + table: MY_TABLE # No default (required) private_key: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) @@ -83,9 +83,9 @@ output: account: ORG-ACCOUNT # No default (required) user: "" # No default (required) role: ACCOUNTADMIN # No default (required) - database: "" # No default (required) - schema: "" # No default (required) - table: "" # No default (required) + database: MY_DATABASE # No default (required) + schema: PUBLIC # No default (required) + table: MY_TABLE # No default (required) private_key: "" # No default (optional) private_key_file: "" # No default (optional) private_key_pass: "" # No default (optional) @@ -113,9 +113,9 @@ output: check: "" processors: [] # No default (optional) max_in_flight: 4 - channel_prefix: "" # No default (optional) - channel_name: "" # No default (optional) - offset_token: "" # No default (optional) + channel_prefix: channel-${HOST} # No default (optional) + channel_name: partition-${!@kafka_partition} # No default (optional) + offset_token: offset-${!"%016X".format(@kafka_offset)} # No default (optional) ``` -- @@ -347,6 +347,12 @@ The Snowflake database to ingest data into. *Type*: `string` +```yml +# Examples + +database: MY_DATABASE +``` + === `schema` The Snowflake schema to ingest data into. @@ -355,6 +361,12 @@ The Snowflake schema to ingest data into. *Type*: `string` +```yml +# Examples + +schema: PUBLIC +``` + === `table` The Snowflake table to ingest data into. @@ -363,6 +375,12 @@ The Snowflake table to ingest data into. *Type*: `string` +```yml +# Examples + +table: MY_TABLE +``` + === `private_key` The PEM encoded private RSA key to use for authenticating with Snowflake. Either this or `private_key_file` must be specified. @@ -604,6 +622,12 @@ NOTE: There is a limit of 10,000 streams per table - if using more than 10k stre *Type*: `string` +```yml +# Examples + +channel_prefix: channel-${HOST} +``` + === `channel_name` The channel name to use. @@ -621,6 +645,12 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter *Type*: `string` +```yml +# Examples + +channel_name: partition-${!@kafka_partition} +``` + === `offset_token` The offset token to use for exactly once delivery of data in the pipeline. When data is sent on a channel, each message in a batch's offset token @@ -639,4 +669,12 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter *Type*: `string` +```yml +# Examples + +offset_token: offset-${!"%016X".format(@kafka_offset)} + +offset_token: postgres-${!@lsn} +``` + diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index 314300e423..923d2bf122 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -98,9 +98,9 @@ You can monitor the output batch size using the `+"`snowflake_compressed_output_ `).Example("ORG-ACCOUNT"), service.NewStringField(ssoFieldUser).Description("The user to run the Snowpipe Stream as. See https://docs.snowflake.com/en/user-guide/admin-user-management[Snowflake Documentation^] on how to create a user."), service.NewStringField(ssoFieldRole).Description("The role for the `user` field. The role must have the https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#required-access-privileges[required privileges^] to call the Snowpipe Streaming APIs. See https://docs.snowflake.com/en/user-guide/admin-user-management#user-roles[Snowflake Documentation^] for more information about roles.").Example("ACCOUNTADMIN"), - service.NewStringField(ssoFieldDB).Description("The Snowflake database to ingest data into."), - service.NewStringField(ssoFieldSchema).Description("The Snowflake schema to ingest data into."), - service.NewStringField(ssoFieldTable).Description("The Snowflake table to ingest data into."), + service.NewStringField(ssoFieldDB).Description("The Snowflake database to ingest data into.").Example("MY_DATABASE"), + service.NewStringField(ssoFieldSchema).Description("The Snowflake schema to ingest data into.").Example("PUBLIC"), + service.NewStringField(ssoFieldTable).Description("The Snowflake table to ingest data into.").Example("MY_TABLE"), service.NewStringField(ssoFieldKey).Description("The PEM encoded private RSA key to use for authenticating with Snowflake. Either this or `private_key_file` must be specified.").Optional().Secret(), service.NewStringField(ssoFieldKeyFile).Description("The file to load the private RSA key from. This should be a `.p8` PEM encoded file. Either this or `private_key` must be specified.").Optional(), service.NewStringField(ssoFieldKeyPass).Description("The RSA key passphrase if the RSA key is encrypted.").Optional().Secret(), @@ -138,7 +138,8 @@ This option is mutually exclusive with `+"`"+ssoFieldChannelName+"`"+`. NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support.`). Optional(). - Advanced(), + Advanced(). + Example(`channel-${HOST}`), service.NewInterpolatedStringField(ssoFieldChannelName). Description(`The channel name to use. Duplicate channel names will result in errors and prevent multiple instances of Redpanda Connect from writing at the same time. @@ -150,7 +151,8 @@ This option is mutually exclusive with `+"`"+ssoFieldChannelPrefix+"`"+`. NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support.`). Optional(). - Advanced(), + Advanced(). + Examples(`partition-${!@kafka_partition}`), service.NewInterpolatedStringField(ssoFieldOffsetToken). Description(`The offset token to use for exactly once delivery of data in the pipeline. When data is sent on a channel, each message in a batch's offset token is compared to the latest token for a channel. If the offset token is lexicographically less than the latest in the channel, it's assumed the message is a duplicate and @@ -163,7 +165,8 @@ NOTE: It's assumed that messages within a batch are in increasing order by offse For more information about offset tokens, see https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#offset-tokens[^Snowflake Documentation]`). Optional(). - Advanced(), + Advanced(). + Examples(`offset-${!"%016X".format(@kafka_offset)}`, `postgres-${!@lsn}`), ). LintRule(`root = match { this.exists("private_key") && this.exists("private_key_file") => [ "both `+"`private_key`"+` and `+"`private_key_file`"+` can't be set simultaneously" ],