101
101
% % which members have joined and left during a membership update
102
102
all_members :: ordsets :ordset (nodename ()) | undefined
103
103
}).
104
+ -type state () :: # state {}.
104
105
105
106
% %%===================================================================
106
107
% %% API
118
119
start_link () ->
119
120
PeerService = application :get_env (plumtree , peer_service , partisan_peer_service ),
120
121
{ok , Members } = PeerService :members (),
121
- {InitEagers , InitLazys } = init_peers (Members ),
122
+ plumtree_util :log (debug , " peer sampling service members: ~p " , [Members ]),
123
+ % % the peer service has already sampled the members, we start off
124
+ % % with pure gossip (ie. all members are in the eager push list and lazy
125
+ % % list is empty)
126
+ InitEagers = Members ,
127
+ InitLazys = [],
128
+ plumtree_util :log (debug , " init peers, eager: ~p , lazy: ~p " ,
129
+ [InitEagers , InitLazys ]),
122
130
Mods = app_helper :get_env (plumtree , broadcast_mods , []),
123
131
Res = start_link (Members , InitEagers , InitLazys , Mods ),
124
132
PeerService :add_sup_callback (fun ? MODULE :update /1 ),
@@ -226,7 +234,7 @@ debug_get_tree(Root, Nodes) ->
226
234
% %%===================================================================
227
235
228
236
% % @private
229
- -spec init ([[any ()],...]) -> {ok , # state {} }.
237
+ -spec init ([[any ()], ...]) -> {ok , state () }.
230
238
init ([AllMembers , InitEagers , InitLazys , Mods ]) ->
231
239
schedule_lazy_tick (),
232
240
schedule_exchange_tick (),
@@ -239,7 +247,7 @@ init([AllMembers, InitEagers, InitLazys, Mods]) ->
239
247
{ok , State2 }.
240
248
241
249
% % @private
242
- -spec handle_call (term (), {pid (), term ()}, # state {}) -> {reply , term (), # state {} }.
250
+ -spec handle_call (term (), {pid (), term ()}, state ()) -> {reply , term (), state () }.
243
251
handle_call ({get_peers , Root }, _From , State ) ->
244
252
EagerPeers = all_peers (Root , State # state .eager_sets , State # state .common_eagers ),
245
253
LazyPeers = all_peers (Root , State # state .lazy_sets , State # state .common_lazys ),
@@ -253,67 +261,66 @@ handle_call({cancel_exchanges, WhichExchanges}, _From, State) ->
253
261
{reply , Cancelled , State }.
254
262
255
263
% % @private
256
- -spec handle_cast (term (), # state {}) -> {noreply , # state {} }.
264
+ -spec handle_cast (term (), state ()) -> {noreply , state () }.
257
265
handle_cast ({broadcast , MessageId , Message , Mod }, State ) ->
258
- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
259
- % lager:info("broadcast/3 messaged processed; messages remaining: ~p",
260
- % [MessageQueueLen]),
266
+ plumtree_util :log (debug , " received {broadcast, ~p , Msg, ~p }" ,
267
+ [MessageId , Mod ]),
261
268
State1 = eager_push (MessageId , Message , Mod , State ),
262
269
State2 = schedule_lazy_push (MessageId , Mod , State1 ),
263
270
{noreply , State2 };
264
271
handle_cast ({broadcast , MessageId , Message , Mod , Round , Root , From }, State ) ->
265
- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
266
- % lager:info("broadcast/6 messaged processed; messages remaining: ~p",
267
- % [MessageQueueLen]),
272
+ plumtree_util :log (debug , " received {broadcast, ~p , Msg, ~p , ~p , ~p , ~p }" ,
273
+ [MessageId , Mod , Round , Root , From ]),
268
274
Valid = Mod :merge (MessageId , Message ),
269
275
State1 = handle_broadcast (Valid , MessageId , Message , Mod , Round , Root , From , State ),
270
276
{noreply , State1 };
271
277
handle_cast ({prune , Root , From }, State ) ->
272
- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
273
- % lager:info("prune/2 messaged processed; messages remaining: ~p",
274
- % [MessageQueueLen]),
278
+ plumtree_util :log (debug , " received ~p " , [{prune , Root , From }]),
279
+ plumtree_util :log (debug , " moving peer ~p from eager to lazy" , [From ]),
275
280
State1 = add_lazy (From , Root , State ),
276
281
{noreply , State1 };
277
282
handle_cast ({i_have , MessageId , Mod , Round , Root , From }, State ) ->
278
- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
279
- % lager:info("i_have/5 messaged processed; messages remaining: ~p",
280
- % [MessageQueueLen]),
283
+ plumtree_util :log (debug , " received ~p " , [{i_have , MessageId , Mod , Round , Root , From }]),
281
284
Stale = Mod :is_stale (MessageId ),
282
285
State1 = handle_ihave (Stale , MessageId , Mod , Round , Root , From , State ),
283
286
{noreply , State1 };
284
287
handle_cast ({ignored_i_have , MessageId , Mod , Round , Root , From }, State ) ->
285
- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
286
- % lager:info("ignored_i_have/5 messaged processed; messages remaining: ~p",
287
- % [MessageQueueLen]),
288
+ plumtree_util :log (debug , " received ~p " , [{ignored_i_have , MessageId , Mod , Round , Root , From }]),
288
289
State1 = ack_outstanding (MessageId , Mod , Round , Root , From , State ),
289
290
{noreply , State1 };
290
291
handle_cast ({graft , MessageId , Mod , Round , Root , From }, State ) ->
291
- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len),
292
- % lager:info("graft/5 messaged processed; messages remaining: ~p",
293
- % [MessageQueueLen]),
292
+ plumtree_util :log (debug , " received ~p " , [{graft , MessageId , Mod , Round , Root , From }]),
294
293
Result = Mod :graft (MessageId ),
294
+ plumtree_util :log (debug , " graft(~p ): ~p " , [MessageId , Result ]),
295
295
State1 = handle_graft (Result , MessageId , Mod , Round , Root , From , State ),
296
296
{noreply , State1 };
297
- handle_cast ({update , Members }, State = # state {all_members = BroadcastMembers }) ->
298
- % {message_queue_len, MessageQueueLen} = process_info(self(), message_queue_len) ,
299
- % lager:info("update/1 messaged processed; messages remaining: ~p",
300
- % [MessageQueueLen ]),
297
+ handle_cast ({update , Members }, State = # state {all_members = BroadcastMembers ,
298
+ common_eagers = EagerPeers0 ,
299
+ common_lazys = LazyPeers }) ->
300
+ plumtree_util : log ( debug , " received ~p " , [{ update , Members } ]),
301
301
CurrentMembers = ordsets :from_list (Members ),
302
302
New = ordsets :subtract (CurrentMembers , BroadcastMembers ),
303
303
Removed = ordsets :subtract (BroadcastMembers , CurrentMembers ),
304
+ plumtree_util :log (debug , " new members: ~p " , [ordsets :to_list (New )]),
305
+ plumtree_util :log (debug , " removed members: ~p " , [ordsets :to_list (Removed )]),
304
306
State1 = case ordsets :size (New ) > 0 of
305
307
false ->
306
308
State ;
307
309
true ->
308
- {EagerPeers , LazyPeers } = init_peers (CurrentMembers ),
310
+ % % as per the paper (page 9):
311
+ % % "When a new member is detected, it is simply added to the set
312
+ % % of eagerPushPeers"
313
+ EagerPeers = ordsets :union (EagerPeers0 , New ),
314
+ plumtree_util :log (debug , " new peers, eager: ~p , lazy: ~p " ,
315
+ [EagerPeers , LazyPeers ]),
309
316
reset_peers (CurrentMembers , EagerPeers , LazyPeers , State )
310
317
end ,
311
318
State2 = neighbors_down (Removed , State1 ),
312
319
{noreply , State2 }.
313
320
314
321
% % @private
315
- -spec handle_info ('exchange_tick' | 'lazy_tick' | {'DOWN' , _ , 'process' , _ , _ }, # state {} ) ->
316
- {noreply , # state {} }.
322
+ -spec handle_info ('exchange_tick' | 'lazy_tick' | {'DOWN' , _ , 'process' , _ , _ }, state () ) ->
323
+ {noreply , state () }.
317
324
handle_info (lazy_tick , State ) ->
318
325
schedule_lazy_tick (),
319
326
_ = send_lazy (State ),
@@ -327,23 +334,26 @@ handle_info({'DOWN', Ref, process, _Pid, _Reason}, State=#state{exchanges=Exchan
327
334
{noreply , State # state {exchanges = Exchanges1 }}.
328
335
329
336
% % @private
330
- -spec terminate (term (), # state {} ) -> term ().
337
+ -spec terminate (term (), state () ) -> term ().
331
338
terminate (_Reason , _State ) ->
332
339
ok .
333
340
334
341
% % @private
335
- -spec code_change (term () | {down , term ()}, # state {} , term ()) -> {ok , # state {} }.
342
+ -spec code_change (term () | {down , term ()}, state () , term ()) -> {ok , state () }.
336
343
code_change (_OldVsn , State , _Extra ) ->
337
344
{ok , State }.
338
345
339
346
% %%===================================================================
340
347
% %% Internal functions
341
348
% %%===================================================================
342
349
handle_broadcast (false , _MessageId , _Message , Mod , _Round , Root , From , State ) -> % % stale msg
350
+ % % remove sender from eager and set as lazy
351
+ plumtree_util :log (debug , " moving peer ~p from eager to lazy" , [From ]),
343
352
State1 = add_lazy (From , Root , State ),
344
353
_ = send ({prune , Root , myself ()}, Mod , From ),
345
354
State1 ;
346
355
handle_broadcast (true , MessageId , Message , Mod , Round , Root , From , State ) -> % % valid msg
356
+ % % remove sender from lazy and set as eager
347
357
State1 = add_eager (From , Root , State ),
348
358
State2 = eager_push (MessageId , Message , Mod , Round + 1 , Root , From , State1 ),
349
359
schedule_lazy_push (MessageId , Mod , Round + 1 , Root , From , State2 ).
@@ -372,8 +382,8 @@ handle_graft({error, Reason}, _MessageId, Mod, _Round, _Root, _From, State) ->
372
382
lager :error (" unable to graft message from ~p . reason: ~p " , [Mod , Reason ]),
373
383
State .
374
384
375
- neighbors_down (Removed , State = # state {common_eagers = CommonEagers ,eager_sets = EagerSets ,
376
- common_lazys = CommonLazys ,lazy_sets = LazySets ,
385
+ neighbors_down (Removed , State = # state {common_eagers = CommonEagers , eager_sets = EagerSets ,
386
+ common_lazys = CommonLazys , lazy_sets = LazySets ,
377
387
outstanding = Outstanding }) ->
378
388
NewCommonEagers = ordsets :subtract (CommonEagers , Removed ),
379
389
NewCommonLazys = ordsets :subtract (CommonLazys , Removed ),
@@ -398,6 +408,7 @@ eager_push(MessageId, Message, Mod, State) ->
398
408
399
409
eager_push (MessageId , Message , Mod , Round , Root , From , State ) ->
400
410
Peers = eager_peers (Root , From , State ),
411
+ plumtree_util :log (debug , " eager push to peers: ~p " , [Peers ]),
401
412
_ = send ({broadcast , MessageId , Message , Mod , Round , Root , myself ()}, Mod , Peers ),
402
413
State .
403
414
@@ -406,6 +417,8 @@ schedule_lazy_push(MessageId, Mod, State) ->
406
417
407
418
schedule_lazy_push (MessageId , Mod , Round , Root , From , State ) ->
408
419
Peers = lazy_peers (Root , From , State ),
420
+ plumtree_util :log (debug , " scheduling lazy push to peers ~p : ~p " ,
421
+ [Peers , {MessageId , Mod , Round , Root , From }]),
409
422
add_all_outstanding (MessageId , Mod , Round , Root , Peers , State ).
410
423
411
424
send_lazy (# state {outstanding = Outstanding }) ->
@@ -416,6 +429,8 @@ send_lazy(Peer, Messages) ->
416
429
{MessageId , Mod , Round , Root } <- ordsets :to_list (Messages )].
417
430
418
431
send_lazy (MessageId , Mod , Round , Root , Peer ) ->
432
+ plumtree_util :log (debug , " sending lazy push ~p " ,
433
+ [{i_have , MessageId , Mod , Round , Root , myself ()}]),
419
434
send ({i_have , MessageId , Mod , Round , Root , myself ()}, Mod , Peer ).
420
435
421
436
maybe_exchange (State ) ->
@@ -425,7 +440,7 @@ maybe_exchange(State) ->
425
440
426
441
maybe_exchange (undefined , State ) ->
427
442
State ;
428
- maybe_exchange (Peer , State = # state {mods = [Mod | _ ],exchanges = Exchanges }) ->
443
+ maybe_exchange (Peer , State = # state {mods = [Mod | _ ], exchanges = Exchanges }) ->
429
444
% % limit the number of exchanges this node can start concurrently.
430
445
% % the exchange must (currently?) implement any "inbound" concurrency limits
431
446
ExchangeLimit = app_helper :get_env (plumtree , broadcast_start_exchange_limit , 1 ),
@@ -439,10 +454,10 @@ maybe_exchange(_Peer, State=#state{mods=[]}) ->
439
454
% % No registered handler.
440
455
State .
441
456
442
- exchange (Peer , State = # state {mods = [Mod | Mods ],exchanges = Exchanges }) ->
457
+ exchange (Peer , State = # state {mods = [Mod | Mods ], exchanges = Exchanges }) ->
443
458
State1 = case Mod :exchange (Peer ) of
444
459
{ok , Pid } ->
445
- lager : debug ( " started ~p exchange with ~p (~p )" , [Mod , Peer , Pid ]),
460
+ plumtree_util : log ( debug , " started ~p exchange with ~p (~p )" , [Mod , Peer , Pid ]),
446
461
Ref = monitor (process , Pid ),
447
462
State # state {exchanges = [{Mod , Peer , Ref , Pid } | Exchanges ]};
448
463
{error , _Reason } ->
@@ -569,7 +584,7 @@ update_peers(From, Root, EagerUpdate, LazyUpdate, State) ->
569
584
NewLazys = LazyUpdate (From , CurrentLazys ),
570
585
set_peers (Root , NewEagers , NewLazys , State ).
571
586
572
- set_peers (Root , Eagers , Lazys , State = # state {eager_sets = EagerSets ,lazy_sets = LazySets }) ->
587
+ set_peers (Root , Eagers , Lazys , State = # state {eager_sets = EagerSets , lazy_sets = LazySets }) ->
573
588
NewEagers = orddict :store (Root , Eagers , EagerSets ),
574
589
NewLazys = orddict :store (Root , Lazys , LazySets ),
575
590
State # state {eager_sets = NewEagers , lazy_sets = NewLazys }.
@@ -627,38 +642,6 @@ reset_peers(AllMembers, EagerPeers, LazyPeers, State) ->
627
642
all_members = ordsets :from_list (AllMembers )
628
643
}.
629
644
630
- init_peers (Members ) ->
631
- case length (Members ) of
632
- 1 ->
633
- % % Single member, must be ourselves
634
- InitEagers = [],
635
- InitLazys = [];
636
- 2 ->
637
- % % Two members, just eager push to the other
638
- InitEagers = Members -- [myself ()],
639
- InitLazys = [];
640
- N when N < 5 ->
641
- % % 2 to 4 members, start with a fully connected tree
642
- % % with cycles. it will be adjusted as needed
643
- Tree = plumtree_util :build_tree (1 , Members , [cycles ]),
644
- InitEagers = orddict :fetch (myself (), Tree ),
645
- InitLazys = [lists :nth (rand_compat :uniform (N - 2 ), Members -- [myself () | InitEagers ])];
646
- N when N < 10 ->
647
- % % 5 to 9 members, start with gossip tree used by
648
- % % riak_core_gossip. it will be adjusted as needed
649
- Tree = plumtree_util :build_tree (2 , Members , [cycles ]),
650
- InitEagers = orddict :fetch (myself (), Tree ),
651
- InitLazys = [lists :nth (rand_compat :uniform (N - 3 ), Members -- [myself () | InitEagers ])];
652
- N ->
653
- % % 10 or more members, use a tree similar to riak_core_gossip
654
- % % but with higher fanout (larger initial eager set size)
655
- NEagers = round (math :log (N ) + 1 ),
656
- Tree = plumtree_util :build_tree (NEagers , Members , [cycles ]),
657
- InitEagers = orddict :fetch (myself (), Tree ),
658
- InitLazys = [lists :nth (rand_compat :uniform (N - (NEagers + 1 )), Members -- [myself () | InitEagers ])]
659
- end ,
660
- {InitEagers , InitLazys }.
661
-
662
645
% % @private
663
646
myself () ->
664
647
node ().
0 commit comments