Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/sanbase_web/graphql/cache/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule SanbaseWeb.Graphql.Cache do

alias __MODULE__, as: CacheMod
# alias SanbaseWeb.Graphql.ConCacheProvider, as: CacheProvider
# CachexProvider uses Cachex.fetch (fallback runs in worker → breaks Dataloader)
alias SanbaseWeb.Graphql.CachexProvider, as: CacheProvider

@ttl 300
Expand Down
211 changes: 108 additions & 103 deletions lib/sanbase_web/graphql/cache/cachex_provider.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
defmodule SanbaseWeb.Graphql.CachexProvider do
@behaviour SanbaseWeb.Graphql.CacheProvider
@default_ttl_seconds 300

@max_lock_acquired_time_ms 60_000

import Cachex.Spec

@compile inline: [
execute_cache_miss_function: 4,
handle_execute_cache_miss_function: 4,
obtain_lock: 3
]
require Logger

@impl SanbaseWeb.Graphql.CacheProvider
def start_link(opts) do
Expand All @@ -22,21 +14,43 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
Supervisor.child_spec({Cachex, opts(opts)}, id: Keyword.fetch!(opts, :id))
end

@default_max_entries 2_000_000
@default_reclaim_ratio 0.3
@default_limit_check_interval_ms 5000
@default_ttl_seconds 300
@default_expiration_interval_seconds 10

defp opts(opts) do
max_entries = Keyword.get(opts, :max_entries, @default_max_entries)
reclaim = Keyword.get(opts, :reclaim, @default_reclaim_ratio)

limit_interval_ms =
Keyword.get(opts, :limit_check_interval_ms, @default_limit_check_interval_ms)

default_ttl = Keyword.get(opts, :default_ttl_seconds, @default_ttl_seconds)

expiration_interval =
Keyword.get(opts, :expiration_interval_seconds, @default_expiration_interval_seconds)

ensure_opts_ets()
:ets.insert(:sanbase_graphql_cachex_opts, {Keyword.fetch!(opts, :name), default_ttl})

[
name: Keyword.fetch!(opts, :name),
# When the keys reach 2 million, remove 30% of the
# least recently written keys
limit: 2_000_000,
policy: Cachex.Policy.LRW,
reclaim: 0.3,
# How often the Janitor process runs to clean the cache
interval: 5000,
# The default TTL of keys in the cache
hooks: [
hook(
module: Cachex.Limit.Scheduled,
args: {
max_entries,
[reclaim: reclaim],
[frequency: limit_interval_ms]
}
)
],
expiration:
expiration(
default: :timer.seconds(@default_ttl_seconds),
interval: :timer.seconds(10),
default: :timer.seconds(default_ttl),
interval: :timer.seconds(expiration_interval),
lazy: true
)
]
Expand Down Expand Up @@ -78,8 +92,7 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
:ok

{:nocache, _} ->
Process.put(:has_nocache_field, true)

Process.put(:do_not_cache_query, true)
:ok

_ ->
Expand All @@ -90,105 +103,97 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
@impl SanbaseWeb.Graphql.CacheProvider
def get_or_store(cache, key, func, cache_modify_middleware) do
true_key = true_key(key)
ttl = ttl_ms(cache, key)

case Cachex.get(cache, true_key) do
{:ok, compressed_value} when is_binary(compressed_value) ->
decompress_value(compressed_value)
result =
Cachex.fetch(cache, true_key, fn ->
case func.() do
{:ok, _} = ok_tuple ->
{:commit, compress_value(ok_tuple), [expire: ttl]}

_ ->
execute_cache_miss_function(cache, key, func, cache_modify_middleware)
end
end
{:error, _} = error ->
{:ignore, error}

defp execute_cache_miss_function(cache, key, func, cache_modify_middleware) do
# This is the only place where we need to have the transactional get_or_store
# mechanism. Cachex.fetch! is running in multiple processes, which causes issues
# when testing. Cachex.transaction has a non-configurable timeout. We actually
# can achieve the required behavior by manually getting and realeasing the lock.
# The transactional guarantees are not needed.
cache_record = Cachex.Services.Overseer.ensure(cache)

# Start a process that will handle the unlock in case this process terminates
# without releasing the lock. The process is not linked to the current one so
# it can continue to live and do its job even if this process terminates.
{:ok, unlocker_pid} =
__MODULE__.Unlocker.start(max_lock_acquired_time_ms: @max_lock_acquired_time_ms)

unlock_fun = fn -> Cachex.Services.Locksmith.unlock(cache_record, [true_key(key)]) end

try do
true = obtain_lock(cache_record, [true_key(key)])
_ = GenServer.cast(unlocker_pid, {:unlock_after, unlock_fun})

case Cachex.get(cache, true_key(key)) do
{:ok, compressed_value} when is_binary(compressed_value) ->
# First check if the result has not been stored while waiting for the lock.
decompress_value(compressed_value)

_ ->
handle_execute_cache_miss_function(
cache,
key,
_result = func.(),
cache_modify_middleware
)
end
after
true = unlock_fun.()
# We expect the process to unlock only in case we don't reach here for some reason.
# If we're here we can kill the process. If the process has already unlocked
_ = GenServer.cast(unlocker_pid, :stop)
end
end
{:nocache, value} ->
# Do not put the :do_not_cache_query flag here as is
# is executed inside a Courier process. Set it afterwards
# when handling the result
{:ignore, {:nocache, value}}

defp obtain_lock(cache_record, keys, attempt \\ 0)
{:middleware, _middleware_module, _args} = tuple ->
{:ignore, cache_modify_middleware.(cache, key, tuple)}
end
end)

defp obtain_lock(_cache_record, _keys, 30) do
raise("Obtaining cache lock failed because of timeout")
end
case result do
{:commit, compressed} when is_binary(compressed) ->
decompress_value(compressed)

defp obtain_lock(cache_record, keys, attempt) do
case Cachex.Services.Locksmith.lock(cache_record, keys) do
false ->
# In case the lock cannot be obtained, try again after some time
# In the beginning the next attempt is scheduled in an exponential
# backoff fashion - 10, 130, 375, 709, etc. milliseconds
# The backoff is capped at 2 seconds
sleep_ms = (:math.pow(attempt * 20, 1.6) + 10) |> trunc()
sleep_ms = Enum.min([sleep_ms, 2000])

Process.sleep(sleep_ms)
obtain_lock(cache_record, keys, attempt + 1)

true ->
true
end
end
{:ok, compressed} when is_binary(compressed) ->
decompress_value(compressed)

defp handle_execute_cache_miss_function(cache, key, result, cache_modify_middleware) do
case result do
{:middleware, _, _} = tuple ->
cache_modify_middleware.(cache, key, tuple)
{:error, error} ->
# Transforms like :no_cache -> "Specified cache not running"
error_msg = if is_atom(error), do: Cachex.Error.long_form(error), else: error
{:error, error_msg}

{:nocache, value} ->
Process.put(:has_nocache_field, true)
value
{:ignore, {:error, error}} ->
# Transforms like :no_cache -> "Specified cache not running"
error_msg = if is_atom(error), do: Cachex.Error.long_form(error), else: error
{:error, error_msg}

{:error, _} = error ->
error
{:ignore, {:nocache, value}} ->
Process.put(:do_not_cache_query, true)
value

{:ok, _value} = ok_tuple ->
cache_item(cache, key, ok_tuple)
ok_tuple
{:ignore, value} ->
value
end
end

defp ttl_ms(_cache, {_key, ttl}) when is_integer(ttl), do: :timer.seconds(ttl)
defp ttl_ms(cache, _key), do: :timer.seconds(default_ttl_seconds(cache))

defp cache_item(cache, {key, ttl}, value) when is_integer(ttl) do
Cachex.put(cache, key, compress_value(value), ttl: :timer.seconds(ttl))
Cachex.put(cache, key, compress_value(value), expire: :timer.seconds(ttl))
end

defp cache_item(cache, key, value) do
Cachex.put(cache, key, compress_value(value), ttl: :timer.seconds(@default_ttl_seconds))
Cachex.put(cache, key, compress_value(value),
expire: :timer.seconds(default_ttl_seconds(cache))
)
end

defp default_ttl_seconds(cache) do
case :ets.lookup(:sanbase_graphql_cachex_opts, cache) do
[{^cache, ttl}] -> ttl
[] -> @default_ttl_seconds
end
rescue
_ ->
Logger.error(
"CachexProvider: Could not get default TTL from ETS for cache #{cache}, using default #{@default_ttl_seconds}"
)

@default_ttl_seconds
end

defp ensure_opts_ets() do
case :ets.whereis(:sanbase_graphql_cachex_opts) do
:undefined ->
try do
:ets.new(:sanbase_graphql_cachex_opts, [:named_table, :public, :set])
catch
:error, {:badarg, _} -> :ok
:error, :badarg -> :ok
:error, %ArgumentError{} -> :ok
end

_ ->
:ok
end

:ok
end

defp true_key({key, ttl}) when is_integer(ttl), do: key
Expand Down
Loading