-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
executable file
·154 lines (134 loc) · 4.26 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"github.com/DistributedClocks/GoVector/govec/vclock"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
// Declare a global view of replicas
var CURRENT_VIEW []string
// Delare Vector Clock for enforcing causal consistency
var MY_VECTOR_CLOCK vclock.VClock
// Socket Address of the current View
var SOCKET_ADDRESS string
// Shard id that the current node belongs to
var MY_SHARD_ID string
// Protects access to CURRENT_VIEW
var viewMutex sync.Mutex
// KVStore represents the in-memory key-value store
var KVStore = make(map[string]Value)
// Value represents data with the original type
type Value struct {
Data interface{}
Type string
}
type Sync_Data struct {
KvsSync string `json:"kvsCopy"`
VectorClockStr string `json:"vectorClock"`
ShardsString string `json:"shard"`
}
// GET /sync
func syncHandler(c echo.Context) error {
// Prepare the data to be sent back to the requesting replica
// Convert kvs map to JSON btes
jsonBytes, err := json.Marshal(KVStore)
if err != nil {
return c.JSON(http.StatusInternalServerError, "Failed to convert kvs to string")
}
// Convert KVStore to string
kvsString := string(jsonBytes)
// Convert Shards to JSON bytes
jsonBytes, err = json.Marshal(SHARDS)
if err != nil {
return c.JSON(http.StatusInternalServerError, "Failed to convert Shard Map to string")
}
// Convert Shards to string
shardsString := string(jsonBytes)
// Convert Vector Clock to string
vcString := MY_VECTOR_CLOCK.ReturnVCString()
syncData := Sync_Data{
KvsSync: kvsString,
VectorClockStr: vcString,
ShardsString: shardsString,
}
// Send the current view and vector clock as a JSON response
return c.JSON(http.StatusOK, syncData)
}
func main() {
// Read environment variables
SOCKET_ADDRESS = os.Getenv("SOCKET_ADDRESS")
CURRENT_VIEW = strings.Split(os.Getenv("VIEW"), ",")
SHARD_COUNT, err := strconv.Atoi(os.Getenv("SHARD_COUNT"))
// Check if SHARD_COUNT was specified
if err == nil {
syncMyself(SHARD_COUNT)
// Store my shard id
updateMyShardID()
// Create a hash ring to represent the distribution of shards
HASH_RING = createHashRing()
}
// Define new Echo instance
e := echo.New()
fmt.Printf("\nMy ShardID: %s\n", MY_SHARD_ID)
// Define Logger to display requests. Code from Echo Documentation
e.Use(middleware.RequestLoggerWithConfig(middleware.RequestLoggerConfig{
LogStatus: true,
LogURI: true,
LogMethod: true,
LogUserAgent: true,
LogRemoteIP: true,
BeforeNextFunc: func(c echo.Context) {
c.Set("vclock", MY_VECTOR_CLOCK.ReturnVCString())
},
LogValuesFunc: func(c echo.Context, v middleware.RequestLoggerValues) error {
//vclock, _ := c.Get("vclock").(string)
//method := strings.ToUpper(v.Method)
if v.URI == "/view" {
return nil
}
fmt.Printf("%v %s %v status: %v shard: %s \n shardMap: %v\n view: %v\n\n", v.RemoteIP, v.Method, v.URI, v.Status, MY_SHARD_ID, SHARDS, CURRENT_VIEW)
return nil
},
}))
// Define /kvs GET endpoints
e.GET("/kvs", getKey)
e.GET("/kvs/", getKey)
e.GET("/kvs/:key", getKey)
// Define /kvs PUT endpoints
e.PUT("/kvs", putKey)
e.PUT("/kvs/", putKey)
e.PUT("/kvs/:key", putKey)
// Define /kvs DELETE endpoints
e.DELETE("/kvs", deleteKey)
e.DELETE("/kvs/", deleteKey)
e.DELETE("/kvs/:key", deleteKey)
// Define /view endpoints
e.PUT("/view", putReplicaView)
e.GET("/view", getView)
e.DELETE("/view", deleteReplicaView)
// Define /shard endpoints
e.GET("/shard/ids", getAllShardIds)
e.GET("/shard/node-shard-id", getMyShardId)
e.GET("/shard/members/:id", getMembersOfShard)
e.GET("/shard/key-count/:id", getShardKeyCount)
e.PUT("/shard/add-member/:id", addNodeToShard)
e.PUT("shard/reshard", reshard)
e.PUT("/shard/kvs-update/:key", updateKvsForResharding)
// Define /sync endpoint for syncing new nodes
e.GET("/sync", syncHandler)
// Build the JSON body to be sent: {"socket-address":"<IP:PORT>"}
payload := map[string]string{"socket-address": SOCKET_ADDRESS}
jsonPayload, _ := json.Marshal(payload)
// Broadcaset Put View message to all replicas in the system
broadcast("PUT", "view", jsonPayload, CURRENT_VIEW)
// Start heartbeat checker
go heartbeat()
// Start Echo server
e.Logger.Fatal(e.Start(SOCKET_ADDRESS))
}