-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make server stream work with mint client and add keepalive. #352
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -11,6 +11,8 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
alias GRPC.Client.Adapters.Mint.ConnectionProcess.State | ||||||||
alias GRPC.Client.Adapters.Mint.StreamResponseProcess | ||||||||
|
||||||||
@keep_alive_timeout 60_000 | ||||||||
|
||||||||
require Logger | ||||||||
|
||||||||
@connection_closed_error "the connection is closed" | ||||||||
|
@@ -49,7 +51,11 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
""" | ||||||||
@spec disconnect(pid :: pid()) :: :ok | ||||||||
def disconnect(pid) do | ||||||||
GenServer.call(pid, :disconnect) | ||||||||
if Process.alive?(pid) do | ||||||||
GenServer.call(pid, :disconnect) | ||||||||
else | ||||||||
:ok | ||||||||
end | ||||||||
end | ||||||||
|
||||||||
@doc """ | ||||||||
|
@@ -76,8 +82,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
|
||||||||
@impl true | ||||||||
def init({scheme, host, port, opts}) do | ||||||||
keep_alive = Keyword.get(opts, :keep_alive, false) | ||||||||
|
||||||||
case Mint.HTTP.connect(scheme, host, port, opts) do | ||||||||
{:ok, conn} -> | ||||||||
if keep_alive do | ||||||||
Process.send_after(self(), :ping, @keep_alive_timeout) | ||||||||
end | ||||||||
|
||||||||
{:ok, State.new(conn, opts[:parent])} | ||||||||
|
||||||||
{:error, reason} -> | ||||||||
|
@@ -94,6 +106,9 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
def handle_call(:disconnect, _from, state) do | ||||||||
# TODO add a code to if disconnect is brutal we just stop if is friendly we wait for pending requests | ||||||||
{:ok, conn} = Mint.HTTP.close(state.conn) | ||||||||
|
||||||||
finish_all_pending_requests(state) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch |
||||||||
|
||||||||
{:stop, :normal, :ok, State.update_conn(state, conn)} | ||||||||
end | ||||||||
|
||||||||
|
@@ -172,6 +187,34 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
end | ||||||||
|
||||||||
@impl true | ||||||||
def handle_info(:ping, %{ping_ref: nil} = state) do | ||||||||
case Mint.HTTP2.ping(state.conn) do | ||||||||
{:ok, conn, ref} -> | ||||||||
Process.send_after(self(), :ping, @keep_alive_timeout) | ||||||||
|
||||||||
state = | ||||||||
state | ||||||||
|> State.update_conn(conn) | ||||||||
|> State.set_ping_ref(ref) | ||||||||
|
||||||||
{:noreply, state} | ||||||||
|
||||||||
{:error, conn, _error} -> | ||||||||
state = State.update_conn(state, conn) | ||||||||
check_connection_status(state) | ||||||||
{:noreply, state} | ||||||||
Comment on lines
+204
to
+205
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You want the
Suggested change
|
||||||||
end | ||||||||
end | ||||||||
|
||||||||
def handle_info(:ping, state) do | ||||||||
Logger.debug("No response received from ping stopping connection") | ||||||||
{:ok, conn} = Mint.HTTP.close(state.conn) | ||||||||
|
||||||||
finish_all_pending_requests(state) | ||||||||
|
||||||||
{:stop, :normal, State.update_conn(state, conn)} | ||||||||
end | ||||||||
|
||||||||
def handle_info(message, state) do | ||||||||
case Mint.HTTP.stream(state.conn, message) do | ||||||||
:unknown -> | ||||||||
|
@@ -251,6 +294,10 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
state | ||||||||
end | ||||||||
|
||||||||
defp process_response({:pong, request_ref}, %{ping_ref: request_ref} = state) do | ||||||||
State.set_ping_ref(state, nil) | ||||||||
end | ||||||||
|
||||||||
defp process_response({:done, request_ref}, state) do | ||||||||
:ok = | ||||||||
state | ||||||||
|
@@ -261,6 +308,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
new_state | ||||||||
end | ||||||||
|
||||||||
defp process_response({:error, request_ref, error}, state) do | ||||||||
pid = State.stream_response_pid(state, request_ref) | ||||||||
:ok = StreamResponseProcess.consume(pid, :error, error) | ||||||||
:ok = StreamResponseProcess.done(pid) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why you're sending |
||||||||
|
||||||||
state | ||||||||
end | ||||||||
|
||||||||
defp chunk_body_and_enqueue_rest({request_ref, body, from}, state) do | ||||||||
{head, tail} = chunk_body(body, get_window_size(state.conn, request_ref)) | ||||||||
|
||||||||
|
@@ -391,7 +446,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do | |||||||
end | ||||||||
|
||||||||
defp check_connection_status(state) do | ||||||||
if Mint.HTTP.open?(state.conn) do | ||||||||
if Mint.HTTP.open?(state.conn, :read) do | ||||||||
check_request_stream_queue(state) | ||||||||
else | ||||||||
finish_all_pending_requests(state) | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity, it's better if you add a question mark to this option, since, usually
keep_alive
is a provided as a number by convention.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it's valuable to add a module doc describing this adapter-specific option. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/elixir-mint/mint/blob/main/lib/mint/http.ex#L276-L364
can't see any keep_alive option here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transport_opts
is forwarded togen_tcp
orssl
. The Mint docs list the most common ones, but there are more of them.https://www.erlang.org/doc/man/gen_tcp.html#type-option
Now that you mentioned,
keepalive
on gen_tcp doesn't have a underscore