Skip to content

Commit

Permalink
[azure-blob-storage] Make container interpolated
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Amador <[email protected]>
  • Loading branch information
mfamador committed Oct 2, 2024
1 parent 1073300 commit 00a3897
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ The storage account SAS token. This field is ignored if `storage_connection_stri
=== `container`
The name of the container from which to download blobs.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
Expand Down
10 changes: 7 additions & 3 deletions internal/impl/azure/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func azureComponentSpec(forBlobStorage bool) *service.ConfigSpec {
return spec
}

func blobStorageClientFromParsed(pConf *service.ParsedConfig, container string) (*azblob.Client, bool, error) {
func blobStorageClientFromParsed(pConf *service.ParsedConfig, container *service.InterpolatedString) (*azblob.Client, bool, error) {
connectionString, err := pConf.FieldString(bscFieldStorageConnectionString)
if err != nil {
return nil, false, err
Expand All @@ -84,7 +84,7 @@ const (
blobEndpointExp = "https://%s.blob.core.windows.net"
)

func getBlobStorageClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken, container string) (*azblob.Client, bool, error) {
func getBlobStorageClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken string, container *service.InterpolatedString) (*azblob.Client, bool, error) {
var client *azblob.Client
var err error
var containerSASToken bool
Expand All @@ -103,7 +103,11 @@ func getBlobStorageClient(storageConnectionString, storageAccount, storageAccess
if strings.HasPrefix(storageSASToken, "sp=") {
// container SAS token
containerSASToken = true
serviceURL = fmt.Sprintf("%s/%s?%s", fmt.Sprintf(blobEndpointExp, storageAccount), container, storageSASToken)
c, err := container.TryString(service.NewMessage([]byte("")))
if err != nil {
return nil, false, fmt.Errorf("error getting container: %w", err)
}
serviceURL = fmt.Sprintf("%s/%s?%s", fmt.Sprintf(blobEndpointExp, storageAccount), c, storageSASToken)
} else {
// storage account SAS token
serviceURL = fmt.Sprintf("%s/%s", fmt.Sprintf(blobEndpointExp, storageAccount), storageSASToken)
Expand Down
17 changes: 10 additions & 7 deletions internal/impl/azure/input_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,20 @@ type bsiConfig struct {
}

func bsiConfigFromParsed(pConf *service.ParsedConfig) (conf bsiConfig, err error) {
if conf.Container, err = pConf.FieldString(bsiFieldContainer); err != nil {
var containerSASToken bool
container, err := pConf.FieldInterpolatedString(bsiFieldContainer)
if err != nil {
return
}
var containerSASToken bool
if conf.client, containerSASToken, err = blobStorageClientFromParsed(pConf, conf.Container); err != nil {
if conf.client, containerSASToken, err = blobStorageClientFromParsed(pConf, container); err != nil {
return
}
if containerSASToken {
// when using a container SAS token, the container is already implicit
conf.Container = ""
// if using a container SAS token, the container is already implicit
container, _ = service.NewInterpolatedString("")
}
if conf.Container, err = container.TryString(service.NewMessage([]byte(""))); err != nil {
return
}
if conf.Prefix, err = pConf.FieldString(bsiFieldPrefix); err != nil {
return
Expand Down Expand Up @@ -119,7 +123,7 @@ This input adds the following metadata fields to each message:
You can access these metadata fields using xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].`).
Fields(
service.NewStringField(bsiFieldContainer).
service.NewInterpolatedStringField(bsiFieldContainer).
Description("The name of the container from which to download blobs."),
service.NewStringField(bsiFieldPrefix).
Description("An optional path prefix, if set only objects with the prefix are consumed.").
Expand Down Expand Up @@ -367,7 +371,6 @@ func newAzureTargetBatchReader(ctx context.Context, conf bsiConfig) (*azureTarge
}
staticKeys.pager = pager
}

return &staticKeys, nil
}

Expand Down
6 changes: 1 addition & 5 deletions internal/impl/azure/output_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ func bsoConfigFromParsed(pConf *service.ParsedConfig) (conf bsoConfig, err error
return
}
var containerSASToken bool
c, err := conf.Container.TryString(service.NewMessage([]byte("")))
if err != nil {
return
}
if conf.client, containerSASToken, err = blobStorageClientFromParsed(pConf, c); err != nil {
if conf.client, containerSASToken, err = blobStorageClientFromParsed(pConf, conf.Container); err != nil {
return
}
if containerSASToken {
Expand Down

0 comments on commit 00a3897

Please sign in to comment.