Skip to content

Commit 5bcb9aa

Browse files
committed
Allow for configurable periods of lazy push and exchange
1 parent 781c9df commit 5bcb9aa

File tree

2 files changed

+66
-18
lines changed

2 files changed

+66
-18
lines changed

include/plumtree.hrl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
%% -------------------------------------------------------------------
2+
%%
3+
%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
4+
%%
5+
%% This file is provided to you under the Apache License,
6+
%% Version 2.0 (the "License"); you may not use this file
7+
%% except in compliance with the License. You may obtain
8+
%% a copy of the License at
9+
%%
10+
%% http://www.apache.org/licenses/LICENSE-2.0
11+
%%
12+
%% Unless required by applicable law or agreed to in writing,
13+
%% software distributed under the License is distributed on an
14+
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
%% KIND, either express or implied. See the License for the
16+
%% specific language governing permissions and limitations
17+
%% under the License.
18+
%%
19+
%% -------------------------------------------------------------------
20+
21+
-define(DEFAULT_LAZY_TICK_PERIOD, 1000).
22+
-define(DEFAULT_EXCHANGE_TICK_PERIOD, 10000).

src/plumtree_broadcast.erl

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
%% API
2525
-export([start_link/0,
26-
start_link/4,
26+
start_link/5,
2727
broadcast/2,
2828
update/1,
2929
broadcast_members/0,
@@ -99,7 +99,15 @@
9999

100100
%% Set of all known members. Used to determine
101101
%% which members have joined and left during a membership update
102-
all_members :: ordsets:ordset(nodename()) | undefined
102+
all_members :: ordsets:ordset(nodename()) | undefined,
103+
104+
%% Lazy tick period in milliseconds. On every tick all outstanding
105+
%% lazy pushes are sent out
106+
lazy_tick_period :: non_neg_integer(),
107+
108+
%% Exchange tick period in milliseconds that may or may not occur
109+
exchange_tick_period :: non_neg_integer()
110+
103111
}).
104112
-type state() :: #state{}.
105113

@@ -117,6 +125,10 @@
117125
%% to generate membership updates as the ring changes.
118126
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
119127
start_link() ->
128+
LazyTickPeriod = application:get_env(plumtree, lazy_tick_period,
129+
?DEFAULT_LAZY_TICK_PERIOD),
130+
ExchangeTickPeriod = application:get_env(plumtree, exchange_tick_period,
131+
?DEFAULT_EXCHANGE_TICK_PERIOD),
120132
PeerService = application:get_env(plumtree, peer_service, partisan_peer_service),
121133
{ok, Members} = PeerService:members(),
122134
plumtree_util:log(debug, "peer sampling service members: ~p", [Members]),
@@ -128,7 +140,9 @@ start_link() ->
128140
plumtree_util:log(debug, "init peers, eager: ~p, lazy: ~p",
129141
[InitEagers, InitLazys]),
130142
Mods = app_helper:get_env(plumtree, broadcast_mods, []),
131-
Res = start_link(Members, InitEagers, InitLazys, Mods),
143+
Res = start_link(Members, InitEagers, InitLazys, Mods,
144+
[{lazy_tick_period, LazyTickPeriod},
145+
{exchange_tick_period, ExchangeTickPeriod}]),
132146
PeerService:add_sup_callback(fun ?MODULE:update/1),
133147
Res.
134148

@@ -141,14 +155,20 @@ start_link() ->
141155
%% `InitEagers' and `InitLazys' must also be subsets of `InitMembers'. `Mods' is
142156
%% a list of modules that may be handlers for broadcasted messages. All modules in
143157
%% `Mods' should implement the `plumtree_broadcast_handler' behaviour.
158+
%% `Opts' is a proplist with the following possible options:
159+
%% Flush all outstanding lazy pushes period (in milliseconds)
160+
%% {`lazy_tick_period', non_neg_integer()}
161+
%% Possibly perform an exchange period (in milliseconds)
162+
%% {`exchange_tick_period', non_neg_integer()}
144163
%%
145164
%% NOTE: When starting the server using start_link/2 no automatic membership update from
146165
%% ring_events is registered. Use start_link/0.
147-
-spec start_link([nodename()], [nodename()], [nodename()], [module()]) ->
166+
-spec start_link([nodename()], [nodename()], [nodename()], [module()],
167+
proplists:proplist()) ->
148168
{ok, pid()} | ignore | {error, term()}.
149-
start_link(InitMembers, InitEagers, InitLazys, Mods) ->
169+
start_link(InitMembers, InitEagers, InitLazys, Mods, Opts) ->
150170
gen_server:start_link({local, ?SERVER}, ?MODULE,
151-
[InitMembers, InitEagers, InitLazys, Mods], []).
171+
[InitMembers, InitEagers, InitLazys, Mods, Opts], []).
152172

153173
%% @doc Broadcasts a message originating from this node. The message will be delivered to
154174
%% each node at least once. The `Mod' passed is responsible for handling the message on remote
@@ -235,13 +255,17 @@ debug_get_tree(Root, Nodes) ->
235255

236256
%% @private
237257
-spec init([[any()], ...]) -> {ok, state()}.
238-
init([AllMembers, InitEagers, InitLazys, Mods]) ->
239-
schedule_lazy_tick(),
240-
schedule_exchange_tick(),
258+
init([AllMembers, InitEagers, InitLazys, Mods, Opts]) ->
259+
LazyTickPeriod = proplists:get_value(lazy_tick_period, Opts),
260+
ExchangeTickPeriod = proplists:get_value(exchange_tick_period, Opts),
261+
schedule_lazy_tick(LazyTickPeriod),
262+
schedule_exchange_tick(ExchangeTickPeriod),
241263
State1 = #state{
242264
outstanding = orddict:new(),
243265
mods = lists:usort(Mods),
244-
exchanges=[]
266+
exchanges=[],
267+
lazy_tick_period = LazyTickPeriod,
268+
exchange_tick_period = ExchangeTickPeriod
245269
},
246270
State2 = reset_peers(AllMembers, InitEagers, InitLazys, State1),
247271
{ok, State2}.
@@ -321,12 +345,14 @@ handle_cast({update, Members}, State=#state{all_members=BroadcastMembers,
321345
%% @private
322346
-spec handle_info('exchange_tick' | 'lazy_tick' | {'DOWN', _, 'process', _, _}, state()) ->
323347
{noreply, state()}.
324-
handle_info(lazy_tick, State) ->
325-
schedule_lazy_tick(),
348+
handle_info(lazy_tick,
349+
#state{lazy_tick_period = Period} = State) ->
350+
schedule_lazy_tick(Period),
326351
_ = send_lazy(State),
327352
{noreply, State};
328-
handle_info(exchange_tick, State) ->
329-
schedule_exchange_tick(),
353+
handle_info(exchange_tick,
354+
#state{exchange_tick_period = Period= State) ->
355+
schedule_exchange_tick(Period),
330356
State1 = maybe_exchange(State),
331357
{noreply, State1};
332358
handle_info({'DOWN', Ref, process, _Pid, _Reason}, State=#state{exchanges=Exchanges}) ->
@@ -623,11 +649,11 @@ send(Msg, Mod, P) ->
623649
%% TODO: add debug logging
624650
%% gen_server:cast({?SERVER, P}, Msg).
625651

626-
schedule_lazy_tick() ->
627-
schedule_tick(lazy_tick, broadcast_lazy_timer, 1000).
652+
schedule_lazy_tick(Period) ->
653+
schedule_tick(lazy_tick, broadcast_lazy_timer, Period).
628654

629-
schedule_exchange_tick() ->
630-
schedule_tick(exchange_tick, broadcast_exchange_timer, 10000).
655+
schedule_exchange_tick(Period) ->
656+
schedule_tick(exchange_tick, broadcast_exchange_timer, Period).
631657

632658
schedule_tick(Message, Timer, Default) ->
633659
TickMs = app_helper:get_env(plumtree, Timer, Default),

0 commit comments

Comments
 (0)