Skip to content

Commit

Permalink
Merge pull request #2927 from redpanda-data/mihaitodor-add-private-ke…
Browse files Browse the repository at this point in the history
…y-field-snowflake-put

Add `private_key` field to `snowflake_put` output
  • Loading branch information
Jeffail authored Oct 10, 2024
2 parents 5b829dc + 147d7da commit 126b388
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
### Added

- Field `checksum_algorithm` added to the `aws_s3` output. (@dom-lee-naimuri)
- Field `private_key` added to the `snowflake_put` output. (@mihaitodor)

### Fixed

Expand Down
19 changes: 17 additions & 2 deletions docs/modules/components/pages/outputs/snowflake_put.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ output:
cloud: aws # No default (optional)
user: "" # No default (required)
password: "" # No default (optional)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
role: "" # No default (required)
Expand Down Expand Up @@ -80,6 +81,7 @@ output:
cloud: aws # No default (optional)
user: "" # No default (required)
password: "" # No default (optional)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
role: "" # No default (required)
Expand Down Expand Up @@ -514,12 +516,25 @@ This field contains sensitive information that usually shouldn't be added to a c
*Type*: `string`
=== `private_key`
The private SSH key. `private_key_pass` is required when using encrypted keys.
[CAUTION]
====
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
====
*Type*: `string`
=== `private_key_file`
The path to a file containing the private SSH key.
The path to a file containing the private SSH key. `private_key_pass` is required when using encrypted keys.
*Type*: `string`
Expand Down Expand Up @@ -669,7 +684,7 @@ Requires version v4.12.0 or newer
=== `snowpipe`
An optional Snowpipe name. Use the `<snowpipe>` part from `<database>.<schema>.<snowpipe>`.
An optional Snowpipe name. Use the `<snowpipe>` part from `<database>.<schema>.<snowpipe>`. `private_key` or `private_key_file` must be set when using this feature.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
Expand Down
45 changes: 31 additions & 14 deletions internal/impl/snowflake/output_snowflake_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ and it must be set to the `+"`<cloud>`"+` part of the Account Identifier
`).Example("aws").Example("gcp").Example("azure").Optional()).
Field(service.NewStringField("user").Description("Username.")).
Field(service.NewStringField("password").Description("An optional password.").Optional().Secret()).
Field(service.NewStringField("private_key_file").Description("The path to a file containing the private SSH key.").Optional()).
Field(service.NewStringField("private_key").Description("The private SSH key. `private_key_pass` is required when using encrypted keys.").Optional().Secret()).
Field(service.NewStringField("private_key_file").Description("The path to a file containing the private SSH key. `private_key_pass` is required when using encrypted keys.").Optional()).
Field(service.NewStringField("private_key_pass").Description("An optional private SSH key passphrase.").Optional().Secret()).
Field(service.NewStringField("role").Description("Role.")).
Field(service.NewStringField("database").Description("Database.")).
Expand All @@ -229,13 +230,14 @@ and it must be set to the `+"`<cloud>`"+` part of the Account Identifier
string(CompressionTypeZstandard): "Messages must be pre-compressed using the Zstandard algorithm. Default `file_extension`: `zst`.",
}).Description("Compression type.").Default(string(CompressionTypeAuto))).
Field(service.NewInterpolatedStringField("request_id").Description("Request ID. Will be assigned a random UUID (v4) string if not set or empty.").Optional().Default("").Version("v4.12.0")).
Field(service.NewInterpolatedStringField("snowpipe").Description(`An optional Snowpipe name. Use the `+"`<snowpipe>`"+` part from `+"`<database>.<schema>.<snowpipe>`"+`.`).Optional()).
Field(service.NewInterpolatedStringField("snowpipe").Description("An optional Snowpipe name. Use the `<snowpipe>` part from `<database>.<schema>.<snowpipe>`. `private_key` or `private_key_file` must be set when using this feature.").Optional()).
Field(service.NewBoolField("client_session_keep_alive").Description("Enable Snowflake keepalive mechanism to prevent the client session from expiring after 4 hours (error 390114).").Advanced().Default(false)).
Field(service.NewBatchPolicyField("batching")).
Field(service.NewIntField("max_in_flight").Description("The maximum number of parallel message batches to have in flight at any given time.").Default(1)).
LintRule(`root = match {
this.exists("password") && this.password != "" && this.exists("private_key_file") && this.private_key_file != "" => [ "both `+"`password`"+` and `+"`private_key_file`"+` can't be set simultaneously" ],
this.exists("snowpipe") && this.snowpipe != "" && (!this.exists("private_key_file") || this.private_key_file == "") => [ "`+"`private_key_file`"+` is required when setting `+"`snowpipe`"+`" ],
(!this.exists("password") || this.password == "") && (!this.exists("private_key") || this.private_key == "") && (!this.exists("private_key_file") || this.private_key_file == "") => [ "either `+"`password`"+` or `+"`private_key`"+` or `+"`private_key_file`"+` must be set" ],
this.exists("password") && this.password != "" && (this.exists("private_key") && this.private_key != "" || this.exists("private_key_file") && this.private_key_file != "") => [ "only one of `+"`password`"+`, `+"`private_key`"+` and `+"`private_key_file`"+` can be set" ],
this.exists("snowpipe") && this.snowpipe != "" && !((this.exists("private_key") && this.private_key != "") || (this.exists("private_key_file") && this.private_key_file != "")) => [ "either `+"`private_key`"+` or `+"`private_key_file`"+` must be set when using `+"`snowpipe`"+`" ],
}`).
Example("Kafka / realtime brokers", "Upload message batches from realtime brokers such as Kafka persisting the batch partition and offsets in the stage path and filename similarly to the https://docs.snowflake.com/en/user-guide/kafka-connector-ts.html#step-1-view-the-copy-history-for-the-table[Kafka Connector scheme^] and call Snowpipe to load them into a table. When batching is configured at the input level, it is done per-partition.", `
input:
Expand Down Expand Up @@ -415,25 +417,28 @@ func init() {

//------------------------------------------------------------------------------

// getPrivateKey reads and parses the private key
// getPrivateKeyFromFile reads and parses the private key
// Inspired from https://github.com/chanzuckerberg/terraform-provider-snowflake/blob/c07d5820bea7ac3d8a5037b0486c405fdf58420e/pkg/provider/provider.go#L367
func getPrivateKey(f fs.FS, path, passphrase string) (*rsa.PrivateKey, error) {
func getPrivateKeyFromFile(f fs.FS, path, passphrase string) (*rsa.PrivateKey, error) {
privateKeyBytes, err := service.ReadFile(f, path)
if err != nil {
return nil, fmt.Errorf("failed to read private key %s: %s", path, err)
}
if len(privateKeyBytes) == 0 {
return nil, errors.New("private key is empty")
}
return getPrivateKey(privateKeyBytes, passphrase)
}

func getPrivateKey(privateKeyBytes []byte, passphrase string) (*rsa.PrivateKey, error) {
privateKeyBlock, _ := pem.Decode(privateKeyBytes)
if privateKeyBlock == nil {
return nil, errors.New("could not parse private key, key is not in PEM format")
}

if privateKeyBlock.Type == "ENCRYPTED PRIVATE KEY" {
if passphrase == "" {
return nil, errors.New("private key requires a passphrase, but private_key_passphrase was not supplied")
return nil, errors.New("private key requires a passphrase, but private_key_pass was not supplied")
}

// Only keys encrypted with pbes2 http://oid-info.com/get/1.2.840.113549.1.5.13 are supported.
Expand Down Expand Up @@ -647,20 +652,32 @@ func newSnowflakeWriterFromConfig(conf *service.ParsedConfig, mgr *service.Resou

authenticator := gosnowflake.AuthTypeJwt
if password == "" {
var privateKeyFile string
if privateKeyFile, err = conf.FieldString("private_key_file"); err != nil {
return nil, fmt.Errorf("failed to parse private_key_file: %s", err)
}

var privateKeyPass string
if conf.Contains("private_key_pass") {
if privateKeyPass, err = conf.FieldString("private_key_pass"); err != nil {
return nil, fmt.Errorf("failed to parse private_key_pass: %s", err)
}
}

if s.privateKey, err = getPrivateKey(mgr.FS(), privateKeyFile, privateKeyPass); err != nil {
return nil, fmt.Errorf("failed to read private key: %s", err)
var privateKey string
if conf.Contains("private_key") {
if privateKey, err = conf.FieldString("private_key"); err != nil {
return nil, fmt.Errorf("failed to parse private_key: %s", err)
}
}
if privateKey != "" {
if s.privateKey, err = getPrivateKey([]byte(privateKey), privateKeyPass); err != nil {
return nil, fmt.Errorf("failed to read private key: %s", err)
}
} else {
var privateKeyFile string
if privateKeyFile, err = conf.FieldString("private_key_file"); err != nil {
return nil, fmt.Errorf("failed to parse private_key_file: %s", err)
}

if s.privateKey, err = getPrivateKeyFromFile(mgr.FS(), privateKeyFile, privateKeyPass); err != nil {
return nil, fmt.Errorf("failed to read private key: %s", err)
}
}

if s.publicKeyFingerprint, err = calculatePublicKeyFingerprint(s.privateKey); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/snowflake/output_snowflake_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ snowpipe: '` + tc.snowpipe + `'
privateKeyPath: "resources/ssh_keys/snowflake_rsa_key.p8",
stage: "@test_stage",
compression: "NONE",
errConfigContains: "failed to read private key: private key requires a passphrase, but private_key_passphrase was not supplied",
errConfigContains: "failed to read private key: private key requires a passphrase, but private_key_pass was not supplied",
},
{
name: "executes snowflake query without compression",
Expand Down

0 comments on commit 126b388

Please sign in to comment.