Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#397]: Preserve the order of the response messages when the stream response process handles ":consume_response" #396

Merged
8 changes: 4 additions & 4 deletions lib/grpc/client/adapters/mint/stream_response_process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
{{_, message}, rest} ->
# TODO add code here to handle compressor headers
response = codec.decode(message, res_mod)
new_responses = [{:ok, response} | responses]
new_responses = responses ++ [{:ok, response}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a list, let's change the data structure to a queue: https://www.erlang.org/doc/apps/stdlib/queue.html

Erlang :queue is supposed to be O(1) for enqueueing and dequeueing. This change is O(len(responses)) for enqueueing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, migrated to erlang queue

new_state = %{state | buffer: rest, responses: new_responses}
{:reply, :ok, new_state, {:continue, :produce_response}}

Expand All @@ -117,7 +117,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
)
when type in @header_types do
state = update_compressor({type, headers}, state)
new_responses = [get_headers_response(headers, type) | responses]
new_responses = responses ++ [get_headers_response(headers, type)]
{:reply, :ok, %{state | responses: new_responses}, {:continue, :produce_response}}
end

Expand All @@ -131,7 +131,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do

case get_headers_response(headers, type) do
{:error, _rpc_error} = error ->
{:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}}
{:reply, :ok, %{state | responses: responses ++ [error]}, {:continue, :produce_response}}

_any ->
{:reply, :ok, state, {:continue, :produce_response}}
Expand All @@ -143,7 +143,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
_from,
%{responses: responses} = state
) do
{:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}}
{:reply, :ok, %{state | responses: responses ++ [error]}, {:continue, :produce_response}}
end

def handle_call({:consume_response, :done}, _from, state) do
Expand Down
21 changes: 21 additions & 0 deletions test/grpc/client/adapters/mint/stream_response_process_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,27 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
assert {:ok, build(:hello_reply_rpc)} == data
end

test "ordering bug", %{pid: pid} do
hello_luis =
<<0, 0, 0, 0, 12, 10, 10, 72, 101, 108, 108, 111, 32, 76, 117, 105, 115>>

hello_luit =
<<0, 0, 0, 0, 12, 10, 10, 72, 101, 108, 108, 111, 32, 76, 117, 105, 116>>

stream = StreamResponseProcess.build_stream(pid)
StreamResponseProcess.consume(pid, :data, hello_luis)
StreamResponseProcess.consume(pid, :data, hello_luit)
StreamResponseProcess.done(pid)

expected_elements =
[
ok: %Helloworld.HelloReply{message: "Hello Luis", __unknown_fields__: []},
ok: %Helloworld.HelloReply{message: "Hello Luit", __unknown_fields__: []}
]

assert Enum.to_list(stream) == expected_elements
end

test_with_params(
"emits headers to stream",
%{pid: pid},
Expand Down