Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions bindings/elixir/lib/fluss/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,33 @@ defmodule Fluss.Config do

@enforce_keys [:bootstrap_servers]
defstruct bootstrap_servers: nil,
writer_acks: nil,
writer_batch_size: nil,
writer_batch_timeout_ms: nil,
writer_bucket_no_key_assigner: nil,
writer_buffer_memory_size: nil,
writer_buffer_wait_timeout_ms: nil,
writer_dynamic_batch_size_enabled: nil,
writer_dynamic_batch_size_min: nil
writer_dynamic_batch_size_min: nil,
writer_enable_idempotence: nil,
writer_max_inflight_requests_per_bucket: nil,
writer_request_max_size: nil,
writer_retries: nil

@type t :: %__MODULE__{
bootstrap_servers: String.t(),
writer_acks: String.t() | nil,
writer_batch_size: non_neg_integer() | nil,
writer_batch_timeout_ms: non_neg_integer() | nil,
writer_bucket_no_key_assigner: :sticky | :round_robin | nil,
writer_buffer_memory_size: non_neg_integer() | nil,
writer_buffer_wait_timeout_ms: non_neg_integer() | nil,
writer_dynamic_batch_size_enabled: boolean() | nil,
writer_dynamic_batch_size_min: non_neg_integer() | nil
writer_dynamic_batch_size_min: non_neg_integer() | nil,
writer_enable_idempotence: boolean() | nil,
writer_max_inflight_requests_per_bucket: non_neg_integer() | nil,
writer_request_max_size: non_neg_integer() | nil,
writer_retries: non_neg_integer() | nil
}

@spec new(String.t()) :: t()
Expand All @@ -58,6 +74,10 @@ defmodule Fluss.Config do
def set_bootstrap_servers(%__MODULE__{} = config, servers) when is_binary(servers),
do: %{config | bootstrap_servers: servers}

@spec set_writer_acks(t(), String.t()) :: t()
def set_writer_acks(%__MODULE__{} = config, acks) when is_binary(acks),
do: %{config | writer_acks: acks}

@spec set_writer_batch_size(t(), non_neg_integer()) :: t()
def set_writer_batch_size(%__MODULE__{} = config, size) when is_integer(size),
do: %{config | writer_batch_size: size}
Expand All @@ -66,6 +86,19 @@ defmodule Fluss.Config do
def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms),
do: %{config | writer_batch_timeout_ms: ms}

@spec set_writer_bucket_no_key_assigner(t(), :sticky | :round_robin) :: t()
def set_writer_bucket_no_key_assigner(%__MODULE__{} = config, assigner)
when assigner in [:sticky, :round_robin],
do: %{config | writer_bucket_no_key_assigner: assigner}

@spec set_writer_buffer_memory_size(t(), non_neg_integer()) :: t()
def set_writer_buffer_memory_size(%__MODULE__{} = config, size) when is_integer(size),
do: %{config | writer_buffer_memory_size: size}

@spec set_writer_buffer_wait_timeout_ms(t(), non_neg_integer()) :: t()
def set_writer_buffer_wait_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms),
do: %{config | writer_buffer_wait_timeout_ms: ms}

@spec set_writer_dynamic_batch_size_enabled(t(), boolean()) :: t()
def set_writer_dynamic_batch_size_enabled(%__MODULE__{} = config, enabled)
when is_boolean(enabled),
Expand All @@ -75,6 +108,24 @@ defmodule Fluss.Config do
def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) when is_integer(size),
do: %{config | writer_dynamic_batch_size_min: size}

@spec set_writer_enable_idempotence(t(), boolean()) :: t()
def set_writer_enable_idempotence(%__MODULE__{} = config, enabled)
when is_boolean(enabled),
do: %{config | writer_enable_idempotence: enabled}

@spec set_writer_max_inflight_requests_per_bucket(t(), non_neg_integer()) :: t()
def set_writer_max_inflight_requests_per_bucket(%__MODULE__{} = config, n)
when is_integer(n),
do: %{config | writer_max_inflight_requests_per_bucket: n}

@spec set_writer_request_max_size(t(), non_neg_integer()) :: t()
def set_writer_request_max_size(%__MODULE__{} = config, size) when is_integer(size),
do: %{config | writer_request_max_size: size}

@spec set_writer_retries(t(), non_neg_integer()) :: t()
def set_writer_retries(%__MODULE__{} = config, n) when is_integer(n),
do: %{config | writer_retries: n}

@spec get_bootstrap_servers(t()) :: String.t()
def get_bootstrap_servers(%__MODULE__{bootstrap_servers: servers}), do: servers
end
47 changes: 45 additions & 2 deletions bindings/elixir/native/fluss_nif/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,34 @@
// specific language governing permissions and limitations
// under the License.

use fluss::config::Config;
use rustler::NifStruct;
use fluss::config::{Config, NoKeyAssigner};
use rustler::{NifStruct, NifUnitEnum};

/// Bucket-assigner strategy for tables without bucket keys.
/// Maps to fluss::config::NoKeyAssigner.
#[derive(NifUnitEnum)]
pub enum NifNoKeyAssigner {
Sticky,
RoundRobin,
}

/// Decoded from `%Fluss.Config{}` Elixir struct.
#[derive(NifStruct)]
#[module = "Fluss.Config"]
pub struct NifConfig {
pub bootstrap_servers: String,
pub writer_acks: Option<String>,
pub writer_batch_size: Option<i32>,
pub writer_batch_timeout_ms: Option<i64>,
pub writer_bucket_no_key_assigner: Option<NifNoKeyAssigner>,
pub writer_buffer_memory_size: Option<u64>,
pub writer_buffer_wait_timeout_ms: Option<u64>,
pub writer_dynamic_batch_size_enabled: Option<bool>,
pub writer_dynamic_batch_size_min: Option<i32>,
pub writer_enable_idempotence: Option<bool>,
pub writer_max_inflight_requests_per_bucket: Option<u64>,
pub writer_request_max_size: Option<i32>,
pub writer_retries: Option<i32>,
}

impl NifConfig {
Expand All @@ -47,6 +63,33 @@ impl NifConfig {
if let Some(size) = self.writer_dynamic_batch_size_min {
config.writer_dynamic_batch_size_min = size;
}
if let Some(acks) = self.writer_acks {
config.writer_acks = acks;
}
if let Some(assigner) = self.writer_bucket_no_key_assigner {
config.writer_bucket_no_key_assigner = match assigner {
NifNoKeyAssigner::Sticky => NoKeyAssigner::Sticky,
NifNoKeyAssigner::RoundRobin => NoKeyAssigner::RoundRobin,
};
}
if let Some(memory_size) = self.writer_buffer_memory_size {
config.writer_buffer_memory_size = memory_size as usize;
}
if let Some(timeout_ms) = self.writer_buffer_wait_timeout_ms {
config.writer_buffer_wait_timeout_ms = timeout_ms;
}
if let Some(enabled) = self.writer_enable_idempotence {
config.writer_enable_idempotence = enabled;
}
if let Some(requests_limit) = self.writer_max_inflight_requests_per_bucket {
config.writer_max_inflight_requests_per_bucket = requests_limit as usize;
}
if let Some(max_size) = self.writer_request_max_size {
config.writer_request_max_size = max_size;
}
if let Some(retries) = self.writer_retries {
config.writer_retries = retries;
}
config
}
}
108 changes: 108 additions & 0 deletions bindings/elixir/test/config_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

defmodule Fluss.ConfigTest do
use ExUnit.Case, async: true

test "new/1 creates config with bootstrap_servers; all other fields default to nil" do
config = Fluss.Config.new("localhost:9123")
assert config == %Fluss.Config{bootstrap_servers: "localhost:9123"}
end

test "set_writer_acks/2 sets the acks value" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_acks("all")

assert config.writer_acks == "all"
end

test "set_writer_bucket_no_key_assigner/2 sets a valid assigner" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_bucket_no_key_assigner(:sticky)

assert config.writer_bucket_no_key_assigner == :sticky
end

test "set_writer_bucket_no_key_assigner/2 only accepts :sticky or :round_robin" do
assert_raise FunctionClauseError, fn ->
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_bucket_no_key_assigner(:custom)
end
end

test "set_writer_buffer_memory_size/2 sets the buffer memory size" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_buffer_memory_size(67_108_864)

assert config.writer_buffer_memory_size == 67_108_864
end

test "set_writer_buffer_wait_timeout_ms/2 sets the wait timeout" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_buffer_wait_timeout_ms(5_000)

assert config.writer_buffer_wait_timeout_ms == 5_000
end

test "set_writer_enable_idempotence/2 sets the idempotence flag" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_enable_idempotence(false)

assert config.writer_enable_idempotence == false
end

test "set_writer_max_inflight_requests_per_bucket/2 sets the inflight limit" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_max_inflight_requests_per_bucket(3)

assert config.writer_max_inflight_requests_per_bucket == 3
end

test "set_writer_request_max_size/2 sets the request max size" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_request_max_size(2_097_152)

assert config.writer_request_max_size == 2_097_152
end

test "set_writer_retries/2 sets the retry count" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_retries(5)

assert config.writer_retries == 5
end

test "setters chain correctly" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_writer_acks("all")
|> Fluss.Config.set_writer_retries(3)
|> Fluss.Config.set_writer_bucket_no_key_assigner(:round_robin)

assert config.writer_acks == "all"
assert config.writer_retries == 3
assert config.writer_bucket_no_key_assigner == :round_robin
end
end
Loading