Skip to content

Commit 34c9818

Browse files
committed
Downgrade cachex as the new version gets OOM
1 parent 245ca01 commit 34c9818

7 files changed

Lines changed: 520 additions & 262 deletions

File tree

lib/sanbase_web/graphql/cache/cachex_provider.ex

Lines changed: 103 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
defmodule SanbaseWeb.Graphql.CachexProvider do
22
@behaviour SanbaseWeb.Graphql.CacheProvider
3+
@default_ttl_seconds 300
4+
5+
@max_lock_acquired_time_ms 60_000
36

47
import Cachex.Spec
5-
require Logger
8+
9+
@compile inline: [
10+
execute_cache_miss_function: 4,
11+
handle_execute_cache_miss_function: 4,
12+
obtain_lock: 3
13+
]
614

715
@impl SanbaseWeb.Graphql.CacheProvider
816
def start_link(opts) do
@@ -14,43 +22,21 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
1422
Supervisor.child_spec({Cachex, opts(opts)}, id: Keyword.fetch!(opts, :id))
1523
end
1624

17-
@default_max_entries 2_000_000
18-
@default_reclaim_ratio 0.3
19-
@default_limit_check_interval_ms 5000
20-
@default_ttl_seconds 300
21-
@default_expiration_interval_seconds 10
22-
2325
defp opts(opts) do
24-
max_entries = Keyword.get(opts, :max_entries, @default_max_entries)
25-
reclaim = Keyword.get(opts, :reclaim, @default_reclaim_ratio)
26-
27-
limit_interval_ms =
28-
Keyword.get(opts, :limit_check_interval_ms, @default_limit_check_interval_ms)
29-
30-
default_ttl = Keyword.get(opts, :default_ttl_seconds, @default_ttl_seconds)
31-
32-
expiration_interval =
33-
Keyword.get(opts, :expiration_interval_seconds, @default_expiration_interval_seconds)
34-
35-
ensure_opts_ets()
36-
:ets.insert(:sanbase_graphql_cachex_opts, {Keyword.fetch!(opts, :name), default_ttl})
37-
3826
[
3927
name: Keyword.fetch!(opts, :name),
40-
hooks: [
41-
hook(
42-
module: Cachex.Limit.Scheduled,
43-
args: {
44-
max_entries,
45-
[reclaim: reclaim],
46-
[frequency: limit_interval_ms]
47-
}
48-
)
49-
],
28+
# When the keys reach 2 million, remove 30% of the
29+
# least recently written keys
30+
limit: 2_000_000,
31+
policy: Cachex.Policy.LRW,
32+
reclaim: 0.3,
33+
# How often the Janitor process runs to clean the cache
34+
interval: 5000,
35+
# The default TTL of keys in the cache
5036
expiration:
5137
expiration(
52-
default: :timer.seconds(default_ttl),
53-
interval: :timer.seconds(expiration_interval),
38+
default: :timer.seconds(@default_ttl_seconds),
39+
interval: :timer.seconds(10),
5440
lazy: true
5541
)
5642
]
@@ -92,7 +78,8 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
9278
:ok
9379

9480
{:nocache, _} ->
95-
Process.put(:do_not_cache_query, true)
81+
Process.put(:has_nocache_field, true)
82+
9683
:ok
9784

9885
_ ->
@@ -103,97 +90,105 @@ defmodule SanbaseWeb.Graphql.CachexProvider do
10390
@impl SanbaseWeb.Graphql.CacheProvider
10491
def get_or_store(cache, key, func, cache_modify_middleware) do
10592
true_key = true_key(key)
106-
ttl = ttl_ms(cache, key)
107-
108-
result =
109-
Cachex.fetch(cache, true_key, fn ->
110-
case func.() do
111-
{:ok, _} = ok_tuple ->
112-
{:commit, compress_value(ok_tuple), [expire: ttl]}
11393

114-
{:error, _} = error ->
115-
{:ignore, error}
94+
case Cachex.get(cache, true_key) do
95+
{:ok, compressed_value} when is_binary(compressed_value) ->
96+
decompress_value(compressed_value)
11697

117-
{:nocache, value} ->
118-
# Do not put the :do_not_cache_query flag here as is
119-
# is executed inside a Courier process. Set it afterwards
120-
# when handling the result
121-
{:ignore, {:nocache, value}}
98+
_ ->
99+
execute_cache_miss_function(cache, key, func, cache_modify_middleware)
100+
end
101+
end
122102

123-
{:middleware, _middleware_module, _args} = tuple ->
124-
{:ignore, cache_modify_middleware.(cache, key, tuple)}
125-
end
126-
end)
103+
defp execute_cache_miss_function(cache, key, func, cache_modify_middleware) do
104+
# This is the only place where we need to have the transactional get_or_store
105+
# mechanism. Cachex.fetch! is running in multiple processes, which causes issues
106+
# when testing. Cachex.transaction has a non-configurable timeout. We actually
107+
# can achieve the required behavior by manually getting and realeasing the lock.
108+
# The transactional guarantees are not needed.
109+
cache_record = Cachex.Services.Overseer.ensure(cache)
110+
111+
# Start a process that will handle the unlock in case this process terminates
112+
# without releasing the lock. The process is not linked to the current one so
113+
# it can continue to live and do its job even if this process terminates.
114+
{:ok, unlocker_pid} =
115+
__MODULE__.Unlocker.start(max_lock_acquired_time_ms: @max_lock_acquired_time_ms)
116+
117+
unlock_fun = fn -> Cachex.Services.Locksmith.unlock(cache_record, [true_key(key)]) end
118+
119+
try do
120+
true = obtain_lock(cache_record, [true_key(key)])
121+
_ = GenServer.cast(unlocker_pid, {:unlock_after, unlock_fun})
122+
123+
case Cachex.get(cache, true_key(key)) do
124+
{:ok, compressed_value} when is_binary(compressed_value) ->
125+
# First check if the result has not been stored while waiting for the lock.
126+
decompress_value(compressed_value)
127+
128+
_ ->
129+
handle_execute_cache_miss_function(
130+
cache,
131+
key,
132+
_result = func.(),
133+
cache_modify_middleware
134+
)
135+
end
136+
after
137+
true = unlock_fun.()
138+
# We expect the process to unlock only in case we don't reach here for some reason.
139+
# If we're here we can kill the process. If the process has already unlocked
140+
_ = GenServer.cast(unlocker_pid, :stop)
141+
end
142+
end
127143

128-
case result do
129-
{:commit, compressed} when is_binary(compressed) ->
130-
decompress_value(compressed)
144+
defp obtain_lock(cache_record, keys, attempt \\ 0)
131145

132-
{:ok, compressed} when is_binary(compressed) ->
133-
decompress_value(compressed)
146+
defp obtain_lock(_cache_record, _keys, 30) do
147+
raise("Obtaining cache lock failed because of timeout")
148+
end
134149

135-
{:error, error} ->
136-
# Transforms like :no_cache -> "Specified cache not running"
137-
error_msg = if is_atom(error), do: Cachex.Error.long_form(error), else: error
138-
{:error, error_msg}
150+
defp obtain_lock(cache_record, keys, attempt) do
151+
case Cachex.Services.Locksmith.lock(cache_record, keys) do
152+
false ->
153+
# In case the lock cannot be obtained, try again after some time
154+
# In the beginning the next attempt is scheduled in an exponential
155+
# backoff fashion - 10, 130, 375, 709, etc. milliseconds
156+
# The backoff is capped at 2 seconds
157+
sleep_ms = (:math.pow(attempt * 20, 1.6) + 10) |> trunc()
158+
sleep_ms = Enum.min([sleep_ms, 2000])
159+
160+
Process.sleep(sleep_ms)
161+
obtain_lock(cache_record, keys, attempt + 1)
162+
163+
true ->
164+
true
165+
end
166+
end
139167

140-
{:ignore, {:error, error}} ->
141-
# Transforms like :no_cache -> "Specified cache not running"
142-
error_msg = if is_atom(error), do: Cachex.Error.long_form(error), else: error
143-
{:error, error_msg}
168+
defp handle_execute_cache_miss_function(cache, key, result, cache_modify_middleware) do
169+
case result do
170+
{:middleware, _, _} = tuple ->
171+
cache_modify_middleware.(cache, key, tuple)
144172

145-
{:ignore, {:nocache, value}} ->
146-
Process.put(:do_not_cache_query, true)
173+
{:nocache, value} ->
174+
Process.put(:has_nocache_field, true)
147175
value
148176

149-
{:ignore, value} ->
150-
value
177+
{:error, _} = error ->
178+
error
179+
180+
{:ok, _value} = ok_tuple ->
181+
cache_item(cache, key, ok_tuple)
182+
ok_tuple
151183
end
152184
end
153185

154-
defp ttl_ms(_cache, {_key, ttl}) when is_integer(ttl), do: :timer.seconds(ttl)
155-
defp ttl_ms(cache, _key), do: :timer.seconds(default_ttl_seconds(cache))
156-
157186
defp cache_item(cache, {key, ttl}, value) when is_integer(ttl) do
158-
Cachex.put(cache, key, compress_value(value), expire: :timer.seconds(ttl))
187+
Cachex.put(cache, key, compress_value(value), ttl: :timer.seconds(ttl))
159188
end
160189

161190
defp cache_item(cache, key, value) do
162-
Cachex.put(cache, key, compress_value(value),
163-
expire: :timer.seconds(default_ttl_seconds(cache))
164-
)
165-
end
166-
167-
defp default_ttl_seconds(cache) do
168-
case :ets.lookup(:sanbase_graphql_cachex_opts, cache) do
169-
[{^cache, ttl}] -> ttl
170-
[] -> @default_ttl_seconds
171-
end
172-
rescue
173-
_ ->
174-
Logger.error(
175-
"CachexProvider: Could not get default TTL from ETS for cache #{cache}, using default #{@default_ttl_seconds}"
176-
)
177-
178-
@default_ttl_seconds
179-
end
180-
181-
defp ensure_opts_ets() do
182-
case :ets.whereis(:sanbase_graphql_cachex_opts) do
183-
:undefined ->
184-
try do
185-
:ets.new(:sanbase_graphql_cachex_opts, [:named_table, :public, :set])
186-
catch
187-
:error, {:badarg, _} -> :ok
188-
:error, :badarg -> :ok
189-
:error, %ArgumentError{} -> :ok
190-
end
191-
192-
_ ->
193-
:ok
194-
end
195-
196-
:ok
191+
Cachex.put(cache, key, compress_value(value), ttl: :timer.seconds(@default_ttl_seconds))
197192
end
198193

199194
defp true_key({key, ttl}) when is_integer(ttl), do: key

0 commit comments

Comments
 (0)