Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/sanbase/application/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ defmodule Sanbase.Application do
[
id: :sanbase_generic_cache,
name: Sanbase.Cache.name(),
ttl_check_interval: :timer.seconds(30),
ttl_check_interval: :timer.seconds(15),
global_ttl: :timer.minutes(5),
acquire_lock_timeout: 60_000
]},
Expand Down
10 changes: 10 additions & 0 deletions lib/sanbase/project/social_volume_query/social_volume_query.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
defmodule Sanbase.Project.SocialVolumeQuery do
use Ecto.Schema

import Ecto.Query
import Ecto.Changeset

alias Sanbase.Project

@file_name "forbidden_social_volume_query_words.csv"
Expand All @@ -26,6 +28,14 @@ defmodule Sanbase.Project.SocialVolumeQuery do
|> unique_constraint(:project_id)
end

def by_project_ids(project_ids) do
from(
q in __MODULE__,
where: q.project_id in ^project_ids
)
|> Sanbase.Repo.all()
end

def default_query_parts(%Project{} = project) do
%Project{ticker: ticker, name: name, slug: slug} = project

Expand Down
162 changes: 73 additions & 89 deletions lib/sanbase_web/graphql/cache/cachex_provider.ex
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
defmodule SanbaseWeb.Graphql.CachexProvider do
@behaviour SanbaseWeb.Graphql.CacheProvider
@default_ttl_seconds 300
import Cachex.Spec

@max_lock_acquired_time_ms 60_000
@behaviour SanbaseWeb.Graphql.CacheProvider

import Cachex.Spec
@default_ttl_seconds 300

@compile inline: [
execute_cache_miss_function: 4,
handle_execute_cache_miss_function: 4,
obtain_lock: 3
]
@compile inline: [execute_cache_miss_function: 4]

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Starts a Cachex cache with the given options.
"""
@spec start_link(keyword()) :: {:ok, pid()} | {:error, term()}
def start_link(opts) do
Cachex.start_link(opts(opts))
end

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Returns a child spec for the Cachex cache.
"""
@spec child_spec(keyword()) :: Supervisor.child_spec()
def child_spec(opts) do
Supervisor.child_spec({Cachex, opts(opts)}, id: Keyword.fetch!(opts, :id))
end
Expand All @@ -27,7 +30,7 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
name: Keyword.fetch!(opts, :name),
# When the keys reach 2 million, remove 30% of the
# least recently written keys
limit: 2_000_000,
limit: 200_000,
policy: Cachex.Policy.LRW,
reclaim: 0.3,
# How often the Janitor process runs to clean the cache
Expand All @@ -43,24 +46,40 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
end

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Returns the size of the cache in megabytes.
"""
@spec size(atom()) :: float()
def size(cache) do
{:ok, bytes_size} = Cachex.inspect(cache, {:memory, :bytes})
(bytes_size / (1024 * 1024)) |> Float.round(2)
end

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Returns the number of items in the cache.
"""
@spec count(atom()) :: integer()
def count(cache) do
{:ok, count} = Cachex.size(cache)
count
end

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Clears all items from the cache.
"""
@spec clear_all(atom()) :: :ok
def clear_all(cache) do
{:ok, _} = Cachex.clear(cache)
:ok
end

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Retrieves a value from the cache by key.
"""
@spec get(atom(), term()) :: term() | nil
def get(cache, key) do
case Cachex.get(cache, true_key(key)) do
{:ok, compressed_value} when is_binary(compressed_value) ->
Expand All @@ -72,6 +91,10 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
end

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Stores a value in the cache by key.
"""
@spec store(atom(), term(), term()) :: :ok
def store(cache, key, value) do
case value do
{:error, _} ->
Expand All @@ -88,6 +111,10 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
end

@impl SanbaseWeb.Graphql.CacheProvider
@doc """
Retrieves a value from the cache by key, or stores the result of the given function if the key is not found.
"""
@spec get_or_store(atom(), term(), (-> term()), (atom(), term(), term() -> term())) :: term()
def get_or_store(cache, key, func, cache_modify_middleware) do
true_key = true_key(key)

Expand All @@ -100,95 +127,52 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
end
end

defp execute_cache_miss_function(cache, key, func, cache_modify_middleware) do
# This is the only place where we need to have the transactional get_or_store
# mechanism. Cachex.fetch! is running in multiple processes, which causes issues
# when testing. Cachex.transaction has a non-configurable timeout. We actually
# can achieve the required behavior by manually getting and realeasing the lock.
# The transactional guarantees are not needed.
cache_record = Cachex.Services.Overseer.ensure(cache)

# Start a process that will handle the unlock in case this process terminates
# without releasing the lock. The process is not linked to the current one so
# it can continue to live and do its job even if this process terminates.
{:ok, unlocker_pid} =
__MODULE__.Unlocker.start(max_lock_acquired_time_ms: @max_lock_acquired_time_ms)

unlock_fun = fn -> Cachex.Services.Locksmith.unlock(cache_record, [true_key(key)]) end

try do
true = obtain_lock(cache_record, [true_key(key)])
_ = GenServer.cast(unlocker_pid, {:unlock_after, unlock_fun})

case Cachex.get(cache, true_key(key)) do
{:ok, compressed_value} when is_binary(compressed_value) ->
# First check if the result has not been stored while waiting for the lock.
decompress_value(compressed_value)

_ ->
handle_execute_cache_miss_function(
cache,
key,
_result = func.(),
cache_modify_middleware
)
end
after
true = unlock_fun.()
# We expect the process to unlock only in case we don't reach here for some reason.
# If we're here we can kill the process. If the process has already unlocked
_ = GenServer.cast(unlocker_pid, :stop)
end
end

defp obtain_lock(cache_record, keys, attempt \\ 0)

defp obtain_lock(_cache_record, _keys, 30) do
raise("Obtaining cache lock failed because of timeout")
end

defp obtain_lock(cache_record, keys, attempt) do
case Cachex.Services.Locksmith.lock(cache_record, keys) do
false ->
# In case the lock cannot be obtained, try again after some time
# In the beginning the next attempt is scheduled in an exponential
# backoff fashion - 10, 130, 375, 709, etc. milliseconds
# The backoff is capped at 2 seconds
sleep_ms = (:math.pow(attempt * 20, 1.6) + 10) |> trunc()
sleep_ms = Enum.min([sleep_ms, 2000])

Process.sleep(sleep_ms)
obtain_lock(cache_record, keys, attempt + 1)

true ->
true
end
end
@doc false
@spec execute_cache_miss_function(atom(), term(), (-> term()), (atom(), term(), term() ->
term())) :: term()
defp execute_cache_miss_function(cache, key, func, cache_modify_middleware)
when is_function(func, 0) and is_function(cache_modify_middleware, 3) do
Cachex.fetch(
cache,
key,
fn ->
case func.() do
{:middleware, _, _} = tuple ->
{:ignore, cache_modify_middleware.(cache, key, tuple)}

defp handle_execute_cache_miss_function(cache, key, result, cache_modify_middleware) do
case result do
{:middleware, _, _} = tuple ->
cache_modify_middleware.(cache, key, tuple)
{:nocache, value} ->
Process.put(:has_nocache_field, true)
{:ignore, value}

{:nocache, value} ->
Process.put(:has_nocache_field, true)
value
{:error, _} = error ->
{:ignore, error}

{:error, _} = error ->
error
{:ok, _value} = ok_tuple ->
cache_item(cache, key, ok_tuple)

{:ok, _value} = ok_tuple ->
cache_item(cache, key, ok_tuple)
ok_tuple
end
{:ignore, ok_tuple}
end
end
)
|> elem(1)
end

defp cache_item(cache, {key, ttl}, value) when is_integer(ttl) do
Cachex.put(cache, key, compress_value(value), ttl: :timer.seconds(ttl))
compressed_value = compress_value(value)

if byte_size(compressed_value) < 500_000 do
# Do not cache items if their compressed size is > 500kb
Cachex.put(cache, key, compressed_value, expire: :timer.seconds(ttl))
end
end

defp cache_item(cache, key, value) do
Cachex.put(cache, key, compress_value(value), ttl: :timer.seconds(@default_ttl_seconds))
compressed_value = compress_value(value)

if byte_size(compressed_value) < 500_000 do
# Do not cache items if their compressed size is > 500kb
Cachex.put(cache, key, compressed_value, expire: :timer.seconds(@default_ttl_seconds))
end
end

defp true_key({key, ttl}) when is_integer(ttl), do: key
Expand Down
74 changes: 69 additions & 5 deletions lib/sanbase_web/graphql/dataloader/postgres_dataloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ defmodule SanbaseWeb.Graphql.PostgresDataloader do

alias Sanbase.Repo
alias Sanbase.Comment
alias Sanbase.Model.{MarketSegment, Infrastructure}
alias Sanbase.Model.MarketSegment
alias Sanbase.Model.Infrastructure
alias Sanbase.Project
alias Sanbase.Project.ContractAddress
alias Sanbase.Project.SourceSlugMapping
alias Sanbase.ProjectEthAddress

def data() do
Dataloader.KV.new(&query/2)
Expand Down Expand Up @@ -62,17 +67,28 @@ defmodule SanbaseWeb.Graphql.PostgresDataloader do
Map.new(users, &{&1.id, &1})
end

def query(:market_segment, market_segment_ids) do
market_segment_ids = Enum.to_list(market_segment_ids)
def query(:market_segments, project_ids) do
project_ids = Enum.to_list(project_ids)

from(ms in MarketSegment,
where: ms.id in ^market_segment_ids
join: p in assoc(ms, :projects),
where: p.id in ^project_ids,
select: {p.id, ms.name}
)
|> Repo.all()
|> Enum.map(fn %MarketSegment{id: id, name: name} -> {id, name} end)
|> Enum.group_by(fn {project_id, _segment} -> project_id end, fn {_project_id, segment} ->
segment
end)
|> Map.new()
end

def query(:social_volume_query, project_ids) do
project_ids = Enum.to_list(project_ids)

Sanbase.Project.SocialVolumeQuery.by_project_ids(project_ids)
|> Map.new(&{&1.project_id, &1})
end

def query(:infrastructure, infrastructure_ids) do
infrastructure_ids = Enum.to_list(infrastructure_ids)

Expand Down Expand Up @@ -187,6 +203,54 @@ defmodule SanbaseWeb.Graphql.PostgresDataloader do
|> Enum.into(%{}, fn %{slug: slug} = project -> {slug, project} end)
end

def query(:main_contract_address, project_ids) do
project_ids = Enum.to_list(project_ids)

from(ca in ContractAddress,
where: ca.project_id in ^project_ids,
select: {ca.project_id, ca}
)
|> Repo.all()
|> Enum.group_by(fn {project_id, _} -> project_id end, fn {_, ca} -> ca end)
|> Map.new(fn {project_id, contract_addresses} ->
main = ContractAddress.list_to_main_contract_address(contract_addresses)
{project_id, main.address}
end)
end

def query(:contract_addresses, project_ids) do
project_ids = Enum.to_list(project_ids)

from(ca in ContractAddress,
where: ca.project_id in ^project_ids,
select: {ca.project_id, ca}
)
|> Repo.all()
|> Enum.group_by(fn {project_id, _} -> project_id end, fn {_, ca} -> ca end)
end

def query(:eth_addresses, project_ids) do
project_ids = Enum.to_list(project_ids)

from(ea in ProjectEthAddress,
where: ea.project_id in ^project_ids,
select: {ea.project_id, ea}
)
|> Repo.all()
|> Enum.group_by(fn {project_id, _} -> project_id end, fn {_, ea} -> ea end)
end

def query(:source_slug_mappings, project_ids) do
project_ids = Enum.to_list(project_ids)

from(ssm in SourceSlugMapping,
where: ssm.project_id in ^project_ids,
select: {ssm.project_id, ssm}
)
|> Repo.all()
|> Enum.group_by(fn {project_id, _} -> project_id end, fn {_, ssm} -> ssm end)
end

# Private functions

def get_comments_count(ids_mapset, module, field) do
Expand Down
10 changes: 8 additions & 2 deletions lib/sanbase_web/graphql/dataloader/sanbase_dataloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,20 @@ defmodule SanbaseWeb.Graphql.SanbaseDataloader do
:current_user_address_details,
:infrastructure,
:insights_count_per_user,
:market_segment,
:market_segments,
:project_by_slug,
:traded_on_exchanges_count,
:traded_on_exchanges,
:social_volume_query,
# Users
:users_by_id,
# Founders
:available_founders_per_slug
:available_founders_per_slug,
# Contract addresses
:main_contract_address,
:contract_addresses,
:eth_addresses,
:source_slug_mappings
# Trending Words
]

Expand Down
Loading