From 8744a420d537960f47cec075a94b5e4701d404c1 Mon Sep 17 00:00:00 2001 From: steve-chavez Date: Mon, 7 Oct 2024 18:28:22 -0500 Subject: [PATCH 1/2] temporarily remove timerfd This is so the translation to pg WaitEvent API is easier. --- src/core.c | 32 ------------------------------- src/worker.c | 53 +++++++++++++++++----------------------------------- 2 files changed, 17 insertions(+), 68 deletions(-) diff --git a/src/core.c b/src/core.c index 365f4f6..19f3c12 100644 --- a/src/core.c +++ b/src/core.c @@ -24,7 +24,6 @@ #include #include -#include #include #include #include @@ -48,35 +47,6 @@ body_cb(void *contents, size_t size, size_t nmemb, void *userp) return realsize; } -static int multi_timer_cb(CURLM *multi, long timeout_ms, LoopState *lstate) { - elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms); - - itimerspec its = - timeout_ms > 0 ? - // assign the timeout normally - (itimerspec){ - .it_value.tv_sec = timeout_ms / 1000, - .it_value.tv_nsec = (timeout_ms % 1000) * 1000 * 1000, - }: - timeout_ms == 0 ? - /* libcurl wants us to timeout now, however setting both fields of - * new_value.it_value to zero disarms the timer. The closest we can - * do is to schedule the timer to fire in 1 ns. */ - (itimerspec){ - .it_value.tv_sec = 0, - .it_value.tv_nsec = 1, - }: - // libcurl passes a -1 to indicate the timer should be deleted - (itimerspec){}; - - int no_flags = 0; - if (timerfd_settime(lstate->timerfd, no_flags, &its, NULL) < 0) { - ereport(ERROR, errmsg("timerfd_settime failed")); - } - - return 0; -} - static int multi_socket_cb(CURL *easy, curl_socket_t sockfd, int what, LoopState *lstate, void *socketp) { static char *whatstrs[] = { "NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", "CURL_POLL_REMOVE" }; elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]); @@ -194,8 +164,6 @@ static void init_curl_handle(CURLM *curl_mhandle, MemoryContext curl_memctx, int void set_curl_mhandle(CURLM *curl_mhandle, LoopState *lstate){ CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_SOCKETFUNCTION, multi_socket_cb); CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_SOCKETDATA, lstate); - CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_TIMERFUNCTION, multi_timer_cb); - CURL_MULTI_SETOPT(curl_mhandle, CURLMOPT_TIMERDATA, lstate); } void delete_expired_responses(char *ttl, int batch_size){ diff --git a/src/worker.c b/src/worker.c index c320479..97674af 100644 --- a/src/worker.c +++ b/src/worker.c @@ -24,7 +24,6 @@ #include #include -#include #include #include #include @@ -101,7 +100,6 @@ void pg_net_worker(Datum main_arg) { LoopState lstate = { .epfd = epoll_create1(0), - .timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC), .curl_mhandle = curl_multi_init(), }; @@ -109,19 +107,11 @@ void pg_net_worker(Datum main_arg) { ereport(ERROR, errmsg("Failed to create epoll file descriptor")); } - if (lstate.timerfd < 0) { - ereport(ERROR, errmsg("Failed to create timerfd")); - } - if(!lstate.curl_mhandle) ereport(ERROR, errmsg("curl_multi_init()")); set_curl_mhandle(lstate.curl_mhandle, &lstate); - timerfd_settime(lstate.timerfd, 0, &(itimerspec){}, NULL); - - epoll_ctl(lstate.epfd, EPOLL_CTL_ADD, lstate.timerfd, &(epoll_event){.events = EPOLLIN, .data.fd = lstate.timerfd}); - while (!got_sigterm) { WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -146,11 +136,14 @@ void pg_net_worker(Datum main_arg) { consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext); int running_handles = 0; - int maxevents = guc_batch_size + 1; // 1 extra for the timer - epoll_event *events = palloc0(sizeof(epoll_event) * maxevents); + epoll_event *events = palloc0(sizeof(epoll_event) * guc_batch_size); + + EREPORT_MULTI( + curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles) + ); do { - int nfds = epoll_wait(lstate.epfd, events, maxevents, /*timeout=*/1000); + int nfds = epoll_wait(lstate.epfd, events, guc_batch_size, /*timeout=*/0); if (nfds < 0) { int save_errno = errno; if(save_errno == EINTR) { // can happen when the epoll is interrupted, for example when running under GDB. Just continue in this case. @@ -163,28 +156,17 @@ void pg_net_worker(Datum main_arg) { } for (int i = 0; i < nfds; i++) { - if (events[i].data.fd == lstate.timerfd) { - EREPORT_MULTI( - curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles) - ); - } else { - int ev_bitmask = - events[i].events & EPOLLIN ? CURL_CSELECT_IN: - events[i].events & EPOLLOUT ? CURL_CSELECT_OUT: - CURL_CSELECT_ERR; - - EREPORT_MULTI( - curl_multi_socket_action( - lstate.curl_mhandle, events[i].data.fd, - ev_bitmask, - &running_handles) - ); - - if(running_handles <= 0) { - elog(DEBUG2, "last transfer done, kill timeout"); - timerfd_settime(lstate.timerfd, 0, &(itimerspec){0}, NULL); - } - } + int ev_bitmask = + events[i].events & EPOLLIN ? CURL_CSELECT_IN: + events[i].events & EPOLLOUT ? CURL_CSELECT_OUT: + CURL_CSELECT_ERR; + + EREPORT_MULTI( + curl_multi_socket_action( + lstate.curl_mhandle, events[i].data.fd, + ev_bitmask, + &running_handles) + ); insert_curl_responses(&lstate, CurlMemContext); } @@ -197,7 +179,6 @@ void pg_net_worker(Datum main_arg) { } close(lstate.epfd); - close(lstate.timerfd); curl_multi_cleanup(lstate.curl_mhandle); curl_global_cleanup(); From ced8259f40cf41e08580a2140654807c98ee3422 Mon Sep 17 00:00:00 2001 From: steve-chavez Date: Tue, 8 Oct 2024 22:33:52 -0500 Subject: [PATCH 2/2] refactor: use pg WaitEvent API instead of epoll This allows compiling on other platforms such as MacOS. It mostly works at this stage but with some limitations: - [ ] Doing 10K requests fail with "Bad file descriptor" + I believe this is because we're not deleting file descriptors since the WaitEvent API doesn't provide EPOLL_CTL_DEL. It only provides EPOLL_CTL_ADD/MOD - [ ] timerfd is still Linux specific and pg provides no abstraction over it. --- src/core.c | 46 ++++++++++++++++++---------------------------- src/core.h | 6 +----- src/worker.c | 34 +++++++++------------------------- 3 files changed, 28 insertions(+), 58 deletions(-) diff --git a/src/core.c b/src/core.c index 19f3c12..c20245a 100644 --- a/src/core.c +++ b/src/core.c @@ -23,7 +23,6 @@ #include #include -#include #include #include #include @@ -47,39 +46,30 @@ body_cb(void *contents, size_t size, size_t nmemb, void *userp) return realsize; } -static int multi_socket_cb(CURL *easy, curl_socket_t sockfd, int what, LoopState *lstate, void *socketp) { +typedef struct { + int pos; +} marker; + +static int multi_socket_cb(CURL *easy, curl_socket_t sockfd, int what, LoopState *lstate, marker *mark) { static char *whatstrs[] = { "NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", "CURL_POLL_REMOVE" }; elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]); - int epoll_op; - if(!socketp){ - epoll_op = EPOLL_CTL_ADD; - bool *socket_exists = palloc(sizeof(bool)); - curl_multi_assign(lstate->curl_mhandle, sockfd, socket_exists); + int ev = + (what & CURL_POLL_IN) ? + WL_SOCKET_READABLE: + (what & CURL_POLL_OUT) ? + WL_SOCKET_WRITEABLE: + 0; // no event is assigned since here we get CURL_POLL_REMOVE and the sockfd will be removed + + if(!mark){ + marker *new_marker = palloc(sizeof(marker)); + new_marker->pos = AddWaitEventToSet(lstate->event_set, ev, sockfd, NULL, NULL); + curl_multi_assign(lstate->curl_mhandle, sockfd, new_marker); } else if (what == CURL_POLL_REMOVE){ - epoll_op = EPOLL_CTL_DEL; - pfree(socketp); + pfree(mark); curl_multi_assign(lstate->curl_mhandle, sockfd, NULL); } else { - epoll_op = EPOLL_CTL_MOD; - } - - epoll_event ev = { - .data.fd = sockfd, - .events = - (what & CURL_POLL_IN) ? - EPOLLIN: - (what & CURL_POLL_OUT) ? - EPOLLOUT: - 0, // no event is assigned since here we get CURL_POLL_REMOVE and the sockfd will be removed - }; - - // epoll_ctl will copy ev, so there's no need to do palloc for the epoll_event - // https://github.com/torvalds/linux/blob/e32cde8d2bd7d251a8f9b434143977ddf13dcec6/fs/eventpoll.c#L2408-L2418 - if (epoll_ctl(lstate->epfd, epoll_op, sockfd, &ev) < 0) { - int e = errno; - static char *opstrs[] = { "NONE", "EPOLL_CTL_ADD", "EPOLL_CTL_DEL", "EPOLL_CTL_MOD" }; - ereport(ERROR, errmsg("epoll_ctl with %s failed when receiving %s for sockfd %d: %s", whatstrs[what], opstrs[epoll_op], sockfd, strerror(e))); + ModifyWaitEvent(lstate->event_set, mark->pos, ev, NULL); } return 0; diff --git a/src/core.h b/src/core.h index 1c98fc1..be8abdf 100644 --- a/src/core.h +++ b/src/core.h @@ -1,13 +1,9 @@ #ifndef CORE_H #define CORE_H -typedef struct itimerspec itimerspec; -typedef struct epoll_event epoll_event; - typedef struct { - int epfd; - int timerfd; CURLM *curl_mhandle; + WaitEventSet *event_set; } LoopState; void delete_expired_responses(char *ttl, int batch_size); diff --git a/src/worker.c b/src/worker.c index 97674af..74ec205 100644 --- a/src/worker.c +++ b/src/worker.c @@ -23,7 +23,6 @@ #include #include -#include #include #include #include @@ -99,14 +98,9 @@ void pg_net_worker(Datum main_arg) { ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret))); LoopState lstate = { - .epfd = epoll_create1(0), .curl_mhandle = curl_multi_init(), }; - if (lstate.epfd < 0) { - ereport(ERROR, errmsg("Failed to create epoll file descriptor")); - } - if(!lstate.curl_mhandle) ereport(ERROR, errmsg("curl_multi_init()")); @@ -136,34 +130,26 @@ void pg_net_worker(Datum main_arg) { consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext); int running_handles = 0; - epoll_event *events = palloc0(sizeof(epoll_event) * guc_batch_size); + + lstate.event_set = CreateWaitEventSet(CurlMemContext, guc_batch_size); EREPORT_MULTI( curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles) ); do { - int nfds = epoll_wait(lstate.epfd, events, guc_batch_size, /*timeout=*/0); - if (nfds < 0) { - int save_errno = errno; - if(save_errno == EINTR) { // can happen when the epoll is interrupted, for example when running under GDB. Just continue in this case. - continue; - } - else { - ereport(ERROR, errmsg("epoll_wait() failed: %s", strerror(save_errno))); - break; - } - } + WaitEvent *events = palloc(sizeof(WaitEvent)*guc_batch_size); + int num_events_ocurred = WaitEventSetWait(lstate.event_set, /*timeout=*/1000, events, guc_batch_size, /*pending=*/ 1); - for (int i = 0; i < nfds; i++) { + for (size_t i = 0; i < num_events_ocurred; i++) { int ev_bitmask = - events[i].events & EPOLLIN ? CURL_CSELECT_IN: - events[i].events & EPOLLOUT ? CURL_CSELECT_OUT: + events[i].events & WL_SOCKET_READABLE ? CURL_CSELECT_IN: + events[i].events & WL_SOCKET_WRITEABLE ? CURL_CSELECT_OUT: CURL_CSELECT_ERR; EREPORT_MULTI( curl_multi_socket_action( - lstate.curl_mhandle, events[i].data.fd, + lstate.curl_mhandle, events[i].fd, ev_bitmask, &running_handles) ); @@ -173,13 +159,11 @@ void pg_net_worker(Datum main_arg) { } while (running_handles > 0); // run again while there are curl handles, this will prevent waiting for the latch_timeout (which will cause the cause the curl timeouts to be wrong) - pfree(events); + FreeWaitEventSet(lstate.event_set); MemoryContextReset(CurlMemContext); } - close(lstate.epfd); - curl_multi_cleanup(lstate.curl_mhandle); curl_global_cleanup();