Skip to content

Commit 57d60e8

Browse files
committed
Cancel work when request times out.
This addresses Issue inaka#109, and then some. When a request is made, shotgun:request makes a fresh reference and passes that along with the work to the shotgun FSM. When a request times out, shotgun:request now sends an async request to cancel the now-useless unit of work, passing along the reference for identification, before notifying the client of the request timeout. Things to know: * Gun has its own request queue. So, if a request has made it to gun, cancellation of the request will only squelch future messages that would have come from that request. It is -however- safe to bounce back to at_rest (as we now do), as messages from the cancelled request will not be sent back to the FSM. * Removal of queued requests makes use of queue:filter. For large queues, this is kinda not fast. Minor changes: * state typedef moved up with the other typedefs. * enqueue_work_or_stop slightly simplified.
1 parent a2e9008 commit 57d60e8

File tree

1 file changed

+65
-20
lines changed

1 file changed

+65
-20
lines changed

src/shotgun.erl

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@
101101
data => binary()
102102
}.
103103

104+
-type state() :: #{}.
105+
104106
-export_type([event/0]).
105107

106108
%% @doc Starts the application and all the ones it depends on.
@@ -237,6 +239,7 @@ put(Pid, Uri, Headers0, Body, Options) ->
237239
-spec request(pid(), http_verb(), iodata(), headers(), iodata(), options()) ->
238240
result().
239241
request(Pid, get, Uri, Headers0, Body, Options) ->
242+
Ref = make_ref(),
240243
try
241244
check_uri(Uri),
242245
#{handle_event := HandleEvent,
@@ -249,22 +252,29 @@ request(Pid, get, Uri, Headers0, Body, Options) ->
249252
true ->
250253
{get_async,
251254
{HandleEvent, AsyncMode},
252-
{Uri, Headers, Body}};
255+
{Ref, Uri, Headers, Body}};
253256
false ->
254-
{get, {Uri, Headers, Body}}
257+
{get, {Ref, Uri, Headers, Body}}
255258
end,
256259
gen_fsm:sync_send_event(Pid, Event, Timeout)
257260
catch
261+
exit:{timeout, Rest} ->
262+
gen_fsm:send_all_state_event(Pid, {cancel_req, Ref}),
263+
{error, {timeout, Rest}};
258264
_:Reason -> {error, Reason}
259265
end;
260266
request(Pid, Method, Uri, Headers0, Body, Options) ->
267+
Ref = make_ref(),
261268
try
262269
check_uri(Uri),
263270
#{headers := Headers, timeout := Timeout} =
264271
process_options(Options, Headers0, Method),
265-
Event = {Method, {Uri, Headers, Body}},
272+
Event = {Method, {Ref, Uri, Headers, Body}},
266273
gen_fsm:sync_send_event(Pid, Event, Timeout)
267274
catch
275+
exit:{timeout, Rest} ->
276+
gen_fsm:send_all_state_event(Pid, {cancel_req, Ref}),
277+
{error, {timeout, Rest}};
268278
_:Reason -> {error, Reason}
269279
end.
270280

@@ -299,8 +309,6 @@ parse_event(EventBin) ->
299309
%% gen_fsm callbacks
300310
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
301311

302-
-type state() :: #{}.
303-
304312
%% @private
305313
-spec init([term()]) ->
306314
{ok, at_rest, state()} | {stop, gun_open_timeout} | {stop, gun_open_failed}.
@@ -332,7 +340,26 @@ init([Host, Port, Type, Opts]) ->
332340
end.
333341

334342
%% @private
335-
-spec handle_event(shutdown, atom(), state()) -> {stop, normal, state()}.
343+
-spec handle_event(shutdown | {cancel_req, reference()},
344+
StateName::atom(), state()) ->
345+
{stop, normal, state()} | {next_state, at_rest, state()} |
346+
{next_state, StateName::atom(), state()}.
347+
%Cancel our current unit of work...
348+
handle_event({cancel_req, ReqRef}, _,
349+
#{req_ref := ReqRef, stream := StreamRef, pid := Pid} = State) ->
350+
%This isn't guaranteed to cancel the pending request, but it will
351+
%prevent us from getting messages about it in the future.
352+
ok = gun:cancel(Pid, StreamRef),
353+
{next_state, at_rest, State, 0};
354+
%Or unqueue a queued unit of work...
355+
handle_event({cancel_req, ReqRef}, StateName, StateData) ->
356+
NewStateData = remove_request(ReqRef, StateData),
357+
case StateName of
358+
at_rest ->
359+
{next_state, at_rest, NewStateData, 0};
360+
_ ->
361+
{next_state, StateName, NewStateData}
362+
end;
336363
handle_event(shutdown, _StateName, StateData) ->
337364
{stop, normal, StateData}.
338365

@@ -393,26 +420,30 @@ at_rest(timeout, State) ->
393420
ok = gen_fsm:send_event(self(), Work),
394421
{next_state, at_rest, NewState}
395422
end;
396-
at_rest({get_async, {HandleEvent, AsyncMode}, Args, From},
423+
at_rest({get_async, {HandleEvent, AsyncMode},
424+
{ReqRef, _, _, _} = Args, From},
397425
State = #{pid := Pid}) ->
398426
StreamRef = do_http_verb(get, Pid, Args),
399427
CleanState = clean_state(State),
400428
NewState = CleanState#{
401429
from => From,
430+
req_ref => ReqRef,
402431
pid => Pid,
403432
stream => StreamRef,
404433
handle_event => HandleEvent,
405434
async => true,
406435
async_mode => AsyncMode
407436
},
408437
{next_state, wait_response, NewState};
409-
at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) ->
438+
at_rest({HttpVerb, {ReqRef, _, _, _} = Args, From},
439+
State = #{pid := Pid}) ->
410440
StreamRef = do_http_verb(HttpVerb, Pid, Args),
411441
CleanState = clean_state(State),
412442
NewState = CleanState#{
413443
pid => Pid,
414444
stream => StreamRef,
415-
from => From
445+
from => From,
446+
req_ref => ReqRef
416447
},
417448
{next_state, wait_response, NewState}.
418449

@@ -520,10 +551,12 @@ receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) ->
520551
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
521552

522553
%% @private
523-
clean_state() -> clean_state(queue:new()).
554+
clean_state() ->
555+
clean_state(queue:new()).
524556

525557
%% @private
526-
-spec clean_state(map()) -> map(); (queue:queue()) -> map().
558+
-spec clean_state(map()) -> map();
559+
(queue:queue()) -> map().
527560
clean_state(State) when is_map(State) ->
528561
clean_state(get_pending_reqs(State));
529562
clean_state(Reqs) ->
@@ -539,12 +572,13 @@ clean_state(Reqs) ->
539572
async => false,
540573
async_mode => binary,
541574
buffer => <<"">>,
575+
req_ref => undefined,
542576
pending_requests => Reqs
543577
}.
544578

545579
%% @private
546580
-spec do_http_verb(http_verb(), pid(), tuple()) -> reference().
547-
do_http_verb(Method, Pid, {Uri, Headers, Body}) ->
581+
do_http_verb(Method, Pid, {_, Uri, Headers, Body}) ->
548582
MethodStr = string:to_upper(atom_to_list(Method)),
549583
MethodBin = list_to_binary(MethodStr),
550584
gun:request(Pid, MethodBin, Uri, Headers, Body).
@@ -642,17 +676,14 @@ check_uri(U) ->
642676
end.
643677

644678
%% @private
645-
enqueue_work_or_stop(FSM = at_rest, Event, From, State) ->
646-
enqueue_work_or_stop(FSM, Event, From, State, 0);
647-
enqueue_work_or_stop(FSM, Event, From, State) ->
648-
enqueue_work_or_stop(FSM, Event, From, State, infinity).
649-
650-
%% @private
651-
enqueue_work_or_stop(FSM, Event, From, State, Timeout) ->
679+
enqueue_work_or_stop(StateName, Event, From, State) ->
652680
case create_work(Event, From) of
653681
{ok, Work} ->
654682
NewState = append_work(Work, State),
655-
{next_state, FSM, NewState, Timeout};
683+
case StateName of
684+
at_rest -> {next_state, at_rest, NewState, 0};
685+
_ -> {next_state, StateName, NewState}
686+
end;
656687
not_work ->
657688
{stop, {unexpected, Event}, State}
658689
end.
@@ -690,3 +721,17 @@ append_work(Work, State) ->
690721
%% @private
691722
get_pending_reqs(State) ->
692723
maps:get(pending_requests, State).
724+
725+
%% @private
726+
-spec remove_request(reference(), state()) -> state().
727+
remove_request(ReqRef, #{pending_requests := PendingReqs} = StateData) ->
728+
NewPendingReqs =
729+
queue:filter(fun(E) ->
730+
case E of
731+
%Handle get_async
732+
{_, _, {Ref, _, _, _}, _} -> Ref /= ReqRef;
733+
%Handle all other work.
734+
{_, {Ref, _, _, _}, _} -> Ref /= ReqRef
735+
end
736+
end, PendingReqs),
737+
StateData#{pending_requests := NewPendingReqs}.

0 commit comments

Comments
 (0)