diff --git a/lib/sanbase/clickhouse_repo.ex b/lib/sanbase/clickhouse_repo.ex index 9cec66f88c..33429e257b 100644 --- a/lib/sanbase/clickhouse_repo.ex +++ b/lib/sanbase/clickhouse_repo.ex @@ -40,35 +40,45 @@ defmodule Sanbase.ClickhouseRepo do end @doc ~s""" - Execute a query and apply `transform_fn/1` on each row of the result. + Execute a query described by the Query struct or the query text and arguments. + + If the execution is successful, transform_fn/1 is used to transform each row + of the result. transform_fn/1 accepts as argument a single list, containing one + value per column. + + Example: + + ClickhouseRepo.query_transform( + "SELECT toUnixTimestamp(now())", + [], + fn [ts] -> %{datetime: DateTime.from_unix!(ts)} end + ) """ @spec query_transform(Sanbase.Clickhouse.Query.t(), (list() -> any())) :: {:ok, any()} | {:error, String.t()} @spec query_transform(String.t(), list(), (list() -> any())) :: {:ok, any()} | {:error, String.t()} - def query_transform(%Sanbase.Clickhouse.Query{} = query, transform_fn) do + def query_transform(%Sanbase.Clickhouse.Query{} = query, transform_fn) + when is_function(transform_fn, 1) do with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do query_transform(sql, args, transform_fn) end end def query_transform(query, args, transform_fn) do - case execute_query_transform(query, args) do + case execute_query_transform(query, args, caller: "query_transform/{2,3}") do {:ok, result} -> {:ok, Enum.map(result.rows, transform_fn)} {:error, error} -> {:error, error} end - rescue - e -> - log_and_return_error_from_exception(e, "query_transform/3", __STACKTRACE__) end @doc ~s""" Execute a query and apply the transform_fn on every row the result. Return a map with the transformed rows alongside some metadata - - the query id, column names and a short summary of the used resources + the clickhouse query id, column names and a short summary of the used resources """ @spec query_transform_with_metadata(Sanbase.Clickhouse.Query.t(), (list() -> list())) :: {:ok, Map.t()} | {:error, String.t()} @@ -81,7 +91,10 @@ defmodule Sanbase.ClickhouseRepo do end def query_transform_with_metadata(query, args, transform_fn) do - case execute_query_transform(query, args, propagate_error: true) do + case execute_query_transform(query, args, + caller: "query_transform_with_metadata/3", + propagate_error: true + ) do {:ok, result} -> {:ok, %{ @@ -114,11 +127,15 @@ defmodule Sanbase.ClickhouseRepo do when acc: any def query_reduce(%Sanbase.Clickhouse.Query{} = query, init, reducer) do with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do - query_reduce(sql, args, init, reducer) + execute_query_reduce(sql, args, init, reducer) end end def query_reduce(query, args, init, reducer) do + execute_query_reduce(query, args, init, reducer) + end + + defp execute_query_reduce(query, args, init, reducer, attempts_left \\ 1) do ordered_params = order_params(query, args) sanitized_query = sanitize_query(query) @@ -130,14 +147,18 @@ defmodule Sanbase.ClickhouseRepo do {:ok, Enum.reduce(result.rows, init, reducer)} {:error, error} -> - log_and_return_error(error, "query_reduce/4") + if retryable_error?(error) and attempts_left > 0 do + execute_query_reduce(query, args, init, reducer, attempts_left - 1) + else + log_and_return_error(error, "query_reduce/4") + end end rescue e -> log_and_return_error_from_exception(e, "query_reduce/4", __STACKTRACE__) end - defp execute_query_transform(query, args, opts \\ []) do + defp execute_query_transform(query, args, opts, attempts_left \\ 1) do ordered_params = order_params(query, args) sanitized_query = sanitize_query(query) @@ -149,11 +170,20 @@ defmodule Sanbase.ClickhouseRepo do {:ok, result} {:error, error} -> - log_and_return_error(error, "query_transform/3", opts) + if retryable_error?(error) and attempts_left > 0 do + execute_query_transform(query, args, opts, attempts_left - 1) + else + log_and_return_error(error, opts[:caller], opts) + end end + rescue + e -> + log_and_return_error_from_exception(e, opts[:caller], __STACKTRACE__, + propagate_error: Keyword.get(opts, :propagate_error, false) + ) end - @masked_error_message "Cannot execute database query. If issue persists please contact Santiment Support." + @masked_error_message "Cannot execute ClickHouse database query. If issue persists please contact Santiment Support." defp log_and_return_error_from_exception( %{} = exception, function_executed, @@ -330,4 +360,38 @@ defmodule Sanbase.ClickhouseRepo do ) |> to_string() end + + def retryable_error?(error) do + error_str = + case error do + e when is_binary(e) -> e + %Clickhousex.Error{message: message} -> message + end + + non_retryable_errors = [ + "(SYNTAX_ERROR)", + "(ILLEGAL_TYPE_OF_ARGUMENT)", + "(UNKNOWN_IDENTIFIER)", + "(ACCESS_DENIED)", + "(UNKNOWN_TABLE)", + "(MEMORY_LIMIT_EXCEEDED)", + "(AMBIGUOUS_COLUMN_NAME)", + ["number of params received", "does not match expected"] + ] + + has_non_retryable_error? = + Enum.any?(non_retryable_errors, fn + error when is_binary(error) -> + String.contains?(error_str, error) + + # in case of lists, all elements in the list must be present in the error + # can be done with regexes, too, but this is simpler + errors_list when is_list(errors_list) -> + Enum.all?(errors_list, fn error -> + String.contains?(error_str, error) + end) + end) + + not has_non_retryable_error? + end end diff --git a/lib/sanbase/queries/refresh/worker.ex b/lib/sanbase/queries/refresh/worker.ex index c61d81da4a..ee8f7fd364 100644 --- a/lib/sanbase/queries/refresh/worker.ex +++ b/lib/sanbase/queries/refresh/worker.ex @@ -28,32 +28,19 @@ defmodule Sanbase.Queries.RefreshWorker do defp maybe_remove_scheduled_job(result, nil), do: result defp maybe_remove_scheduled_job({:error, error_str}, scheduled_job) do - case retryable_error?(error_str) do - true -> - {:error, error_str} - - false -> - Oban.cancel_job(@oban_conf_name, scheduled_job) - {:error, error_str} + if clickhouse_error?(error_str) and Sanbase.ClickhouseRepo.retryable_error?(error_str) do + {:error, error_str} + else + Oban.cancel_job(@oban_conf_name, scheduled_job) + {:error, error_str} end end defp maybe_remove_scheduled_job(result, _), do: result - defp retryable_error?(error_str) do - non_retryable_errors = [ - "(SYNTAX_ERROR)", - "(ILLEGAL_TYPE_OF_ARGUMENT)", - "(UNKNOWN_IDENTIFIER)", - "(ACCESS_DENIED)", - "(UNKNOWN_TABLE)", - "(MEMORY_LIMIT_EXCEEDED)", - "(AMBIGUOUS_COLUMN_NAME)" - ] - - has_non_retryable_error? = - Enum.any?(non_retryable_errors, fn error -> String.contains?(error_str, error) end) - - not has_non_retryable_error? + defp clickhouse_error?(error_str) do + # Clickhouse errors returned from our ClickhouseRepo + # start with "Cannot execute ClickHouse database query" + String.contains?(error_str, "ClickHouse") end end diff --git a/test/sanbase/clickhouse/api_call_data_test.exs b/test/sanbase/clickhouse/api_call_data_test.exs index 89a5c4e6da..c54d61ff4a 100644 --- a/test/sanbase/clickhouse/api_call_data_test.exs +++ b/test/sanbase/clickhouse/api_call_data_test.exs @@ -71,7 +71,7 @@ defmodule Sanbase.Clickhouse.ApiCallDataTest do {:error, error} = ApiCallData.api_call_history(context.user.id, dt1, dt3, "1d", :all) - assert error =~ "Cannot execute database query." + assert error =~ "Cannot execute ClickHouse database query." end) =~ error_msg end) end diff --git a/test/sanbase/clickhouse/clickhouse_repo_test.exs b/test/sanbase/clickhouse/clickhouse_repo_test.exs index e9f423d1a2..a5d64187d4 100644 --- a/test/sanbase/clickhouse/clickhouse_repo_test.exs +++ b/test/sanbase/clickhouse/clickhouse_repo_test.exs @@ -14,7 +14,7 @@ defmodule Sanbase.Clickhouse.ClickhouseRepoTest do {:error, error} = ClickhouseRepo.query_transform("SELECT NOW()", [], & &1) assert error =~ - "Cannot execute database query. If issue persists please contact Santiment Support" + "Cannot execute ClickHouse database query. If issue persists please contact Santiment Support" # assert returned error message does not contain internal details refute error =~ error_msg @@ -38,7 +38,7 @@ defmodule Sanbase.Clickhouse.ClickhouseRepoTest do {:error, error} = ClickhouseRepo.query_transform("SELECT NOW()", [], & &1) assert error =~ - "Cannot execute database query. If issue persists please contact Santiment Support" + "Cannot execute ClickHouse database query. If issue persists please contact Santiment Support" # assert returned error message does not contain internal details refute error =~ error_msg diff --git a/test/sanbase/social_data/trending_words/trending_words_test.exs b/test/sanbase/social_data/trending_words/trending_words_test.exs index b712103f71..56c1968bfc 100644 --- a/test/sanbase/social_data/trending_words/trending_words_test.exs +++ b/test/sanbase/social_data/trending_words/trending_words_test.exs @@ -178,7 +178,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do |> Sanbase.Mock.run_with_mocks(fn -> {:error, error} = TrendingWords.get_trending_words(dt1, dt3, "1d", 2, :all) - assert error =~ "Cannot execute database query." + assert error =~ "Cannot execute ClickHouse database query." end) end end @@ -250,7 +250,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do |> Sanbase.Mock.run_with_mocks(fn -> {:error, error} = TrendingWords.get_currently_trending_words(10, :all) - assert error =~ "Cannot execute database query." + assert error =~ "Cannot execute ClickHouse database query." end) end end @@ -281,7 +281,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do {:error, error} = TrendingWords.get_word_trending_history("word", dt1, dt2, "1h", 10, :all) - assert error =~ "Cannot execute database query." + assert error =~ "Cannot execute ClickHouse database query." end) end end @@ -316,7 +316,7 @@ defmodule Sanbase.SocialData.TrendingWordsTest do {:error, error} = TrendingWords.get_project_trending_history(project.slug, dt1, dt2, "1h", 10, :all) - assert error =~ "Cannot execute database query." + assert error =~ "Cannot execute ClickHouse database query." end) end end