-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkvs.go
284 lines (260 loc) · 10.7 KB
/
kvs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package main
import (
"encoding/json"
"io"
"net/http"
"github.com/DistributedClocks/GoVector/govec/vclock"
"github.com/labstack/echo/v4"
)
// Define JSON body for kvs PUT requests
type KVS_PUT_Request struct {
Data interface{} `json:"value"`
Type string `json:"type"`
CausalMetaData string `json:"causal-metadata"`
FromRepilca string `json:"from-replica,omitempty"`
}
// Define JSON body for kvs GET and DELETE requests
type KVS_GET_DELETE_Request struct {
CausalMetaData string `json:"causal-metadata"`
FromRepilca string `json:"from-replica,omitempty"`
}
// PUT /kvs/<key>
// Add a key-value to the database
func putKey(c echo.Context) error {
// Read JSON from request body
body, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Failed to read request body"})
}
// Parse JSON body
var input KVS_PUT_Request
jsonErr := json.Unmarshal(body, &input)
//Print the json input
if jsonErr != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid JSON format"})
}
// Check which shard the key belongs to
key := c.Param("key")
keyByte := []byte(key)
shardid := HASH_RING.LocateKey(keyByte).String()
// Check if shardid is NOT the same as MY_SHARD_ID
if shardid != MY_SHARD_ID {
// If input is from another replica, update the vector clock and return
if input.FromRepilca != "" {
senderVC, err := NewVClockFromString(input.CausalMetaData)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid metadata format"})
}
MY_VECTOR_CLOCK.Merge(senderVC)
return c.JSON(http.StatusOK, map[string]string{"result": "vector clock updated"})
} else {
return forwardRequest(c, choseNodeFromShard(shardid), "kvs/"+key, body)
}
}
// Validate key length
if len(key) > 50 {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Key is too long"})
}
// Check if the incoming value is empty
if input.Data == nil || input.Data == "" {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "PUT request does not specify a value"})
}
// Handle the causal metadata to ensure causal consistency
var senderVC vclock.VClock
// Check if request was sent from a client or another replica
if input.FromRepilca != "" {
// HANDLE REQUEST FROM ANOTHER REPLICA
senderPos := input.FromRepilca
// Parse causal metadata string from client
senderVC, err = NewVClockFromString(input.CausalMetaData)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid metadata format"})
}
// Return error if senders VC value is not +1 receivers vc value
if !(compareReplicasVC(senderVC, MY_VECTOR_CLOCK, senderPos)) {
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "Causal dependencies not satisfied; try again later"})
}
// Merge the replicas's vector clock with client vector clock
MY_VECTOR_CLOCK.Merge(senderVC)
} else {
// HANDLE REQUEST FROM A CLIENT
// Check if the client vector clock is nil
if input.CausalMetaData != "" {
// Parse causal metadata string from client
senderVC, err = NewVClockFromString(input.CausalMetaData)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid metadata format"})
}
// Check if clients request is deliverable based on its vector clock
// if recieverVC ---> clientVc return error
// If the replica is less updated than the client, it cant deliver the message
if !(senderVC.Compare(MY_VECTOR_CLOCK, vclock.Concurrent) || senderVC.Compare(MY_VECTOR_CLOCK, vclock.Equal)) {
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "Causal dependencies not satisfied; try again later"})
}
}
// Merge the replicas's vector clock with client vector clock
MY_VECTOR_CLOCK.Merge(senderVC)
// Increment replica's index in the vector clock to track a new write
MY_VECTOR_CLOCK.Tick(SOCKET_ADDRESS)
// Otherwise broadcast request to other replicas and deliver
input.FromRepilca = SOCKET_ADDRESS
input.CausalMetaData = MY_VECTOR_CLOCK.ReturnVCString()
jsonData, _ := json.Marshal(input)
go broadcast("PUT", "kvs/"+key, jsonData, CURRENT_VIEW)
}
// Check if the key existed before the update
_, existed := KVStore[key]
// Update or create key-value mapping
// Lock before accessing the KVStore
KVSmutex.Lock()
KVStore[key] = Value{input.Data, input.Type}
// Unlock after accessing the KVStore
KVSmutex.Unlock()
// Return response with the appropriate status
if existed {
return c.JSON(http.StatusOK, map[string]string{"result": "replaced", "causal-metadata": MY_VECTOR_CLOCK.ReturnVCString(), "shard-id": MY_SHARD_ID})
}
return c.JSON(http.StatusCreated, map[string]string{"result": "created", "causal-metadata": MY_VECTOR_CLOCK.ReturnVCString(), "shard-id": MY_SHARD_ID})
}
// GET /kvs/<key>
// Return the value of the indicated key
func getKey(c echo.Context) error {
// Check which shard the key belongs to
key := c.Param("key")
keyByte := []byte(key)
shardid := HASH_RING.LocateKey(keyByte).String()
// Read JSON from request body
body, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Failed to read request body"})
}
var input KVS_GET_DELETE_Request
jsonErr := json.Unmarshal(body, &input)
if jsonErr != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid JSON format"})
}
if shardid != MY_SHARD_ID {
return forwardRequest(c, choseNodeFromShard(shardid), "kvs/"+key, body)
}
// Handle the causal metadata to ensure causal consistency
var senderVC vclock.VClock
// HANDLE REQUEST FROM A CLIENT
// Check if the client vector clock is nil
if input.CausalMetaData != "" {
// Parse causal metadata string from client
senderVC, err = NewVClockFromString(input.CausalMetaData)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid metadata format"})
}
// Check if clients request is deliverable based on its vector clock
// if recieverVC ---> clientVc return error
// If the replica is less updated than the client, it cant deliver the message
if !(senderVC.Compare(MY_VECTOR_CLOCK, vclock.Concurrent) || senderVC.Compare(MY_VECTOR_CLOCK, vclock.Equal) || senderVC.Compare(MY_VECTOR_CLOCK, vclock.Descendant)) {
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "Causal dependencies not satisfied; try again later", "vc": MY_VECTOR_CLOCK.ReturnVCString()})
}
}
// Lock before accessing the KVStore
KVSmutex.Lock()
// Check if key exists
value, ok := KVStore[key]
// Unlock after accessing the KVStore
KVSmutex.Unlock()
if !ok {
return c.JSON(http.StatusNotFound, map[string]string{"error": "Key does not exist"})
}
// Return response with original data type
return c.JSON(http.StatusOK, map[string]interface{}{
"result": "found",
"value": value.Data,
"causal-metadata": MY_VECTOR_CLOCK.ReturnVCString(),
"shard-id": MY_SHARD_ID,
})
}
// DELETE /kvs/<key>
// Delete the indicateed key from the database
func deleteKey(c echo.Context) error {
// Read JSON from request body
body, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Failed to read request body"})
}
// Parse JSON body
var input KVS_GET_DELETE_Request
jsonErr := json.Unmarshal(body, &input)
if jsonErr != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid JSON format"})
}
// Check which shard the key belongs to
key := c.Param("key")
keyByte := []byte(key)
shardid := HASH_RING.LocateKey(keyByte).String()
// If shardid is NOT the same as MY_SHARD_ID, then forward the request to the appropriate shard
if shardid != MY_SHARD_ID {
if input.FromRepilca != "" {
senderVC, err := NewVClockFromString(input.CausalMetaData)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid metadata format"})
}
MY_VECTOR_CLOCK.Merge(senderVC)
return c.JSON(http.StatusOK, map[string]string{"result": "vector clock updated"})
} else {
return forwardRequest(c, choseNodeFromShard(shardid), "kvs/"+key, body)
}
}
// Handle the causal metadata to ensure causal consistency
var senderVC vclock.VClock
// Check if request was sent from a client or another replica
if input.FromRepilca != "" {
// HANDLE REQUEST FROM ANOTHER REPLICA
senderPos := input.FromRepilca
// Parse causal metadata string from client
senderVC, err = NewVClockFromString(input.CausalMetaData)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid metadata format"})
}
// Return error if senders VC value is not +1 receivers vc value
if !(compareReplicasVC(senderVC, MY_VECTOR_CLOCK, senderPos)) {
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "Causal dependencies not satisfied; try again later"})
}
// Merge the replicas's vector clock with client vector clock
MY_VECTOR_CLOCK.Merge(senderVC)
} else {
// HANDLE REQUEST FROM A CLIENT
// Check if the client vector clock is nil
if input.CausalMetaData != "" {
// Parse causal metadata string from client
senderVC, err = NewVClockFromString(input.CausalMetaData)
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid metadata format"})
}
// Check if clients request is deliverable based on its vector clock
// if recieverVC ---> clientVc return error
// If the replica is less updated than the client, it cant deliver the message
if !(senderVC.Compare(MY_VECTOR_CLOCK, vclock.Concurrent) || senderVC.Compare(MY_VECTOR_CLOCK, vclock.Equal)) {
return c.JSON(http.StatusServiceUnavailable, map[string]string{"error": "Causal dependencies not satisfied; try again later"})
}
}
// Merge the replicas's vector clock with client vector clock
MY_VECTOR_CLOCK.Merge(senderVC)
// Increment replica's index in the vector clock to track a new write
MY_VECTOR_CLOCK.Tick(SOCKET_ADDRESS)
// Otherwise broadcast request to other replicas and deliver
input.FromRepilca = SOCKET_ADDRESS
input.CausalMetaData = MY_VECTOR_CLOCK.ReturnVCString()
jsonData, _ := json.Marshal(input)
go broadcast("PUT", "kvs/"+key, jsonData, CURRENT_VIEW)
}
// Check if key exists
_, ok := KVStore[key]
if !ok {
return c.JSON(http.StatusNotFound, map[string]string{"error": "Key does not exist"})
}
// Lock before accessing the KVStore
KVSmutex.Lock()
// Delete key
delete(KVStore, key)
// Unlock after accessing the KVStore
KVSmutex.Unlock()
// Return response
return c.JSON(http.StatusOK, map[string]string{"result": "deleted", "causal-metadata": MY_VECTOR_CLOCK.ReturnVCString(), "shard-id": MY_SHARD_ID})
}