Skip to content

Commit eaf5ebc

Browse files
committed
Add basic queue for not sent bytes
1 parent e1e1345 commit eaf5ebc

File tree

5 files changed

+84
-3
lines changed

5 files changed

+84
-3
lines changed

src/core.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
#include "trie.h"
3232
#include "list.h"
33+
#include "network.h"
3334
#include "hashtable.h"
3435

3536

@@ -62,6 +63,7 @@ struct sol_client {
6263
char *client_id;
6364
int fd;
6465
struct session session;
66+
struct buffer buf;
6567
};
6668

6769

src/network.c

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,41 @@
4949
#define EVLOOP_INITIAL_SIZE 4
5050

5151

52+
static const int INITIAL_BUFLEN = 4;
53+
54+
55+
void buffer_create(struct buffer *buf) {
56+
buf->bytes = sol_malloc(INITIAL_BUFLEN);
57+
buf->size = INITIAL_BUFLEN;
58+
buf->start = buf->end = 0;
59+
}
60+
61+
62+
void buffer_release(struct buffer *buf) {
63+
sol_free(buf->bytes);
64+
buf->bytes = NULL;
65+
buf->start = buf->end = 0;
66+
}
67+
68+
69+
void buffer_push_back(struct buffer *buf, unsigned char *bytes, size_t len) {
70+
71+
// Re-size buffer in case of len surpassing buffer size
72+
if (len > buf->size / 2) {
73+
buf->size *= 2;
74+
buf->bytes = sol_realloc(buf->bytes, buf->size);
75+
}
76+
77+
memcpy(buf->bytes + buf->end, bytes, len);
78+
buf->end += len;
79+
80+
}
81+
82+
83+
int buffer_empty(const struct buffer *buf) {
84+
return buf->end == buf->start;
85+
}
86+
5287
/* Set non-blocking socket */
5388
int set_nonblocking(int fd) {
5489
int flags, result;
@@ -122,7 +157,7 @@ static int create_and_bind_tcp(const char *host, const char *port) {
122157
if (sfd == -1) continue;
123158

124159
/* set SO_REUSEADDR so the socket will be reusable after process kill */
125-
if (setsockopt(sfd, SOL_SOCKET, (SO_REUSEPORT | SO_REUSEADDR),
160+
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
126161
&(int) { 1 }, sizeof(int)) < 0)
127162
perror("SO_REUSEADDR");
128163

src/network.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,36 @@
3131

3232
#include <stdio.h>
3333
#include <stdint.h>
34+
#include <sys/types.h>
3435
#include "util.h"
3536

3637

3738
// Socket families
3839
#define UNIX 0
3940
#define INET 1
4041

42+
/*
43+
* Buffer queue, mainly for remaining bytes to send, for EWOULDBLOCK or EAGAIN
44+
* on non-blocking socket
45+
*/
46+
struct buffer {
47+
size_t size;
48+
size_t start;
49+
size_t end;
50+
unsigned char *bytes;
51+
};
52+
53+
// Buffer build functions
54+
void buffer_create(struct buffer *);
55+
56+
void buffer_release(struct buffer *);
57+
58+
/* Copy an array of bytes into the buffer */
59+
void buffer_push_back(struct buffer *, unsigned char *, size_t);
60+
61+
/* Check if the buffer is empty by comparing start and end cursors */
62+
int buffer_empty(const struct buffer *);
63+
4164
/* Event loop wrapper structure, define an EPOLL loop and his status. The
4265
* EPOLL instance use EPOLLONESHOT for each event and must be re-armed
4366
* manually, in order to allow future uses on a multithreaded architecture.
@@ -78,7 +101,6 @@ struct closure {
78101
callback *call;
79102
};
80103

81-
82104
/* Set non-blocking socket */
83105
int set_nonblocking(int);
84106

src/server.c

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ static int connect_handler(struct closure *cb, union mqtt_packet *pkt) {
256256
new_client->fd = cb->fd;
257257
const char *cid = (const char *) pkt->connect.payload.client_id;
258258
new_client->client_id = sol_strdup(cid);
259+
buffer_create(&new_client->buf);
259260
hashtable_put(sol.clients, cid, new_client);
260261

261262
/* Substitute fd on callback with closure */
@@ -653,12 +654,31 @@ static int pingreq_handler(struct closure *cb, union mqtt_packet *pkt) {
653654
static void on_write(struct evloop *loop, void *arg) {
654655

655656
struct closure *cb = arg;
657+
struct sol_client *c = cb->obj;
656658

657659
ssize_t sent;
660+
661+
if (buffer_empty(&c->buf) == 0) {
662+
663+
/* Check if there's still some remaning bytes to send out */
664+
if ((sent = send_bytes(cb->fd, c->buf.bytes + c->buf.start, c->buf.end)) < 0)
665+
sol_error("Error writing on socket to client %s: %s",
666+
((struct sol_client *) cb->obj)->client_id, strerror(errno));
667+
668+
/* Update buffer pointers */
669+
c->buf.start += sent;
670+
671+
}
672+
658673
if ((sent = send_bytes(cb->fd, cb->payload->data, cb->payload->size)) < 0)
659674
sol_error("Error writing on socket to client %s: %s",
660675
((struct sol_client *) cb->obj)->client_id, strerror(errno));
661676

677+
/* Update client buffer for remaining bytes to send */
678+
if (sent < cb->payload->size)
679+
buffer_push_back(&c->buf, cb->payload->data + sent,
680+
cb->payload->size - sent);
681+
662682
// Update information stats
663683
info.bytes_sent += sent;
664684
bytestring_release(cb->payload);
@@ -977,6 +997,8 @@ static int client_destructor(struct hashtable_entry *entry) {
977997
if (client->client_id)
978998
sol_free(client->client_id);
979999

1000+
buffer_release(&client->buf);
1001+
9801002
sol_free(client);
9811003

9821004
return 0;

src/util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void *sol_realloc(void *, size_t);
5959
size_t malloc_size(void *);
6060
void sol_free(void *);
6161
char *sol_strdup(const char *);
62-
char *remove_occur(char *str, char c);
62+
char *remove_occur(char *, char);
6363
char *append_string(char *, char *, size_t);
6464

6565
size_t memory_used(void);

0 commit comments

Comments
 (0)