-
-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ping command for PubSub connections #274
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is gonna be a useful addition. However, few notes:
- We need to handle
:ping
calls when the gen_statem is in thedisconnected
state. - We need tests.
- Instead of sending a
PING
command with a message, could we just keep a queue of pendingPING
commands, and for everyPONG
that the gen_statem receives we send it to the next caller waiting in the queue? Redis is single-threaded so it will reply toPING
s in order anyway.
lib/redix/pubsub/connection.ex
Outdated
{:ok, task_pid} = | ||
Task.start(fn -> | ||
res = | ||
receive do | ||
{:pong, ^random_string} -> :ok | ||
after | ||
5000 -> :error | ||
end | ||
|
||
:ok = :gen_statem.reply(from, res) | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't need the task here if we go with a queue of pending clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the feedback.
Would the caller get an :error
after 5000 ms without the task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flow to implement here is using gen_statem timeouts and processe aliases, see
Timeout specifies how long to wait for a response. If no response is received within the specified time, this function returns timeout. Assuming that the server executes on a node supporting aliases (introduced in OTP 24) the request will also be abandoned. That is, no response will be received after a time-out. Otherwise, a stray response might be received at a later time.
from the gen_statem
docs. Basically you don't care about timeouts, you just reply to every pong and BEAM throws away the responses for procs that timed out already
This allows for periodic checking of the underlying TCP connection. According to the Redis documentation, PING is one of very few commands that can be issued on a pubsub connection: "Once the client enters the subscribed state it is not supposed to issue any other commands, except for additional SUBSCRIBE, SSUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, SUNSUBSCRIBE, PUNSUBSCRIBE, PING, RESET and QUIT commands"
I've added some tests, and actually discovered that the reply format is different if you're subscribed to any channels or not. I've also removed the task, and replaced it with a callback that is sent after 5000 ms. EDIT: |
@whatyouhide Just a little reminder about this one. We've run my branch in production at work the last week and haven't encountered any troubles thus far. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going in the right direction, fantastic! Left a few more structural comments 🙃
Upon a successful reply from the server, :ok will be returned. If no reply is received within 5 seconds, :error will be returned. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upon a successful reply from the server, :ok will be returned. If no reply is received within 5 seconds, :error will be returned. | |
Upon a successful reply from the server, returns `:ok`. If no reply is received within the given timeout, returns `:error`. |
|
||
""" | ||
@spec ping(connection()) :: :ok | :error | ||
def ping(conn) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make the timeout here configurable, and not hardcoded to 5 seconds. Do this:
def ping(conn, timeout \\ 5000) when is_integer(timeout) and timeout >= 0 do
and trickle that down to the implementation.
reply = :error | ||
{:keep_state_and_data, {:reply, from, reply}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reply = :error | |
{:keep_state_and_data, {:reply, from, reply}} | |
{:keep_state_and_data, {:reply, from, :error}} |
@@ -230,6 +259,19 @@ defmodule Redix.PubSub.Connection do | |||
end | |||
end | |||
|
|||
def connected({:call, from}, :ping, data) do | |||
random_string = :crypto.strong_rand_bytes(12) |> Base.encode64() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do the same amount of bytes as in UUIDs.
random_string = :crypto.strong_rand_bytes(12) |> Base.encode64() | |
random_string = Base.encode64(:crypto.strong_rand_bytes(16)) |
def connected({:call, from}, :ping, data) do | ||
random_string = :crypto.strong_rand_bytes(12) |> Base.encode64() | ||
pipeline = [["PING", "ping:#{random_string}"]] | ||
data.transport.send(data.socket, Enum.map(pipeline, &Protocol.pack/1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle if this returns something that is not :ok
here, and disconnect. We do it in other places too.
pipeline = [["PING", "ping:#{random_string}"]] | ||
data.transport.send(data.socket, Enum.map(pipeline, &Protocol.pack/1)) | ||
|
||
Process.send_after(self(), {:ping_callback, random_string}, 5000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use send_after/3
here. Instead, use :gen_statem
timeout actions. Return a timeout action from the return value of this function and handle the timeout instead of the message later on. Cancel the timeout if you get a ping with :cancel
. The :gen_statem
docs are the best resource for this 🙃
@@ -15,7 +15,8 @@ defmodule Redix.PubSub.Connection do | |||
:connected_address, | |||
:client_id, | |||
subscriptions: %{}, | |||
monitors: %{} | |||
monitors: %{}, | |||
pings: [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want a :queue
here rather than a list, right?
This allows for periodic checking of the underlying TCP connection.
According to the Redis documentation, PING is one of very few commands that can be issued on a pubsub connection:
"Once the client enters the subscribed state it is not supposed to issue any other commands, except for additional SUBSCRIBE, SSUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, SUNSUBSCRIBE, PUNSUBSCRIBE, PING, RESET and QUIT commands"
I was not sure if the chosen flow is the best one, ie. sending a serialized PID as part of the payload and using that for sending the reply back to the waiting task.