Skip to content

Commit

Permalink
Update poller behavior when 'accept' failed.
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim committed Feb 29, 2024
1 parent 468fb02 commit 341f66c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 64 deletions.
1 change: 0 additions & 1 deletion src/kernel/Communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,6 @@ void *Communicator::accept(const struct sockaddr *addr, socklen_t addrlen,
delete target;
}

close(sockfd);
return NULL;
}

Expand Down
113 changes: 58 additions & 55 deletions src/kernel/poller.c
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,10 @@ static void __poller_handle_listen(struct __poller_node *node,

result = node->data.accept(addr, addrlen, sockfd, node->data.context);
if (!result)
{
close(sockfd);
break;
}

res->data = node->data;
res->data.result = result;
Expand Down Expand Up @@ -651,6 +654,55 @@ static void __poller_handle_connect(struct __poller_node *node,
poller->callback((struct poller_result *)node, poller->context);
}

static void __poller_handle_recvfrom(struct __poller_node *node,
poller_t *poller)
{
struct __poller_node *res = node->res;
struct sockaddr_storage ss;
struct sockaddr *addr = (struct sockaddr *)&ss;
socklen_t addrlen;
void *result;
ssize_t n;

while (1)
{
addrlen = sizeof (struct sockaddr_storage);
n = recvfrom(node->data.fd, poller->buf, POLLER_BUFSIZE, 0,
addr, &addrlen);
if (n < 0)
{
if (errno == EAGAIN)
return;
else
break;
}

result = node->data.recvfrom(addr, addrlen, poller->buf, n,
node->data.context);
if (!result)
break;

res->data = node->data;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->callback((struct poller_result *)res, poller->context);

res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
if (!res)
break;
}

if (__poller_remove_node(node, poller))
return;

node->error = errno;
node->state = PR_ST_ERROR;
free(node->res);
poller->callback((struct poller_result *)node, poller->context);
}

static void __poller_handle_ssl_accept(struct __poller_node *node,
poller_t *poller)
{
Expand Down Expand Up @@ -849,55 +901,6 @@ static void __poller_handle_notify(struct __poller_node *node,
poller->callback((struct poller_result *)node, poller->context);
}

static void __poller_handle_recvfrom(struct __poller_node *node,
poller_t *poller)
{
struct __poller_node *res = node->res;
struct sockaddr_storage ss;
struct sockaddr *addr = (struct sockaddr *)&ss;
socklen_t addrlen;
void *result;
ssize_t n;

while (1)
{
addrlen = sizeof (struct sockaddr_storage);
n = recvfrom(node->data.fd, poller->buf, POLLER_BUFSIZE, 0,
addr, &addrlen);
if (n < 0)
{
if (errno == EAGAIN)
return;
else
break;
}

result = node->data.recvfrom(addr, addrlen, poller->buf, n,
node->data.context);
if (!result)
break;

res->data = node->data;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->callback((struct poller_result *)res, poller->context);

res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
if (!res)
break;
}

if (__poller_remove_node(node, poller))
return;

node->error = errno;
node->state = PR_ST_ERROR;
free(node->res);
poller->callback((struct poller_result *)node, poller->context);
}

static int __poller_handle_pipe(poller_t *poller)
{
struct __poller_node **node = (struct __poller_node **)poller->buf;
Expand Down Expand Up @@ -1055,6 +1058,9 @@ static void *__poller_thread_routine(void *arg)
case PD_OP_CONNECT:
__poller_handle_connect(node, poller);
break;
case PD_OP_RECVFROM:
__poller_handle_recvfrom(node, poller);
break;
case PD_OP_SSL_ACCEPT:
__poller_handle_ssl_accept(node, poller);
break;
Expand All @@ -1070,9 +1076,6 @@ static void *__poller_thread_routine(void *arg)
case PD_OP_NOTIFY:
__poller_handle_notify(node, poller);
break;
case PD_OP_RECVFROM:
__poller_handle_recvfrom(node, poller);
break;
}
}

Expand Down Expand Up @@ -1282,6 +1285,9 @@ static int __poller_data_get_event(int *event, const struct poller_data *data)
case PD_OP_CONNECT:
*event = EPOLLOUT | EPOLLET;
return 0;
case PD_OP_RECVFROM:
*event = EPOLLIN | EPOLLET;
return 1;
case PD_OP_SSL_ACCEPT:
*event = EPOLLIN | EPOLLET;
return 0;
Expand All @@ -1297,9 +1303,6 @@ static int __poller_data_get_event(int *event, const struct poller_data *data)
case PD_OP_NOTIFY:
*event = EPOLLIN | EPOLLET;
return 1;
case PD_OP_RECVFROM:
*event = EPOLLIN | EPOLLET;
return 1;
default:
errno = EINVAL;
return -1;
Expand Down
16 changes: 8 additions & 8 deletions src/kernel/poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ struct poller_data
#define PD_OP_WRITE 2
#define PD_OP_LISTEN 3
#define PD_OP_CONNECT 4
#define PD_OP_RECVFROM 5
#define PD_OP_SSL_READ PD_OP_READ
#define PD_OP_SSL_WRITE PD_OP_WRITE
#define PD_OP_SSL_ACCEPT 5
#define PD_OP_SSL_CONNECT 6
#define PD_OP_SSL_SHUTDOWN 7
#define PD_OP_EVENT 8
#define PD_OP_NOTIFY 9
#define PD_OP_RECVFROM 10
#define PD_OP_SSL_ACCEPT 6
#define PD_OP_SSL_CONNECT 7
#define PD_OP_SSL_SHUTDOWN 8
#define PD_OP_EVENT 9
#define PD_OP_NOTIFY 10
short operation;
unsigned short iovcnt;
int fd;
Expand All @@ -57,10 +57,10 @@ struct poller_data
poller_message_t *(*create_message)(void *);
int (*partial_written)(size_t, void *);
void *(*accept)(const struct sockaddr *, socklen_t, int, void *);
void *(*event)(void *);
void *(*notify)(void *, void *);
void *(*recvfrom)(const struct sockaddr *, socklen_t,
const void *, size_t, void *);
void *(*event)(void *);
void *(*notify)(void *, void *);
};
void *context;
union
Expand Down

0 comments on commit 341f66c

Please sign in to comment.