Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make server stream work with mint client and add keepalive. #352

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/grpc/client/adapters/mint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ defmodule GRPC.Client.Adapters.Mint do
|> Keyword.get(:transport_opts, [])
|> Keyword.merge(ssl)

[transport_opts: Keyword.merge(@default_transport_opts, transport_opts)]
keep_alive = Keyword.get(opts, :keep_alive, false)

[transport_opts: Keyword.merge(@default_transport_opts, transport_opts), keep_alive: keep_alive]
Comment on lines +117 to +119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, it's better if you add a question mark to this option, since, usually keep_alive is a provided as a number by convention.

Suggested change
keep_alive = Keyword.get(opts, :keep_alive, false)
[transport_opts: Keyword.merge(@default_transport_opts, transport_opts), keep_alive: keep_alive]
keep_alive? = Keyword.get(opts, :keep_alive?, false)
[transport_opts: Keyword.merge(@default_transport_opts, transport_opts), keep_alive?: keep_alive?]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's valuable to add a module doc describing this adapter-specific option. WDYT?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport_opts is forwarded to gen_tcp or ssl. The Mint docs list the most common ones, but there are more of them.

https://www.erlang.org/doc/man/gen_tcp.html#type-option

Now that you mentioned, keepalive on gen_tcp doesn't have a underscore

end

defp connect_opts(_channel, opts) do
transport_opts = Keyword.get(opts, :transport_opts, [])
[transport_opts: Keyword.merge(@default_transport_opts, transport_opts)]
keep_alive = Keyword.get(opts, :keep_alive, false)

[transport_opts: Keyword.merge(@default_transport_opts, transport_opts), keep_alive: keep_alive]
end

defp mint_scheme(%Channel{scheme: "https"} = _channel), do: :https
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
alias GRPC.Client.Adapters.Mint.ConnectionProcess.State
alias GRPC.Client.Adapters.Mint.StreamResponseProcess

@keep_alive_timeout 60_000

require Logger

@connection_closed_error "the connection is closed"
Expand Down Expand Up @@ -49,7 +51,11 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
"""
@spec disconnect(pid :: pid()) :: :ok
def disconnect(pid) do
GenServer.call(pid, :disconnect)
if Process.alive?(pid) do
GenServer.call(pid, :disconnect)
else
:ok
end
end

@doc """
Expand All @@ -76,8 +82,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do

@impl true
def init({scheme, host, port, opts}) do
keep_alive = Keyword.get(opts, :keep_alive, false)

case Mint.HTTP.connect(scheme, host, port, opts) do
{:ok, conn} ->
if keep_alive do
Process.send_after(self(), :ping, @keep_alive_timeout)
end

{:ok, State.new(conn, opts[:parent])}

{:error, reason} ->
Expand All @@ -94,6 +106,9 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
def handle_call(:disconnect, _from, state) do
# TODO add a code to if disconnect is brutal we just stop if is friendly we wait for pending requests
{:ok, conn} = Mint.HTTP.close(state.conn)

finish_all_pending_requests(state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch


{:stop, :normal, :ok, State.update_conn(state, conn)}
end

Expand Down Expand Up @@ -172,6 +187,34 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
end

@impl true
def handle_info(:ping, %{ping_ref: nil} = state) do
case Mint.HTTP2.ping(state.conn) do
{:ok, conn, ref} ->
Process.send_after(self(), :ping, @keep_alive_timeout)

state =
state
|> State.update_conn(conn)
|> State.set_ping_ref(ref)

{:noreply, state}

{:error, conn, _error} ->
state = State.update_conn(state, conn)
check_connection_status(state)
{:noreply, state}
Comment on lines +204 to +205
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want the check_connection_status to handle the GS reply for you

Suggested change
check_connection_status(state)
{:noreply, state}
check_connection_status(state)

end
end

def handle_info(:ping, state) do
Logger.debug("No response received from ping stopping connection")
{:ok, conn} = Mint.HTTP.close(state.conn)

finish_all_pending_requests(state)

{:stop, :normal, State.update_conn(state, conn)}
end

def handle_info(message, state) do
case Mint.HTTP.stream(state.conn, message) do
:unknown ->
Expand Down Expand Up @@ -251,6 +294,10 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
state
end

defp process_response({:pong, request_ref}, %{ping_ref: request_ref} = state) do
State.set_ping_ref(state, nil)
end

defp process_response({:done, request_ref}, state) do
:ok =
state
Expand All @@ -261,6 +308,14 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
new_state
end

defp process_response({:error, request_ref, error}, state) do
pid = State.stream_response_pid(state, request_ref)
:ok = StreamResponseProcess.consume(pid, :error, error)
:ok = StreamResponseProcess.done(pid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you're sending done/1 here? By default, even if a error happens, the connection will produce a :done message when the stream is closed by the server


state
end

defp chunk_body_and_enqueue_rest({request_ref, body, from}, state) do
{head, tail} = chunk_body(body, get_window_size(state.conn, request_ref))

Expand Down Expand Up @@ -391,7 +446,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess do
end

defp check_connection_status(state) do
if Mint.HTTP.open?(state.conn) do
if Mint.HTTP.open?(state.conn, :read) do
check_request_stream_queue(state)
else
finish_all_pending_requests(state)
Expand Down
9 changes: 7 additions & 2 deletions lib/grpc/client/adapters/mint/connection_process/state.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
defmodule GRPC.Client.Adapters.Mint.ConnectionProcess.State do
@moduledoc false

defstruct [:conn, :parent, requests: %{}, request_stream_queue: :queue.new()]
defstruct [:conn, :parent, :ping_ref, requests: %{}, request_stream_queue: :queue.new()]

@type t :: %__MODULE__{
conn: Mint.HTTP.t(),
requests: map(),
parent: pid()
parent: pid(),
ping_ref: Mint.Types.request_ref() | nil
}

def new(conn, parent) do
Expand All @@ -17,6 +18,10 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcess.State do
%{state | conn: conn}
end

def set_ping_ref(state, ref) do
%{state | ping_ref: ref}
end

def update_request_stream_queue(state, queue) do
%{state | request_stream_queue: queue}
end
Expand Down
3 changes: 2 additions & 1 deletion lib/grpc/client/adapters/mint/stream_response_process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
_from,
%{responses: responses} = state
) do
{:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}}

{:reply, :ok, %{state | done: true, responses: [error | responses]}, {:continue, :produce_response}}
end

def handle_call({:consume_response, :done}, _from, state) do
Expand Down