Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ config :sanbase, Sanbase.ClickhouseRepo,
pool_size: {:system, "CLICKHOUSE_POOL_SIZE", "3"},
show_sensitive_data_on_connection_error: true

config :sanbase, Sanbase.ChRepo,
adapter: Ecto.Adapters.ClickHouse,
loggers: [Ecto.LogEntry],
hostname: "clickhouse",
port: 8123,
database: "default",
username: "default",
password: "",
timeout: 60_000,
pool_size: {:system, "CLICKHOUSE_POOL_SIZE", "3"},
# idle_interval: 60_000,
show_sensitive_data_on_connection_error: true

clickhouse_read_only_opts = [
adapter: ClickhouseEcto,
loggers: [Ecto.LogEntry, Sanbase.Prometheus.EctoInstrumenter],
Expand Down
7 changes: 7 additions & 0 deletions lib/sanbase/application/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@ defmodule Sanbase.Application do
fn -> Sanbase.ClickhouseRepo.enabled?() end
),

# Start the main ChRepo. This is started in all
# pods as each pod will need it.
start_in_and_if(
fn -> Sanbase.ChRepo end,
[:dev, :prod],
fn -> Sanbase.ChRepo.enabled?() end
),
# Start the main clickhouse read-only repos
clickhouse_readonly_children,

Expand Down
24 changes: 12 additions & 12 deletions lib/sanbase/balances/balance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Sanbase.Balance do
maybe_fill_gaps_last_seen_balance_ohlc: 1
]

alias Sanbase.ClickhouseRepo
alias Sanbase.ChRepo
alias Sanbase.Project

@type slug :: String.t()
Expand Down Expand Up @@ -202,7 +202,7 @@ defmodule Sanbase.Balance do
Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} ->
query = assets_held_by_address_query(address, table, opts)

case ClickhouseRepo.query_transform(query, transform_fn) do
case ChRepo.query_transform(query, transform_fn) do
{:ok, data} -> {:cont, {:ok, acc ++ data}}
{:error, error} -> {:halt, {:error, error}}
end
Expand Down Expand Up @@ -240,7 +240,7 @@ defmodule Sanbase.Balance do
Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} ->
query_struct = usd_value_address_change_query(address, datetime, table)

case ClickhouseRepo.query_transform(query_struct, transform_fn) do
case ChRepo.query_transform(query_struct, transform_fn) do
{:ok, data} -> {:cont, {:ok, acc ++ data}}
{:error, error} -> {:halt, {:error, error}}
end
Expand All @@ -265,7 +265,7 @@ defmodule Sanbase.Balance do
Enum.reduce_while(tables, {:ok, []}, fn table, {:ok, acc} ->
query_struct = usd_value_held_by_address_query(address, table)

case ClickhouseRepo.query_transform(query_struct, transform_fn) do
case ChRepo.query_transform(query_struct, transform_fn) do
{:ok, data} -> {:cont, {:ok, acc ++ data}}
{:error, error} -> {:halt, {:error, error}}
end
Expand All @@ -289,7 +289,7 @@ defmodule Sanbase.Balance do
{:ok, table} <- balances_table(slug, infr) do
query_struct = addresses_by_filter_query(slug, decimals, operator, threshold, table, opts)

ClickhouseRepo.query_transform(query_struct, fn [address, balance] ->
ChRepo.query_transform(query_struct, fn [address, balance] ->
%{
address: address,
balance: balance
Expand All @@ -309,7 +309,7 @@ defmodule Sanbase.Balance do
address = Sanbase.BlockchainAddress.to_internal_format(address)
query_struct = first_datetime_query(address, slug, blockchain)

ClickhouseRepo.query_transform(query_struct, fn [unix] ->
ChRepo.query_transform(query_struct, fn [unix] ->
DateTime.from_unix!(unix)
end)
|> maybe_unwrap_ok_value()
Expand Down Expand Up @@ -343,7 +343,7 @@ defmodule Sanbase.Balance do
def current_balance_top_addresses(slug, decimals, infrastructure, blockchain, table, opts) do
query_struct = top_addresses_query(slug, decimals, blockchain, table, opts)

ClickhouseRepo.query_transform(query_struct, fn [address, balance] ->
ChRepo.query_transform(query_struct, fn [address, balance] ->
%{
address: address,
infrastructure: infrastructure,
Expand Down Expand Up @@ -374,7 +374,7 @@ defmodule Sanbase.Balance do
defp do_current_balance(addresses, slug, decimals, blockchain, table) do
query_struct = current_balance_query(addresses, slug, decimals, blockchain, table)

ClickhouseRepo.query_transform(query_struct, fn [address, balance] ->
ChRepo.query_transform(query_struct, fn [address, balance] ->
%{
address: address,
balance: balance
Expand All @@ -385,7 +385,7 @@ defmodule Sanbase.Balance do
defp do_balance_change(addresses, slug, decimals, blockchain, from, to) do
query_struct = balance_change_query(addresses, slug, decimals, blockchain, from, to)

ClickhouseRepo.query_transform(query_struct, fn
ChRepo.query_transform(query_struct, fn
[address, balance_start, balance_end, balance_change] ->
%{
address: address,
Expand All @@ -411,7 +411,7 @@ defmodule Sanbase.Balance do

query_struct = last_balance_before_query(addresses, slug, decimals, blockchain, datetime)

case ClickhouseRepo.query_transform(query_struct, & &1) do
case ChRepo.query_transform(query_struct, & &1) do
{:ok, list} ->
# If an address does not own the given coin/token, it will be missing from the
# result. Iterate it like this in order to fill the missing values with 0
Expand All @@ -437,7 +437,7 @@ defmodule Sanbase.Balance do
query_struct =
historical_balance_query(address, slug, decimals, blockchain, from, to, interval)

ClickhouseRepo.query_transform(query_struct, fn [unix, value, has_changed] ->
ChRepo.query_transform(query_struct, fn [unix, value, has_changed] ->
%{
datetime: DateTime.from_unix!(unix),
balance: value,
Expand Down Expand Up @@ -469,7 +469,7 @@ defmodule Sanbase.Balance do
query_struct =
historical_balance_ohlc_query(address, slug, decimals, blockchain, from, to, interval)

ClickhouseRepo.query_transform(
ChRepo.query_transform(
query_struct,
fn [unix, open, high, low, close, has_changed] ->
%{
Expand Down
22 changes: 11 additions & 11 deletions lib/sanbase/balances/balance_sql_query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule Sanbase.Balance.SqlQuery do
balance,
txID,
computedAt
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE address IN (addresses_of_interest) AND dt >= toStartOfYear(from) AND dt <= to
),
merged AS
Expand Down Expand Up @@ -91,7 +91,7 @@ defmodule Sanbase.Balance.SqlQuery do
argMaxIf(balance, (dt, txID, computedAt), dt <= {{from}}) / {{decimals}} AS start_balance,
argMaxIf(balance, (dt, txID, computedAt), dt <= {{to}}) / {{decimals}} AS end_balance,
end_balance - start_balance AS diff
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{maybe_selector_clause(blockchain, slug, "slug")}
#{address_clause(addresses, argument_name: "addresses")} AND
Expand Down Expand Up @@ -142,7 +142,7 @@ defmodule Sanbase.Balance.SqlQuery do
toUnixTimestamp(intDiv(toUInt32(dt), {{interval}}) * {{interval}}) AS time,
address,
argMax(balance, (dt, txID, computedAt)) / {{decimals}} AS balance
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{maybe_selector_clause(blockchain, slug, "slug")}
#{address_clause(addresses, argument_name: "addresses")} AND
Expand Down Expand Up @@ -218,7 +218,7 @@ defmodule Sanbase.Balance.SqlQuery do
max(balance) / {{decimals}} AS high,
argMax(balance, (dt, txID, computedAt)) / {{decimals}} AS close,
min(balance) / {{decimals}} AS low
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{maybe_selector_clause(blockchain, slug, "slug")}
#{address_clause(addresses, argument_name: "addresses")} AND
Expand Down Expand Up @@ -253,7 +253,7 @@ defmodule Sanbase.Balance.SqlQuery do
SELECT
address,
argMax(balance, (dt, txID, computedAt)) / {{decimals}}
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{maybe_selector_clause(blockchain, slug, "slug")}
#{address_clause(addresses, argument_name: "addresses")}
Expand All @@ -273,7 +273,7 @@ defmodule Sanbase.Balance.SqlQuery do
def first_datetime_query(address, slug, blockchain) when is_binary(address) do
sql = """
SELECT toUnixTimestamp(min(dt))
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{maybe_selector_clause(blockchain, slug, "slug")}
#{address_clause(address, argument_name: "address")}
Expand Down Expand Up @@ -457,7 +457,7 @@ defmodule Sanbase.Balance.SqlQuery do
balance,
txID,
computedAt
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{address_clause(address, argument_name: "address")}
)
Expand Down Expand Up @@ -498,7 +498,7 @@ defmodule Sanbase.Balance.SqlQuery do
0
) AS current_balance,
current_balance - previous_balance AS balance_change
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{address_clause(address, argument_name: "address")}
#{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"}
Expand Down Expand Up @@ -530,7 +530,7 @@ defmodule Sanbase.Balance.SqlQuery do
balance,
txID,
computedAt
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{address_clause(address, argument_name: "address")}
)
Expand Down Expand Up @@ -560,7 +560,7 @@ defmodule Sanbase.Balance.SqlQuery do
SELECT
{{slug}} AS name,
argMax(balance, (dt, txID, computedAt)) / {{decimals}} AS balance
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{address_clause(address, argument_name: "address")}
#{if Keyword.get(opts, :show_assets_with_zero_balance, false), do: "", else: "HAVING balance > 0"}
Expand Down Expand Up @@ -646,7 +646,7 @@ defmodule Sanbase.Balance.SqlQuery do
SELECT
address,
argMaxIf(balance, (dt, txID, computedAt), dt <= {{datetime}}) / {{decimals}}
FROM {{table}}
FROM \{\{table:inline\}\}
WHERE
#{maybe_selector_clause(blockchain, slug, "slug")}
#{address_clause(addresses, argument_name: "addresses")} AND
Expand Down
4 changes: 2 additions & 2 deletions lib/sanbase/billing/subscription/san_burn_credit_trx.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Sanbase.Billing.Subscription.SanBurnCreditTransaction do

import Ecto.Changeset

alias Sanbase.ClickhouseRepo
alias Sanbase.ChRepo
alias Sanbase.Accounts.EthAccount
alias Sanbase.Accounts.User
alias Sanbase.Repo
Expand Down Expand Up @@ -72,7 +72,7 @@ defmodule Sanbase.Billing.Subscription.SanBurnCreditTransaction do
def fetch_burn_trxs do
query_struct = fetch_san_burns_query()

ClickhouseRepo.query_transform(query_struct, fn [timestamp, address, value, trx_id] ->
ChRepo.query_transform(query_struct, fn [timestamp, address, value, trx_id] ->
%{
trx_datetime: DateTime.from_unix!(timestamp),
address: address,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
defmodule Sanbase.BlockchainAddress.BlockchainAddressLabelChange do
import __MODULE__.SqlQuery
alias Sanbase.ClickhouseRepo
alias Sanbase.ChRepo

def labels_list() do
query_struct = labels_list_query()

ClickhouseRepo.query_transform(query_struct, fn
ChRepo.query_transform(query_struct, fn
[label_fqn, display_name] ->
origin = String.split(label_fqn, "/") |> List.first()
%{name: label_fqn, human_readable_name: display_name, origin: origin}
Expand All @@ -18,7 +18,7 @@ defmodule Sanbase.BlockchainAddress.BlockchainAddressLabelChange do

query_struct = label_changes_query(address, blockchain, from, to)

ClickhouseRepo.query_transform(query_struct, fn [unix, address, label_fqn, sign] ->
ChRepo.query_transform(query_struct, fn [unix, address, label_fqn, sign] ->
%{
datetime: DateTime.from_unix!(unix),
address: address,
Expand Down
Loading