Skip to content

Commit

Permalink
Merge pull request #2924 from redpanda-data/azure-data-lake-output
Browse files Browse the repository at this point in the history
feat: add Azure Data Lake Gen2 Output
  • Loading branch information
ooesili authored Oct 15, 2024
2 parents d610765 + 53a60a5 commit fa30dbb
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file.
- Field `checksum_algorithm` added to the `aws_s3` output. (@dom-lee-naimuri)
- Field `nkey` added to `nats`, `nats_jetstream`, `nats_kv` and `nats_stream` components. (@ye11ow)
- Field `private_key` added to the `snowflake_put` output. (@mihaitodor)
- New `azure_data_lake_gen2` output. (@ooesili)

### Fixed

Expand Down
146 changes: 146 additions & 0 deletions docs/modules/components/pages/outputs/azure_data_lake_gen2.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
= azure_data_lake_gen2
:type: output
:status: beta
:categories: ["Services","Azure"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
// © 2024 Redpanda Data Inc.
component_type_dropdown::[]
Sends message parts as files to an Azure Data Lake Gen2 filesystem. Each file is uploaded with the filename specified with the `path` field.
Introduced in version 4.38.0.
```yml
# Config fields, showing default values
output:
label: ""
azure_data_lake_gen2:
storage_account: ""
storage_access_key: ""
storage_connection_string: ""
storage_sas_token: ""
filesystem: messages-${!timestamp("2006")} # No default (required)
path: ${!counter()}-${!timestamp_unix_nano()}.txt
max_in_flight: 64
```
In order to have a different path for each file you should use function
interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here], which are
calculated per message of a batch.
Supports multiple authentication methods but only one of the following is required:
- `storage_connection_string`
- `storage_account` and `storage_access_key`
- `storage_account` and `storage_sas_token`
- `storage_account` to access via https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential[DefaultAzureCredential^]
If multiple are set then the `storage_connection_string` is given priority.
If the `storage_connection_string` does not contain the `AccountName` parameter, please specify it in the
`storage_account` field.
== Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`.
== Fields
=== `storage_account`
The storage account to access. This field is ignored if `storage_connection_string` is set.
*Type*: `string`
*Default*: `""`
=== `storage_access_key`
The storage account access key. This field is ignored if `storage_connection_string` is set.
*Type*: `string`
*Default*: `""`
=== `storage_connection_string`
A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set.
*Type*: `string`
*Default*: `""`
=== `storage_sas_token`
The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set.
*Type*: `string`
*Default*: `""`
=== `filesystem`
The data lake storage filesystem name for uploading the messages to.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
filesystem: messages-${!timestamp("2006")}
```
=== `path`
The path of each message to upload within the filesystem.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
*Default*: `"${!counter()}-${!timestamp_unix_nano()}.txt"`
```yml
# Examples
path: ${!counter()}-${!timestamp_unix_nano()}.json
path: ${!meta("kafka_key")}.json
path: ${!json("doc.namespace")}/${!json("doc.id")}.json
```
=== `max_in_flight`
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
*Type*: `int`
*Default*: `64`
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0
github.com/Azure/go-amqp v1.0.5
github.com/ClickHouse/clickhouse-go/v2 v2.27.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xP
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 h1:Be6KInmFEKV81c0pOAEbRYehLMwmmGI1exuFj248AMk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0/go.mod h1:WCPBHsOXfBVnivScjs2ypRfimjEW0qPVLGgJkZlrIOA=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek=
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.1 h1:qhYAuEKEz8jDGV1Tyf3Gz44ppGZP53xJGi4qdH2PLsc=
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.2.1/go.mod h1:8QbU31D+n+SDJDstJXJWnOBKAgv8ZTz/x7su5KQ31Kk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 h1:lJwNFV+xYjHREUTHJKx/ZF6CJSt9znxmLw9DqSTvyRU=
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0/go.mod h1:GfT0aGew8Qj5yiQVqOO5v7N8fanbJGyUoHqXg56qcVY=
github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck=
Expand Down
95 changes: 95 additions & 0 deletions internal/impl/azure/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package azure
import (
"errors"
"fmt"
"net/url"
"os"
"strings"

Expand All @@ -25,6 +26,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/data/aztables"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
dlservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
)

Expand Down Expand Up @@ -80,8 +83,89 @@ func blobStorageClientFromParsed(pConf *service.ParsedConfig, container *service
return getBlobStorageClient(connectionString, storageAccount, storageAccessKey, storageSASToken, container)
}

func dlClientFromParsed(pConf *service.ParsedConfig, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) {
connectionString, err := pConf.FieldString(bscFieldStorageConnectionString)
if err != nil {
return nil, false, err
}
storageAccount, err := pConf.FieldString(bscFieldStorageAccount)
if err != nil {
return nil, false, err
}
storageAccessKey, err := pConf.FieldString(bscFieldStorageAccessKey)
if err != nil {
return nil, false, err
}
storageSASToken, err := pConf.FieldString(bscFieldStorageSASToken)
if err != nil {
return nil, false, err
}
if storageAccount == "" && connectionString == "" {
return nil, false, errors.New("invalid azure storage account credentials")
}
return getDLClient(connectionString, storageAccount, storageAccessKey, storageSASToken, fsName)
}

func getDLClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken string, fsName *service.InterpolatedString) (*dlservice.Client, bool, error) {
if storageConnectionString != "" {
storageConnectionString := parseStorageConnectionString(storageConnectionString, storageAccount)
client, err := dlservice.NewClientFromConnectionString(storageConnectionString, nil)
if err != nil {
return nil, false, fmt.Errorf("creating new data lake file client from connection string: %w", err)
}
return client, false, nil
}

serviceURL := fmt.Sprintf(dfsEndpointExpr, storageAccount)

if storageAccessKey != "" {
cred, err := azdatalake.NewSharedKeyCredential(storageAccount, storageAccessKey)
if err != nil {
return nil, false, fmt.Errorf("creating new shared key credential: %w", err)
}
client, err := dlservice.NewClientWithSharedKeyCredential(serviceURL, cred, nil)
if err != nil {
return nil, false, fmt.Errorf("creating new client from shared key credential: %w", err)
}
return client, false, nil
}

if storageSASToken != "" {
var isFilesystemSASToken bool
if isServiceSASToken(storageSASToken) {
// container/filesystem scoped SAS token
isFilesystemSASToken = true
fsNameStr, err := fsName.TryString(service.NewMessage([]byte("")))
if err != nil {
return nil, false, fmt.Errorf("interpolating filesystem name: %w", err)
}
serviceURL = fmt.Sprintf("%s/%s?%s", serviceURL, fsNameStr, storageSASToken)
} else {
// storage account SAS token
serviceURL = fmt.Sprintf("%s?%s", serviceURL, storageSASToken)
}
client, err := dlservice.NewClientWithNoCredential(serviceURL, nil)
if err != nil {
return nil, false, fmt.Errorf("creating client with no credentials: %w", err)
}
return client, isFilesystemSASToken, nil
}

// default credentials
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, false, fmt.Errorf("getting default Azure credentials: %w", err)
}
client, err := dlservice.NewClient(serviceURL, cred, nil)
if err != nil {
return nil, false, fmt.Errorf("creating client from default credentials: %w", err)
}
return client, false, err
}

const (
blobEndpointExp = "https://%s.blob.core.windows.net"
dfsEndpointExpr = "https://%s.dfs.core.windows.net"
)

func getBlobStorageClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken string, container *service.InterpolatedString) (*azblob.Client, bool, error) {
Expand Down Expand Up @@ -290,3 +374,14 @@ func getTablesServiceClient(account, accessKey, connectionString, storageSASToke
}
return client, err
}

func isServiceSASToken(token string) bool {
query, err := url.ParseQuery(token)
if err != nil {
return false
}
// 2024-10-09: `sr` parameter is present and required in service SAS tokens,
// and is not valid in storage account SAS tokens
// https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#specify-the-signed-resource-blob-storage-only
return query.Has("sr")
}
Loading

0 comments on commit fa30dbb

Please sign in to comment.