diff --git a/src/Client/Connection.hpp b/src/Client/Connection.hpp index ef5cd35e3..d9626f0ba 100644 --- a/src/Client/Connection.hpp +++ b/src/Client/Connection.hpp @@ -230,8 +230,7 @@ class Connection template friend - enum DecodeStatus processResponse(Connection &conn, - Response *result); + enum DecodeStatus processResponse(Connection &conn, int req_sync, Response *result); template friend @@ -530,8 +529,7 @@ inputBufGC(Connection &conn) template DecodeStatus -processResponse(Connection &conn, - Response *result) +processResponse(Connection &conn, int req_sync, Response *result) { //Decode response. In case of success - fill in feature map //and adjust end-of-decoded data pointer. Call GC if needed. @@ -563,7 +561,7 @@ processResponse(Connection &conn, } LOG_DEBUG("Header: sync=", response.header.sync, ", code=", response.header.code, ", schema=", response.header.schema_id); - if (result != nullptr) { + if (result != nullptr && response.header.sync == req_sync) { *result = std::move(response); } else { conn.impl->futures.insert({response.header.sync, diff --git a/src/Client/Connector.hpp b/src/Client/Connector.hpp index b0fd96c8b..863bd289c 100644 --- a/src/Client/Connector.hpp +++ b/src/Client/Connector.hpp @@ -89,8 +89,24 @@ class Connector std::set> m_ReadyToSend; void close(Connection &conn); void close(ConnectionImpl &conn); + +private: + /** + * A helper to decode responses of a connection. + * Can be called when the connection is not ready to decode - it's just no-op. + * If `result` is not `nullptr`, it is used to return response for a request with + * `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored. + * Returns -1 in the case of any error, 0 on success. + */ + int connectionDecodeResponses(Connection &conn, int req_sync, Response *result); + private: NetProvider m_NetProvider; + /** + * Set of connections that are ready to decode. + * Shouldn't be modified directly - is managed by methods `readyToDecode` + * and `connectionDecodeResponses`. + */ std::set> m_ReadyToDecode; }; @@ -157,21 +173,35 @@ Connector::close(ConnectionImpl &conn) template int -connectionDecodeResponses(Connection &conn, - Response *result) +Connector::connectionDecodeResponses(Connection &conn, int req_sync, + Response *result) { + if (!hasDataToDecode(conn)) + return 0; + + /* Ready to decode connection must be in the corresponding set. */ + assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end()); + + int rc = 0; while (hasDataToDecode(conn)) { - DecodeStatus rc = processResponse(conn, result); - if (rc == DECODE_ERR) - return -1; + DecodeStatus status = processResponse(conn, req_sync, result); + if (status == DECODE_ERR) { + rc = -1; + break; + } //In case we've received only a part of response //we should wait until the rest arrives - otherwise //we can't properly decode response. */ - if (rc == DECODE_NEEDMORE) - return 0; - assert(rc == DECODE_SUCC); + if (status == DECODE_NEEDMORE) { + rc = 0; + break; + } + assert(status == DECODE_SUCC); } - return 0; + /* A connection that has no data to decode must not be left in the set. */ + if (!hasDataToDecode(conn)) + m_ReadyToDecode.erase(conn); + return rc; } template @@ -183,24 +213,27 @@ Connector::wait(Connection &conn, LOG_DEBUG("Waiting for the future ", future, " with timeout ", timeout); Timer timer{timeout}; timer.start(); - if (connectionDecodeResponses(conn, result) != 0) + static constexpr int INVALID_SYNC = -1; + int req_sync = static_cast(future); + if (result != NULL) + result->header.sync = INVALID_SYNC; + if (connectionDecodeResponses(conn, req_sync, result) != 0) return -1; + if (result != NULL && result->header.sync != INVALID_SYNC) { + LOG_DEBUG("Future ", future, " is ready and decoded"); + return 0; + } while (!conn.hasError() && !conn.futureIsReady(future)) { if (m_NetProvider.wait(timer.timeLeft()) != 0) { conn.setError(std::string("Failed to poll: ") + strerror(errno), errno); return -1; } - if (hasDataToDecode(conn)) { - assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end()); - if (connectionDecodeResponses(conn, result) != 0) - return -1; - /* - * In case we've handled whole data in input buffer - - * mark connection as completed. - */ - if (!hasDataToDecode(conn)) - m_ReadyToDecode.erase(conn); + if (connectionDecodeResponses(conn, req_sync, result) != 0) + return -1; + if (result != NULL && result->header.sync != INVALID_SYNC) { + LOG_DEBUG("Future ", future, " is ready and decoded"); + return 0; } if (timer.isExpired()) break; @@ -213,6 +246,8 @@ Connector::wait(Connection &conn, LOG_DEBUG("Connection has been timed out: future ", future, " is not ready"); return -1; + } else if (result != NULL) { + *result = std::move(conn.getResponse(future)); } LOG_DEBUG("Feature ", future, " is ready and decoded"); return 0; @@ -233,13 +268,8 @@ Connector::waitAll(Connection &conn, strerror(errno), errno); return -1; } - if (hasDataToDecode(conn)) { - assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end()); - if (connectionDecodeResponses(conn, static_cast*>(nullptr)) != 0) - return -1; - if (!hasDataToDecode(conn)) - m_ReadyToDecode.erase(conn); - } + if (connectionDecodeResponses(conn, 0, nullptr) != 0) + return -1; bool finish = true; for (size_t i = last_not_ready; i < futures.size(); ++i) { if (!conn.futureIsReady(futures[i])) { @@ -278,10 +308,8 @@ Connector::waitAny(int timeout) } Connection conn = *m_ReadyToDecode.begin(); assert(hasDataToDecode(conn)); - if (connectionDecodeResponses(conn, static_cast*>(nullptr)) != 0) + if (connectionDecodeResponses(conn, 0, nullptr) != 0) return std::nullopt; - if (!hasDataToDecode(conn)) - m_ReadyToDecode.erase(conn); return conn; } @@ -299,13 +327,8 @@ Connector::waitCount(Connection &conn, strerror(errno), errno); return -1; } - if (hasDataToDecode(conn)) { - assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end()); - if (connectionDecodeResponses(conn, static_cast*>(nullptr)) != 0) - return -1; - if (!hasDataToDecode(conn)) - m_ReadyToDecode.erase(conn); - } + if (connectionDecodeResponses(conn, 0, nullptr) != 0) + return -1; if ((conn.getFutureCount() - ready_futures) >= future_count) return 0; if (timer.isExpired()) diff --git a/test/ClientTest.cpp b/test/ClientTest.cpp index a3fde6599..b42ff747e 100644 --- a/test/ClientTest.cpp +++ b/test/ClientTest.cpp @@ -1239,6 +1239,91 @@ test_wait(Connector &client) response = conn.getResponse(f); fail_unless(response.has_value()); + TEST_CASE("waitAny after several waits (gh-124)"); + Connection conn1(client); + Connection conn2(client); + Connection conn3(client); + rc = test_connect(client, conn1, localhost, port); + fail_unless(rc == 0); + rc = test_connect(client, conn2, localhost, port); + fail_unless(rc == 0); + rc = test_connect(client, conn3, localhost, port); + fail_unless(rc == 0); + rid_t f1 = conn1.ping(); + rid_t f2 = conn2.ping(); + rid_t f3 = conn3.ping(); + + /* Wait for all connections. */ + fail_unless(client.wait(conn1, f1, WAIT_TIMEOUT) == 0); + fail_unless(conn1.futureIsReady(f1)); + fail_unless(conn1.getResponse(f1).header.code == 0); + + fail_unless(client.wait(conn2, f2, WAIT_TIMEOUT) == 0); + fail_unless(conn2.futureIsReady(f2)); + fail_unless(conn2.getResponse(f2).header.code == 0); + + fail_unless(client.wait(conn3, f3, WAIT_TIMEOUT) == 0); + fail_unless(conn3.futureIsReady(f3)); + fail_unless(conn3.getResponse(f3).header.code == 0); + + /* + * Wait any - we shouldn't get any of the connections here since we've + * received all the responses. + * Note that the connector used to crash here (gh-124) because some of the + * connnections still could appear in `m_ReadyToDecode` set. + */ + std::optional> conn_opt = client.waitAny(WAIT_TIMEOUT); + fail_if(conn_opt.has_value()); + + /* Close all connections used only by the case. */ + client.close(conn1); + client.close(conn2); + client.close(conn3); + + TEST_CASE("wait with argument result"); + f = conn.ping(); + fail_unless(!conn.futureIsReady(f)); + Response result; + fail_unless(client.wait(conn, f, WAIT_TIMEOUT, &result) == 0); + /* The result was consumed, so the future is not ready. */ + fail_unless(!conn.futureIsReady(f)); + /* The future is actually request sync - check if the result is valid. */ + fail_unless(result.header.sync == static_cast(f)); + fail_unless(result.header.code == 0); + + TEST_CASE("wait with argument result for decoded future"); + f = conn.ping(); + fail_unless(!conn.futureIsReady(f)); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0); + fail_unless(conn.futureIsReady(f)); + fail_unless(client.wait(conn, f, WAIT_TIMEOUT, &result) == 0); + /* The result was consumed, so the future is not ready. */ + fail_unless(!conn.futureIsReady(f)); + /* The future is actually request sync - check if the result is valid. */ + fail_unless(result.header.sync == static_cast(f)); + fail_unless(result.header.code == 0); + + TEST_CASE("wait with argument result - several requests"); + /* Obtain in direct order. */ + f1 = conn.ping(); + f2 = conn.ping(); + fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0); + fail_unless(result.header.sync == static_cast(f1)); + fail_unless(result.header.code == 0); + fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0); + fail_unless(result.header.sync == static_cast(f2)); + fail_unless(result.header.code == 0); + + /* Obtain in reversed order. */ + f1 = conn.ping(); + f2 = conn.ping(); + fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0); + fail_unless(result.header.sync == static_cast(f2)); + fail_unless(result.header.code == 0); + fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0); + fail_unless(result.header.sync == static_cast(f1)); + fail_unless(result.header.code == 0); + client.close(conn); }