From 9dfb3db18b0f292df1257e2b1dc48fa04b8a62aa Mon Sep 17 00:00:00 2001 From: Wesley Merkel Date: Wed, 9 Oct 2024 14:15:10 -0600 Subject: [PATCH 1/5] feat: add Azure Data Lake Gen2 Output --- go.mod | 3 +- go.sum | 6 +- internal/impl/azure/auth.go | 97 ++++++++++++++++++ internal/impl/azure/output_adls.go | 158 +++++++++++++++++++++++++++++ internal/plugins/info.csv | 1 + 5 files changed, 262 insertions(+), 3 deletions(-) create mode 100644 internal/impl/azure/output_adls.go diff --git a/go.mod b/go.mod index e2246163ec..7d7fed29b2 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,8 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3 github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0 - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 + github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 github.com/Azure/go-amqp v1.0.5 github.com/ClickHouse/clickhouse-go/v2 v2.27.1 diff --git a/go.sum b/go.sum index cf40c45181..a092a2cc97 100644 --- a/go.sum +++ b/go.sum @@ -87,8 +87,10 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xP github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 h1:Be6KInmFEKV81c0pOAEbRYehLMwmmGI1exuFj248AMk= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0/go.mod h1:WCPBHsOXfBVnivScjs2ypRfimjEW0qPVLGgJkZlrIOA= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek= +github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.1 h1:qhYAuEKEz8jDGV1Tyf3Gz44ppGZP53xJGi4qdH2PLsc= +github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.1/go.mod h1:8QbU31D+n+SDJDstJXJWnOBKAgv8ZTz/x7su5KQ31Kk= github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 h1:lJwNFV+xYjHREUTHJKx/ZF6CJSt9znxmLw9DqSTvyRU= github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0/go.mod h1:GfT0aGew8Qj5yiQVqOO5v7N8fanbJGyUoHqXg56qcVY= github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= diff --git a/internal/impl/azure/auth.go b/internal/impl/azure/auth.go index a46557b631..437d8cd46e 100644 --- a/internal/impl/azure/auth.go +++ b/internal/impl/azure/auth.go @@ -17,6 +17,8 @@ package azure import ( "errors" "fmt" + "log" + "net/url" "os" "strings" @@ -25,6 +27,8 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/data/aztables" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake" + dlservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service" "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" ) @@ -80,8 +84,90 @@ func blobStorageClientFromParsed(pConf *service.ParsedConfig, container *service return getBlobStorageClient(connectionString, storageAccount, storageAccessKey, storageSASToken, container) } +func adlsClientFromParsed(pConf *service.ParsedConfig, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) { + connectionString, err := pConf.FieldString(bscFieldStorageConnectionString) + if err != nil { + return nil, false, err + } + storageAccount, err := pConf.FieldString(bscFieldStorageAccount) + if err != nil { + return nil, false, err + } + storageAccessKey, err := pConf.FieldString(bscFieldStorageAccessKey) + if err != nil { + return nil, false, err + } + storageSASToken, err := pConf.FieldString(bscFieldStorageSASToken) + if err != nil { + return nil, false, err + } + if storageAccount == "" && connectionString == "" { + return nil, false, errors.New("invalid azure storage account credentials") + } + return getADLSClient(connectionString, storageAccount, storageAccessKey, storageSASToken, fsName) +} + +func getADLSClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken string, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) { + if storageConnectionString != "" { + storageConnectionString := parseStorageConnectionString(storageConnectionString, storageAccount) + client, err := dlservice.NewClientFromConnectionString(storageConnectionString, nil) + if err != nil { + return nil, false, fmt.Errorf("creating new ADLS file client from connection string: %w", err) + } + return client, false, nil + } + + serviceURL := fmt.Sprintf(dfsEndpointExpr, storageAccount) + + if storageAccessKey != "" { + cred, err := azdatalake.NewSharedKeyCredential(storageAccount, storageAccessKey) + if err != nil { + return nil, false, fmt.Errorf("creating new shared key credential: %w", err) + } + client, err := dlservice.NewClientWithSharedKeyCredential(serviceURL, cred, nil) + if err != nil { + return nil, false, fmt.Errorf("creating new client from shared key credential: %w", err) + } + return client, false, nil + } + + if storageSASToken != "" { + var isFilesystemSASToken bool + if isServiceSASToken(storageSASToken) { + // container/filesystem scoped SAS token + isFilesystemSASToken = true + fsNameStr, err := fsName.TryString(service.NewMessage([]byte(""))) + if err != nil { + return nil, false, fmt.Errorf("interpolating filesystem name: %w", err) + } + serviceURL = fmt.Sprintf("%s/%s?%s", serviceURL, fsNameStr, storageSASToken) + } else { + // storage account SAS token + serviceURL = fmt.Sprintf("%s?%s", serviceURL, storageSASToken) + log.Println("serviceURL:", serviceURL) + } + client, err := dlservice.NewClientWithNoCredential(serviceURL, nil) + if err != nil { + return nil, false, fmt.Errorf("creating client with no credentials: %w", err) + } + return client, isFilesystemSASToken, nil + } + + // default credentials + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, false, fmt.Errorf("getting default Azure credentials: %w", err) + } + client, err := dlservice.NewClient(serviceURL, cred, nil) + if err != nil { + return nil, false, fmt.Errorf("creating client from default credentials: %w", err) + } + return client, false, err +} + const ( blobEndpointExp = "https://%s.blob.core.windows.net" + dfsEndpointExpr = "https://%s.dfs.core.windows.net" ) func getBlobStorageClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken string, container *service.InterpolatedString) (*azblob.Client, bool, error) { @@ -290,3 +376,14 @@ func getTablesServiceClient(account, accessKey, connectionString, storageSASToke } return client, err } + +func isServiceSASToken(token string) bool { + query, err := url.ParseQuery(token) + if err != nil { + return false + } + // 2024-10-09: `sr` parameter is present and required in service SAS tokens, + // and is not valid in storage account SAS tokens + // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#specify-the-signed-resource-blob-storage-only + return query.Has("sr") +} diff --git a/internal/impl/azure/output_adls.go b/internal/impl/azure/output_adls.go new file mode 100644 index 0000000000..39d555d40b --- /dev/null +++ b/internal/impl/azure/output_adls.go @@ -0,0 +1,158 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package azure + +import ( + "context" + "fmt" + + dlservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service" + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + // Azure Data Lake Storage Output Fields + adlsFieldFilesystem = "filesystem" + adlsFieldPath = "path" +) + +type adlsConfig struct { + client *dlservice.Client + path *service.InterpolatedString + filesystem *service.InterpolatedString +} + +func init() { + err := service.RegisterOutput("azure_data_lake_gen2", adlsSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, mif int, err error) { + var pConf *adlsConfig + if pConf, err = adlsConfigFromParsed(conf); err != nil { + return + } + if mif, err = conf.FieldMaxInFlight(); err != nil { + return + } + if out, err = newAzureADLSWriter(pConf, mgr.Logger()); err != nil { + return + } + return + }) + if err != nil { + panic(err) + } +} + +func adlsConfigFromParsed(pConf *service.ParsedConfig) (*adlsConfig, error) { + var conf adlsConfig + var err error + conf.filesystem, err = pConf.FieldInterpolatedString(adlsFieldFilesystem) + if err != nil { + return nil, err + } + conf.path, err = pConf.FieldInterpolatedString(adlsFieldPath) + if err != nil { + return nil, err + } + var isFilesystemSASToken bool + conf.client, isFilesystemSASToken, err = adlsClientFromParsed(pConf, conf.filesystem) + if err != nil { + return nil, err + } + if isFilesystemSASToken { + conf.filesystem, _ = service.NewInterpolatedString("") + } + return &conf, nil +} + +func newAzureADLSWriter(conf *adlsConfig, log *service.Logger) (*azureADLSWriter, error) { + return &azureADLSWriter{ + conf: conf, + log: log, + }, nil +} + +type azureADLSWriter struct { + conf *adlsConfig + log *service.Logger +} + +func (a *azureADLSWriter) Connect(ctx context.Context) error { + return nil +} + +func (a *azureADLSWriter) Write(ctx context.Context, msg *service.Message) error { + fsName, err := a.conf.filesystem.TryString(msg) + if err != nil { + return fmt.Errorf("interpolating filesystem name: %w", err) + } + path, err := a.conf.path.TryString(msg) + if err != nil { + return fmt.Errorf("interpolating file path: %w", err) + } + mBytes, err := msg.AsBytes() + if err != nil { + return fmt.Errorf("reading message body: %w", err) + } + + fileClient := a.conf.client.NewFileSystemClient(fsName).NewFileClient(path) + _, err = fileClient.Create(ctx, nil) + if err != nil { + return fmt.Errorf("creating file: %w", err) + } + err = fileClient.UploadBuffer(ctx, mBytes, nil) + if err != nil { + return fmt.Errorf("uploading message body: %w", err) + } + return nil +} + +func (a *azureADLSWriter) Close(ctx context.Context) error { + return nil +} + +func adlsSpec() *service.ConfigSpec { + return azureComponentSpec(true). + Beta(). + Version("4.37.0"). + Summary(`Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `+"`"+adlsFieldPath+"`"+` field.`). + Description(` +In order to have a different path for each file you should use function +interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here], which are +calculated per message of a batch. + +Supports multiple authentication methods but only one of the following is required: + +- `+"`storage_connection_string`"+` +- `+"`storage_account` and `storage_access_key`"+` +- `+"`storage_account` and `storage_sas_token`"+` +- `+"`storage_account` to access via https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential[DefaultAzureCredential^]"+` + +If multiple are set then the `+"`storage_connection_string`"+` is given priority. + +If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+` parameter, please specify it in the +`+"`storage_account`"+` field.`+service.OutputPerformanceDocs(true, false)). + Fields( + service.NewInterpolatedStringField(adlsFieldFilesystem). + Description("The ADLS filesystem name for uploading the messages to."). + Example(`messages-${!timestamp("2006")}`), + service.NewInterpolatedStringField(adlsFieldPath). + Description("The path of each message to upload within the filesystem."). + Example(`${!count("files")}-${!timestamp_unix_nano()}.json`). + Example(`${!meta("kafka_key")}.json`). + Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`). + Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`), + service.NewOutputMaxInFlightField(), + ) +} diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 4d20e58615..16f77cc00b 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -28,6 +28,7 @@ azure_blob_storage ,output ,azure_blob_storage ,3.36.0 ,certif azure_cosmosdb ,input ,azure_cosmosdb ,4.25.0 ,certified ,n ,y ,y azure_cosmosdb ,output ,azure_cosmosdb ,4.25.0 ,certified ,n ,y ,y azure_cosmosdb ,processor ,azure_cosmosdb ,4.25.0 ,certified ,n ,y ,y +azure_data_lake_gen2 ,output ,azure_data_lake_gen2 ,4.37.0 ,certified ,n ,n ,n azure_queue_storage ,input ,azure_queue_storage ,3.42.0 ,certified ,n ,y ,y azure_queue_storage ,output ,azure_queue_storage ,3.36.0 ,certified ,n ,y ,y azure_table_storage ,input ,azure_table_storage ,4.10.0 ,certified ,n ,y ,y From 88a17c97ad7e5c0035b9648674d1634845d75d4a Mon Sep 17 00:00:00 2001 From: Wesley Merkel Date: Wed, 9 Oct 2024 14:39:38 -0600 Subject: [PATCH 2/5] fix: update docs for azure_data_lake_gen2 output --- .../pages/outputs/azure_data_lake_gen2.adoc | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc diff --git a/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc b/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc new file mode 100644 index 0000000000..46dfec1fca --- /dev/null +++ b/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc @@ -0,0 +1,146 @@ += azure_data_lake_gen2 +:type: output +:status: beta +:categories: ["Services","Azure"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `path` field. + +Introduced in version 4.37.0. + +```yml +# Config fields, showing default values +output: + label: "" + azure_data_lake_gen2: + storage_account: "" + storage_access_key: "" + storage_connection_string: "" + storage_sas_token: "" + filesystem: messages-${!timestamp("2006")} # No default (required) + path: ${!count("files")}-${!timestamp_unix_nano()}.txt + max_in_flight: 64 +``` + +In order to have a different path for each file you should use function +interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here], which are +calculated per message of a batch. + +Supports multiple authentication methods but only one of the following is required: + +- `storage_connection_string` +- `storage_account` and `storage_access_key` +- `storage_account` and `storage_sas_token` +- `storage_account` to access via https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential[DefaultAzureCredential^] + +If multiple are set then the `storage_connection_string` is given priority. + +If the `storage_connection_string` does not contain the `AccountName` parameter, please specify it in the +`storage_account` field. + +== Performance + +This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. + +== Fields + +=== `storage_account` + +The storage account to access. This field is ignored if `storage_connection_string` is set. + + +*Type*: `string` + +*Default*: `""` + +=== `storage_access_key` + +The storage account access key. This field is ignored if `storage_connection_string` is set. + + +*Type*: `string` + +*Default*: `""` + +=== `storage_connection_string` + +A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set. + + +*Type*: `string` + +*Default*: `""` + +=== `storage_sas_token` + +The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set. + + +*Type*: `string` + +*Default*: `""` + +=== `filesystem` + +The ADLS filesystem name for uploading the messages to. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +```yml +# Examples + +filesystem: messages-${!timestamp("2006")} +``` + +=== `path` + +The path of each message to upload within the filesystem. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + +*Default*: `"${!count(\"files\")}-${!timestamp_unix_nano()}.txt"` + +```yml +# Examples + +path: ${!count("files")}-${!timestamp_unix_nano()}.json + +path: ${!meta("kafka_key")}.json + +path: ${!json("doc.namespace")}/${!json("doc.id")}.json +``` + +=== `max_in_flight` + +The maximum number of messages to have in flight at a given time. Increase this to improve throughput. + + +*Type*: `int` + +*Default*: `64` + + From 80dd98e02cd286e718b2c66a683b118498917550 Mon Sep 17 00:00:00 2001 From: Wesley Merkel Date: Tue, 15 Oct 2024 11:35:09 -0600 Subject: [PATCH 3/5] fix: small fixes to azure data lake output --- internal/impl/azure/auth.go | 2 - internal/impl/azure/output_adls.go | 71 +++++++++++++++--------------- internal/plugins/info.csv | 2 +- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/internal/impl/azure/auth.go b/internal/impl/azure/auth.go index 437d8cd46e..392f52b9fa 100644 --- a/internal/impl/azure/auth.go +++ b/internal/impl/azure/auth.go @@ -17,7 +17,6 @@ package azure import ( "errors" "fmt" - "log" "net/url" "os" "strings" @@ -144,7 +143,6 @@ func getADLSClient(storageConnectionString, storageAccount, storageAccessKey, st } else { // storage account SAS token serviceURL = fmt.Sprintf("%s?%s", serviceURL, storageSASToken) - log.Println("serviceURL:", serviceURL) } client, err := dlservice.NewClientWithNoCredential(serviceURL, nil) if err != nil { diff --git a/internal/impl/azure/output_adls.go b/internal/impl/azure/output_adls.go index 39d555d40b..ccee5fd871 100644 --- a/internal/impl/azure/output_adls.go +++ b/internal/impl/azure/output_adls.go @@ -22,6 +22,41 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" ) +func adlsSpec() *service.ConfigSpec { + return azureComponentSpec(true). + Beta(). + Version("4.38.0"). + Summary(`Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `+"`"+adlsFieldPath+"`"+` field.`). + Description(` +In order to have a different path for each file you should use function +interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here], which are +calculated per message of a batch. + +Supports multiple authentication methods but only one of the following is required: + +- `+"`storage_connection_string`"+` +- `+"`storage_account` and `storage_access_key`"+` +- `+"`storage_account` and `storage_sas_token`"+` +- `+"`storage_account` to access via https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential[DefaultAzureCredential^]"+` + +If multiple are set then the `+"`storage_connection_string`"+` is given priority. + +If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+` parameter, please specify it in the +`+"`storage_account`"+` field.`+service.OutputPerformanceDocs(true, false)). + Fields( + service.NewInterpolatedStringField(adlsFieldFilesystem). + Description("The ADLS filesystem name for uploading the messages to."). + Example(`messages-${!timestamp("2006")}`), + service.NewInterpolatedStringField(adlsFieldPath). + Description("The path of each message to upload within the filesystem."). + Example(`${!count("files")}-${!timestamp_unix_nano()}.json`). + Example(`${!meta("kafka_key")}.json`). + Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`). + Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`), + service.NewOutputMaxInFlightField(), + ) +} + const ( // Azure Data Lake Storage Output Fields adlsFieldFilesystem = "filesystem" @@ -71,6 +106,7 @@ func adlsConfigFromParsed(pConf *service.ParsedConfig) (*adlsConfig, error) { return nil, err } if isFilesystemSASToken { + // if using a container SAS token, the container is already implicit conf.filesystem, _ = service.NewInterpolatedString("") } return &conf, nil @@ -121,38 +157,3 @@ func (a *azureADLSWriter) Write(ctx context.Context, msg *service.Message) error func (a *azureADLSWriter) Close(ctx context.Context) error { return nil } - -func adlsSpec() *service.ConfigSpec { - return azureComponentSpec(true). - Beta(). - Version("4.37.0"). - Summary(`Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `+"`"+adlsFieldPath+"`"+` field.`). - Description(` -In order to have a different path for each file you should use function -interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here], which are -calculated per message of a batch. - -Supports multiple authentication methods but only one of the following is required: - -- `+"`storage_connection_string`"+` -- `+"`storage_account` and `storage_access_key`"+` -- `+"`storage_account` and `storage_sas_token`"+` -- `+"`storage_account` to access via https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential[DefaultAzureCredential^]"+` - -If multiple are set then the `+"`storage_connection_string`"+` is given priority. - -If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+` parameter, please specify it in the -`+"`storage_account`"+` field.`+service.OutputPerformanceDocs(true, false)). - Fields( - service.NewInterpolatedStringField(adlsFieldFilesystem). - Description("The ADLS filesystem name for uploading the messages to."). - Example(`messages-${!timestamp("2006")}`), - service.NewInterpolatedStringField(adlsFieldPath). - Description("The path of each message to upload within the filesystem."). - Example(`${!count("files")}-${!timestamp_unix_nano()}.json`). - Example(`${!meta("kafka_key")}.json`). - Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`). - Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`), - service.NewOutputMaxInFlightField(), - ) -} diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 16f77cc00b..d10674896c 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -28,7 +28,7 @@ azure_blob_storage ,output ,azure_blob_storage ,3.36.0 ,certif azure_cosmosdb ,input ,azure_cosmosdb ,4.25.0 ,certified ,n ,y ,y azure_cosmosdb ,output ,azure_cosmosdb ,4.25.0 ,certified ,n ,y ,y azure_cosmosdb ,processor ,azure_cosmosdb ,4.25.0 ,certified ,n ,y ,y -azure_data_lake_gen2 ,output ,azure_data_lake_gen2 ,4.37.0 ,certified ,n ,n ,n +azure_data_lake_gen2 ,output ,azure_data_lake_gen2 ,4.38.0 ,certified ,n ,n ,n azure_queue_storage ,input ,azure_queue_storage ,3.42.0 ,certified ,n ,y ,y azure_queue_storage ,output ,azure_queue_storage ,3.36.0 ,certified ,n ,y ,y azure_table_storage ,input ,azure_table_storage ,4.10.0 ,certified ,n ,y ,y From d5cba932e75343035f259f516acfe775bd5c9a36 Mon Sep 17 00:00:00 2001 From: Wesley Merkel Date: Tue, 15 Oct 2024 11:42:03 -0600 Subject: [PATCH 4/5] fix: remove "ADLS" abbreviation It's not used very commonly, so this commit replaces uses of "ADLS" with DL/dataLake/etc instead. --- CHANGELOG.md | 1 + .../pages/outputs/azure_data_lake_gen2.adoc | 4 +- internal/impl/azure/auth.go | 8 ++-- .../{output_adls.go => output_data_lake.go} | 48 +++++++++---------- 4 files changed, 31 insertions(+), 30 deletions(-) rename internal/impl/azure/{output_adls.go => output_data_lake.go} (74%) diff --git a/CHANGELOG.md b/CHANGELOG.md index b974ac21fa..1e7e7ccbb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. - Field `checksum_algorithm` added to the `aws_s3` output. (@dom-lee-naimuri) - Field `nkey` added to `nats`, `nats_jetstream`, `nats_kv` and `nats_stream` components. (@ye11ow) - Field `private_key` added to the `snowflake_put` output. (@mihaitodor) +- New `azure_data_lake_gen2` output. (@ooesili) ### Fixed diff --git a/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc b/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc index 46dfec1fca..21e1f8461b 100644 --- a/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc +++ b/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc @@ -25,7 +25,7 @@ component_type_dropdown::[] Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `path` field. -Introduced in version 4.37.0. +Introduced in version 4.38.0. ```yml # Config fields, showing default values @@ -101,7 +101,7 @@ The storage account SAS token. This field is ignored if `storage_connection_stri === `filesystem` -The ADLS filesystem name for uploading the messages to. +The data lake storage filesystem name for uploading the messages to. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. diff --git a/internal/impl/azure/auth.go b/internal/impl/azure/auth.go index 392f52b9fa..d25b7ddf86 100644 --- a/internal/impl/azure/auth.go +++ b/internal/impl/azure/auth.go @@ -83,7 +83,7 @@ func blobStorageClientFromParsed(pConf *service.ParsedConfig, container *service return getBlobStorageClient(connectionString, storageAccount, storageAccessKey, storageSASToken, container) } -func adlsClientFromParsed(pConf *service.ParsedConfig, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) { +func dlClientFromParsed(pConf *service.ParsedConfig, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) { connectionString, err := pConf.FieldString(bscFieldStorageConnectionString) if err != nil { return nil, false, err @@ -103,15 +103,15 @@ func adlsClientFromParsed(pConf *service.ParsedConfig, fsName *service.Interpola if storageAccount == "" && connectionString == "" { return nil, false, errors.New("invalid azure storage account credentials") } - return getADLSClient(connectionString, storageAccount, storageAccessKey, storageSASToken, fsName) + return getDLClient(connectionString, storageAccount, storageAccessKey, storageSASToken, fsName) } -func getADLSClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken string, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) { +func getDLClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken string, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) { if storageConnectionString != "" { storageConnectionString := parseStorageConnectionString(storageConnectionString, storageAccount) client, err := dlservice.NewClientFromConnectionString(storageConnectionString, nil) if err != nil { - return nil, false, fmt.Errorf("creating new ADLS file client from connection string: %w", err) + return nil, false, fmt.Errorf("creating new data lake file client from connection string: %w", err) } return client, false, nil } diff --git a/internal/impl/azure/output_adls.go b/internal/impl/azure/output_data_lake.go similarity index 74% rename from internal/impl/azure/output_adls.go rename to internal/impl/azure/output_data_lake.go index ccee5fd871..8f7db9dee2 100644 --- a/internal/impl/azure/output_adls.go +++ b/internal/impl/azure/output_data_lake.go @@ -22,11 +22,11 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" ) -func adlsSpec() *service.ConfigSpec { +func dataLakeSpec() *service.ConfigSpec { return azureComponentSpec(true). Beta(). Version("4.38.0"). - Summary(`Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `+"`"+adlsFieldPath+"`"+` field.`). + Summary(`Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `+"`"+dloFieldPath+"`"+` field.`). Description(` In order to have a different path for each file you should use function interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here], which are @@ -44,10 +44,10 @@ If multiple are set then the `+"`storage_connection_string`"+` is given priority If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+` parameter, please specify it in the `+"`storage_account`"+` field.`+service.OutputPerformanceDocs(true, false)). Fields( - service.NewInterpolatedStringField(adlsFieldFilesystem). - Description("The ADLS filesystem name for uploading the messages to."). + service.NewInterpolatedStringField(dloFieldFilesystem). + Description("The data lake storage filesystem name for uploading the messages to."). Example(`messages-${!timestamp("2006")}`), - service.NewInterpolatedStringField(adlsFieldPath). + service.NewInterpolatedStringField(dloFieldPath). Description("The path of each message to upload within the filesystem."). Example(`${!count("files")}-${!timestamp_unix_nano()}.json`). Example(`${!meta("kafka_key")}.json`). @@ -59,27 +59,27 @@ If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+ const ( // Azure Data Lake Storage Output Fields - adlsFieldFilesystem = "filesystem" - adlsFieldPath = "path" + dloFieldFilesystem = "filesystem" + dloFieldPath = "path" ) -type adlsConfig struct { +type dloConfig struct { client *dlservice.Client path *service.InterpolatedString filesystem *service.InterpolatedString } func init() { - err := service.RegisterOutput("azure_data_lake_gen2", adlsSpec(), + err := service.RegisterOutput("azure_data_lake_gen2", dataLakeSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, mif int, err error) { - var pConf *adlsConfig - if pConf, err = adlsConfigFromParsed(conf); err != nil { + var pConf *dloConfig + if pConf, err = dloConfigFromParsed(conf); err != nil { return } if mif, err = conf.FieldMaxInFlight(); err != nil { return } - if out, err = newAzureADLSWriter(pConf, mgr.Logger()); err != nil { + if out, err = newAzureDataLakeWriter(pConf, mgr.Logger()); err != nil { return } return @@ -89,19 +89,19 @@ func init() { } } -func adlsConfigFromParsed(pConf *service.ParsedConfig) (*adlsConfig, error) { - var conf adlsConfig +func dloConfigFromParsed(pConf *service.ParsedConfig) (*dloConfig, error) { + var conf dloConfig var err error - conf.filesystem, err = pConf.FieldInterpolatedString(adlsFieldFilesystem) + conf.filesystem, err = pConf.FieldInterpolatedString(dloFieldFilesystem) if err != nil { return nil, err } - conf.path, err = pConf.FieldInterpolatedString(adlsFieldPath) + conf.path, err = pConf.FieldInterpolatedString(dloFieldPath) if err != nil { return nil, err } var isFilesystemSASToken bool - conf.client, isFilesystemSASToken, err = adlsClientFromParsed(pConf, conf.filesystem) + conf.client, isFilesystemSASToken, err = dlClientFromParsed(pConf, conf.filesystem) if err != nil { return nil, err } @@ -112,23 +112,23 @@ func adlsConfigFromParsed(pConf *service.ParsedConfig) (*adlsConfig, error) { return &conf, nil } -func newAzureADLSWriter(conf *adlsConfig, log *service.Logger) (*azureADLSWriter, error) { - return &azureADLSWriter{ +func newAzureDataLakeWriter(conf *dloConfig, log *service.Logger) (*azureDataLakeWriter, error) { + return &azureDataLakeWriter{ conf: conf, log: log, }, nil } -type azureADLSWriter struct { - conf *adlsConfig +type azureDataLakeWriter struct { + conf *dloConfig log *service.Logger } -func (a *azureADLSWriter) Connect(ctx context.Context) error { +func (a *azureDataLakeWriter) Connect(ctx context.Context) error { return nil } -func (a *azureADLSWriter) Write(ctx context.Context, msg *service.Message) error { +func (a *azureDataLakeWriter) Write(ctx context.Context, msg *service.Message) error { fsName, err := a.conf.filesystem.TryString(msg) if err != nil { return fmt.Errorf("interpolating filesystem name: %w", err) @@ -154,6 +154,6 @@ func (a *azureADLSWriter) Write(ctx context.Context, msg *service.Message) error return nil } -func (a *azureADLSWriter) Close(ctx context.Context) error { +func (a *azureDataLakeWriter) Close(ctx context.Context) error { return nil } From 53a60a5e6f07465a2914422a98bf9934b9097958 Mon Sep 17 00:00:00 2001 From: Wesley Merkel Date: Tue, 15 Oct 2024 12:46:34 -0600 Subject: [PATCH 5/5] fix: improve azure data lake output examples --- .../components/pages/outputs/azure_data_lake_gen2.adoc | 6 +++--- internal/impl/azure/output_data_lake.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc b/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc index 21e1f8461b..39511b4678 100644 --- a/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc +++ b/docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc @@ -37,7 +37,7 @@ output: storage_connection_string: "" storage_sas_token: "" filesystem: messages-${!timestamp("2006")} # No default (required) - path: ${!count("files")}-${!timestamp_unix_nano()}.txt + path: ${!counter()}-${!timestamp_unix_nano()}.txt max_in_flight: 64 ``` @@ -122,12 +122,12 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter *Type*: `string` -*Default*: `"${!count(\"files\")}-${!timestamp_unix_nano()}.txt"` +*Default*: `"${!counter()}-${!timestamp_unix_nano()}.txt"` ```yml # Examples -path: ${!count("files")}-${!timestamp_unix_nano()}.json +path: ${!counter()}-${!timestamp_unix_nano()}.json path: ${!meta("kafka_key")}.json diff --git a/internal/impl/azure/output_data_lake.go b/internal/impl/azure/output_data_lake.go index 8f7db9dee2..c6d24a68c6 100644 --- a/internal/impl/azure/output_data_lake.go +++ b/internal/impl/azure/output_data_lake.go @@ -49,10 +49,10 @@ If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+ Example(`messages-${!timestamp("2006")}`), service.NewInterpolatedStringField(dloFieldPath). Description("The path of each message to upload within the filesystem."). - Example(`${!count("files")}-${!timestamp_unix_nano()}.json`). + Example(`${!counter()}-${!timestamp_unix_nano()}.json`). Example(`${!meta("kafka_key")}.json`). Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`). - Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`), + Default(`${!counter()}-${!timestamp_unix_nano()}.txt`), service.NewOutputMaxInFlightField(), ) }