diff --git a/extracted/plugins/FilePlugin/src/common/sqFilePluginBasicPrims.c b/extracted/plugins/FilePlugin/src/common/sqFilePluginBasicPrims.c index 1373a49b4a..0ad7522df8 100644 --- a/extracted/plugins/FilePlugin/src/common/sqFilePluginBasicPrims.c +++ b/extracted/plugins/FilePlugin/src/common/sqFilePluginBasicPrims.c @@ -830,7 +830,7 @@ sqFileThisSession() { } void -signalOnDataArrival(int fd, void *clientData, int flag){ +signalOnDataArrival(sqInt fd, void *clientData, int flag){ interpreterProxy->signalSemaphoreWithIndex((sqInt)clientData); aioDisable(fd); } diff --git a/extracted/plugins/SocketPlugin/src/common/SocketPluginImpl.c b/extracted/plugins/SocketPlugin/src/common/SocketPluginImpl.c index 5c9ec96fd9..9e7b8787be 100644 --- a/extracted/plugins/SocketPlugin/src/common/SocketPluginImpl.c +++ b/extracted/plugins/SocketPlugin/src/common/SocketPluginImpl.c @@ -231,6 +231,7 @@ typedef struct privateSocketStruct int multiListen; /* whether to listen for multiple connections */ int acceptedSock; /* a connection that has been accepted */ int socketType; + int waitingToSend; } privateSocketStruct; #define CONN_NOTIFY (1<<0) @@ -261,7 +262,7 @@ typedef struct privateSocketStruct #define SOCKETERROR(S) (PSP(S)->sockError) #define SOCKETPEER(S) (PSP(S)->peer) #define SOCKETPEERSIZE(S) (PSP(S)->peerSize) - +#define SOCKET_WAITINGTOSEND(S) (PSP(S)->waitingToSend) /*** Resolver state ***/ @@ -304,6 +305,7 @@ char *socketHandlerName(aioHandler h) if (h == connectHandler) return "connectHandler"; if (h == dataHandler) return "dataHandler"; if (h == closeHandler) return "closeHandler"; + if (h == sendHandler) return "sendHandler"; return "***unknownHandler***"; } #endif @@ -475,7 +477,8 @@ static void acceptHandler(sqInt fd, void *data, int flags) pss->sockError= socketError(fd); pss->sockState= Invalid; pss->s= -1; - closesocket(fd); + pss->waitingToSend = false; + closesocket(fd); logTrace("acceptHandler: aborting server %d pss=%p\n", fd, pss); } else /* (flags & AIO_R) -- accept() is ready */ @@ -517,7 +520,8 @@ static void acceptHandler(sqInt fd, void *data, int flags) aioDisable(fd); closesocket(fd); pss->s= newSock; - aioEnable(newSock, pss, 0); + pss->waitingToSend = false; + aioEnable(newSock, pss, 0); } } } @@ -574,7 +578,25 @@ static void connectHandler(sqInt fd, void *data, int flags) } -/* read or write data transfer is now possible for the socket. */ +/* send data transfer is now possible for the socket. */ + +static void sendHandler(sqInt fd, void *data, int flags) +{ + privateSocketStruct *pss= (privateSocketStruct *)data; + logTrace("sendHandler(%d=%d, %p, %d)\n", fd, pss->s, data, flags); + + if (pss == NULL) + { + logTrace("sendHandler: pss is NULL fd=%d data=%p flags=0x%x\n", fd, data, flags); + return; + } + + pss->waitingToSend = false; + + notify(pss, WRITE_NOTIFY); +} + +/* read data transfer is now possible for the socket. */ static void dataHandler(sqInt fd, void *data, int flags) { @@ -614,8 +636,8 @@ static void dataHandler(sqInt fd, void *data, int flags) int n= recv(fd, (void *)buf, 1, MSG_OOB); if (n == 1) logTrace("socket: received OOB data: %02x\n", buf[0]); } - if (flags & AIO_R) notify(pss, READ_NOTIFY); - if (flags & AIO_W) notify(pss, WRITE_NOTIFY); + + if (flags & AIO_R) notify(pss, READ_NOTIFY); } @@ -634,6 +656,8 @@ static void closeHandler(sqInt fd, void *data, int flags) } pss->sockState= Unconnected; pss->s= -1; + pss->waitingToSend = false; + notify(pss, READ_NOTIFY | CONN_NOTIFY); } @@ -730,6 +754,7 @@ void sqSocketCreateNetTypeSocketTypeRecvBytesSendBytesSemaIDReadSemaIDWriteSemaI return; } pss->s= newSocket; + pss->waitingToSend = false; pss->connSema= semaIndex; pss->readSema= readSemaIndex; pss->writeSema= writeSemaIndex; @@ -785,6 +810,7 @@ void sqSocketCreateRawProtoTypeRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID(Soc return; } pss->s= newSocket; + pss->waitingToSend = false; pss->connSema= semaIndex; pss->readSema= readSemaIndex; pss->writeSema= writeSemaIndex; @@ -991,6 +1017,7 @@ void sqSocketAcceptFromRecvBytesSendBytesSemaIDReadSemaIDWriteSemaID(SocketPtr s _PSP(s)= pss; pss->s= PSP(serverSocket)->acceptedSock; + pss->waitingToSend = false; PSP(serverSocket)->acceptedSock= -1; SOCKETSTATE(serverSocket)= WaitingForConnection; aioHandle(SOCKET(serverSocket), acceptHandler, AIO_RX); @@ -1224,9 +1251,8 @@ sqInt sqSocketSendDone(SocketPtr s) if (!socketValid(s)) return false; - // If the socket is connected we just return true. Then the send/sendto might block, but we will use the event system if(SOCKETSTATE(s) == Connected) - return true; + return !SOCKET_WAITINGTOSEND(s); return false; } @@ -1319,30 +1345,34 @@ sqInt sqSocketSendDataBufCount(SocketPtr s, char *buf, sqInt bufSize) { /* --- TCP --- */ logTrace( "TCP sendData(%d, %ld)\n", SOCKET(s), bufSize); - if ((nsent= send(SOCKET(s), buf, bufSize, 0)) <= 0) - { - lastError = getLastSocketError(); - if ((nsent == -1) && (lastError == ERROR_WOULD_BLOCK)) - { - logTrace( "TCP sendData(%d, %ld) -> %d [blocked]", - SOCKET(s), bufSize, nsent); - return 0; - } - else - { - /* error: most likely "connection closed by peer" */ - SOCKETSTATE(s)= OtherEndClosed; - SOCKETERROR(s)= lastError; - logWarn("errno %d\n", lastError); - logWarnFromErrno("write"); + if ((nsent= send(SOCKET(s), buf, bufSize, 0)) <= 0){ + lastError = getLastSocketError(); + if ((nsent == -1) && (lastError == ERROR_WOULD_BLOCK)) + { + logTrace( "TCP sendData(%d, %ld) -> %d [blocked]", SOCKET(s), bufSize, nsent); + SOCKET_WAITINGTOSEND(s) = true; + aioHandle(SOCKET(s), sendHandler, AIO_WX); + return 0; + } + else + { + /* error: most likely "connection closed by peer" */ + SOCKETSTATE(s)= OtherEndClosed; + SOCKETERROR(s)= lastError; + SOCKET_WAITINGTOSEND(s) = false; - return 0; - } - } + logWarn("errno %d\n", lastError); + logWarnFromErrno("write"); + SOCKET_WAITINGTOSEND(s) = false; + + return 0; + } + } } - /* write completed synchronously */ - logTrace( "sendData(%d) done = %d\n", SOCKET(s), nsent); - return nsent; + /* write completed synchronously */ + logTrace( "sendData(%d) done = %d\n", SOCKET(s), nsent); + SOCKET_WAITINGTOSEND(s) = false; + return nsent; } @@ -1395,13 +1425,21 @@ sqInt sqSockettoHostportSendDataBufCount(SocketPtr s, sqInt address, sqInt port, saddr.sin_addr.s_addr= htonl(address); { int nsent= sendto(SOCKET(s), buf, bufSize, 0, (struct sockaddr *)&saddr, sizeof(saddr)); - if (nsent >= 0) - return nsent; + if (nsent >= 0){ + SOCKET_WAITINGTOSEND(s) = false; + return nsent; + } int lastError = getLastSocketError(); - if (lastError == ERROR_WOULD_BLOCK) /* asynchronous write in progress */ - return 0; + if (lastError == ERROR_WOULD_BLOCK) { + SOCKET_WAITINGTOSEND(s) = true; + aioHandle(SOCKET(s), sendHandler, AIO_WX); + + /* asynchronous write in progress */ + return 0; + } + logTrace( "UDP send failed\n"); SOCKETERROR(s)= lastError; }