diff --git a/config/dev.exs b/config/dev.exs index e4eaf5c5f5..8bf12f51f4 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -90,6 +90,19 @@ config :sanbase, Sanbase.ClickhouseRepo, pool_size: {:system, "CLICKHOUSE_POOL_SIZE", "3"}, show_sensitive_data_on_connection_error: true +config :sanbase, Sanbase.ChRepo, + adapter: Ecto.Adapters.ClickHouse, + loggers: [Ecto.LogEntry], + hostname: "clickhouse", + port: 8123, + database: "default", + username: "default", + password: "", + timeout: 60_000, + pool_size: {:system, "CLICKHOUSE_POOL_SIZE", "3"}, + # idle_interval: 60_000, + show_sensitive_data_on_connection_error: true + clickhouse_read_only_opts = [ adapter: ClickhouseEcto, loggers: [Ecto.LogEntry, Sanbase.Prometheus.EctoInstrumenter], diff --git a/lib/sanbase/application/application.ex b/lib/sanbase/application/application.ex index d7de8cc28c..1f7b21441c 100644 --- a/lib/sanbase/application/application.ex +++ b/lib/sanbase/application/application.ex @@ -299,6 +299,13 @@ defmodule Sanbase.Application do fn -> Sanbase.ClickhouseRepo.enabled?() end ), + # Start the main ChRepo. This is started in all + # pods as each pod will need it. + start_in_and_if( + fn -> Sanbase.ChRepo end, + [:dev, :prod], + fn -> Sanbase.ChRepo.enabled?() end + ), # Start the main clickhouse read-only repos clickhouse_readonly_children, diff --git a/lib/sanbase/balances/balance.ex b/lib/sanbase/balances/balance.ex index 8b4b2ed09d..1ccaf1d4e0 100644 --- a/lib/sanbase/balances/balance.ex +++ b/lib/sanbase/balances/balance.ex @@ -11,7 +11,7 @@ defmodule Sanbase.Balance do maybe_fill_gaps_last_seen_balance_ohlc: 1 ] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo alias Sanbase.Project @type slug :: String.t() @@ -202,7 +202,7 @@ defmodule Sanbase.Balance do Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} -> query = assets_held_by_address_query(address, table, opts) - case ClickhouseRepo.query_transform(query, transform_fn) do + case ChRepo.query_transform(query, transform_fn) do {:ok, data} -> {:cont, {:ok, acc ++ data}} {:error, error} -> {:halt, {:error, error}} end @@ -240,7 +240,7 @@ defmodule Sanbase.Balance do Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} -> query_struct = usd_value_address_change_query(address, datetime, table) - case ClickhouseRepo.query_transform(query_struct, transform_fn) do + case ChRepo.query_transform(query_struct, transform_fn) do {:ok, data} -> {:cont, {:ok, acc ++ data}} {:error, error} -> {:halt, {:error, error}} end @@ -265,7 +265,7 @@ defmodule Sanbase.Balance do Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} -> query_struct = usd_value_held_by_address_query(address, table) - case ClickhouseRepo.query_transform(query_struct, transform_fn) do + case ChRepo.query_transform(query_struct, transform_fn) do {:ok, data} -> {:cont, {:ok, acc ++ data}} {:error, error} -> {:halt, {:error, error}} end @@ -289,7 +289,7 @@ defmodule Sanbase.Balance do {:ok, table} <- balances_table(slug, infr) do query_struct = addresses_by_filter_query(slug, decimals, operator, threshold, table, opts) - ClickhouseRepo.query_transform(query_struct, fn [address, balance] -> + ChRepo.query_transform(query_struct, fn [address, balance] -> %{ address: address, balance: balance @@ -309,7 +309,7 @@ defmodule Sanbase.Balance do address = Sanbase.BlockchainAddress.to_internal_format(address) query_struct = first_datetime_query(address, slug, blockchain) - ClickhouseRepo.query_transform(query_struct, fn [unix] -> + ChRepo.query_transform(query_struct, fn [unix] -> DateTime.from_unix!(unix) end) |> maybe_unwrap_ok_value() @@ -343,7 +343,7 @@ defmodule Sanbase.Balance do def current_balance_top_addresses(slug, decimals, infrastructure, blockchain, table, opts) do query_struct = top_addresses_query(slug, decimals, blockchain, table, opts) - ClickhouseRepo.query_transform(query_struct, fn [address, balance] -> + ChRepo.query_transform(query_struct, fn [address, balance] -> %{ address: address, infrastructure: infrastructure, @@ -374,7 +374,7 @@ defmodule Sanbase.Balance do defp do_current_balance(addresses, slug, decimals, blockchain, table) do query_struct = current_balance_query(addresses, slug, decimals, blockchain, table) - ClickhouseRepo.query_transform(query_struct, fn [address, balance] -> + ChRepo.query_transform(query_struct, fn [address, balance] -> %{ address: address, balance: balance @@ -385,7 +385,7 @@ defmodule Sanbase.Balance do defp do_balance_change(addresses, slug, decimals, blockchain, from, to) do query_struct = balance_change_query(addresses, slug, decimals, blockchain, from, to) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [address, balance_start, balance_end, balance_change] -> %{ address: address, @@ -411,7 +411,7 @@ defmodule Sanbase.Balance do query_struct = last_balance_before_query(addresses, slug, decimals, blockchain, datetime) - case ClickhouseRepo.query_transform(query_struct, & &1) do + case ChRepo.query_transform(query_struct, & &1) do {:ok, list} -> # If an address does not own the given coin/token, it will be missing from the # result. Iterate it like this in order to fill the missing values with 0 @@ -437,7 +437,7 @@ defmodule Sanbase.Balance do query_struct = historical_balance_query(address, slug, decimals, blockchain, from, to, interval) - ClickhouseRepo.query_transform(query_struct, fn [unix, value, has_changed] -> + ChRepo.query_transform(query_struct, fn [unix, value, has_changed] -> %{ datetime: DateTime.from_unix!(unix), balance: value, @@ -469,7 +469,7 @@ defmodule Sanbase.Balance do query_struct = historical_balance_ohlc_query(address, slug, decimals, blockchain, from, to, interval) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [unix, open, high, low, close, has_changed] -> %{ diff --git a/lib/sanbase/balances/balance_sql_query.ex b/lib/sanbase/balances/balance_sql_query.ex index b54e4da51d..201fca3983 100644 --- a/lib/sanbase/balances/balance_sql_query.ex +++ b/lib/sanbase/balances/balance_sql_query.ex @@ -48,7 +48,7 @@ defmodule Sanbase.Balance.SqlQuery do balance, txID, computedAt - FROM {{table}} + FROM \{\{table:inline\}\} WHERE address IN (addresses_of_interest) AND dt >= toStartOfYear(from) AND dt <= to ), merged AS @@ -91,7 +91,7 @@ defmodule Sanbase.Balance.SqlQuery do argMaxIf(balance, (dt, txID, computedAt), dt <= {{from}}) / {{decimals}} AS start_balance, argMaxIf(balance, (dt, txID, computedAt), dt <= {{to}}) / {{decimals}} AS end_balance, end_balance - start_balance AS diff - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{maybe_selector_clause(blockchain, slug, "slug")} #{address_clause(addresses, argument_name: "addresses")} AND @@ -142,7 +142,7 @@ defmodule Sanbase.Balance.SqlQuery do toUnixTimestamp(intDiv(toUInt32(dt), {{interval}}) * {{interval}}) AS time, address, argMax(balance, (dt, txID, computedAt)) / {{decimals}} AS balance - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{maybe_selector_clause(blockchain, slug, "slug")} #{address_clause(addresses, argument_name: "addresses")} AND @@ -218,7 +218,7 @@ defmodule Sanbase.Balance.SqlQuery do max(balance) / {{decimals}} AS high, argMax(balance, (dt, txID, computedAt)) / {{decimals}} AS close, min(balance) / {{decimals}} AS low - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{maybe_selector_clause(blockchain, slug, "slug")} #{address_clause(addresses, argument_name: "addresses")} AND @@ -253,7 +253,7 @@ defmodule Sanbase.Balance.SqlQuery do SELECT address, argMax(balance, (dt, txID, computedAt)) / {{decimals}} - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{maybe_selector_clause(blockchain, slug, "slug")} #{address_clause(addresses, argument_name: "addresses")} @@ -273,7 +273,7 @@ defmodule Sanbase.Balance.SqlQuery do def first_datetime_query(address, slug, blockchain) when is_binary(address) do sql = """ SELECT toUnixTimestamp(min(dt)) - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{maybe_selector_clause(blockchain, slug, "slug")} #{address_clause(address, argument_name: "address")} @@ -457,7 +457,7 @@ defmodule Sanbase.Balance.SqlQuery do balance, txID, computedAt - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{address_clause(address, argument_name: "address")} ) @@ -498,7 +498,7 @@ defmodule Sanbase.Balance.SqlQuery do 0 ) AS current_balance, current_balance - previous_balance AS balance_change - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{address_clause(address, argument_name: "address")} #{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"} @@ -530,7 +530,7 @@ defmodule Sanbase.Balance.SqlQuery do balance, txID, computedAt - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{address_clause(address, argument_name: "address")} ) @@ -560,7 +560,7 @@ defmodule Sanbase.Balance.SqlQuery do SELECT {{slug}} AS name, argMax(balance, (dt, txID, computedAt)) / {{decimals}} AS balance - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{address_clause(address, argument_name: "address")} #{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"} @@ -646,7 +646,7 @@ defmodule Sanbase.Balance.SqlQuery do SELECT address, argMaxIf(balance, (dt, txID, computedAt), dt <= {{datetime}}) / {{decimals}} - FROM {{table}} + FROM \{\{table:inline\}\} WHERE #{maybe_selector_clause(blockchain, slug, "slug")} #{address_clause(addresses, argument_name: "addresses")} AND diff --git a/lib/sanbase/billing/subscription/san_burn_credit_trx.ex b/lib/sanbase/billing/subscription/san_burn_credit_trx.ex index 64e8f299e4..7bf855970e 100644 --- a/lib/sanbase/billing/subscription/san_burn_credit_trx.ex +++ b/lib/sanbase/billing/subscription/san_burn_credit_trx.ex @@ -5,7 +5,7 @@ defmodule Sanbase.Billing.Subscription.SanBurnCreditTransaction do import Ecto.Changeset - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo alias Sanbase.Accounts.EthAccount alias Sanbase.Accounts.User alias Sanbase.Repo @@ -72,7 +72,7 @@ defmodule Sanbase.Billing.Subscription.SanBurnCreditTransaction do def fetch_burn_trxs do query_struct = fetch_san_burns_query() - ClickhouseRepo.query_transform(query_struct, fn [timestamp, address, value, trx_id] -> + ChRepo.query_transform(query_struct, fn [timestamp, address, value, trx_id] -> %{ trx_datetime: DateTime.from_unix!(timestamp), address: address, diff --git a/lib/sanbase/blockchain_address/label_change/blockchain_adderss_label_change.ex b/lib/sanbase/blockchain_address/label_change/blockchain_adderss_label_change.ex index b51aa6f65f..a38314436c 100644 --- a/lib/sanbase/blockchain_address/label_change/blockchain_adderss_label_change.ex +++ b/lib/sanbase/blockchain_address/label_change/blockchain_adderss_label_change.ex @@ -1,11 +1,11 @@ defmodule Sanbase.BlockchainAddress.BlockchainAddressLabelChange do import __MODULE__.SqlQuery - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo def labels_list() do query_struct = labels_list_query() - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [label_fqn, display_name] -> origin = String.split(label_fqn, "/") |> List.first() %{name: label_fqn, human_readable_name: display_name, origin: origin} @@ -18,7 +18,7 @@ defmodule Sanbase.BlockchainAddress.BlockchainAddressLabelChange do query_struct = label_changes_query(address, blockchain, from, to) - ClickhouseRepo.query_transform(query_struct, fn [unix, address, label_fqn, sign] -> + ChRepo.query_transform(query_struct, fn [unix, address, label_fqn, sign] -> %{ datetime: DateTime.from_unix!(unix), address: address, diff --git a/lib/sanbase/ch_repo.ex b/lib/sanbase/ch_repo.ex new file mode 100644 index 0000000000..898a6724dc --- /dev/null +++ b/lib/sanbase/ch_repo.ex @@ -0,0 +1,342 @@ +defmodule Sanbase.ChRepo do + @moduledoc ~s""" + Module for interacting with the Clickhouse database. + + In case a read-only user is needed (as when the query to be executed + is provided by an external user), dynamically switch the pool of + connections with the one of a user with RO permissions: + `Sanbase.ClickhouseRepo.put_dynamic_repo(Sanbase.ClickhouseRepo.ReadOnly)` + """ + + env = Application.compile_env(:sanbase, :env) + @adapter if env == :test, do: Ecto.Adapters.Postgres, else: Ecto.Adapters.ClickHouse + + use Ecto.Repo, otp_app: :sanbase, adapter: @adapter + + alias Sanbase.Utils.Config + require Logger + + def enabled?() do + case Config.module_get(Sanbase.ClickhouseRepo, :clickhouse_repo_enabled?) do + true -> true + false -> false + nil -> System.get_env("CLICKHOUSE_REPO_ENABLED", "true") |> String.to_existing_atom() + end + end + + @doc """ + Dynamically loads the repository url from the + CLICKHOUSE_DATABASE_URL environment variable. + """ + def init(_, opts) do + pool_size = Config.module_get(__MODULE__, :pool_size) |> Sanbase.Math.to_integer() + + opts = + opts + |> Keyword.put(:pool_size, pool_size) + |> Keyword.put(:url, System.get_env("CLICKHOUSE_DATABASE_URL")) + + {:ok, opts} + end + + @doc ~s""" + Execute a query and apply `transform_fn/1` on each row of the result. + """ + @spec query_transform(Sanbase.Clickhouse.Query.t(), (list() -> any())) :: + {:ok, any()} | {:error, String.t()} + @spec query_transform(String.t(), list(), (list() -> any())) :: + {:ok, any()} | {:error, String.t()} + def query_transform(%Sanbase.Clickhouse.Query{} = query, transform_fn) do + query = Sanbase.Clickhouse.Query.put_format(query, nil) + + with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args_v2(query) do + query_transform(sql, args, transform_fn) + end + end + + def query_transform(query, args, transform_fn) do + IO.inspect("EXECUTING.....") + + try do + case execute_query_transform(query, args) do + {:ok, result} -> + IO.inspect(result, label: "__________________________________") + {:ok, Enum.map(result.rows, transform_fn)} + + {:error, error} -> + {:error, error} + end + rescue + e -> + IO.inspect({"================", e, Exception.message(e)}) + log_and_return_error_from_exception(e, "query_transform/3", __STACKTRACE__) + end + |> dbg() + end + + @doc ~s""" + Execute a query and apply the transform_fn on every row the result. + Return a map with the transformed rows alongside some metadata - + the query id, column names and a short summary of the used resources + """ + @spec query_transform_with_metadata(Sanbase.Clickhouse.Query.t(), (list() -> list())) :: + {:ok, Map.t()} | {:error, String.t()} + @spec query_transform_with_metadata(String.t(), list(), (list() -> list())) :: + {:ok, Map.t()} | {:error, String.t()} + def query_transform_with_metadata(%Sanbase.Clickhouse.Query{} = query, transform_fn) do + with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do + query_transform_with_metadata(sql, args, transform_fn) + end + end + + def query_transform_with_metadata(query, args, transform_fn) do + case execute_query_transform(query, args, propagate_error: true) do + {:ok, result} -> + {:ok, + %{ + rows: Enum.map(result.rows, transform_fn), + column_names: result.columns, + column_types: result.column_types, + query_id: result.query_id, + summary: result.summary + }} + + {:error, error} -> + {:error, error} + end + rescue + e -> + log_and_return_error_from_exception(e, "query_transform_with_metadata/3", __STACKTRACE__, + propagate_error: true + ) + end + + @doc ~s""" + Execute a query and reduce all the rows, starting with `init` as initial accumulator + and using `reduce` for every row + """ + @spec query_reduce(Sanbase.Clickhouse.Query.t(), acc, (list(), acc -> acc)) :: + {:ok, Map.t()} | {:error, String.t()} + when acc: any + @spec query_reduce(String.t(), list(), acc, (list(), acc -> acc)) :: + {:ok, Map.t()} | {:error, String.t()} + when acc: any + def query_reduce(%Sanbase.Clickhouse.Query{} = query, init, reducer) do + with {:ok, %{sql: sql, args: args}} <- Sanbase.Clickhouse.Query.get_sql_args(query) do + query_reduce(sql, args, init, reducer) + end + end + + def query_reduce(query, args, init, reducer) do + # ordered_params = order_params(query, args) + # sanitized_query = sanitize_query(query) + + # maybe_store_executed_clickhouse_sql(query, args) + # maybe_print_interpolated_query(query, args) + + case __MODULE__.query(query, args) do + {:ok, result} -> + {:ok, Enum.reduce(result.rows, init, reducer)} + + {:error, error} -> + log_and_return_error(error, "query_reduce/4") + end + rescue + e -> + log_and_return_error_from_exception(e, "query_reduce/4", __STACKTRACE__) + end + + defp execute_query_transform(query, args, opts \\ []) do + # ordered_params = order_params(query, args) + # sanitized_query = sanitize_query(query) + # + # maybe_store_executed_clickhouse_sql(sanitized_query, ordered_params) + # maybe_print_interpolated_query(sanitized_query, ordered_params) + + case __MODULE__.query(query, args) do + {:ok, result} -> + {:ok, result} + + {:error, error} -> + log_and_return_error(error, "query_transform/3", opts) + end + end + + @masked_error_message "Cannot execute database query. If issue persists please contact Santiment Support." + defp log_and_return_error_from_exception( + %{} = exception, + function_executed, + stacktrace, + opts \\ [] + ) do + propagate_error = Keyword.get(opts, :propagate_error, false) + + log_id = UUID.uuid4() + error_message = extract_error_from_stacktrace(stacktrace) || Exception.message(exception) + + Logger.warning(""" + [#{log_id}] Cannot execute ClickHouse #{function_executed}. Reason: #{error_message} + + Stacktrace: + #{Exception.format_stacktrace()} + """) + + {:error, "[#{log_id}] #{if propagate_error, do: error_message, else: @masked_error_message}"} + end + + defp log_and_return_error(error, function_executed, opts \\ []) do + propagate_error = Keyword.get(opts, :propagate_error, false) + log_id = UUID.uuid4() + + error_message = extract_error_from_error(error) + + Logger.warning( + "[#{log_id}] Cannot execute ClickHouse #{function_executed}. Reason: #{error_message}" + ) + + {:error, "[#{log_id}] #{if propagate_error, do: error_message, else: @masked_error_message}"} + end + + @doc ~s""" + Replace positional params denoted as `?1`, `?2`, etc. with just `?` as they + are not supported by ClickHouse. A complex regex is used as such character + sequences can apear inside strings in which case they should not be removed. + """ + def sanitize_query(query) do + query + |> IO.iodata_to_binary() + |> String.replace(~r/(\?([0-9]+))(?=(?:[^\\"']|[\\"'][^\\"']*[\\"'])*$)/, "?") + end + + @doc ~s""" + Add artificial support for positional parameters. Extract all occurences of `?1`, + `?2`, etc. in the query and reorder and duplicate the params so every param + in the list appears in order as if every positional param is just `?` + """ + def order_params(query, params) do + sanitised = + Regex.replace(~r/(([^\\]|^))["'].*?[^\\]['"]/, IO.iodata_to_binary(query), "\\g{1}") + + ordering = + Regex.scan(~r/\?([0-9]+)/, sanitised) + |> Enum.map(fn [_, x] -> String.to_integer(x) end) + + ordering_count = Enum.max_by(ordering, fn x -> x end, fn -> 0 end) + + if ordering_count != length(params) do + raise "\nError: number of params received (#{length(params)}) does not match expected (#{ordering_count})" + end + + ordered_params = + ordering + |> Enum.reduce([], fn ix, acc -> [Enum.at(params, ix - 1) | acc] end) + |> Enum.reverse() + + case ordered_params do + [] -> params + _ -> ordered_params + end + end + + defp extract_error_from_stacktrace(stacktrace) do + line_with_exception = + Enum.find_value(stacktrace, fn + {_, _, [<<_::binary>> = line | _], _} -> + if String.contains?(line, "DB::Exception"), do: line + + _ -> + nil + end) + + case line_with_exception do + nil -> nil + line -> transform_error_string(line) + end + end + + # %Clickhousex.Error{} is causing some errors + defp extract_error_from_error(%_{message: message}) do + transform_error_string(message) + end + + defp extract_error_from_error(error), do: error + + defp transform_error_string(error_str) do + case String.split(error_str, "DB::Exception: ") do + [error_str] -> + error_str + + [_ | _] = split_error -> + error = List.last(split_error) + + [error_msg, error_code, _version_str] = + Regex.split(~r|\([A-Z_]+\)|, error, include_captures: true, trim: true) + + error_msg = + case String.split(error_msg, "SETTINGS log_comment", parts: 2) do + [_] -> + error_msg + + [stripped_error_msg, _] -> + # Exclude the SETTINGS fragment from the error response + # so it is not shown in the result. The SETTINGS fragment + # are appended by the preprocessing done in the backend + # and not by the user, who will see the error + stripped_error_msg + end + + "#{error_code} #{error_msg}" |> String.trim() + end + end + + # If the `__store_executed_clickhouse_sql__` flag is set to true + # from the MetricResolver module, store the executed SQL query + # after interpolating the parameters in it. + defp maybe_store_executed_clickhouse_sql(query, params) do + if Process.get(:__store_executed_clickhouse_sql__, false) do + list = Process.get(:__executed_clickhouse_sql_list__, []) + + # Interpolate the parameters inside the query so it is easy to copy-paste + interpolated_query = get_interpolated_query(query, params) + Process.put(:__executed_clickhouse_sql_list__, [interpolated_query | list]) + + :ok + end + rescue + _ -> :ok + end + + case Mix.env() do + :dev -> + defp maybe_print_interpolated_query(query, params) do + # In dev env, if the PRINT_CLICKHOUSE_SQL env var is set to true/1 + # the interpolated query is printed to the console. + # This makes it much easier to copy/paste the query and share it + # with other people, or directly run it for debugging purposes + if System.get_env("PRINT_INTERPOLATED_CLICKHOUSE_SQL") in ["true", "1"] do + IO.puts( + IO.ANSI.format([ + :light_blue, + "---\n", + get_interpolated_query(query, params), + "\n---" + ]) + ) + end + end + + _ -> + defp maybe_print_interpolated_query(_query, _params), do: :ok + end + + defp get_interpolated_query(query, []), do: query + + defp get_interpolated_query(query, params) do + Clickhousex.Codec.Values.encode( + %Clickhousex.Query{param_count: length(params)}, + query, + params + ) + |> to_string() + end +end diff --git a/lib/sanbase/clickhouse/api_call_data.ex b/lib/sanbase/clickhouse/api_call_data.ex index a784e68e75..45c2fafbf3 100644 --- a/lib/sanbase/clickhouse/api_call_data.ex +++ b/lib/sanbase/clickhouse/api_call_data.ex @@ -9,7 +9,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do import Sanbase.Utils.Transform, only: [maybe_unwrap_ok_value: 1, maybe_apply_function: 2, maybe_sort: 3] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @doc ~s""" Get a timeseries with the total number of api calls made by a user in a given interval @@ -21,7 +21,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do def api_call_history(user_id, from, to, interval, auth_method \\ :apikey) do query_struct = api_call_history_query(user_id, from, to, interval, auth_method) - ClickhouseRepo.query_transform(query_struct, fn [t, count] -> + ChRepo.query_transform(query_struct, fn [t, count] -> %{ datetime: DateTime.from_unix!(t), api_calls_count: count @@ -38,7 +38,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do def api_call_count(user_id, from, to, auth_method \\ :apikey) do query_struct = api_call_count_query(user_id, from, to, auth_method) - ClickhouseRepo.query_transform(query_struct, fn [count] -> count end) + ChRepo.query_transform(query_struct, fn [count] -> count end) |> maybe_unwrap_ok_value() end @@ -47,7 +47,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do def active_users_count(from, to) do query_struct = active_users_count_query(from, to) - ClickhouseRepo.query_transform(query_struct, fn [value] -> value end) + ChRepo.query_transform(query_struct, fn [value] -> value end) |> maybe_unwrap_ok_value() end @@ -57,7 +57,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do until = Keyword.get(opts, :until, Timex.now()) query_struct = users_used_api_query(until) - ClickhouseRepo.query_transform(query_struct, fn [value] -> value end) + ChRepo.query_transform(query_struct, fn [value] -> value end) end @spec users_used_sansheets(Keyword.t()) :: @@ -66,7 +66,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do until = Keyword.get(opts, :until, Timex.now()) query_struct = users_used_sansheets_query(until) - ClickhouseRepo.query_transform(query_struct, fn [value] -> value end) + ChRepo.query_transform(query_struct, fn [value] -> value end) end @spec api_calls_count_per_user(Keyword.t()) :: @@ -75,7 +75,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do until = Keyword.get(opts, :until, Timex.now()) query_struct = api_calls_count_per_user_query(until) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [user_id, count], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [user_id, count], acc -> Map.put(acc, user_id, count) end) end @@ -85,7 +85,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do def api_metric_distribution() do query_struct = api_metric_distribution_query() - ClickhouseRepo.query_transform(query_struct, fn [metric, count] -> + ChRepo.query_transform(query_struct, fn [metric, count] -> %{metric: metric, count: count} end) |> maybe_unwrap_ok_value() @@ -96,7 +96,7 @@ defmodule Sanbase.Clickhouse.ApiCallData do def api_metric_distribution_per_user() do query_struct = api_metric_distribution_per_user_query() - ClickhouseRepo.query_reduce(query_struct, %{}, fn [user_id, metric, count], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [user_id, metric, count], acc -> Map.update(acc, user_id, %{}, fn map -> update_api_distribution_user_map(map, metric, count) end) diff --git a/lib/sanbase/clickhouse/exchange_address.ex b/lib/sanbase/clickhouse/exchange_address.ex index 17cd96205d..1e4072aeec 100644 --- a/lib/sanbase/clickhouse/exchange_address.ex +++ b/lib/sanbase/clickhouse/exchange_address.ex @@ -9,7 +9,7 @@ defmodule Sanbase.Clickhouse.ExchangeAddress do def exchange_names(blockchain, is_dex) when blockchain in @supported_blockchains do query_struct = exchange_names_query(blockchain, is_dex) - Sanbase.ClickhouseRepo.query_reduce(query_struct, [], fn [owner], acc -> + Sanbase.ChRepo.query_reduce(query_struct, [], fn [owner], acc -> case is_binary(owner) and owner != "" do true -> [owner | acc] false -> acc @@ -25,7 +25,7 @@ defmodule Sanbase.Clickhouse.ExchangeAddress do def exchange_addresses(blockchain, limit) when blockchain in @supported_blockchains do query_struct = exchange_addresses_query(blockchain, limit) - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [address, label, owner] -> + Sanbase.ChRepo.query_transform(query_struct, fn [address, label, owner] -> %{ address: address, is_dex: if(label == "decentralized_exchange", do: true, else: false), @@ -42,7 +42,7 @@ defmodule Sanbase.Clickhouse.ExchangeAddress do when blockchain in @supported_blockchains do query_struct = exchange_addresses_for_exchange_query(blockchain, owner, limit) - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [address] -> address end) + Sanbase.ChRepo.query_transform(query_struct, fn [address] -> address end) end def exchange_addresses_for_exchange(blockchain, _owner, _limit), diff --git a/lib/sanbase/clickhouse/exchanges/exchanges.ex b/lib/sanbase/clickhouse/exchanges/exchanges.ex index dd62956986..feaf65d975 100644 --- a/lib/sanbase/clickhouse/exchanges/exchanges.ex +++ b/lib/sanbase/clickhouse/exchanges/exchanges.ex @@ -8,7 +8,7 @@ defmodule Sanbase.Clickhouse.Exchanges do def top_exchanges_by_balance(%{slug: slug}, limit, _opts \\ []) when is_binary(slug) do query_struct = top_exchanges_by_balance_query(slug, limit) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [owner, label, balance, change_1d, change_7d, change_30d, first_seen_ts] -> first_seen_dt = if first_seen_ts, do: DateTime.from_unix!(first_seen_ts) @@ -35,7 +35,7 @@ defmodule Sanbase.Clickhouse.Exchanges do true -> query_struct = owners_by_slug_and_metric_query(metric, slug) - ClickhouseRepo.query_transform(query_struct, fn [owner] -> owner end) + ChRepo.query_transform(query_struct, fn [owner] -> owner end) false -> {:error, "The provided metric #{metric} is not a label-based metric"} diff --git a/lib/sanbase/clickhouse/fees/fees.ex b/lib/sanbase/clickhouse/fees/fees.ex index bbcb917ec3..59f1bd4fd9 100644 --- a/lib/sanbase/clickhouse/fees/fees.ex +++ b/lib/sanbase/clickhouse/fees/fees.ex @@ -2,12 +2,12 @@ defmodule Sanbase.Clickhouse.Fees do import Sanbase.Utils.Transform, only: [maybe_apply_function: 2] alias Sanbase.Project - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo def eth_fees_distribution(from, to, limit) do query_struct = eth_fees_distribution_query(from, to, limit) - ClickhouseRepo.query_transform(query_struct, & &1) + ChRepo.query_transform(query_struct, & &1) |> maybe_apply_function(&value_fees_list_to_result/1) end diff --git a/lib/sanbase/clickhouse/founders/founders.ex b/lib/sanbase/clickhouse/founders/founders.ex index fcad533f59..a14b95c803 100644 --- a/lib/sanbase/clickhouse/founders/founders.ex +++ b/lib/sanbase/clickhouse/founders/founders.ex @@ -3,7 +3,7 @@ defmodule Sanbase.Clickhouse.Founders do slugs = List.wrap(slug_or_slugs) query = get_founders_query(slugs) - Sanbase.ClickhouseRepo.query_transform(query, fn [name, project_slug] -> + Sanbase.ChRepo.query_transform(query, fn [name, project_slug] -> %{name: name, slug: project_slug} end) end diff --git a/lib/sanbase/clickhouse/github/github.ex b/lib/sanbase/clickhouse/github/github.ex index 15b7217c98..fd87123643 100644 --- a/lib/sanbase/clickhouse/github/github.ex +++ b/lib/sanbase/clickhouse/github/github.ex @@ -18,7 +18,7 @@ defmodule Sanbase.Clickhouse.Github do import Sanbase.Utils.Transform, only: [maybe_unwrap_ok_value: 1, maybe_apply_function: 2] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo alias Sanbase.Math require Logger @@ -50,11 +50,11 @@ defmodule Sanbase.Clickhouse.Github do def total_github_activity(organizations, from, to) do query_struct = total_github_activity_query(organizations, from, to) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [ - organization, - github_activity - ], - acc -> + ChRepo.query_reduce(query_struct, %{}, fn [ + organization, + github_activity + ], + acc -> Map.put(acc, organization, github_activity |> Math.to_integer(0)) end) end @@ -87,11 +87,11 @@ defmodule Sanbase.Clickhouse.Github do def total_dev_activity(organizations, from, to) do query_struct = total_dev_activity_query(organizations, from, to) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [ - organization, - dev_activity - ], - acc -> + ChRepo.query_reduce(query_struct, %{}, fn [ + organization, + dev_activity + ], + acc -> Map.put(acc, organization, dev_activity |> Math.to_integer(0)) end) end @@ -128,11 +128,11 @@ defmodule Sanbase.Clickhouse.Github do def total_dev_activity_contributors_count(organizations, from, to) do query_struct = total_dev_activity_contributors_count_query(organizations, from, to) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [ - organization, - dev_activity - ], - acc -> + ChRepo.query_reduce(query_struct, %{}, fn [ + organization, + dev_activity + ], + acc -> Map.put(acc, organization, dev_activity |> Math.to_integer(0)) end) end @@ -168,11 +168,11 @@ defmodule Sanbase.Clickhouse.Github do def total_github_activity_contributors_count(organizations, from, to) do query_struct = total_github_activity_contributors_count_query(organizations, from, to) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [ - organization, - dev_activity - ], - acc -> + ChRepo.query_reduce(query_struct, %{}, fn [ + organization, + dev_activity + ], + acc -> Map.put(acc, organization, dev_activity |> Math.to_integer(0)) end) end @@ -261,7 +261,7 @@ defmodule Sanbase.Clickhouse.Github do def first_datetime(organization_or_organizations) do query_struct = first_datetime_query(organization_or_organizations) - ClickhouseRepo.query_transform(query_struct, fn [timestamp] -> + ChRepo.query_transform(query_struct, fn [timestamp] -> timestamp |> DateTime.from_unix!() end) |> maybe_unwrap_ok_value() @@ -270,7 +270,7 @@ defmodule Sanbase.Clickhouse.Github do def last_datetime_computed_at(organization_or_organizations) do query_struct = last_datetime_computed_at_query(organization_or_organizations) - ClickhouseRepo.query_transform(query_struct, fn [datetime] -> + ChRepo.query_transform(query_struct, fn [datetime] -> datetime |> DateTime.from_unix!() end) |> maybe_unwrap_ok_value() @@ -352,7 +352,7 @@ defmodule Sanbase.Clickhouse.Github do defp do_dev_activity_contributors_count(organizations, from, to, interval) do query_struct = dev_activity_contributors_count_query(organizations, from, to, interval) - ClickhouseRepo.query_transform(query_struct, fn [datetime, contributors] -> + ChRepo.query_transform(query_struct, fn [datetime, contributors] -> %{ datetime: datetime |> DateTime.from_unix!(), contributors_count: contributors |> Math.to_integer(0) @@ -369,7 +369,7 @@ defmodule Sanbase.Clickhouse.Github do interval ) - ClickhouseRepo.query_transform(query_struct, fn [datetime, contributors] -> + ChRepo.query_transform(query_struct, fn [datetime, contributors] -> %{ datetime: datetime |> DateTime.from_unix!(), contributors_count: contributors |> Math.to_integer(0) @@ -378,7 +378,7 @@ defmodule Sanbase.Clickhouse.Github do end defp datetime_activity_execute(query_struct) do - ClickhouseRepo.query_transform(query_struct, fn [datetime, value] -> + ChRepo.query_transform(query_struct, fn [datetime, value] -> %{ datetime: datetime |> DateTime.from_unix!(), activity: value |> Math.to_integer(0) diff --git a/lib/sanbase/clickhouse/historical_balance/xrp_balance.ex b/lib/sanbase/clickhouse/historical_balance/xrp_balance.ex index 1a9f37d9e1..3bde517f0c 100644 --- a/lib/sanbase/clickhouse/historical_balance/xrp_balance.ex +++ b/lib/sanbase/clickhouse/historical_balance/xrp_balance.ex @@ -8,7 +8,7 @@ defmodule Sanbase.Clickhouse.HistoricalBalance.XrpBalance do import Sanbase.Clickhouse.HistoricalBalance.Utils import Sanbase.Metric.SqlQuery.Helper, only: [timerange_parameters: 3] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @table "xrp_balances" @@ -16,7 +16,7 @@ defmodule Sanbase.Clickhouse.HistoricalBalance.XrpBalance do def assets_held_by_address(address) do query_struct = current_balances_query([address], "XRP") - ClickhouseRepo.query_transform(query_struct, fn [^address, value] -> + ChRepo.query_transform(query_struct, fn [^address, value] -> %{ slug: "xrp", balance: value @@ -28,7 +28,7 @@ defmodule Sanbase.Clickhouse.HistoricalBalance.XrpBalance do def current_balance(addresses, _currency, _decimals) do query_struct = current_balances_query(addresses, "XRP") - ClickhouseRepo.query_transform(query_struct, fn [address, value] -> + ChRepo.query_transform(query_struct, fn [address, value] -> %{ address: address, balance: value @@ -66,7 +66,7 @@ defmodule Sanbase.Clickhouse.HistoricalBalance.XrpBalance do when is_binary(address) do query_struct = historical_balance_query(address, currency, issuer, from, to, interval) - ClickhouseRepo.query_transform(query_struct, fn [dt, balance, has_changed] -> + ChRepo.query_transform(query_struct, fn [dt, balance, has_changed] -> %{ datetime: DateTime.from_unix!(dt), balance: balance, @@ -93,7 +93,7 @@ defmodule Sanbase.Clickhouse.HistoricalBalance.XrpBalance do when is_binary(address_or_addresses) or is_list(address_or_addresses) do query_struct = balance_change_query(address_or_addresses, currency, issuer, from, to) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [address, balance_start, balance_end, balance_change] -> %{ address: address, @@ -127,7 +127,7 @@ defmodule Sanbase.Clickhouse.HistoricalBalance.XrpBalance do interval ) - ClickhouseRepo.query_transform(query_struct, fn [dt, change] -> + ChRepo.query_transform(query_struct, fn [dt, change] -> %{ datetime: DateTime.from_unix!(dt), balance_change: change @@ -139,7 +139,7 @@ defmodule Sanbase.Clickhouse.HistoricalBalance.XrpBalance do def last_balance_before(address, %{currency: currency, issuer: issuer}, _decimals, datetime) do query_struct = last_balance_before_query(address, currency, issuer, datetime) - case ClickhouseRepo.query_transform(query_struct, & &1) do + case ChRepo.query_transform(query_struct, & &1) do {:ok, [[balance]]} -> {:ok, balance} {:ok, []} -> {:ok, 0} {:error, error} -> {:error, error} diff --git a/lib/sanbase/clickhouse/label/label.ex b/lib/sanbase/clickhouse/label/label.ex index baf8e1575f..08bbe545c1 100644 --- a/lib/sanbase/clickhouse/label/label.ex +++ b/lib/sanbase/clickhouse/label/label.ex @@ -20,7 +20,7 @@ defmodule Sanbase.Clickhouse.Label do def label_fqns_with_asset(slug) do query_struct = label_fqns_with_asset_query(slug) - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [fqn, owner, version] -> + Sanbase.ChRepo.query_transform(query_struct, fn [fqn, owner, version] -> display_name = fqn |> String.trim_leading("#{owner}/") @@ -44,7 +44,7 @@ defmodule Sanbase.Clickhouse.Label do query_struct = addresses_by_label_fqns_query(label_fqns, blockchain) - Sanbase.ClickhouseRepo.query_reduce( + Sanbase.ChRepo.query_reduce( query_struct, %{}, fn [address, blockchain, label_fqn], acc -> @@ -65,7 +65,7 @@ defmodule Sanbase.Clickhouse.Label do query_struct = addresses_by_label_keys_query(label_keys, blockchain) - Sanbase.ClickhouseRepo.query_reduce( + Sanbase.ChRepo.query_reduce( query_struct, %{}, fn [address, blockchain, label_fqn], acc -> @@ -108,7 +108,7 @@ defmodule Sanbase.Clickhouse.Label do query_struct = addresses_labels_query(slug, blockchain, addresses) result = - Sanbase.ClickhouseRepo.query_reduce( + Sanbase.ChRepo.query_reduce( query_struct, %{}, fn [address, label, metadata], acc -> @@ -130,7 +130,7 @@ defmodule Sanbase.Clickhouse.Label do blockchain = slug_to_blockchain(slug) query_struct = addresses_labels_query(slug, blockchain, addresses) - Sanbase.ClickhouseRepo.query_reduce( + Sanbase.ChRepo.query_reduce( query_struct, %{}, fn [address, label, metadata], acc -> diff --git a/lib/sanbase/clickhouse/metadata_helper.ex b/lib/sanbase/clickhouse/metadata_helper.ex index c38a248035..c0cf1e3424 100644 --- a/lib/sanbase/clickhouse/metadata_helper.ex +++ b/lib/sanbase/clickhouse/metadata_helper.ex @@ -4,7 +4,7 @@ defmodule Sanbase.Clickhouse.MetadataHelper do anomalies metadata from ClickHouse """ - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo alias Sanbase.Clickhouse.MetricAdapter.Registry import Sanbase.Utils.Transform, only: [maybe_apply_function: 2] @@ -18,7 +18,7 @@ defmodule Sanbase.Clickhouse.MetadataHelper do query_struct = Sanbase.Clickhouse.Query.new("SELECT toUInt32(asset_id), name FROM asset_metadata", %{}) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [asset_id, slug], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [asset_id, slug], acc -> Map.put(acc, slug, asset_id) end) end) @@ -54,7 +54,7 @@ defmodule Sanbase.Clickhouse.MetadataHelper do query_struct = Sanbase.Clickhouse.Query.new("SELECT toUInt32(metric_id), name FROM metric_metadata", %{}) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [metric_id, name], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [metric_id, name], acc -> names = Map.get(Registry.metric_to_names_map(), name, [name]) names diff --git a/lib/sanbase/clickhouse/metric/histogram_metric.ex b/lib/sanbase/clickhouse/metric/histogram_metric.ex index 5d2d574c85..5acdc31075 100644 --- a/lib/sanbase/clickhouse/metric/histogram_metric.ex +++ b/lib/sanbase/clickhouse/metric/histogram_metric.ex @@ -5,7 +5,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do import Sanbase.Metric.SqlQuery.Helper, only: [asset_id_filter: 2] alias Sanbase.Metric - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @spent_coins_cost_histograms [ "price_histogram", @@ -40,7 +40,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do def histogram_data("age_distribution" = metric, %{slug: slug}, from, to, interval, limit) do query_struct = histogram_data_query(metric, slug, from, to, interval, limit) - ClickhouseRepo.query_transform(query_struct, fn [unix, value] -> + ChRepo.query_transform(query_struct, fn [unix, value] -> range_from = unix |> DateTime.from_unix!() range_to = @@ -58,7 +58,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do when metric in @spent_coins_cost_histograms do query_struct = histogram_data_query(metric, slug, from, to, interval, limit) - ClickhouseRepo.query_transform(query_struct, fn [price, amount] -> + ChRepo.query_transform(query_struct, fn [price, amount] -> %{ price: Sanbase.Math.to_float(price), value: Sanbase.Math.to_float(amount) @@ -71,7 +71,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do when metric in @eth2_string_label_float_value_metrics do query_struct = histogram_data_query(metric, slug, from, to, interval, limit) - ClickhouseRepo.query_transform(query_struct, fn [label, amount] -> + ChRepo.query_transform(query_struct, fn [label, amount] -> %{ label: label, value: Sanbase.Math.to_float(amount) @@ -83,7 +83,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do when metric in @eth2_string_address_string_label_float_value_metrics do query_struct = histogram_data_query(metric, slug, from, to, interval, limit) - ClickhouseRepo.query_transform(query_struct, fn [address, label, amount] -> + ChRepo.query_transform(query_struct, fn [address, label, amount] -> %{ address: address, label: label, @@ -96,7 +96,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do when metric in @eth2_datetime_staking_pools_integer_valuation_list do query_struct = histogram_data_query(metric, slug, from, to, interval, limit) - ClickhouseRepo.query_transform(query_struct, fn [timestamp, value] -> + ChRepo.query_transform(query_struct, fn [timestamp, value] -> %{ datetime: DateTime.from_unix!(timestamp), value: @@ -119,7 +119,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do params = %{selector: selector} Sanbase.Clickhouse.Query.new(sql, params) - |> ClickhouseRepo.query_transform(fn [timestamp] -> + |> ChRepo.query_transform(fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() @@ -153,7 +153,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do } Sanbase.Clickhouse.Query.new(sql, params) - |> ClickhouseRepo.query_transform(fn [timestamp] -> DateTime.from_unix!(timestamp) end) + |> ChRepo.query_transform(fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() end @@ -176,7 +176,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.HistogramMetric do sql = "SELECT toUnixTimestamp(max(dt)) FROM eth2_staking_transfers_v2" query_struct = Sanbase.Clickhouse.Query.new(sql, %{}) - ClickhouseRepo.query_transform(query_struct, fn [timestamp] -> + ChRepo.query_transform(query_struct, fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() diff --git a/lib/sanbase/clickhouse/metric/metric_adapter.ex b/lib/sanbase/clickhouse/metric/metric_adapter.ex index c3f97600bc..16136c4ea7 100644 --- a/lib/sanbase/clickhouse/metric/metric_adapter.ex +++ b/lib/sanbase/clickhouse/metric/metric_adapter.ex @@ -16,7 +16,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do alias __MODULE__.TableMetric alias __MODULE__.Registry - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @default_complexity_weight 0.3 @@ -90,7 +90,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do filters = get_filters(metric, opts) timeseries_data_per_slug_query(metric, slug, from, to, interval, aggregation, filters, opts) - |> ClickhouseRepo.query_reduce( + |> ChRepo.query_reduce( %{}, fn [timestamp, slug, value], acc -> datetime = DateTime.from_unix!(timestamp) @@ -134,7 +134,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do filters = Keyword.get(opts, :additional_filters, []) slugs_by_filter_query(metric, from, to, operator, threshold, aggregation, filters) - |> ClickhouseRepo.query_transform(fn [slug, _value] -> slug end) + |> ChRepo.query_transform(fn [slug, _value] -> slug end) end @impl Sanbase.Metric.Behaviour @@ -145,7 +145,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do filters = Keyword.get(opts, :additional_filters, []) slugs_order_query(metric, from, to, direction, aggregation, filters) - |> ClickhouseRepo.query_transform(fn [slug, _value] -> slug end) + |> ChRepo.query_transform(fn [slug, _value] -> slug end) end @impl Sanbase.Metric.Behaviour @@ -200,7 +200,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do @impl Sanbase.Metric.Behaviour def available_metrics(selector) do available_metrics_for_selector_query(selector) - |> ClickhouseRepo.query_transform(fn [metric] -> + |> ChRepo.query_transform(fn [metric] -> Map.get(Registry.metric_to_names_map(), metric) end) |> maybe_apply_function(fn metrics -> @@ -219,12 +219,12 @@ defmodule Sanbase.Clickhouse.MetricAdapter do fixed_parameters = Map.get(Registry.fixed_parameters_map(), metric) query_struct = available_label_fqns_for_fixed_parameters_query(metric, fixed_parameters) - Sanbase.ClickhouseRepo.query_transform(query_struct, & &1) + Sanbase.ChRepo.query_transform(query_struct, & &1) |> maybe_apply_function(&List.flatten/1) Map.get(Registry.table_map(), metric) == "labeled_intraday_metrics_v2" -> query_struct = available_label_fqns_for_labeled_intraday_metrics_query(metric) - Sanbase.ClickhouseRepo.query_transform(query_struct, & &1) + Sanbase.ChRepo.query_transform(query_struct, & &1) true -> {:ok, []} @@ -240,12 +240,12 @@ defmodule Sanbase.Clickhouse.MetricAdapter do query_struct = available_label_fqns_for_fixed_parameters_query(metric, slug, fixed_parameters) - Sanbase.ClickhouseRepo.query_transform(query_struct, & &1) + Sanbase.ChRepo.query_transform(query_struct, & &1) |> maybe_apply_function(&List.flatten/1) Map.get(Registry.table_map(), metric) == "labeled_intraday_metrics_v2" -> query_struct = available_label_fqns_for_labeled_intraday_metrics_query(metric, slug) - Sanbase.ClickhouseRepo.query_transform(query_struct, & &1) + Sanbase.ChRepo.query_transform(query_struct, & &1) true -> {:ok, []} @@ -280,7 +280,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do true -> first_datetime_query(metric, selector) - |> ClickhouseRepo.query_transform(fn [datetime] -> DateTime.from_unix!(datetime) end) + |> ChRepo.query_transform(fn [datetime] -> DateTime.from_unix!(datetime) end) |> maybe_unwrap_ok_value() end end @@ -293,7 +293,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do true -> last_datetime_computed_at_query(metric, selector) - |> ClickhouseRepo.query_transform(fn [datetime] -> DateTime.from_unix!(datetime) end) + |> ChRepo.query_transform(fn [datetime] -> DateTime.from_unix!(datetime) end) |> maybe_unwrap_ok_value() end end @@ -304,12 +304,12 @@ defmodule Sanbase.Clickhouse.MetricAdapter do defp get_available_slugs() do available_slugs_query() - |> ClickhouseRepo.query_transform(fn [slug] -> slug end) + |> ChRepo.query_transform(fn [slug] -> slug end) end defp get_available_slugs(metric) do available_slugs_for_metric_query(metric) - |> ClickhouseRepo.query_transform(fn [slug] -> slug end) + |> ChRepo.query_transform(fn [slug] -> slug end) end defp get_aggregated_timeseries_data(metric, slugs, from, to, aggregation, filters, opts) @@ -335,7 +335,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter do query_struct = aggregated_timeseries_data_query(metric, slugs, from, to, aggregation, filters, opts) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [slug, value, has_changed], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [slug, value, has_changed], acc -> value = if has_changed == 1, do: value, else: nil Map.put(acc, slug, value) end) diff --git a/lib/sanbase/clickhouse/metric/sql_query/metric_sql_query.ex b/lib/sanbase/clickhouse/metric/sql_query/metric_sql_query.ex index b78886854c..7070bda97a 100644 --- a/lib/sanbase/clickhouse/metric/sql_query/metric_sql_query.ex +++ b/lib/sanbase/clickhouse/metric/sql_query/metric_sql_query.ex @@ -50,7 +50,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.SqlQuery do SELECT dt, argMax(value, computed_at) AS value - FROM {{table}} + FROM {{table:inline}} PREWHERE #{finalized_data_filter_str(table, only_finalized_data)} #{fixed_parameters_str} @@ -163,7 +163,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.SqlQuery do asset_id, dt, argMax(value, computed_at) AS value2 - FROM {{table}} + FROM \{\{table:inline\}\} PREWHERE #{finalized_data_filter_str(table, only_finalized_data)} #{additional_filters} @@ -228,7 +228,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.SqlQuery do SELECT dt, asset_id, argMax(value, computed_at) AS value FROM ( SELECT dt, asset_id, metric_id, value, computed_at - FROM {{table}} + FROM \{\{table:inline\}\} PREWHERE #{finalized_data_filter_str(table, only_finalized_data)} #{additional_filters} diff --git a/lib/sanbase/clickhouse/metric/table_metric.ex b/lib/sanbase/clickhouse/metric/table_metric.ex index f6a2ff78a7..7656eb0eb7 100644 --- a/lib/sanbase/clickhouse/metric/table_metric.ex +++ b/lib/sanbase/clickhouse/metric/table_metric.ex @@ -2,7 +2,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.TableMetric do import Sanbase.Clickhouse.MetricAdapter.TableSqlQuery import Sanbase.Utils.Transform - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo def table_data(_metric, %{slug: []}, _from, _to) do {:ok, @@ -23,7 +23,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.TableMetric do slugs = List.wrap(slug_or_slugs) query_struct = table_data_query(metric, slugs, from, to) - ClickhouseRepo.query_transform(query_struct, fn [_label | tail] -> tail end) + ChRepo.query_transform(query_struct, fn [_label | tail] -> tail end) |> maybe_apply_function(&transform_table_data(&1, slugs)) end diff --git a/lib/sanbase/clickhouse/nft/nft_trade.ex b/lib/sanbase/clickhouse/nft/nft_trade.ex index e9c528670e..bda5d438a5 100644 --- a/lib/sanbase/clickhouse/nft/nft_trade.ex +++ b/lib/sanbase/clickhouse/nft/nft_trade.ex @@ -1,12 +1,12 @@ defmodule Sanbase.Clickhouse.NftTrade do import Sanbase.Utils.Transform, only: [maybe_unwrap_ok_value: 1] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo def get_trades_count(label_key, from, to) do query_struct = get_trades_count_query(label_key, from, to) - ClickhouseRepo.query_transform(query_struct, fn [count] -> count end) + ChRepo.query_transform(query_struct, fn [count] -> count end) |> maybe_unwrap_ok_value() end @@ -14,7 +14,7 @@ defmodule Sanbase.Clickhouse.NftTrade do when label_key in [:nft_influencer, :nft_whale] do query_struct = get_trades_query(label_key, from, to, opts) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn list -> [ @@ -71,7 +71,7 @@ defmodule Sanbase.Clickhouse.NftTrade do query_struct = fetch_label_query(contract, blockchain, "value") - case ClickhouseRepo.query_transform(query_struct, fn [label] -> label end) do + case ChRepo.query_transform(query_struct, fn [label] -> label end) do {:ok, [label]} when not is_nil(label) -> label _ -> nil end @@ -83,7 +83,7 @@ defmodule Sanbase.Clickhouse.NftTrade do query_struct = fetch_label_query(contract, blockchain, "search_text") - case ClickhouseRepo.query_transform(query_struct, fn [search_term] -> search_term end) do + case ChRepo.query_transform(query_struct, fn [search_term] -> search_term end) do {:ok, [search_term]} when not is_nil(search_term) -> search_term _ -> nil end diff --git a/lib/sanbase/clickhouse/project/project.ex b/lib/sanbase/clickhouse/project/project.ex index d1539280cf..1955d2fa7e 100644 --- a/lib/sanbase/clickhouse/project/project.ex +++ b/lib/sanbase/clickhouse/project/project.ex @@ -2,7 +2,7 @@ defmodule Sanbase.Clickhouse.Project do def projects_info(slugs) do query = projects_info_query(slugs) - Sanbase.ClickhouseRepo.query_reduce(query, %{}, fn [slug, full, summary], acc -> + Sanbase.ChRepo.query_reduce(query, %{}, fn [slug, full, summary], acc -> Map.put(acc, slug, %{full: full, summary: summary}) end) end diff --git a/lib/sanbase/clickhouse/query/query.ex b/lib/sanbase/clickhouse/query/query.ex index 5574534135..7a350040f4 100644 --- a/lib/sanbase/clickhouse/query/query.ex +++ b/lib/sanbase/clickhouse/query/query.ex @@ -78,7 +78,15 @@ defmodule Sanbase.Clickhouse.Query do %{struct | sql: sql} end + @spec put_format(t(), String.t()) :: t() + def put_format(struct, format) do + %{struct | format: format} + end + + @spec get_sql_text(t()) :: String.t() def get_sql_text(%__MODULE__{} = query), do: query.sql + + @spec get_sql_parameters(t()) :: map() def get_sql_parameters(%__MODULE__{} = query), do: query.parameters @spec put_sql(t(), parameters) :: t() @@ -138,6 +146,20 @@ defmodule Sanbase.Clickhouse.Query do end end + def get_sql_args_v2(%__MODULE__{} = query) do + query = preprocess_query(query) + + with {:ok, {sql, args}} <- + Sanbase.TemplateEngine.run_generate_positional_params_v2( + query.sql, + params: query.parameters, + env: query.environment + ) do + result = %{sql: sql, args: args} + {:ok, result} + end + end + # Private functions defp preprocess_query(query) do @@ -178,7 +200,9 @@ defmodule Sanbase.Clickhouse.Query do %{query | sql: sql} end - defp add_format(%{sql: sql, format: format} = query) do + defp add_format(%{format: nil} = struct), do: struct + + defp add_format(%{sql: sql, format: format} = query) when not is_nil(format) do sql = sql <> "\nFORMAT #{format}" %{query | sql: sql} end diff --git a/lib/sanbase/clickhouse/top_holders/metric_adapter.ex b/lib/sanbase/clickhouse/top_holders/metric_adapter.ex index aba59e0c66..b00e850d07 100644 --- a/lib/sanbase/clickhouse/top_holders/metric_adapter.ex +++ b/lib/sanbase/clickhouse/top_holders/metric_adapter.ex @@ -13,7 +13,7 @@ defmodule Sanbase.Clickhouse.TopHolders.MetricAdapter do alias Sanbase.Project - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @supported_infrastructures ["ETH"] @@ -67,7 +67,7 @@ defmodule Sanbase.Clickhouse.TopHolders.MetricAdapter do timeseries_data_params(selector, contract, infr, from, to, interval, decimals, opts) do result = timeseries_data_query(metric, params) - |> ClickhouseRepo.query_transform(fn [timestamp, value, has_changed] -> + |> ChRepo.query_transform(fn [timestamp, value, has_changed] -> %{datetime: DateTime.from_unix!(timestamp), value: value, has_changed: has_changed} end) @@ -192,7 +192,7 @@ defmodule Sanbase.Clickhouse.TopHolders.MetricAdapter do table = to_table(contract, infr) query_struct = first_datetime_query(table, contract) - ClickhouseRepo.query_transform(query_struct, fn [timestamp] -> + ChRepo.query_transform(query_struct, fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() @@ -207,7 +207,7 @@ defmodule Sanbase.Clickhouse.TopHolders.MetricAdapter do _query_struct = first_datetime_query(table, contract) query_struct = last_datetime_computed_at_query(table, contract) - ClickhouseRepo.query_transform(query_struct, fn [timestamp] -> + ChRepo.query_transform(query_struct, fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() diff --git a/lib/sanbase/clickhouse/top_holders/top_holders.ex b/lib/sanbase/clickhouse/top_holders/top_holders.ex index 16c38a1303..d271c287aa 100644 --- a/lib/sanbase/clickhouse/top_holders/top_holders.ex +++ b/lib/sanbase/clickhouse/top_holders/top_holders.ex @@ -3,7 +3,7 @@ defmodule Sanbase.Clickhouse.TopHolders do Uses ClickHouse to calculate the percent supply in exchanges, non exchanges and combined """ - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo alias Sanbase.Clickhouse.Label alias Sanbase.Project @@ -32,7 +32,7 @@ defmodule Sanbase.Clickhouse.TopHolders do def realtime_top_holders(slug, opts) do query_struct = realtime_top_holders_query(slug, opts) - ClickhouseRepo.query_transform(query_struct, &holder_transform_func/1) + ChRepo.query_transform(query_struct, &holder_transform_func/1) end @spec top_holders(String.t(), DateTime.t(), DateTime.t(), Keyword.t()) :: @@ -45,7 +45,7 @@ defmodule Sanbase.Clickhouse.TopHolders do query_struct <- top_holders_query(slug, contract, decimals, from, to, opts), {:ok, result} <- - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, &holder_transform_func/1 ), @@ -83,7 +83,7 @@ defmodule Sanbase.Clickhouse.TopHolders do interval ) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [dt, in_exchanges, outside_exchanges, in_top_holders_total] -> %{ @@ -123,7 +123,7 @@ defmodule Sanbase.Clickhouse.TopHolders do interval ) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [dt, in_exchanges, outside_exchanges, in_holders] -> %{ diff --git a/lib/sanbase/clickhouse/uniswap/metric_adapter.ex b/lib/sanbase/clickhouse/uniswap/metric_adapter.ex index 51e888c63f..93e22c6833 100644 --- a/lib/sanbase/clickhouse/uniswap/metric_adapter.ex +++ b/lib/sanbase/clickhouse/uniswap/metric_adapter.ex @@ -65,7 +65,7 @@ defmodule Sanbase.Clickhouse.Uniswap.MetricAdapter do ) do query_struct = histogram_data_query(metric, selector, from, to, interval, limit) - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [address, value] -> + Sanbase.ChRepo.query_transform(query_struct, fn [address, value] -> %{address: address, value: value} end) |> maybe_add_balances(from, to) diff --git a/lib/sanbase/contract_metric/metric_adapter.ex b/lib/sanbase/contract_metric/metric_adapter.ex index 4bf8f8dfce..b40988e4b1 100644 --- a/lib/sanbase/contract_metric/metric_adapter.ex +++ b/lib/sanbase/contract_metric/metric_adapter.ex @@ -30,7 +30,7 @@ defmodule Sanbase.Contract.MetricAdapter do @default_complexity_weight 1.0 @aggregations [:count] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @impl Sanbase.Metric.Behaviour def has_incomplete_data?(_metric), do: false @@ -48,7 +48,7 @@ defmodule Sanbase.Contract.MetricAdapter do def timeseries_data(metric, %{contract_address: contract_address}, from, to, interval, _opts) when is_binary(contract_address) do timeseries_data_query(metric, contract_address, from, to, interval) - |> ClickhouseRepo.query_transform(fn [unix, value] -> + |> ChRepo.query_transform(fn [unix, value] -> %{ datetime: DateTime.from_unix!(unix), value: value @@ -81,7 +81,7 @@ defmodule Sanbase.Contract.MetricAdapter do when is_binary(contract_address) do query_struct = first_datetime_query(contract_address) - ClickhouseRepo.query_transform(query_struct, fn [unix] -> + ChRepo.query_transform(query_struct, fn [unix] -> DateTime.from_unix!(unix) end) |> maybe_unwrap_ok_value() @@ -92,7 +92,7 @@ defmodule Sanbase.Contract.MetricAdapter do when is_binary(contract_address) do query_struct = last_datetime_computed_at_query(contract_address) - ClickhouseRepo.query_transform(query_struct, fn [unix] -> DateTime.from_unix!(unix) end) + ChRepo.query_transform(query_struct, fn [unix] -> DateTime.from_unix!(unix) end) |> maybe_unwrap_ok_value() end diff --git a/lib/sanbase/intercom/intercom.ex b/lib/sanbase/intercom/intercom.ex index 676f92286f..14b30a6c77 100644 --- a/lib/sanbase/intercom/intercom.ex +++ b/lib/sanbase/intercom/intercom.ex @@ -12,7 +12,7 @@ defmodule Sanbase.Intercom do alias Sanbase.Clickhouse.ApiCallData alias Sanbase.Accounts.EthAccount alias Sanbase.Repo - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo require Logger @@ -224,7 +224,7 @@ defmodule Sanbase.Intercom do query_struct = Sanbase.Clickhouse.Query.new(sql, params) - ClickhouseRepo.query_transform(query_struct, fn [dt, user_id, attributes] -> + ChRepo.query_transform(query_struct, fn [dt, user_id, attributes] -> %{ dt: dt, user_id: user_id, @@ -250,7 +250,7 @@ defmodule Sanbase.Intercom do query_struct = Sanbase.Clickhouse.Query.new(sql, params) - ClickhouseRepo.query_transform(query_struct, fn [dt, user_id, attributes] -> + ChRepo.query_transform(query_struct, fn [dt, user_id, attributes] -> %{ dt: dt, user_id: user_id, @@ -272,7 +272,7 @@ defmodule Sanbase.Intercom do query_struct = Sanbase.Clickhouse.Query.new(sql, params) - {:ok, user_ids} = ClickhouseRepo.query_transform(query_struct, fn [user_id] -> user_id end) + {:ok, user_ids} = ChRepo.query_transform(query_struct, fn [user_id] -> user_id end) user_ids end @@ -518,7 +518,7 @@ defmodule Sanbase.Intercom do GROUP BY user_id """ - Sanbase.ClickhouseRepo.query_transform(query, [], fn [user_id] -> user_id end) + Sanbase.ChRepo.query_transform(query, [], fn [user_id] -> user_id end) |> case do {:ok, result} -> result {:error, _} -> [] diff --git a/lib/sanbase/metric/latest_metric.ex b/lib/sanbase/metric/latest_metric.ex index 9403891dc4..2c2b35a4dc 100644 --- a/lib/sanbase/metric/latest_metric.ex +++ b/lib/sanbase/metric/latest_metric.ex @@ -60,7 +60,7 @@ defmodule Sanbase.Metric.LatestMetric do def get_data(table, metrics, slugs, opts) do query_struct = get_data_query(table, metrics, slugs, opts) - Sanbase.ClickhouseRepo.query_transform( + Sanbase.ChRepo.query_transform( query_struct, fn [slug, metric, value, dt_unix, computed_at_unix] -> %{ diff --git a/lib/sanbase/metric/transform.ex b/lib/sanbase/metric/transform.ex index 142a378aec..101c4d2137 100644 --- a/lib/sanbase/metric/transform.ex +++ b/lib/sanbase/metric/transform.ex @@ -69,7 +69,7 @@ defmodule Sanbase.Metric.Transform do def remove_missing_values({:error, error}), do: {:error, error} def exec_timeseries_data_query(%Sanbase.Clickhouse.Query{} = query) do - Sanbase.ClickhouseRepo.query_transform(query, fn + Sanbase.ChRepo.query_transform(query, fn [unix, value] -> %{datetime: DateTime.from_unix!(unix), value: value} @@ -83,6 +83,10 @@ defmodule Sanbase.Metric.Transform do close: close } } + + data -> + IO.inspect(data, label: "-----------------") + data end) end end diff --git a/lib/sanbase/prices/price.ex b/lib/sanbase/prices/price.ex index 9a655f3202..e65a3d8fee 100644 --- a/lib/sanbase/prices/price.ex +++ b/lib/sanbase/prices/price.ex @@ -19,7 +19,7 @@ defmodule Sanbase.Price do ] alias Sanbase.Project - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @default_source "coinmarketcap" @supported_sources ["coinmarketcap", "cryptocompare"] @@ -154,7 +154,7 @@ defmodule Sanbase.Price do query_struct = timeseries_data_query(slug_or_slugs, from, to, interval, source, aggregation) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [timestamp, price_usd, price_btc, marketcap_usd, volume_usd] -> %{ @@ -238,7 +238,7 @@ defmodule Sanbase.Price do aggregation ) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [timestamp, slug, value], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [timestamp, slug, value], acc -> datetime = DateTime.from_unix!(timestamp) elem = %{slug: slug, value: value} Map.update(acc, datetime, [elem], &[elem | &1]) @@ -271,7 +271,7 @@ defmodule Sanbase.Price do query_struct = aggregated_timeseries_data_query(slugs, from, to, source) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [slug, price_usd, price_btc, marketcap_usd, volume_usd, has_changed] -> %{ slug: slug, @@ -333,7 +333,7 @@ defmodule Sanbase.Price do query_struct = aggregated_metric_timeseries_data_query(slugs, metric, from, to, source, aggregation) - ClickhouseRepo.query_reduce(query_struct, %{}, fn + ChRepo.query_reduce(query_struct, %{}, fn [slug, value, has_changed], acc -> value = if has_changed == 1, do: value Map.put(acc, slug, value) @@ -362,7 +362,7 @@ defmodule Sanbase.Price do query_struct = aggregated_marketcap_and_volume_query(slugs, from, to, source, opts) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [slug, marketcap_usd, volume_usd, has_changed] -> %{ slug: slug, @@ -385,7 +385,7 @@ defmodule Sanbase.Price do def latest_prices_per_slug(slugs, source, limit_per_slug) when is_list(slugs) do query_struct = latest_prices_per_slug_query(slugs, source, limit_per_slug) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [slug, prices_usd, prices_btc], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [slug, prices_usd, prices_btc], acc -> acc |> Map.put({slug, "USD"}, prices_usd) |> Map.put({slug, "BTC"}, prices_btc) @@ -399,7 +399,7 @@ defmodule Sanbase.Price do query_struct = slugs_by_filter_query(metric, from, to, operator, threshold, aggregation, source) - ClickhouseRepo.query_transform(query_struct, fn [slug, _value] -> slug end) + ChRepo.query_transform(query_struct, fn [slug, _value] -> slug end) end end @@ -409,7 +409,7 @@ defmodule Sanbase.Price do query_struct = slugs_order_query(metric, from, to, direction, aggregation, source) - ClickhouseRepo.query_transform(query_struct, fn [slug, _value] -> slug end) + ChRepo.query_transform(query_struct, fn [slug, _value] -> slug end) end end @@ -424,7 +424,7 @@ defmodule Sanbase.Price do with {:ok, source} <- opts_to_source(opts) do query_struct = last_record_before_query(slug, datetime, source) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [price_usd, price_btc, marketcap_usd, volume_usd] -> %{ @@ -450,7 +450,7 @@ defmodule Sanbase.Price do with {:ok, source} <- opts_to_source(opts) do query_struct = ohlc_query(slug, from, to, source) - ClickhouseRepo.query_transform(query_struct, fn [open, high, low, close, has_changed] -> + ChRepo.query_transform(query_struct, fn [open, high, low, close, has_changed] -> %{ open_price_usd: open, high_price_usd: high, @@ -474,7 +474,7 @@ defmodule Sanbase.Price do with {:ok, source} <- opts_to_source(opts) do query_struct = timeseries_ohlc_data_query(slug, from, to, interval, source) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [timestamp, open, high, low, close] -> %{ @@ -523,7 +523,7 @@ defmodule Sanbase.Price do query_struct = combined_marketcap_and_volume_query(slugs, from, to, interval, source) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [timestamp, marketcap_usd, volume_usd, has_changed] -> %{ @@ -559,14 +559,14 @@ defmodule Sanbase.Price do with {:ok, source} <- opts_to_source(opts) do query_struct = slugs_with_volume_over_query(volume, source) - ClickhouseRepo.query_transform(query_struct, fn [slug] -> slug end) + ChRepo.query_transform(query_struct, fn [slug] -> slug end) end end def has_data?(slug) do query_struct = select_any_record_query(slug) - case ClickhouseRepo.query_transform(query_struct, & &1) do + case ChRepo.query_transform(query_struct, & &1) do {:ok, [_]} -> {:ok, true} {:ok, []} -> {:ok, false} {:error, error} -> {:error, error} @@ -585,7 +585,7 @@ defmodule Sanbase.Price do with {:ok, source} <- opts_to_source(opts) do query_struct = first_datetime_query(slug, source) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() @@ -595,7 +595,7 @@ defmodule Sanbase.Price do def last_datetime_computed_at(slug) do query_struct = last_datetime_computed_at_query(slug) - ClickhouseRepo.query_transform(query_struct, fn [datetime] -> + ChRepo.query_transform(query_struct, fn [datetime] -> DateTime.from_unix!(datetime) end) |> maybe_unwrap_ok_value() diff --git a/lib/sanbase/prices/price_pair/price_pair.ex b/lib/sanbase/prices/price_pair/price_pair.ex index 0da597df8a..e6e0fe8dae 100644 --- a/lib/sanbase/prices/price_pair/price_pair.ex +++ b/lib/sanbase/prices/price_pair/price_pair.ex @@ -10,7 +10,7 @@ defmodule Sanbase.PricePair do import Sanbase.Metric.Transform, only: [exec_timeseries_data_query: 1] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @default_source "cryptocompare" @@ -85,7 +85,7 @@ defmodule Sanbase.PricePair do query_struct = timeseries_data_per_slug_query(slugs, quote_asset, from, to, interval, source, aggregation) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [timestamp, slug, value], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [timestamp, slug, value], acc -> datetime = DateTime.from_unix!(timestamp) elem = %{slug: slug, value: value} Map.update(acc, datetime, [elem], &[elem | &1]) @@ -127,7 +127,7 @@ defmodule Sanbase.PricePair do query_struct = aggregated_timeseries_data_query(slugs, quote_asset, from, to, source, aggregation) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [slug, value, has_changed], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [slug, value, has_changed], acc -> # This way if the slug does not have any data still include it in the result # with value `nil`. This way the API can cache the result. In case one of the # aggregated_timeseries_data calls fails, the slugs in it won't be included @@ -146,7 +146,7 @@ defmodule Sanbase.PricePair do query_struct = slugs_by_filter_query(quote_asset, from, to, source, operator, threshold, aggregation) - ClickhouseRepo.query_transform(query_struct, fn [slug, _value] -> slug end) + ChRepo.query_transform(query_struct, fn [slug, _value] -> slug end) end def slugs_order(quote_asset, from, to, direction, opts \\ []) @@ -155,7 +155,7 @@ defmodule Sanbase.PricePair do aggregation = Keyword.get(opts, :aggregation) || :last source = Keyword.get(opts, :source, @default_source) query_struct = slugs_order_query(quote_asset, from, to, source, direction, aggregation) - ClickhouseRepo.query_transform(query_struct, fn [slug, _value] -> slug end) + ChRepo.query_transform(query_struct, fn [slug, _value] -> slug end) end @doc ~s""" @@ -167,7 +167,7 @@ defmodule Sanbase.PricePair do source = Keyword.get(opts, :source, @default_source) query_struct = last_record_before_query(slug, quote_asset, datetime, source) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [unix, value] -> %{ @@ -199,7 +199,7 @@ defmodule Sanbase.PricePair do source = Keyword.get(opts, :source) || @default_source query_struct = available_slugs_query(quote_asset, source) - ClickhouseRepo.query_transform(query_struct, fn [slug] -> slug end) + ChRepo.query_transform(query_struct, fn [slug] -> slug end) end def has_data?(slug, quote_asset, opts \\ []) @@ -208,7 +208,7 @@ defmodule Sanbase.PricePair do source = Keyword.get(opts, :source, @default_source) query_struct = select_any_record_query(slug, quote_asset, source) - ClickhouseRepo.query_transform(query_struct, & &1) + ChRepo.query_transform(query_struct, & &1) |> case do {:ok, [_]} -> {:ok, true} {:ok, []} -> {:ok, false} @@ -222,7 +222,7 @@ defmodule Sanbase.PricePair do source = Keyword.get(opts, :source, @default_source) query_struct = available_quote_assets_query(slug, source) - ClickhouseRepo.query_transform(query_struct, fn [quote_asset] -> quote_asset end) + ChRepo.query_transform(query_struct, fn [quote_asset] -> quote_asset end) end @doc ~s""" @@ -236,7 +236,7 @@ defmodule Sanbase.PricePair do source = Keyword.get(opts, :source, @default_source) query_struct = first_datetime_query(slug, quote_asset, source) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() @@ -246,7 +246,7 @@ defmodule Sanbase.PricePair do source = Keyword.get(opts, :source, @default_source) query_struct = last_datetime_computed_at_query(slug, quote_asset, source) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp] -> DateTime.from_unix!(timestamp) end) |> maybe_unwrap_ok_value() diff --git a/lib/sanbase/project/ecosystem/metric.ex b/lib/sanbase/project/ecosystem/metric.ex index 027b14c350..042b5ce381 100644 --- a/lib/sanbase/project/ecosystem/metric.ex +++ b/lib/sanbase/project/ecosystem/metric.ex @@ -16,7 +16,7 @@ defmodule Sanbase.Ecosystem.Metric do query = aggregated_timeseries_data_query(ecosystems, metric, from, to, aggregation) - case Sanbase.ClickhouseRepo.query_transform(query, & &1) do + case Sanbase.ChRepo.query_transform(query, & &1) do {:ok, data} -> result = Enum.map(data, fn [ecosystem, value] -> %{ecosystem: ecosystem, value: value} end) @@ -34,7 +34,7 @@ defmodule Sanbase.Ecosystem.Metric do query = timeseries_data_query(ecosystems, metric, from, to, interval, aggregation) - case Sanbase.ClickhouseRepo.query_transform(query, & &1) do + case Sanbase.ChRepo.query_transform(query, & &1) do {:ok, data} -> result = Enum.map(data, fn [ecosystem, dt, value] -> diff --git a/lib/sanbase/queries/autocomplete.ex b/lib/sanbase/queries/autocomplete.ex index 94327ace83..1881b8e2a7 100644 --- a/lib/sanbase/queries/autocomplete.ex +++ b/lib/sanbase/queries/autocomplete.ex @@ -1,5 +1,5 @@ defmodule Sanbase.Clickhouse.Autocomplete do - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo def get_data(opts \\ []) do Sanbase.ClickhouseRepo.put_dynamic_repo(Sanbase.ClickhouseRepo.ReadOnly) @@ -23,7 +23,7 @@ defmodule Sanbase.Clickhouse.Autocomplete do query_struct = Sanbase.Clickhouse.Query.new(sql, %{}) {:ok, result} = - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [ name, @@ -55,7 +55,7 @@ defmodule Sanbase.Clickhouse.Autocomplete do query_struct = Sanbase.Clickhouse.Query.new(sql, %{}) {:ok, result} = - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [ table, @@ -96,7 +96,7 @@ defmodule Sanbase.Clickhouse.Autocomplete do query_struct = Sanbase.Clickhouse.Query.new(sql, %{}) {:ok, result} = - ClickhouseRepo.query_transform(query_struct, fn [name, origin] -> + ChRepo.query_transform(query_struct, fn [name, origin] -> %{name: name, origin: origin} end) diff --git a/lib/sanbase/queries/executor/executor.ex b/lib/sanbase/queries/executor/executor.ex index d1da73d8b7..22414bb89b 100644 --- a/lib/sanbase/queries/executor/executor.ex +++ b/lib/sanbase/queries/executor/executor.ex @@ -21,7 +21,7 @@ defmodule Sanbase.Queries.Executor do %Clickhouse.Query{} = clickhouse_query = create_clickhouse_query(query, query_metadata, environment) - case Sanbase.ClickhouseRepo.query_transform_with_metadata( + case Sanbase.ChRepo.query_transform_with_metadata( clickhouse_query, &transform_result/1 ) do diff --git a/lib/sanbase/queries/query/query_execution.ex b/lib/sanbase/queries/query/query_execution.ex index 2bcf69dc96..be8ed37e0e 100644 --- a/lib/sanbase/queries/query/query_execution.ex +++ b/lib/sanbase/queries/query/query_execution.ex @@ -298,7 +298,7 @@ defmodule Sanbase.Queries.QueryExecution do Sanbase.ClickhouseRepo.put_dynamic_repo(Sanbase.ClickhouseRepo) - Sanbase.ClickhouseRepo.query_transform( + Sanbase.ChRepo.query_transform( query_struct, fn [ read_compressed_gb, diff --git a/lib/sanbase/signal/signal_adapter.ex b/lib/sanbase/signal/signal_adapter.ex index 08154628b9..7e6c6a1791 100644 --- a/lib/sanbase/signal/signal_adapter.ex +++ b/lib/sanbase/signal/signal_adapter.ex @@ -7,7 +7,7 @@ defmodule Sanbase.Signal.SignalAdapter do only: [maybe_unwrap_ok_value: 1, maybe_apply_function: 2] alias Sanbase.Signal.FileHandler - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @aggregations FileHandler.aggregations() @aggregation_map FileHandler.aggregation_map() @@ -66,7 +66,7 @@ defmodule Sanbase.Signal.SignalAdapter do def available_signals(%{slug: slug}) when is_binary(slug) do query_struct = available_signals_query(slug) - ClickhouseRepo.query_transform(query_struct, fn [signal] -> + ChRepo.query_transform(query_struct, fn [signal] -> Map.get(@signal_to_name_map, signal) end) |> maybe_apply_function(fn list -> Enum.reject(list, &is_nil/1) end) @@ -76,7 +76,7 @@ defmodule Sanbase.Signal.SignalAdapter do def available_slugs(signal) do query_struct = available_slugs_query(signal) - ClickhouseRepo.query_transform(query_struct, fn [slug] -> slug end) + ChRepo.query_transform(query_struct, fn [slug] -> slug end) end @impl Sanbase.Signal.Behaviour @@ -99,7 +99,7 @@ defmodule Sanbase.Signal.SignalAdapter do def first_datetime(signal, %{slug: slug}) when is_binary(slug) do query_struct = first_datetime_query(signal, slug) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [0] -> nil [timestamp] -> DateTime.from_unix!(timestamp) end) @@ -110,7 +110,7 @@ defmodule Sanbase.Signal.SignalAdapter do def raw_data(signals, selector, from, to) do query_struct = raw_data_query(signals, from, to) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [unix, signal, slug, value, metadata] -> metadata = @@ -146,7 +146,7 @@ defmodule Sanbase.Signal.SignalAdapter do query_struct = timeseries_data_query(signal, slugs, from, to, interval, aggregation) - ClickhouseRepo.query_transform(query_struct, fn [unix, value, metadata] -> + ChRepo.query_transform(query_struct, fn [unix, value, metadata] -> metadata = metadata |> List.wrap() @@ -178,7 +178,7 @@ defmodule Sanbase.Signal.SignalAdapter do query_struct = aggregated_timeseries_data_query(signal, slugs, from, to, aggregation) - ClickhouseRepo.query_reduce(query_struct, %{}, fn [slug, value], acc -> + ChRepo.query_reduce(query_struct, %{}, fn [slug, value], acc -> Map.put(acc, slug, value) end) end diff --git a/lib/sanbase/social_data/spikes/spikes.ex b/lib/sanbase/social_data/spikes/spikes.ex index a2afe5ecb4..1ad12da168 100644 --- a/lib/sanbase/social_data/spikes/spikes.ex +++ b/lib/sanbase/social_data/spikes/spikes.ex @@ -8,7 +8,7 @@ defmodule Sanbase.SocialData.Spikes do query = get_metric_spikes_explanations_query(metric, selector, from, to) - Sanbase.ClickhouseRepo.query_transform(query, fn [from, to, summary] -> + Sanbase.ChRepo.query_transform(query, fn [from, to, summary] -> %{ spike_start_datetime: DateTime.from_unix!(from), spike_end_datetime: DateTime.from_unix!(to), @@ -21,7 +21,7 @@ defmodule Sanbase.SocialData.Spikes do query = get_metric_spikes_explanations_count_query(metric, selector, from, to, interval) - Sanbase.ClickhouseRepo.query_transform(query, fn [dt, count] -> + Sanbase.ChRepo.query_transform(query, fn [dt, count] -> %{ datetime: DateTime.from_unix!(dt), count: count @@ -31,19 +31,19 @@ defmodule Sanbase.SocialData.Spikes do def available_assets() do query_struct = available_assets_query() - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [slug] -> slug end) + Sanbase.ChRepo.query_transform(query_struct, fn [slug] -> slug end) end def available_assets(metric) do query_struct = available_assets_query(metric) - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [slug] -> slug end) + Sanbase.ChRepo.query_transform(query_struct, fn [slug] -> slug end) end def available_metrics() do names_map = Sanbase.Clickhouse.MetricAdapter.Registry.metric_to_names_map() query_struct = available_metrics_query() - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [metric] -> + Sanbase.ChRepo.query_transform(query_struct, fn [metric] -> Map.get(names_map, metric, []) |> List.first() end) |> maybe_apply_function(fn list -> Enum.reject(list, &is_nil/1) end) @@ -53,7 +53,7 @@ defmodule Sanbase.SocialData.Spikes do names_map = Sanbase.Clickhouse.MetricAdapter.Registry.metric_to_names_map() query_struct = available_metrics_query(selector) - Sanbase.ClickhouseRepo.query_transform(query_struct, fn [metric] -> + Sanbase.ChRepo.query_transform(query_struct, fn [metric] -> Map.get(names_map, metric, []) |> List.first() end) |> maybe_apply_function(fn list -> Enum.reject(list, &is_nil/1) end) diff --git a/lib/sanbase/social_data/trending_words.ex b/lib/sanbase/social_data/trending_words.ex index 3658bfc59a..ed6bbab54e 100644 --- a/lib/sanbase/social_data/trending_words.ex +++ b/lib/sanbase/social_data/trending_words.ex @@ -23,7 +23,7 @@ defmodule Sanbase.SocialData.TrendingWords do import Sanbase.Utils.Transform, only: [maybe_apply_function: 2] import Sanbase.Metric.SqlQuery.Helper, only: [to_unix_timestamp: 3, dt_to_unix: 2] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @type word :: String.t() @type slug :: String.t() @@ -71,7 +71,7 @@ defmodule Sanbase.SocialData.TrendingWords do query_struct = get_trending_words_query(from, to, interval, size, source, word_type_filter) - ClickhouseRepo.query_reduce(query_struct, %{}, fn + ChRepo.query_reduce(query_struct, %{}, fn [ dt, word, @@ -150,7 +150,7 @@ defmodule Sanbase.SocialData.TrendingWords do # ranking query_struct = get_trending_words_query(from, to, interval, size, source, :all) - ClickhouseRepo.query_reduce(query_struct, %{}, fn + ChRepo.query_reduce(query_struct, %{}, fn [_dt, _word, nil, _score], acc -> acc @@ -218,7 +218,7 @@ defmodule Sanbase.SocialData.TrendingWords do source = if source == :all, do: default_source(), else: to_string(source) query_struct = word_trending_history_query(word, from, to, interval, size, source) - ClickhouseRepo.query_transform(query_struct, fn [dt, position] -> + ChRepo.query_transform(query_struct, fn [dt, position] -> position = if position > 0, do: position %{ @@ -242,7 +242,7 @@ defmodule Sanbase.SocialData.TrendingWords do source = if source == :all, do: default_source(), else: to_string(source) query_struct = project_trending_history_query(slug, from, to, interval, size, source) - ClickhouseRepo.query_transform(query_struct, fn [dt, position] -> + ChRepo.query_transform(query_struct, fn [dt, position] -> position = if position > 0, do: position %{ diff --git a/lib/sanbase/template_engine/template_engine.ex b/lib/sanbase/template_engine/template_engine.ex index 43a00557c7..38152529b5 100644 --- a/lib/sanbase/template_engine/template_engine.ex +++ b/lib/sanbase/template_engine/template_engine.ex @@ -121,6 +121,16 @@ defmodule Sanbase.TemplateEngine do end end + def run_generate_positional_params_v2(template, opts) do + params = Keyword.get(opts, :params, %{}) |> Map.new(fn {k, v} -> {to_string(k), v} end) + env = Keyword.get(opts, :env, Sanbase.SanLang.Environment.new()) + + with {:ok, captures} <- TemplateEngine.Captures.extract_captures(template), + {:ok, result} <- do_run_generate_positional_params_v2(template, captures, params, env) do + {:ok, result} + end + end + # Private defp do_run_generate_positional_params(template, captures, params, env) do @@ -131,7 +141,7 @@ defmodule Sanbase.TemplateEngine do fn %{code?: false} = capture_map, {template_acc, args_acc, errors, position} -> case get_value_from_params(capture_map.inner_content, params) do - {:ok, value} -> + {:ok, %{value: value}} -> template_acc = String.replace(template_acc, capture_map.key, "?#{position}") args_acc = [value | args_acc] {template_acc, args_acc, errors, position + 1} @@ -173,6 +183,90 @@ defmodule Sanbase.TemplateEngine do end end + defp do_run_generate_positional_params_v2(template, captures, params, env) do + {sql, args, errors, _position} = + Enum.reduce( + captures, + {template, _args = [], _errors = [], _position = 0}, + fn + %{code?: false} = capture_map, {template_acc, args_acc, errors, position} -> + case get_value_from_params(capture_map.inner_content, params) do + {:ok, %{value: value, modifier: modifier}} -> + type = ch_type_of(value) + + case modifier do + "inline" -> + if not Regex.match?(~r/[a-zA-Z0-9_]/, value) do + raise ArgumentError, + message: + "When inlining in a query, the value can only contain letters, numbers and _" + end + + template_acc = String.replace(template_acc, capture_map.key, value) + {template_acc, args_acc, errors, position} + + _ -> + template_acc = + String.replace(template_acc, capture_map.key, "{$#{position}:#{type}}") + + args_acc = [value | args_acc] + {template_acc, args_acc, errors, position + 1} + end + + :no_value -> + error = %{ + error: :missing_parameter, + key: capture_map.key + } + + errors = [error | errors] + {template_acc, args_acc, errors, position + 1} + end + + %{code?: true} = capture_map, {template_acc, args_acc, errors, position} -> + execution_result = CodeEvaluation.eval(capture_map, env) + type = ch_type_of(execution_result) + template_acc = String.replace(template_acc, capture_map.key, "{$#{position}:#{type}") + args_acc = [execution_result | args_acc] + {template_acc, args_acc, errors, position + 1} + end + ) + + case errors do + [] -> + {:ok, {sql, Enum.reverse(args)}} + + _ -> + missing_keys = Enum.map(errors, & &1.key) |> Enum.join(", ") + params_keys = Map.keys(params) + params_keys = if params_keys == [], do: "none", else: Enum.join(params_keys, ", ") + + error_str = + """ + One or more of the {{}} templates in the query text do not correspond to any of the parameters. + Template keys missing from the parameters: #{missing_keys}. Parameters' keys defined: #{params_keys} + """ + + {:error, error_str} + end + end + + @ch_types ["Bool", "String", "DateTime64", "UInt8", "UInt64", "Int64", "Float64", "Array"] + def ch_types(), do: @ch_types + + defp ch_type_of(value) do + cond do + is_boolean(value) -> "Bool" + is_binary(value) -> "String" + match?(%DateTime{}, value) or match?(%NaiveDateTime{}, value) -> "DateTime64" + is_integer(value) and value in [0..255] -> "UInt8" + is_integer(value) and value >= 0 -> "UInt64" + is_integer(value) and value < 0 -> "Int64" + is_float(value) -> "Float64" + is_list(value) -> "Array" + end + end + defp replace_template_key_with_value( template, %{key: key, inner_content: inner_content}, @@ -180,7 +274,7 @@ defmodule Sanbase.TemplateEngine do _env ) do case get_value_from_params(inner_content, params) do - {:ok, value} -> String.replace(template, key, stringify_value(value)) + {:ok, %{value: value}} -> String.replace(template, key, stringify_value(value)) :no_value -> template end end @@ -203,12 +297,20 @@ defmodule Sanbase.TemplateEngine do end case Map.get(params, key) do - nil -> :no_value - value -> {:ok, maybe_apply_modifier(value, key, modifier)} + nil -> + :no_value + + value -> + {:ok, + %{ + value: maybe_apply_modifier(value, key, modifier), + modifier: modifier + }} end end defp maybe_apply_modifier(value, _key, "human_readable"), do: human_readable(value) + defp maybe_apply_modifier(value, _key, "inline"), do: value defp maybe_apply_modifier(value, _key, nil), do: value defp maybe_apply_modifier(_, key, modifier), diff --git a/lib/sanbase/transfers/btc_transfers.ex b/lib/sanbase/transfers/btc_transfers.ex index 8056333cd5..0f87c6d982 100644 --- a/lib/sanbase/transfers/btc_transfers.ex +++ b/lib/sanbase/transfers/btc_transfers.ex @@ -1,5 +1,5 @@ defmodule Sanbase.Transfers.BtcTransfers do - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo @type transaction :: %{ from_address: String.t(), @@ -20,7 +20,7 @@ defmodule Sanbase.Transfers.BtcTransfers do def top_transfers(from, to, page, page_size, excluded_addresses \\ []) do query_struct = top_transfers_query(from, to, page, page_size, excluded_addresses) - Sanbase.ClickhouseRepo.query_transform( + Sanbase.ChRepo.query_transform( query_struct, fn [dt, to_address, value, trx_id] -> %{ @@ -49,7 +49,7 @@ defmodule Sanbase.Transfers.BtcTransfers do def top_wallet_transfers(wallets, from, to, page, page_size, type) do query_struct = top_wallet_transfers_query(wallets, from, to, page, page_size, type) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp, address, trx_hash, balance, old_balance, abs_value] -> # if the new balance is bigger then the address is the receiver {from_address, to_address} = diff --git a/lib/sanbase/transfers/erc20_transfers.ex b/lib/sanbase/transfers/erc20_transfers.ex index d5258195f2..ae97d3bbe6 100644 --- a/lib/sanbase/transfers/erc20_transfers.ex +++ b/lib/sanbase/transfers/erc20_transfers.ex @@ -6,7 +6,7 @@ defmodule Sanbase.Transfers.Erc20Transfers do import Sanbase.Utils.Transform import Sanbase.Transfers.Utils, only: [top_wallet_transfers_address_clause: 2] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo alias Sanbase.Project defguard is_non_neg_integer(int) when is_integer(int) and int > 0 @@ -29,7 +29,7 @@ defmodule Sanbase.Transfers.Erc20Transfers do opts = [page: page, page_size: page_size] query_struct = top_wallet_transfers_query(wallets, contract, from, to, decimals, type, opts) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp, from_address, to_address, trx_hash, trx_value] -> %{ datetime: DateTime.from_unix!(timestamp), @@ -61,7 +61,7 @@ defmodule Sanbase.Transfers.Erc20Transfers do opts = [page: page, page_size: page_size] query_struct = top_transfers_query(contract, from, to, decimals, excluded_addresses, opts) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [datetime, from_address, to_address, trx_hash, trx_value] -> %{ @@ -93,7 +93,7 @@ defmodule Sanbase.Transfers.Erc20Transfers do interval ) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [unix, incoming, outgoing] -> %{ @@ -110,7 +110,7 @@ defmodule Sanbase.Transfers.Erc20Transfers do query_struct = blockchain_address_transaction_volume_query(addresses, contract, decimals, from, to) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [address, incoming, outgoing] -> %{ @@ -135,7 +135,7 @@ defmodule Sanbase.Transfers.Erc20Transfers do def recent_transactions(address, opts) do query_struct = recent_transactions_query(address, opts) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp, from_address, to_address, trx_hash, trx_value, name, decimals] -> %{ datetime: DateTime.from_unix!(timestamp), @@ -162,7 +162,7 @@ defmodule Sanbase.Transfers.Erc20Transfers do defp execute_transfers_summary_query(type, address, contract, decimals, from, to, opts) do query_struct = transfers_summary_query(type, address, contract, decimals, from, to, opts) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [last_transfer_datetime, address, transaction_volume, transfers_count] -> %{ diff --git a/lib/sanbase/transfers/eth_transfers.ex b/lib/sanbase/transfers/eth_transfers.ex index 41411550fd..2bd1debf70 100644 --- a/lib/sanbase/transfers/eth_transfers.ex +++ b/lib/sanbase/transfers/eth_transfers.ex @@ -6,7 +6,7 @@ defmodule Sanbase.Transfers.EthTransfers do import Sanbase.Utils.Transform import Sanbase.Transfers.Utils, only: [top_wallet_transfers_address_clause: 2] - alias Sanbase.ClickhouseRepo + alias Sanbase.ChRepo require Logger @@ -36,7 +36,7 @@ defmodule Sanbase.Transfers.EthTransfers do opts = [page: page, page_size: page_size] query_struct = top_wallet_transfers_query(wallets, from, to, type, opts) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp, from_address, to_address, trx_hash, trx_value] -> %{ datetime: DateTime.from_unix!(timestamp), @@ -54,7 +54,7 @@ defmodule Sanbase.Transfers.EthTransfers do opts = [page: page, page_size: page_size] query_struct = top_transfers_query(from, to, opts) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp, from_address, to_address, trx_hash, trx_value] -> %{ datetime: DateTime.from_unix!(timestamp), @@ -75,7 +75,7 @@ defmodule Sanbase.Transfers.EthTransfers do def recent_transactions(address, opts) do query_struct = recent_transactions_query(address, opts) - ClickhouseRepo.query_transform(query_struct, fn + ChRepo.query_transform(query_struct, fn [timestamp, from_address, to_address, trx_hash, trx_value] -> %{ datetime: DateTime.from_unix!(timestamp), @@ -99,7 +99,7 @@ defmodule Sanbase.Transfers.EthTransfers do query_struct = blockchain_address_transaction_volume_over_time_query(addresses, from, to, interval) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [unix, incoming, outgoing] -> %{ @@ -117,7 +117,7 @@ defmodule Sanbase.Transfers.EthTransfers do defp execute_transfers_summary_query(type, address, from, to, opts) do query_struct = transfers_summary_query(type, address, from, to, opts) - ClickhouseRepo.query_transform( + ChRepo.query_transform( query_struct, fn [last_transfer_datetime, address, transaction_volumes, transfers_count] -> %{ diff --git a/lib/sanbase/twitter/twitter.ex b/lib/sanbase/twitter/twitter.ex index d433070cb1..5bdbba18af 100644 --- a/lib/sanbase/twitter/twitter.ex +++ b/lib/sanbase/twitter/twitter.ex @@ -5,7 +5,7 @@ defmodule Sanbase.Twitter do def timeseries_data(twitter_handle, from, to, interval) do timeseries_data_query(twitter_handle, from, to, interval) - |> Sanbase.ClickhouseRepo.query_transform(fn [dt, value] -> + |> Sanbase.ChRepo.query_transform(fn [dt, value] -> %{ datetime: DateTime.from_unix!(dt), value: value @@ -15,7 +15,7 @@ defmodule Sanbase.Twitter do def timeseries_data_per_handle(twitter_handles, from, to, interval) do timeseries_data_per_handle_query(twitter_handles, from, to, interval) - |> Sanbase.ClickhouseRepo.query_reduce( + |> Sanbase.ChRepo.query_reduce( %{}, fn [timestamp, slug, value], acc -> datetime = DateTime.from_unix!(timestamp) @@ -32,7 +32,7 @@ defmodule Sanbase.Twitter do def last_record(twitter_handle) do last_record_query(twitter_handle) - |> Sanbase.ClickhouseRepo.query_transform(fn [dt, value] -> + |> Sanbase.ChRepo.query_transform(fn [dt, value] -> %{ datetime: DateTime.from_unix!(dt), followers_count: value @@ -43,13 +43,13 @@ defmodule Sanbase.Twitter do def first_datetime(twitter_handle) do first_datetime_query(twitter_handle) - |> Sanbase.ClickhouseRepo.query_transform(fn [ts] -> DateTime.from_unix!(ts) end) + |> Sanbase.ChRepo.query_transform(fn [ts] -> DateTime.from_unix!(ts) end) |> maybe_unwrap_ok_value() end def last_datetime(twitter_handle) do last_datetime_query(twitter_handle) - |> Sanbase.ClickhouseRepo.query_transform(fn [ts] -> DateTime.from_unix!(ts) end) + |> Sanbase.ChRepo.query_transform(fn [ts] -> DateTime.from_unix!(ts) end) |> maybe_unwrap_ok_value() end diff --git a/lib/sanbase_web/controllers/data_controller.ex b/lib/sanbase_web/controllers/data_controller.ex index 0089fb3cf3..3f60037c79 100644 --- a/lib/sanbase_web/controllers/data_controller.ex +++ b/lib/sanbase_web/controllers/data_controller.ex @@ -255,7 +255,7 @@ defmodule SanbaseWeb.DataController do defp get_slug_to_asset_id_map() do query = "SELECT name AS slug, asset_id FROM asset_metadata FINAL" - case Sanbase.ClickhouseRepo.query_transform(query, [], & &1) do + case Sanbase.ChRepo.query_transform(query, [], & &1) do {:ok, result} -> map = Map.new(result, fn [slug, asset_id] -> {slug, asset_id} end) diff --git a/mix.exs b/mix.exs index a1c6ec1094..857aa40fe5 100644 --- a/mix.exs +++ b/mix.exs @@ -44,6 +44,8 @@ defmodule Sanbase.Mixfile do defp deps() do [ + {:ecto_ch, "~> 0.6.0"}, + ### {:absinthe_phoenix, "~> 2.0"}, {:absinthe_plug, "~> 1.5"}, {:absinthe, "~> 1.5"}, diff --git a/mix.lock b/mix.lock index 8c1191df99..009edbcf36 100644 --- a/mix.lock +++ b/mix.lock @@ -9,6 +9,7 @@ "castle": {:hex, :castle, "0.3.0", "47b1a550b2348a6d7e60e43ded1df19dca601ed21ef6f267c3dbb1b3a301fbf5", [:mix], [{:forecastle, "~> 0.1.0", [hex: :forecastle, repo: "hexpm", optional: false]}], "hexpm", "dbdc1c171520c4591101938a3d342dec70d36b7f5b102a5c138098581e35fcef"}, "castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"}, "certifi": {:hex, :certifi, "2.13.0", "e52be248590050b2dd33b0bb274b56678f9068e67805dca8aa8b1ccdb016bbf6", [:rebar3], [], "hexpm", "8f3d9533a0f06070afdfd5d596b32e21c6580667a492891851b0e2737bc507a1"}, + "ch": {:hex, :ch, "0.3.2", "89aca3fecb26704572b31d95a58277221e66a72d5ad8110bfa309e0db6c214a4", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "e9d8103d68c4c1c006724355f3c56c62940d4895ff9d8939a419521f19d401f2"}, "chacha20": {:hex, :chacha20, "1.0.4", "0359d8f9a32269271044c1b471d5cf69660c362a7c61a98f73a05ef0b5d9eb9e", [:mix], [], "hexpm", "2027f5d321ae9903f1f0da7f51b0635ad6b8819bc7fe397837930a2011bc2349"}, "cidr": {:hex, :cidr, "1.2.0", "a872a23d74ae9aaae1a3d677515e7ab0e86378fff30e02e6e005ebdc712a9398", [:mix], [], "hexpm", "ec053c45add51a798497e2ff329c45cc3ead9b390b8d712dec04eb7b20801c11"}, "circular_buffer": {:hex, :circular_buffer, "0.4.1", "477f370fd8cfe1787b0a1bade6208bbd274b34f1610e41f1180ba756a7679839", [:mix], [], "hexpm", "633ef2e059dde0d7b89bbab13b1da9d04c6685e80e68fbdf41282d4fae746b72"}, @@ -29,6 +30,7 @@ "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, "earmark": {:hex, :earmark, "1.4.47", "7e7596b84fe4ebeb8751e14cbaeaf4d7a0237708f2ce43630cfd9065551f94ca", [:mix], [], "hexpm", "3e96bebea2c2d95f3b346a7ff22285bc68a99fbabdad9b655aa9c6be06c698f8"}, "ecto": {:hex, :ecto, "3.12.5", "4a312960ce612e17337e7cefcf9be45b95a3be6b36b6f94dfb3d8c361d631866", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6eb18e80bef8bb57e17f5a7f068a1719fbda384d40fc37acb8eb8aeca493b6ea"}, + "ecto_ch": {:hex, :ecto_ch, "0.6.0", "2889ec4d2221e2f196e68f3d628bd952f43a1be81f92a8d3a23dd39ee886e338", [:mix], [{:ch, "~> 0.3.0", [hex: :ch, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "3cb1b48a348cad284bbd7361ae9c41f83f6c796e1bb72ef6367c752bd4f3d4b2"}, "ecto_enum": {:hex, :ecto_enum, "1.4.0", "d14b00e04b974afc69c251632d1e49594d899067ee2b376277efd8233027aec8", [:mix], [{:ecto, ">= 3.0.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "> 3.0.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:mariaex, ">= 0.0.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, ">= 0.0.0", [hex: :postgrex, repo: "hexpm", optional: true]}], "hexpm", "8fb55c087181c2b15eee406519dc22578fa60dd82c088be376d0010172764ee4"}, "ecto_psql_extras": {:hex, :ecto_psql_extras, "0.8.3", "0c1df205bd051eaf599b3671e75356b121aa71eac09b63ecf921cb1a080c072e", [:mix], [{:ecto_sql, "~> 3.7", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:postgrex, "> 0.16.0 and < 0.20.0", [hex: :postgrex, repo: "hexpm", optional: false]}, {:table_rex, "~> 3.1.1 or ~> 4.0.0", [hex: :table_rex, repo: "hexpm", optional: false]}], "hexpm", "d0e35ea160359e759a2993a00c3a5389a9ca7ece6df5d0753fa927f988c7351a"}, "ecto_sql": {:hex, :ecto_sql, "3.12.1", "c0d0d60e85d9ff4631f12bafa454bc392ce8b9ec83531a412c12a0d415a3a4d0", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "aff5b958a899762c5f09028c847569f7dfb9cc9d63bdb8133bff8a5546de6bf5"}, @@ -162,6 +164,7 @@ "tidewave": {:hex, :tidewave, "0.1.7", "a93c500a414cfd211c7058a2b4b22759fb8cde5d72c471a34f7046cd66a5a5e6", [:mix], [{:circular_buffer, "~> 0.4", [hex: :circular_buffer, repo: "hexpm", optional: false]}, {:igniter, ">= 0.5.47 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:plug, "~> 1.17", [hex: :plug, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "2cfe9c0c3295132cc682b3cd1c859f801bf2e4d02816618d0659f4d765d26435"}, "timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, + "tracer": {:hex, :tracer, "0.1.1", "d194c2ed0b9b788b8b6c23f4ec714abc3a9dfa04304220600b3fe9eaadd92494", [:mix], [], "hexpm", "9b9af20c8dbe6b3299187f904d751f04061a0b5744bcca510ffd4c6f64da7099"}, "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "ueberauth": {:hex, :ueberauth, "0.10.8", "ba78fbcbb27d811a6cd06ad851793aaf7d27c3b30c9e95349c2c362b344cd8f0", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f2d3172e52821375bccb8460e5fa5cb91cfd60b19b636b6e57e9759b6f8c10c1"}, "ueberauth_google": {:hex, :ueberauth_google, "0.12.1", "90cf49743588193334f7a00da252f92d90bfd178d766c0e4291361681fafec7d", [:mix], [{:oauth2, "~> 1.0 or ~> 2.0", [hex: :oauth2, repo: "hexpm", optional: false]}, {:ueberauth, "~> 0.10.0", [hex: :ueberauth, repo: "hexpm", optional: false]}], "hexpm", "7f7deacd679b2b66e3bffb68ecc77aa1b5396a0cbac2941815f253128e458c38"},