30
30
is_stale /1 ,
31
31
graft /1 ,
32
32
exchange /1 ]).
33
+
33
34
% % gen_server callbacks
34
35
-export ([init /1 ,
35
36
handle_call /3 ,
40
41
41
42
% % API
42
43
-export ([start_link /0 ,
43
- read /1 ]).
44
+ get /1 ,
45
+ put /2 ]).
44
46
45
47
-record (state , {}).
46
- -type state () :: # state {}.
48
+ -type state () :: # state {}.
47
49
48
50
-spec start_link () -> ok .
49
51
start_link () ->
50
52
{ok , _ } = gen_server :start_link ({local , ? SERVER }, ? MODULE ,
51
53
[], []),
52
54
ok .
53
55
54
- -spec read (Key :: any ()) -> {ok , any ()} | {error , not_found }.
55
- read (Key ) ->
56
- case ets :lookup (? MODULE , Key ) of
57
- [{Key , Value }] ->
58
- % lager:info("read key ~p: ~p",
59
- % [Key, Value]),
60
- {ok , Value };
61
- _ ->
62
- lager :info (" unable to find key: ~p " ,
63
- [Key ]),
64
- {error , not_found }
56
+ -spec get (Key :: any ()) -> {error , not_found } | {ok , any ()}.
57
+ get (Key ) ->
58
+ case dbread (Key ) of
59
+ undefined -> {error , not_found };
60
+ Obj ->
61
+ {ok , plumtree_test_object :value (Obj )}
65
62
end .
66
63
64
+ -spec put (Key :: any (),
65
+ Value :: any ()) -> ok .
66
+ put (Key , Value ) ->
67
+ Existing = dbread (Key ),
68
+ UpdatedObj = plumtree_test_object :modify (Existing , Value , this_server_id ()),
69
+ dbwrite (Key , UpdatedObj ),
70
+ plumtree_broadcast :broadcast ({Key , UpdatedObj }, plumtree_test_broadcast_handler ),
71
+ ok .
72
+
67
73
% %%===================================================================
68
74
% %% gen_server callbacks
69
75
% %%===================================================================
70
76
71
77
% % @private
72
78
-spec init ([[any ()], ...]) -> {ok , state ()}.
73
79
init ([]) ->
74
- msgs_seen = ets :new (msgs_seen , [named_table , set , public ,
75
- {keypos , 1 },
76
- {read_concurrency , true }]),
77
80
? MODULE = ets :new (? MODULE , [named_table , set , public ,
78
81
{keypos , 1 },
79
82
{read_concurrency , true }]),
@@ -111,61 +114,50 @@ code_change(_OldVsn, State, _Extra) ->
111
114
112
115
% % Return a two-tuple of message id and payload from a given broadcast
113
116
-spec broadcast_data (any ()) -> {any (), any ()}.
114
- broadcast_data ({Key , _Value } = Data ) ->
115
- MsgId = erlang : phash2 ( Data ) ,
117
+ broadcast_data ({Key , Object } ) ->
118
+ MsgId = { Key , plumtree_test_object : context ( Object )} ,
116
119
lager :info (" broadcast_data(~p ), msg id: ~p " ,
117
- [Data , MsgId ]),
118
- true = ets :insert (msgs_seen , {MsgId , Key }),
119
- true = ets :insert (? MODULE , Data ),
120
- {MsgId , Data }.
120
+ [Object , MsgId ]),
121
+ {MsgId , Object }.
121
122
122
123
% % Given the message id and payload, merge the message in the local state.
123
124
% % If the message has already been received return `false', otherwise return `true'
124
125
-spec merge (any (), any ()) -> boolean ().
125
- merge (MsgId , {Key , _Value } = Payload ) ->
126
- case ets :lookup (msgs_seen , MsgId ) of
127
- [{MsgId , _ }] ->
128
- lager :info (" msg with id ~p has already been seen" ,
129
- [MsgId ]),
130
- false ;
131
- _ ->
132
- lager :info (" merging(~p , ~p ) in local state" ,
133
- [MsgId , Payload ]),
134
- % % insert the message in the local state
135
- true = ets :insert (? MODULE , Payload ),
136
- % % mark this message as been seen
137
- true = ets :insert_new (msgs_seen , {MsgId , Key }),
126
+ merge ({Key , _Context } = MsgId , RemoteObj ) ->
127
+ lager :info (" merge msg id ~p , object: ~p " ,
128
+ [MsgId , RemoteObj ]),
129
+ Existing = dbread (Key ),
130
+ case plumtree_test_object :reconcile (RemoteObj , Existing ) of
131
+ false -> false ;
132
+ {true , Reconciled } ->
133
+ dbwrite (Key , Reconciled ),
138
134
true
139
135
end .
140
136
141
137
% % Return true if the message (given the message id) has already been received.
142
138
% % `false' otherwise
143
139
-spec is_stale (any ()) -> boolean ().
144
- is_stale (MsgId ) ->
145
- case ets :lookup (msgs_seen , MsgId ) of
146
- [{MsgId , _ }] ->
147
- lager :info (" is_stale(~p ): ~p " ,
148
- [MsgId , true ]),
149
- true ;
150
- _ ->
151
- lager :info (" is_stale(~p ): ~p " ,
152
- [MsgId , false ]),
153
- false
154
- end .
140
+ is_stale ({Key , Context }) ->
141
+ Existing = dbread (Key ),
142
+ plumtree_test_object :is_stale (Context , Existing ).
155
143
156
144
% % Return the message associated with the given message id. In some cases a message
157
145
% % has already been sent with information that subsumes the message associated with the given
158
146
% % message id. In this case, `stale' is returned.
159
147
-spec graft (any ()) -> stale | {ok , any ()} | {error , any ()}.
160
- graft (MsgId ) ->
161
- % lager:info("graft(~p)",
162
- % [MsgId]),
163
- case ets :lookup (msgs_seen , MsgId ) of
164
- [{MsgId , Key }] ->
165
- [{Key ,Msg }] = ets :lookup (? MODULE , Key ),
166
- {ok , {Key , Msg }};
167
- _ ->
168
- {error , not_found }
148
+ graft ({Key , Context }) ->
149
+ case dbread (Key ) of
150
+ undefined ->
151
+ % % this *really* should not happen
152
+ lager :alert (" unable to graft key ~p , could not find it" ,
153
+ [Key ]),
154
+ {error , not_found };
155
+ Object ->
156
+ LocalContext = plumtree_test_object :context (Object ),
157
+ case LocalContext =:= Context of
158
+ true -> {ok , Object };
159
+ false -> stale
160
+ end
169
161
end .
170
162
171
163
% % Trigger an exchange between the local handler and the handler on the given node.
@@ -176,3 +168,24 @@ graft(MsgId) ->
176
168
-spec exchange (node ()) -> {ok , pid ()} | {error , term ()}.
177
169
exchange (_Node ) ->
178
170
{ok , self ()}.
171
+
172
+ % % @private
173
+ -spec dbread (Key :: any ()) -> any () | undefined .
174
+ dbread (Key ) ->
175
+ case ets :lookup (? MODULE , Key ) of
176
+ [{Key , Object }] ->
177
+ Object ;
178
+ _ ->
179
+ undefined
180
+ end .
181
+
182
+ % % @private
183
+ -spec dbwrite (Key :: any (),
184
+ Value :: any ()) -> any ().
185
+ dbwrite (Key , Object ) ->
186
+ ets :insert (? MODULE , {Key , Object }),
187
+ Object .
188
+
189
+ % % @private
190
+ this_server_id () -> node ().
191
+
0 commit comments