Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 90 additions & 22 deletions lib/sanbase/clickhouse_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,26 @@ defmodule Sanbase.ClickhouseRepo do
use Ecto.Repo, otp_app: :sanbase, adapter: @adapter

alias Sanbase.Utils.Config

require Logger

# The Application supervisor checks if ths Repo is enabled. It is included in
# the supervision tree only if this returns true
def enabled?() do
case Config.module_get(__MODULE__, :clickhouse_repo_enabled?) do
true -> true
false -> false
nil -> System.get_env("CLICKHOUSE_REPO_ENABLED", "true") |> String.to_existing_atom()
true ->
true

false ->
false

nil ->
env_var = System.get_env("CLICKHOUSE_REPO_ENABLED", "true")

case Config.parse_boolean_value(env_var) do
flag when is_boolean(flag) -> flag
nil -> raise("Invalid env var CLICKHOUSE_REPO_ENABLED value: #{inspect(env_var)}")
end
end
end

Expand All @@ -33,7 +46,14 @@ defmodule Sanbase.ClickhouseRepo do
{:ok, opts}
end

def query_transform(query, args, transform_fn) do
@doc ~s"""
Execute the provided query with the given arguments.

If the execution is successful, transform_fn/1 is used to transform each row
of the result. transform_fn/1 accepts as argument a single list, containing one
value per column.
"""
def query_transform(query, args, transform_fn) when is_function(transform_fn, 1) do
case execute_query_transform(query, args) do
{:ok, result} -> {:ok, Enum.map(result.rows, transform_fn)}
{:error, error} -> {:error, error}
Expand All @@ -44,11 +64,17 @@ defmodule Sanbase.ClickhouseRepo do
end

@doc ~s"""
Execute a query and apply the transform_fn on every row the result.
Execute a query with the provided arguments. The result is enriched with some
metadata.

If the execution is successful, transform_fn/1 is used to transform each row
of the result. transform_fn/1 accepts as argument a single list, containing one
value per column. The resultcontains the same number of rows as the original result from the database.

Return a map with the transformed rows alongside some metadata -
the query id, column names and a short summary of the used resources
the clickhouse query id, column names and a short summary of the used resources
"""
def query_transform_with_metadata(query, args, transform_fn) do
def query_transform_with_metadata(query, args, transform_fn, attempts_left \\ 2) do
case execute_query_transform(query, args, propagate_error: true) do
{:ok, result} ->
{:ok,
Expand All @@ -61,7 +87,10 @@ defmodule Sanbase.ClickhouseRepo do
}}

{:error, error} ->
{:error, error}
case attempts_left > 0 and retryable_error?(error) do
0 -> log_and_return_error(error, "query_transform/3")
_ -> query_transform_with_metadata(query, args, transform_fn, attempts_left - 1)
end
end
rescue
e ->
Expand All @@ -70,15 +99,29 @@ defmodule Sanbase.ClickhouseRepo do
)
end

def query_reduce(query, args, init, reducer) do
@doc ~s"""
Execute a query with the provided arguments

If the execution is successful, reducer/2 is used to reduce the result, starting
with init_acc as the initial accumular. One example usage is to transform the
result containing lists of asset and value to a map where the asset is the key
and the value is the value.
"""
def query_reduce(query, args, init_acc, reducer, attempts_left \\ 2) do
ordered_params = order_params(query, args)
sanitized_query = sanitize_query(query)

maybe_store_executed_clickhouse_sql(sanitized_query, ordered_params)

case __MODULE__.query(sanitized_query, ordered_params) do
{:ok, result} -> {:ok, Enum.reduce(result.rows, init, reducer)}
{:error, error} -> log_and_return_error(error, "query_reduce/4")
{:ok, result} ->
{:ok, Enum.reduce(result.rows, init_acc, reducer)}

{:error, error} ->
case attempts_left > 0 and retryable_error?(error) do
true -> query_reduce(query, args, init_acc, reducer, attempts_left - 1)
false -> log_and_return_error(inspect(error), "query_reduce/4")
end
end
rescue
e ->
Expand Down Expand Up @@ -110,7 +153,9 @@ defmodule Sanbase.ClickhouseRepo do
propagate_error = Keyword.get(opts, :propagate_error, false)

log_id = UUID.uuid4()
error_message = extract_error_from_stacktrace(stacktrace) || Exception.message(exception)

{_error_code, error_message} = extract_error_and_code_from_stacktrace(stacktrace)
error_message = error_message || Exception.message(exception)

Logger.warning("""
[#{log_id}] Cannot execute ClickHouse #{function_executed}. Reason: #{error_message}
Expand All @@ -126,7 +171,7 @@ defmodule Sanbase.ClickhouseRepo do
propagate_error = Keyword.get(opts, :propagate_error, false)
log_id = UUID.uuid4()

error_message = extract_error_from_error(error)
{_error_code, error_message} = extract_error_code_and_message_from_error(error)

Logger.warning(
"[#{log_id}] Cannot execute ClickHouse #{function_executed}. Reason: #{error_message}"
Expand Down Expand Up @@ -176,7 +221,7 @@ defmodule Sanbase.ClickhouseRepo do
end
end

defp extract_error_from_stacktrace(stacktrace) do
defp extract_error_and_code_from_stacktrace(stacktrace) do
line_with_exception =
Enum.find_value(stacktrace, fn
{_, _, [<<_::binary>> = line | _], _} ->
Expand All @@ -187,29 +232,32 @@ defmodule Sanbase.ClickhouseRepo do
end)

case line_with_exception do
nil -> nil
line -> transform_error_string(line)
nil ->
{nil, nil}

line ->
{_error_code, _error_msg} = get_error_code_and_message(line)
end
end

defp extract_error_from_error(%Clickhousex.Error{message: message}) do
transform_error_string(message)
defp extract_error_code_and_message_from_error(%Clickhousex.Error{message: message}) do
{_error_code, _error_msg} = get_error_code_and_message(message)
end

defp extract_error_from_error(error), do: error
defp extract_error_code_and_message_from_error(error), do: {nil, error}

defp transform_error_string(error_str) do
defp get_error_code_and_message(error_str) do
case String.split(error_str, "DB::Exception: ") do
[error_str] ->
error_str
{nil, error_str}

[_ | _] = split_error ->
error = List.last(split_error)

[error_msg, error_code, _version_str] =
Regex.split(~r|\([A-Z_]+\)|, error, include_captures: true, trim: true)

"#{error_code} #{error_msg}" |> String.trim()
{error_code, "#{error_code} #{error_msg}" |> String.trim()}
end
end

Expand All @@ -236,4 +284,24 @@ defmodule Sanbase.ClickhouseRepo do
rescue
_ -> :ok
end

defp retryable_error?(error) do
case circuit_breaker_broken?() do
true ->
false

false ->
{error_code, _error_message} =
cond do
error =~ "Transport Error: :timeout" -> true
true -> false
end
end
end

defp circuit_breaker_broken?() do
# if too many errors happened in the last X seconds, do not attempt retries
# as the error is not here.
false
end
end