Skip to content

Commit 55e9fdd

Browse files
committed
Added buggy implementation of wildcard subscription #, removed useless functions
1 parent 56caa02 commit 55e9fdd

File tree

8 files changed

+142
-79
lines changed

8 files changed

+142
-79
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ features expected from a MQTT broker.
4040
- [X] QoS 0, 1, 2 are handled
4141
- [X] Trie as underlying structure to handle topic hierarchies
4242
- [X] Periodic tasks like stats publishing
43-
- [ ] Wildcards on subscriptions (though simple to implement, will be added soon)
43+
- [X] Wildcards on subscriptions (though simple to implement, will be added soon)
4444
- [ ] QoS 1 and 2 tracking of pending clients and re-send
4545
- [ ] Session present check and handling
4646
- [ ] Authentication

src/network.c

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -210,44 +210,6 @@ int accept_connection(int serversock) {
210210
return clientsock;
211211
}
212212

213-
/*
214-
* Create a socket and use it to connect to the specified host and port
215-
*/
216-
int open_connection(const char *host, int port) {
217-
218-
struct sockaddr_in serveraddr;
219-
struct hostent *server;
220-
221-
/* socket: create the socket */
222-
int sfd = socket(AF_INET, SOCK_STREAM, 0);
223-
if (sfd < 0)
224-
goto err;
225-
226-
/* gethostbyname: get the server's DNS entry */
227-
server = gethostbyname(host);
228-
if (server == NULL)
229-
goto err;
230-
231-
/* build the server's address */
232-
bzero((char *) &serveraddr, sizeof(serveraddr));
233-
serveraddr.sin_family = AF_INET;
234-
bcopy((char *) server->h_addr,
235-
(char *) &serveraddr.sin_addr.s_addr, server->h_length);
236-
serveraddr.sin_port = htons(port);
237-
238-
/* connect: create a connection with the server */
239-
if (connect(sfd, (const struct sockaddr *) &serveraddr,
240-
sizeof(serveraddr)) < 0)
241-
goto err;
242-
243-
return sfd;
244-
245-
err:
246-
247-
perror("open_connection");
248-
return -1;
249-
}
250-
251213
/* Send all bytes contained in buf, updating sent bytes counter */
252214
ssize_t send_bytes(int fd, const unsigned char *buf, size_t len) {
253215

src/network.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,6 @@ int make_listen(const char *, const char *, int);
100100
/* Accept a connection and add it to the right epollfd */
101101
int accept_connection(int);
102102

103-
/* Open a connection with a target host:port */
104-
int open_connection(const char *, int);
105-
106103
struct evloop *evloop_create(int, int);
107104

108105
void evloop_init(struct evloop *, int, int);

src/server.c

Lines changed: 85 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,19 @@ static struct sol sol;
5454
* Statistics topics, published every N seconds defined by configuration
5555
* interval
5656
*/
57-
static const char *sys_topics[7] = {
58-
"$SOL/broker/uptime",
59-
"$SOL/broker/clients/connected",
60-
"$SOL/broker/clients/disconnected",
61-
"$SOL/broker/bytes/sent",
62-
"$SOL/broker/bytes/received",
63-
"$SOL/broker/messages/sent",
64-
"$SOL/broker/messages/received"
57+
static const char *sys_topics[12] = {
58+
"$SOL/",
59+
"$SOL/broker/",
60+
"$SOL/broker/clients/",
61+
"$SOL/broker/bytes/",
62+
"$SOL/broker/messages/",
63+
"$SOL/broker/uptime/",
64+
"$SOL/broker/clients/connected/",
65+
"$SOL/broker/clients/disconnected/",
66+
"$SOL/broker/bytes/sent/",
67+
"$SOL/broker/bytes/received/",
68+
"$SOL/broker/messages/sent/",
69+
"$SOL/broker/messages/received/"
6570
};
6671

6772

@@ -280,9 +285,28 @@ static int disconnect_handler(struct closure *cb, union mqtt_packet *pkt) {
280285
}
281286

282287

288+
static void recursive_subscription(struct trie_node *node, void *arg) {
289+
290+
if (!node || !node->data)
291+
return;
292+
293+
struct list_node *child = node->children->head;
294+
for (; child; child = child->next)
295+
recursive_subscription(child->data, arg);
296+
297+
struct topic *t = node->data;
298+
299+
struct subscriber *s = arg;
300+
301+
t->subscribers = list_push(t->subscribers, s);
302+
}
303+
304+
283305
static int subscribe_handler(struct closure *cb, union mqtt_packet *pkt) {
284306

285307
struct sol_client *c = cb->obj;
308+
bool wildcard = false;
309+
bool alloced = false;
286310

287311
/* Subscribe packets contains a list of topics and QoS tuples */
288312
for (unsigned i = 0; i < pkt->subscribe.tuples_len; i++) {
@@ -293,19 +317,43 @@ static int subscribe_handler(struct closure *cb, union mqtt_packet *pkt) {
293317
* Check if the topic exists already or in case create it and store in
294318
* the global map
295319
*/
296-
const char *topic_name = (const char *) pkt->subscribe.tuples[i].topic;
297-
struct topic *t = sol_topic_get(&sol, topic_name);
320+
char *topic = (char *) pkt->subscribe.tuples[i].topic;
321+
322+
sol_debug("\t%s (QoS %i)", topic, pkt->subscribe.tuples[i].qos);
323+
324+
/* Recursive subscribe to all children topics if the topic ends with "/#" */
325+
if (topic[pkt->subscribe.tuples[i].topic_len - 1] == '#' &&
326+
topic[pkt->subscribe.tuples[i].topic_len - 2] == '/') {
327+
topic = remove_occur(topic, '#');
328+
wildcard = true;
329+
} else if (topic[pkt->subscribe.tuples[i].topic_len - 1] != '/') {
330+
topic = sol_malloc(pkt->subscribe.tuples[i].topic_len + 2);
331+
memcpy(topic, pkt->subscribe.tuples[i].topic,
332+
pkt->subscribe.tuples[i].topic_len);
333+
topic[pkt->subscribe.tuples[i].topic_len] = '/';
334+
topic[pkt->subscribe.tuples[i].topic_len + 1] = '\0';
335+
alloced = true;
336+
}
298337

299-
sol_debug("\t%s (QoS %i)", topic_name, pkt->subscribe.tuples[i].qos);
338+
struct topic *t = sol_topic_get(&sol, topic);
300339

301340
// TODO check for callback correctly set to obj
302341

303342
if (!t) {
304-
t = topic_create(sol_strdup(topic_name));
343+
t = topic_create(sol_strdup(topic));
305344
sol_topic_put(&sol, t);
345+
} else if (wildcard == true) {
346+
struct subscriber *sub = sol_malloc(sizeof(*sub));
347+
sub->client = cb->obj;
348+
sub->qos = pkt->subscribe.tuples[i].qos;
349+
trie_prefix_map_tuple(&sol.topics, topic,
350+
recursive_subscription, sub);
306351
}
307352

308353
topic_add_subscriber(t, cb->obj, pkt->subscribe.tuples[i].qos);
354+
355+
if (alloced)
356+
sol_free(topic);
309357
}
310358

311359
/*
@@ -374,17 +422,31 @@ static int publish_handler(struct closure *cb, union mqtt_packet *pkt) {
374422

375423
info.messages_recv++;
376424

425+
char *topic = (char *) pkt->publish.topic;
426+
bool alloced = false;
427+
428+
if (topic[pkt->publish.topiclen - 1] != '/') {
429+
topic = sol_malloc(pkt->publish.topiclen + 2);
430+
memcpy(topic, pkt->publish.topic, pkt->publish.topiclen);
431+
topic[pkt->publish.topiclen] = '/';
432+
topic[pkt->publish.topiclen + 1] = '\0';
433+
alloced = true;
434+
}
435+
377436
/*
378437
* Retrieve the topic from the global map, if it wasn't created before,
379438
* create a new one with the name selected
380439
*/
381-
struct topic *t = sol_topic_get(&sol, (const char *) pkt->publish.topic);
440+
struct topic *t = sol_topic_get(&sol, topic);
382441

383442
if (!t) {
384-
t = topic_create(sol_strdup((const char *) pkt->publish.topic));
443+
t = topic_create(sol_strdup(topic));
385444
sol_topic_put(&sol, t);
386445
}
387446

447+
if (alloced == true)
448+
sol_free(topic);
449+
388450
// TODO Check for QoS of subscriber, it should override the PUBLISH one
389451

390452
unsigned char *pub = pack_mqtt_packet(pkt, PUBLISH_TYPE);
@@ -822,16 +884,16 @@ static void publish_stats(struct evloop *loop, void *args) {
822884
unsigned char *pmrecv = (unsigned char *) &mrecv[0];
823885
unsigned char *putime = (unsigned char *) &utime[0];
824886

825-
publish_message(0, strlen(sys_topics[0]),
826-
sys_topics[0], strlen(utime), putime);
827-
publish_message(0, strlen(sys_topics[1]),
828-
sys_topics[1], strlen(cclients), pcclients);
829-
publish_message(0, strlen(sys_topics[3]),
830-
sys_topics[3], strlen(bsent), pbsent);
831887
publish_message(0, strlen(sys_topics[5]),
832-
sys_topics[5], strlen(msent), pmsent);
888+
sys_topics[5], strlen(utime), putime);
833889
publish_message(0, strlen(sys_topics[6]),
834-
sys_topics[6], strlen(mrecv), pmrecv);
890+
sys_topics[6], strlen(cclients), pcclients);
891+
publish_message(0, strlen(sys_topics[8]),
892+
sys_topics[8], strlen(bsent), pbsent);
893+
publish_message(0, strlen(sys_topics[10]),
894+
sys_topics[10], strlen(msent), pmsent);
895+
publish_message(0, strlen(sys_topics[11]),
896+
sys_topics[11], strlen(mrecv), pmrecv);
835897

836898
}
837899

@@ -891,7 +953,7 @@ int start_server(const char *addr, const char *port) {
891953
generate_uuid(server_closure.closure_id);
892954

893955
/* Generate stats topics */
894-
for (int i = 0; i < 7; i++)
956+
for (int i = 0; i < 12; i++)
895957
sol_topic_put(&sol, topic_create(sol_strdup(sys_topics[i])));
896958

897959
struct evloop *event_loop = evloop_create(EPOLL_MAX_EVENTS, EPOLL_TIMEOUT);

src/trie.c

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,24 @@ static void trie_prefix_map_func(struct trie_node *node,
480480

481481
}
482482

483+
/* Iterate through children of each node starting from a given node, applying
484+
a defined function which take a struct trie_node as argument */
485+
static void trie_prefix_map_func2(struct trie_node *node,
486+
void (*mapfunc)(struct trie_node *, void *), void *arg) {
487+
488+
if (trie_is_free_node(node)) {
489+
mapfunc(node, arg);
490+
return;
491+
}
492+
493+
struct list_node *child = node->children->head;
494+
for (; child; child = child->next)
495+
trie_prefix_map_func2(child->data, mapfunc, arg);
496+
497+
mapfunc(node, arg);
498+
499+
}
500+
483501
/*
484502
* Apply a function to every key below a given prefix, if prefix is null the
485503
* function will be applied to all the trie
@@ -505,6 +523,30 @@ void trie_prefix_map(Trie *trie, const char *prefix,
505523
}
506524
}
507525

526+
/* Apply a function to every key below a given prefix, if prefix is null the
527+
function will be applied to all the trie */
528+
void trie_prefix_map_tuple(Trie *trie, const char *prefix,
529+
void (*mapfunc)(struct trie_node *, void *), void *arg) {
530+
531+
assert(trie);
532+
533+
if (!prefix) {
534+
trie_prefix_map_func2(trie->root, mapfunc, arg);
535+
} else {
536+
537+
// Walk the trie till the end of the key
538+
struct trie_node *node = trie_node_find(trie->root, prefix);
539+
540+
// No complete key found
541+
if (!node)
542+
return;
543+
544+
// Check all possible sub-paths and add to count where there is a leaf
545+
trie_prefix_map_func2(node, mapfunc, arg);
546+
}
547+
}
548+
549+
508550
/* Release memory of a node while updating size of the trie */
509551
void trie_node_free(struct trie_node *node, size_t *size) {
510552

src/trie.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,7 @@ List *trie_prefix_find(const Trie *, const char *);
109109
void trie_prefix_map(Trie *, const char *, void (*mapfunc)(struct trie_node *));
110110

111111

112+
void trie_prefix_map_tuple(Trie *, const char *,
113+
void (*mapfunc)(struct trie_node *, void *), void *);
114+
112115
#endif

src/util.c

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,16 @@ int parse_int(const char *string) {
117117
}
118118

119119

120-
char *update_integer_string(char *str, int num) {
121-
122-
int n = parse_int(str);
123-
n += num;
124-
/*
125-
* Check for realloc if the new value is "larger" then
126-
* previous
127-
*/
128-
char tmp[number_len(n) + 1]; // max size in bytes
129-
sprintf(tmp, "%d", n); // XXX Unsafe
130-
size_t len = strlen(tmp);
131-
str = sol_realloc(str, len + 1);
132-
strncpy(str, tmp, len + 1);
120+
char *remove_occur(char *str, char c) {
121+
char *p = str;
122+
char *pp = str;
123+
124+
while (*p) {
125+
*pp = *p++;
126+
pp += (*pp != c);
127+
}
128+
129+
*pp = '\0';
133130

134131
return str;
135132
}

src/util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void *sol_realloc(void *, size_t);
5959
size_t malloc_size(void *);
6060
void sol_free(void *);
6161
char *sol_strdup(const char *);
62-
char *update_integer_string(char *, int);
62+
char *remove_occur(char *str, char c);
6363

6464
size_t memory_used(void);
6565

0 commit comments

Comments
 (0)