diff --git a/bindings/elixir/lib/fluss/config.ex b/bindings/elixir/lib/fluss/config.ex index f12f61e4..d02a428f 100644 --- a/bindings/elixir/lib/fluss/config.ex +++ b/bindings/elixir/lib/fluss/config.ex @@ -33,17 +33,33 @@ defmodule Fluss.Config do @enforce_keys [:bootstrap_servers] defstruct bootstrap_servers: nil, + writer_acks: nil, writer_batch_size: nil, writer_batch_timeout_ms: nil, + writer_bucket_no_key_assigner: nil, + writer_buffer_memory_size: nil, + writer_buffer_wait_timeout_ms: nil, writer_dynamic_batch_size_enabled: nil, - writer_dynamic_batch_size_min: nil + writer_dynamic_batch_size_min: nil, + writer_enable_idempotence: nil, + writer_max_inflight_requests_per_bucket: nil, + writer_request_max_size: nil, + writer_retries: nil @type t :: %__MODULE__{ bootstrap_servers: String.t(), + writer_acks: String.t() | nil, writer_batch_size: non_neg_integer() | nil, writer_batch_timeout_ms: non_neg_integer() | nil, + writer_bucket_no_key_assigner: :sticky | :round_robin | nil, + writer_buffer_memory_size: non_neg_integer() | nil, + writer_buffer_wait_timeout_ms: non_neg_integer() | nil, writer_dynamic_batch_size_enabled: boolean() | nil, - writer_dynamic_batch_size_min: non_neg_integer() | nil + writer_dynamic_batch_size_min: non_neg_integer() | nil, + writer_enable_idempotence: boolean() | nil, + writer_max_inflight_requests_per_bucket: non_neg_integer() | nil, + writer_request_max_size: non_neg_integer() | nil, + writer_retries: non_neg_integer() | nil } @spec new(String.t()) :: t() @@ -58,6 +74,10 @@ defmodule Fluss.Config do def set_bootstrap_servers(%__MODULE__{} = config, servers) when is_binary(servers), do: %{config | bootstrap_servers: servers} + @spec set_writer_acks(t(), String.t()) :: t() + def set_writer_acks(%__MODULE__{} = config, acks) when is_binary(acks), + do: %{config | writer_acks: acks} + @spec set_writer_batch_size(t(), non_neg_integer()) :: t() def set_writer_batch_size(%__MODULE__{} = config, size) when is_integer(size), do: %{config | writer_batch_size: size} @@ -66,6 +86,19 @@ defmodule Fluss.Config do def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms), do: %{config | writer_batch_timeout_ms: ms} + @spec set_writer_bucket_no_key_assigner(t(), :sticky | :round_robin) :: t() + def set_writer_bucket_no_key_assigner(%__MODULE__{} = config, assigner) + when assigner in [:sticky, :round_robin], + do: %{config | writer_bucket_no_key_assigner: assigner} + + @spec set_writer_buffer_memory_size(t(), non_neg_integer()) :: t() + def set_writer_buffer_memory_size(%__MODULE__{} = config, size) when is_integer(size), + do: %{config | writer_buffer_memory_size: size} + + @spec set_writer_buffer_wait_timeout_ms(t(), non_neg_integer()) :: t() + def set_writer_buffer_wait_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms), + do: %{config | writer_buffer_wait_timeout_ms: ms} + @spec set_writer_dynamic_batch_size_enabled(t(), boolean()) :: t() def set_writer_dynamic_batch_size_enabled(%__MODULE__{} = config, enabled) when is_boolean(enabled), @@ -75,6 +108,24 @@ defmodule Fluss.Config do def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) when is_integer(size), do: %{config | writer_dynamic_batch_size_min: size} + @spec set_writer_enable_idempotence(t(), boolean()) :: t() + def set_writer_enable_idempotence(%__MODULE__{} = config, enabled) + when is_boolean(enabled), + do: %{config | writer_enable_idempotence: enabled} + + @spec set_writer_max_inflight_requests_per_bucket(t(), non_neg_integer()) :: t() + def set_writer_max_inflight_requests_per_bucket(%__MODULE__{} = config, n) + when is_integer(n), + do: %{config | writer_max_inflight_requests_per_bucket: n} + + @spec set_writer_request_max_size(t(), non_neg_integer()) :: t() + def set_writer_request_max_size(%__MODULE__{} = config, size) when is_integer(size), + do: %{config | writer_request_max_size: size} + + @spec set_writer_retries(t(), non_neg_integer()) :: t() + def set_writer_retries(%__MODULE__{} = config, n) when is_integer(n), + do: %{config | writer_retries: n} + @spec get_bootstrap_servers(t()) :: String.t() def get_bootstrap_servers(%__MODULE__{bootstrap_servers: servers}), do: servers end diff --git a/bindings/elixir/native/fluss_nif/src/config.rs b/bindings/elixir/native/fluss_nif/src/config.rs index 8bbdfad9..60034d9b 100644 --- a/bindings/elixir/native/fluss_nif/src/config.rs +++ b/bindings/elixir/native/fluss_nif/src/config.rs @@ -15,18 +15,34 @@ // specific language governing permissions and limitations // under the License. -use fluss::config::Config; -use rustler::NifStruct; +use fluss::config::{Config, NoKeyAssigner}; +use rustler::{NifStruct, NifUnitEnum}; + +/// Bucket-assigner strategy for tables without bucket keys. +/// Maps to fluss::config::NoKeyAssigner. +#[derive(NifUnitEnum)] +pub enum NifNoKeyAssigner { + Sticky, + RoundRobin, +} /// Decoded from `%Fluss.Config{}` Elixir struct. #[derive(NifStruct)] #[module = "Fluss.Config"] pub struct NifConfig { pub bootstrap_servers: String, + pub writer_acks: Option, pub writer_batch_size: Option, pub writer_batch_timeout_ms: Option, + pub writer_bucket_no_key_assigner: Option, + pub writer_buffer_memory_size: Option, + pub writer_buffer_wait_timeout_ms: Option, pub writer_dynamic_batch_size_enabled: Option, pub writer_dynamic_batch_size_min: Option, + pub writer_enable_idempotence: Option, + pub writer_max_inflight_requests_per_bucket: Option, + pub writer_request_max_size: Option, + pub writer_retries: Option, } impl NifConfig { @@ -47,6 +63,33 @@ impl NifConfig { if let Some(size) = self.writer_dynamic_batch_size_min { config.writer_dynamic_batch_size_min = size; } + if let Some(acks) = self.writer_acks { + config.writer_acks = acks; + } + if let Some(assigner) = self.writer_bucket_no_key_assigner { + config.writer_bucket_no_key_assigner = match assigner { + NifNoKeyAssigner::Sticky => NoKeyAssigner::Sticky, + NifNoKeyAssigner::RoundRobin => NoKeyAssigner::RoundRobin, + }; + } + if let Some(memory_size) = self.writer_buffer_memory_size { + config.writer_buffer_memory_size = memory_size as usize; + } + if let Some(timeout_ms) = self.writer_buffer_wait_timeout_ms { + config.writer_buffer_wait_timeout_ms = timeout_ms; + } + if let Some(enabled) = self.writer_enable_idempotence { + config.writer_enable_idempotence = enabled; + } + if let Some(requests_limit) = self.writer_max_inflight_requests_per_bucket { + config.writer_max_inflight_requests_per_bucket = requests_limit as usize; + } + if let Some(max_size) = self.writer_request_max_size { + config.writer_request_max_size = max_size; + } + if let Some(retries) = self.writer_retries { + config.writer_retries = retries; + } config } } diff --git a/bindings/elixir/test/config_test.exs b/bindings/elixir/test/config_test.exs new file mode 100644 index 00000000..344c6478 --- /dev/null +++ b/bindings/elixir/test/config_test.exs @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.ConfigTest do + use ExUnit.Case, async: true + + test "new/1 creates config with bootstrap_servers; all other fields default to nil" do + config = Fluss.Config.new("localhost:9123") + assert config == %Fluss.Config{bootstrap_servers: "localhost:9123"} + end + + test "set_writer_acks/2 sets the acks value" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_acks("all") + + assert config.writer_acks == "all" + end + + test "set_writer_bucket_no_key_assigner/2 sets a valid assigner" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_bucket_no_key_assigner(:sticky) + + assert config.writer_bucket_no_key_assigner == :sticky + end + + test "set_writer_bucket_no_key_assigner/2 only accepts :sticky or :round_robin" do + assert_raise FunctionClauseError, fn -> + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_bucket_no_key_assigner(:custom) + end + end + + test "set_writer_buffer_memory_size/2 sets the buffer memory size" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_buffer_memory_size(67_108_864) + + assert config.writer_buffer_memory_size == 67_108_864 + end + + test "set_writer_buffer_wait_timeout_ms/2 sets the wait timeout" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_buffer_wait_timeout_ms(5_000) + + assert config.writer_buffer_wait_timeout_ms == 5_000 + end + + test "set_writer_enable_idempotence/2 sets the idempotence flag" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_enable_idempotence(false) + + assert config.writer_enable_idempotence == false + end + + test "set_writer_max_inflight_requests_per_bucket/2 sets the inflight limit" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_max_inflight_requests_per_bucket(3) + + assert config.writer_max_inflight_requests_per_bucket == 3 + end + + test "set_writer_request_max_size/2 sets the request max size" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_request_max_size(2_097_152) + + assert config.writer_request_max_size == 2_097_152 + end + + test "set_writer_retries/2 sets the retry count" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_retries(5) + + assert config.writer_retries == 5 + end + + test "setters chain correctly" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_writer_acks("all") + |> Fluss.Config.set_writer_retries(3) + |> Fluss.Config.set_writer_bucket_no_key_assigner(:round_robin) + + assert config.writer_acks == "all" + assert config.writer_retries == 3 + assert config.writer_bucket_no_key_assigner == :round_robin + end +end