Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#397]: Preserve the order of the response messages when the stream response process handles ":consume_response" #396

Merged
30 changes: 18 additions & 12 deletions lib/grpc/client/adapters/mint/stream_response_process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
grpc_stream: stream,
send_headers_or_trailers: send_headers_or_trailers?,
buffer: <<>>,
responses: [],
responses: :queue.new(),
done: false,
from: nil,
compressor: nil
Expand All @@ -100,7 +100,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
{{_, message}, rest} ->
# TODO add code here to handle compressor headers
response = codec.decode(message, res_mod)
new_responses = [{:ok, response} | responses]
new_responses = :queue.in({:ok, response}, responses)
new_state = %{state | buffer: rest, responses: new_responses}
{:reply, :ok, new_state, {:continue, :produce_response}}

Expand All @@ -117,7 +117,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
)
when type in @header_types do
state = update_compressor({type, headers}, state)
new_responses = [get_headers_response(headers, type) | responses]
new_responses = :queue.in(get_headers_response(headers, type), responses)
{:reply, :ok, %{state | responses: new_responses}, {:continue, :produce_response}}
end

Expand All @@ -131,7 +131,8 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do

case get_headers_response(headers, type) do
{:error, _rpc_error} = error ->
{:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}}
{:reply, :ok, %{state | responses: :queue.in(error, responses)},
{:continue, :produce_response}}

_any ->
{:reply, :ok, state, {:continue, :produce_response}}
Expand All @@ -143,7 +144,8 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
_from,
%{responses: responses} = state
) do
{:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}}
{:reply, :ok, %{state | responses: :queue.in(error, responses)},
{:continue, :produce_response}}
end

def handle_call({:consume_response, :done}, _from, state) do
Expand All @@ -152,19 +154,23 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do

@impl true
def handle_continue(:produce_response, state) do
case state do
%{from: nil} ->
no_responses? = :queue.is_empty(state.responses)
without_from? = is_nil(state.from)

cond do
without_from? ->
{:noreply, state}

%{from: from, responses: [], done: true} ->
GenServer.reply(from, nil)
no_responses? and state.done ->
GenServer.reply(state.from, nil)
{:stop, :normal, state}

%{responses: []} ->
no_responses? ->
{:noreply, state}

%{responses: [response | rest], from: from} ->
GenServer.reply(from, response)
true ->
{{:value, response}, rest} = :queue.out(state.responses)
GenServer.reply(state.from, response)
{:noreply, %{state | responses: rest, from: nil}}
end
end
Expand Down
11 changes: 6 additions & 5 deletions test/grpc/client/adapters/mint/connection_process_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
assert {:reply, :ok, new_state} = response
assert %{} == new_state.requests
response_state = :sys.get_state(response_pid)
assert [] == response_state.responses
assert :queue.to_list(response_state.responses) == []
assert true == response_state.done
end
end
Expand Down Expand Up @@ -342,8 +342,9 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do

response_state = :sys.get_state(response_pid)

assert [error: %Mint.TransportError{reason: :closed, __exception__: true}] ==
response_state.responses
assert :queue.to_list(response_state.responses) == [
error: %Mint.TransportError{reason: :closed, __exception__: true}
]
end
end

Expand Down Expand Up @@ -382,7 +383,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
assert new_state.conn.state == :closed
assert_receive {:elixir_grpc, :connection_down, pid}, 500
response_state = :sys.get_state(response_pid)
assert [error: "the connection is closed"] == response_state.responses
assert :queue.to_list(response_state.responses) == [error: "the connection is closed"]
assert true == response_state.done
assert pid == self()
end
Expand Down Expand Up @@ -410,7 +411,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
assert new_state.conn.state == :closed
assert_receive {:elixir_grpc, :connection_down, pid}, 500
response_state = :sys.get_state(response_pid)
assert [error: "the connection is closed"] == response_state.responses
assert :queue.to_list(response_state.responses) == [error: "the connection is closed"]
assert true == response_state.done
assert pid == self()
end
Expand Down
67 changes: 39 additions & 28 deletions test/grpc/client/adapters/mint/stream_response_process_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
done: false,
from: nil,
grpc_stream: build(:client_stream),
responses: [],
responses: :queue.new(),
compressor: nil,
send_headers_or_trailers: false
}
Expand Down Expand Up @@ -41,7 +41,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
state: state,
data: {_, _, full_message}
} do
expected_response_message = build(:hello_reply_rpc)
expected_response_message = {:ok, build(:hello_reply_rpc)}

response =
StreamResponseProcess.handle_call(
Expand All @@ -52,35 +52,32 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do

assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert new_state.buffer == <<>>
assert [{:ok, response_message}] = new_state.responses
assert expected_response_message == response_message
assert :queue.to_list(new_state.responses) == [expected_response_message]
end

test "append incoming message to existing buffer", %{state: state, data: {part1, part2, _}} do
state = %{state | buffer: part1}
expected_response_message = build(:hello_reply_rpc)
expected_response_message = {:ok, build(:hello_reply_rpc)}

response =
StreamResponseProcess.handle_call({:consume_response, {:data, part2}}, self(), state)

assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert new_state.buffer == <<>>
assert [{:ok, response_message}] = new_state.responses
assert expected_response_message == response_message
assert :queue.to_list(new_state.responses) == [expected_response_message]
end

test "decode message and put rest on buffer", %{state: state, data: {_, _, full}} do
extra_data = <<0, 1, 2>>
data = full <> extra_data
expected_response_message = build(:hello_reply_rpc)
expected_response_message = {:ok, build(:hello_reply_rpc)}

response =
StreamResponseProcess.handle_call({:consume_response, {:data, data}}, self(), state)

assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert new_state.buffer == extra_data
assert [{:ok, response_message}] = new_state.responses
assert expected_response_message == response_message
assert :queue.to_list(new_state.responses) == [expected_response_message]
end
end

Expand All @@ -106,9 +103,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
state
)

expected_error = {:error, %GRPC.RPCError{message: "Internal Server Error", status: 2}}

assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert [{:error, error}] = new_state.responses
assert %GRPC.RPCError{message: "Internal Server Error", status: 2} == error
assert :queue.to_list(new_state.responses) == [expected_error]
end,
do: [
{%{type: :headers, is_header_enabled: false}},
Expand Down Expand Up @@ -139,17 +137,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
state
)

assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert [{type_response, response_headers}] = new_state.responses
assert type == type_response
expected_response = {type, Map.new(headers)}

assert %{
"content-length" => "0",
"content-type" => "application/grpc+proto",
"grpc-message" => "",
"grpc-status" => "0",
"server" => "Cowboy"
} == response_headers
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert :queue.to_list(new_state.responses) == [expected_response]
end,
do: [{:headers}, {:trailers}]
)
Expand All @@ -174,7 +165,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
)

assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert [] == new_state.responses
assert :queue.is_empty(new_state.responses)
end,
do: [{:headers}, {:trailers}]
)
Expand Down Expand Up @@ -238,8 +229,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
)

assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
assert [response_error] = new_state.responses
assert response_error == error
assert :queue.to_list(new_state.responses) == [error]
end
end

Expand Down Expand Up @@ -282,11 +272,11 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
end

test "send response to caller when there are responses in the queue", %{state: state} do
state = %{state | from: {self(), :tag}, done: false, responses: [1, 2]}
state = %{state | from: {self(), :tag}, done: false, responses: :queue.from_list([1, 2])}
{:noreply, new_state} = StreamResponseProcess.handle_continue(:produce_response, state)
%{from: from, responses: responses} = new_state
assert nil == from
assert [2] == responses
assert is_nil(from)
assert :queue.to_list(responses) == [2]
assert_receive {:tag, 1}
end
end
Expand Down Expand Up @@ -321,6 +311,27 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
assert {:ok, build(:hello_reply_rpc)} == data
end

test "preserves response messages order", %{pid: pid} do
hello_luis =
<<0, 0, 0, 0, 12, 10, 10, 72, 101, 108, 108, 111, 32, 76, 117, 105, 115>>

bye_luis =
<<0, 0, 0, 0, 10, 10, 8, 66, 121, 101, 32, 76, 117, 105, 115>>

stream = StreamResponseProcess.build_stream(pid)
StreamResponseProcess.consume(pid, :data, hello_luis)
StreamResponseProcess.consume(pid, :data, bye_luis)
StreamResponseProcess.done(pid)

expected_elements =
[
ok: build(:hello_reply_rpc),
ok: build(:bye_reply_rpc)
]

assert Enum.to_list(stream) == expected_elements
end

test_with_params(
"emits headers to stream",
%{pid: pid},
Expand Down
4 changes: 4 additions & 0 deletions test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ defmodule GRPC.Factory do
def hello_reply_rpc_factory do
%Helloworld.HelloReply{message: "Hello Luis"}
end

def bye_reply_rpc_factory do
%Helloworld.HelloReply{message: "Bye Luis"}
end
end