Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
394 changes: 140 additions & 254 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 7 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ serial_test = { version = "3.2" }

[dependencies]
cfg-if.workspace = true
reqwest_0_12_24 = { package = "reqwest", version = "0.12.24", features = ["json"] }
clap.workspace = true
indoc.workspace = true
paste.workspace = true
Expand Down Expand Up @@ -282,15 +283,11 @@ aws-smithy-runtime-api = { version = "1.7.3", default-features = false, optional
aws-smithy-types = { version = "1.2.11", default-features = false, features = ["rt-tokio"], optional = true }

# Azure
azure_core = { version = "0.25", default-features = false, features = ["reqwest", "hmac_openssl"], optional = true }
azure_identity = { version = "0.25", default-features = false, features = ["reqwest"], optional = true }
azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"] }
azure_identity = { version = "0.30", default-features = false, optional = true }

# Azure Storage
azure_storage = { version = "0.21", default-features = false, optional = true }
azure_storage_blobs = { version = "0.21", default-features = false, optional = true }

# Needed to bridge with outdated version of azure_core used in azure_storage*
azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"] }
azure_storage_blob = { version = "0.7", default-features = false, optional = true }


# OpenDAL
Expand Down Expand Up @@ -456,10 +453,8 @@ openssl-src = { version = "300", default-features = false, features = ["force-en
approx = "0.5.1"
assert_cmd = { version = "2.0.17", default-features = false }
aws-smithy-runtime = { version = "1.8.3", default-features = false, features = ["tls-rustls"] }
azure_core = { version = "0.25", default-features = false, features = ["reqwest", "hmac_openssl", "azurite_workaround"] }
azure_identity = { version = "0.25", default-features = false, features = ["reqwest"] }
azure_storage = { version = "0.21", default-features = false, features = ["enable_reqwest", "hmac_openssl"] }
azure_storage_blobs = { version = "0.21", default-features = false, features = ["enable_reqwest", "hmac_openssl", "azurite_workaround"] }
azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"] }
azure_identity = { version = "0.30", default-features = false }
base64 = "0.22.1"
criterion = { version = "0.7.0", features = ["html_reports", "async_tokio"] }
itertools.workspace = true
Expand Down Expand Up @@ -841,7 +836,7 @@ sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"]
sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
sinks-axiom = ["sinks-http"]
sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"]
sinks-azure_blob = ["dep:azure_storage_blob"]
sinks-azure_monitor_logs = []
sinks-blackhole = []
sinks-chronicle = []
Expand Down
7 changes: 4 additions & 3 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use azure_storage_blobs::prelude::*;
use azure_storage_blob::BlobContainerClient;
use tower::ServiceBuilder;
use vector_lib::{
codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
Expand Down Expand Up @@ -165,10 +165,11 @@ impl GenerateConfig for AzureBlobSinkConfig {
#[async_trait::async_trait]
#[typetag::serde(name = "azure_blob")]
impl SinkConfig for AzureBlobSinkConfig {
async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
cx.proxy(),
)?;

let healthcheck = azure_common::config::build_healthcheck(
Expand All @@ -193,7 +194,7 @@ const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
const DEFAULT_FILENAME_APPEND_UUID: bool = true;

impl AzureBlobSinkConfig {
pub fn build_processor(&self, client: Arc<ContainerClient>) -> crate::Result<VectorSink> {
pub fn build_processor(&self, client: Arc<BlobContainerClient>) -> crate::Result<VectorSink> {
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, AzureBlobRetryLogic)
Expand Down
108 changes: 64 additions & 44 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::{
};

use azure_core::http::StatusCode;
use azure_core_for_storage::prelude::Range;
use azure_storage_blobs::prelude::*;

use azure_storage_blob::BlobContainerClient;
use bytes::{Buf, BytesMut};
use flate2::read::GzDecoder;
use futures::{Stream, StreamExt, stream};
Expand Down Expand Up @@ -36,6 +36,7 @@ async fn azure_blob_healthcheck_passed() {
let client = azure_common::config::build_client(
config.connection_string.clone().into(),
config.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.expect("Failed to create client");

Expand All @@ -55,6 +56,7 @@ async fn azure_blob_healthcheck_unknown_container() {
let client = azure_common::config::build_client(
config.connection_string.clone().into(),
config.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.expect("Failed to create client");

Expand Down Expand Up @@ -83,8 +85,8 @@ async fn azure_blob_insert_lines_into_blob() {
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(blob.properties.content_type, String::from("text/plain"));
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(content_type, Some(String::from("text/plain")));
assert_eq!(lines, blob_lines);
}

Expand All @@ -108,12 +110,9 @@ async fn azure_blob_insert_json_into_blob() {
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(blob.properties.content_encoding, None);
assert_eq!(
blob.properties.content_type,
String::from("application/x-ndjson")
);
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(content_encoding, None);
assert_eq!(content_type, Some(String::from("application/x-ndjson")));
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
Expand Down Expand Up @@ -243,6 +242,7 @@ impl AzureBlobSinkConfig {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.expect("Failed to create client");

Expand All @@ -260,47 +260,69 @@ impl AzureBlobSinkConfig {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.unwrap();
let response = client
.list_blobs()
.prefix(prefix)
.max_results(NonZeroU32::new(1000).unwrap())
.delimiter("/")
.include_metadata(true)
.into_stream()

// Use new SDK pager to fetch first page and collect blob names.
let mut pager = client
.list_blobs(None)
.expect("Failed to start list blobs pager");
let page = pager
.next()
.await
.expect("Failed to fetch blobs")
.unwrap();

response
.blobs
.blobs()
.map(|blob| blob.name.clone())
.collect::<Vec<_>>()
.into_body();

// Best-effort extraction of names from the page body.
// Depending on SDK struct names, this may need tweaking:
// ListBlobsFlatSegmentResponse { segment: { blob_items: [{ name, .. }, ..] }, .. }
let names = page
.segment
.blob_items
.into_iter()
.map(|b| b.name)
.filter(|name| name.starts_with(&prefix))
.collect::<Vec<_>>();

names
}

pub async fn get_blob(&self, blob: String) -> (Blob, Vec<String>) {
pub async fn get_blob(&self, blob: String) -> (Option<String>, Option<String>, Vec<String>) {
use azure_storage_blob::clients::BlobClient as _;
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.unwrap();
let response = client
.blob_client(blob)
.get()
.range(Range::new(0, 1024 * 1024))
.into_stream()
.next()

let blob_client = client.blob_client(&blob);

// Fetch properties to obtain content-type and content-encoding
let props = blob_client
.get_properties(None)
.await
.expect("Failed to get blob")
.unwrap();
.expect("Failed to get blob properties")
.into_body();

(
response.blob,
self.get_blob_content(response.data.collect().await.unwrap().to_vec()),
)
let content_type = props.content_type.clone();
let content_encoding = props.content_encoding.clone();

// Download blob content (full or first MB as needed)
let downloaded = blob_client
.download(None)
.await
.expect("Failed to download blob");
let data = downloaded
.into_body()
.data
.collect()
.await
.expect("Failed to read blob body")
.to_vec();

(content_type, content_encoding, self.get_blob_content(data))
}

fn get_blob_content(&self, data: Vec<u8>) -> Vec<String> {
Expand All @@ -320,17 +342,15 @@ impl AzureBlobSinkConfig {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.unwrap();
let request = client
.create()
.public_access(PublicAccess::None)
.into_future();
let result = client.create_container(None).await;

let response = match request.await {
let response = match result {
Ok(_) => Ok(()),
Err(error) => match error.as_http_error() {
Some(http_error) if http_error.status() as u16 == StatusCode::Conflict => Ok(()),
Err(error) => match error.http_status() {
Some(status) if status.as_u16() == StatusCode::Conflict => Ok(()),
_ => Err(error),
},
};
Expand Down
Loading
Loading