-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpcs.go
123 lines (106 loc) · 4.64 KB
/
rpcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright (c) 2019 Meng Huang ([email protected])
// This package is licensed under a MIT license that can be found in the LICENSE file.
package raft
import (
"context"
"github.com/hslam/rpc"
)
const (
maxConnsPerHost = 2
maxIdleConnsPerHost = 2
network = "tcp"
codec = "pb"
serviceName = "R"
requestVoteName = "RequestVote"
appendEntriesName = "AppendEntries"
installSnapshotName = "InstallSnapshot"
getLeaderName = "GetLeader"
addMemberName = "AddMember"
removeMemberName = "RemoveMember"
setMetaName = "SetMeta"
getMetaName = "GetMeta"
)
// RPCs represents the RPCs.
type RPCs interface {
Register(s Service) error
ListenAndServe() error
Close() error
Ping(addr string) error
RequestVote(ctx context.Context, addr string, req *RequestVoteRequest, res *RequestVoteResponse) error
AppendEntries(ctx context.Context, addr string, req *AppendEntriesRequest, res *AppendEntriesResponse) error
InstallSnapshot(ctx context.Context, addr string, req *InstallSnapshotRequest, res *InstallSnapshotResponse) error
GetLeader(ctx context.Context, addr string, req *GetLeaderRequest, res *GetLeaderResponse) error
AddMember(ctx context.Context, addr string, req *AddMemberRequest, res *AddMemberResponse) error
RemoveMember(ctx context.Context, addr string, req *RemoveMemberRequest, res *RemoveMemberResponse) error
SetMeta(ctx context.Context, addr string, req *SetMetaRequest, res *SetMetaResponse) error
GetMeta(ctx context.Context, addr string, req *GetMetaRequest, res *GetMetaResponse) error
}
type rpcs struct {
*rpc.Transport
appendEntriesServiceName string
requestVoteServiceName string
installSnapshotServiceName string
getLeaderServiceName string
addMemberServiceName string
removeMemberServiceName string
setMetaServiceName string
getMetaServiceName string
server *rpc.Server
addr string
}
func newRPCs(addr string) *rpcs {
r := &rpcs{
Transport: &rpc.Transport{
MaxConnsPerHost: maxConnsPerHost,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
Options: &rpc.Options{Network: network, Codec: codec},
},
appendEntriesServiceName: serviceName + "." + appendEntriesName,
requestVoteServiceName: serviceName + "." + requestVoteName,
installSnapshotServiceName: serviceName + "." + installSnapshotName,
getLeaderServiceName: serviceName + "." + getLeaderName,
addMemberServiceName: serviceName + "." + addMemberName,
removeMemberServiceName: serviceName + "." + removeMemberName,
setMetaServiceName: serviceName + "." + setMetaName,
getMetaServiceName: serviceName + "." + getMetaName,
server: rpc.NewServer(),
addr: addr,
}
return r
}
func (r *rpcs) Register(service Service) error {
return r.server.RegisterName(serviceName, service)
}
func (r *rpcs) ListenAndServe() error {
r.server.SetLogLevel(rpc.OffLogLevel)
r.server.SetNoCopy(true)
return r.server.Listen(network, r.addr, codec)
}
func (r *rpcs) Close() error {
r.Transport.Close()
return r.server.Close()
}
func (r *rpcs) RequestVote(ctx context.Context, addr string, req *RequestVoteRequest, res *RequestVoteResponse) error {
return r.CallWithContext(ctx, addr, r.requestVoteServiceName, req, res)
}
func (r *rpcs) AppendEntries(ctx context.Context, addr string, req *AppendEntriesRequest, res *AppendEntriesResponse) error {
return r.CallWithContext(ctx, addr, r.appendEntriesServiceName, req, res)
}
func (r *rpcs) InstallSnapshot(ctx context.Context, addr string, req *InstallSnapshotRequest, res *InstallSnapshotResponse) error {
return r.CallWithContext(ctx, addr, r.installSnapshotServiceName, req, res)
}
func (r *rpcs) GetLeader(ctx context.Context, addr string, req *GetLeaderRequest, res *GetLeaderResponse) error {
return r.CallWithContext(ctx, addr, r.getLeaderServiceName, req, res)
}
func (r *rpcs) AddMember(ctx context.Context, addr string, req *AddMemberRequest, res *AddMemberResponse) error {
return r.CallWithContext(ctx, addr, r.addMemberServiceName, req, res)
}
func (r *rpcs) RemoveMember(ctx context.Context, addr string, req *RemoveMemberRequest, res *RemoveMemberResponse) error {
return r.CallWithContext(ctx, addr, r.removeMemberServiceName, req, res)
}
func (r *rpcs) SetMeta(ctx context.Context, addr string, req *SetMetaRequest, res *SetMetaResponse) error {
return r.CallWithContext(ctx, addr, r.setMetaServiceName, req, res)
}
func (r *rpcs) GetMeta(ctx context.Context, addr string, req *GetMetaRequest, res *GetMetaResponse) error {
return r.CallWithContext(ctx, addr, r.getMetaServiceName, req, res)
}