-
Notifications
You must be signed in to change notification settings - Fork 46
/
rct_core.h
228 lines (185 loc) · 6.46 KB
/
rct_core.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#ifndef _RCT_CORE_H_
#define _RCT_CORE_H_
#include "config.h"
#ifdef HAVE_DEBUG_LOG
# define RCT_DEBUG_LOG 1
#endif
#ifdef HAVE_ASSERT_PANIC
# define RCT_ASSERT_PANIC 1
#endif
#ifdef HAVE_ASSERT_LOG
# define RCT_ASSERT_LOG 1
#endif
#ifdef HAVE_STATS
# define RCT_STATS 1
#else
# define RCT_STATS 0
#endif
#ifdef HAVE_LITTLE_ENDIAN
# define RCT_LITTLE_ENDIAN 1
#endif
#ifdef HAVE_BACKTRACE
# define RCT_HAVE_BACKTRACE 1
#endif
#define RCT_OK 0
#define RCT_ERROR -1
#define RCT_EAGAIN -2
#define RCT_ENOMEM -3
/* reserved fds for std streams, log etc. */
#define RESERVED_FDS 32
typedef int r_status; /* return type */
typedef int err_t; /* error type */
#define RCT_REDIS_ROLE_NULL 0
#define RCT_REDIS_ROLE_ALL 1
#define RCT_REDIS_ROLE_MASTER 2
#define RCT_REDIS_ROLE_SLAVE 3
#define RCT_REDIS_ROLE_NAME_NODE "node"
#define RCT_REDIS_ROLE_NAME_MASTER "master"
#define RCT_REDIS_ROLE_NAME_SLAVE "slave"
#include <stdio.h>
#include <time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <ctype.h>
#include <errno.h>
#include <stddef.h>
#include <stdint.h>
#include <stdbool.h>
#include <inttypes.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <getopt.h>
#include <pthread.h>
#include <stdlib.h>
#include <hircluster.h>
#include <hiarray.h>
#include <sds.h>
#include <dict.c>
#include <adlist.h>
#include "rct_util.h"
#include "rct_option.h"
#include "rct_log.h"
#include "rct_command.h"
#include "rct_mttlist.h"
#include "rct_locklist.h"
#include "rct_conf.h"
struct async_command;
struct instance {
int log_level; /* log level */
char *log_filename; /* log filename */
char *conf_filename; /* configuration filename */
int interval; /* stats aggregation interval */
char *addr; /* stats monitoring addr */
char hostname[RCT_MAXHOSTNAMELEN]; /* hostname */
pid_t pid; /* process id */
char *pid_filename; /* pid filename */
unsigned pidfile:1; /* pid file created? */
int show_help;
int show_version;
int daemonize;
char *command;
char *role;
uint64_t start;
uint64_t end;
int simple;
int thread_count;
uint64_t buffer_size;
int commands_limit_per_second;
};
typedef struct rctContext {
redisClusterContext *cc;
dict *commands; /* Command table */
sds address;
char *cmd;
uint8_t redis_role;
uint8_t simple;
int thread_count;
uint64_t buffer_size;
int commands_limit_per_second;
rct_conf *cf;
struct hiarray args; /* sds[] */
struct async_command *acmd;
void *private_data;
}rctContext;
rctContext *create_context(struct instance *nci);
void destroy_context(rctContext *rct_ctx);
void nodes_get_state(rctContext *ctx, int type);
void slots_state(rctContext *ctx, int type);
void show_nodes_hold_slot_num(rctContext *ctx, int type);
void show_new_nodes_name(rctContext *ctx, int type);
void show_nodes_list(rctContext *ctx, int type);
void cluster_rebalance(rctContext *ctx, int type);
void do_command(rctContext *ctx, int type);
void do_command_node_by_node(rctContext *ctx, int type);
void cluster_del_keys(rctContext *ctx, int type);
struct async_command;
struct aeEventLoop;
typedef int (async_callback_reply)(struct async_command*);
typedef struct async_command{
rctContext *ctx;
struct aeEventLoop *loop;
redisClusterAsyncContext *acc; /* handler to the redis cluster */
dict *nodes; /* all the nodes in the redis cluster */
sds command; /* command need to run */
struct hiarray *parameters; /* sds[], parameters for the command */
int role; /* target role to run the command */
int nodes_count; /* node count that need do the command in one step */
int finished_count; /* finished node count in one step */
struct hiarray results; //type: cluster_node
async_callback_reply *callback;
int step; /* the command step count */
hilist *black_nodes;
}async_command;
typedef struct async_callback_data{
async_command *acmd;
struct cluster_node *node;
}async_callback_data;
int async_command_init(async_command *acmd, rctContext *ctx, char *addrs, int flags);
void async_command_deinit(async_command *acmd);
int cluster_async_call(rctContext *ctx, char *command, struct hiarray *parameters, int role, async_callback_reply *callback);
redisReply *redis_reply_clone(redisReply *r);
int async_reply_status(async_command *acmd);
int async_reply_string(async_command *acmd);
int async_reply_display(async_command *acmd);
int async_reply_display_check(async_command *acmd);
int async_reply_maxmemory(async_command *acmd);
int async_reply_info_memory(async_command *acmd);
int async_reply_info_keynum(async_command *acmd);
int async_reply_info_display(async_command *acmd);
int async_reply_info_display_check(async_command *acmd);
int async_reply_check_cluster(async_command *acmd);
int async_reply_destroy_cluster(async_command *acmd);
int async_reply_cluster_create(async_command *acmd);
int async_reply_delete_all_slaves(async_command *acmd);
int async_reply_dump_conf_file(async_command *acmd);
typedef struct slots_region
{
uint32_t start;
uint32_t end;
} slots_region;
typedef struct redis_instance{
sds name;
sds addr;
sds host;
int port;
redisContext *con;
uint8_t role;
hilist *slaves;
struct hiarray *slots; /* struct slots_region */
int slots_count;
int slots_weight;
int slots_to_import; /* Positive number means this node need import slots from other nodes, minus number means this node need move slots to other nodes. */
}redis_instance;
int redis_instance_init(redis_instance *node, const char *addr, int role);
void redis_instance_deinit(redis_instance *node);
redis_instance *redis_instance_create(const char *addr, int role);
void redis_instance_destroy(redis_instance *node);
void rct_redis_instance_array_debug_show(struct hiarray *nodes);
redisContext *cxt_get_by_redis_instance(redis_instance *node);
int redis_instance_array_assign_master_slots(struct hiarray *nodes);
struct hiarray *redis_instance_array_create_from_cluster(dict *cluster_nodes);
int redis_instance_array_addr_cmp(const void *t1, const void *t2);
#endif