Skip to content

Commit 995d5ec

Browse files
committed
client: return requested future with result argument of wait
Currently, the argument returns any decoded future - that is inconvenient and completely unusable. Let's return only the requested future, or nothing, if it's not ready. Along the way, reformat argument list of modified functions to make them conform clang-format. Closes #112
1 parent aeb43ee commit 995d5ec

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

src/Client/Connection.hpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ class Connection
230230

231231
template<class B, class N>
232232
friend
233-
enum DecodeStatus processResponse(Connection<B, N> &conn,
234-
Response<B> *result);
233+
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);
235234

236235
template<class B, class N>
237236
friend
@@ -530,8 +529,7 @@ inputBufGC(Connection<BUFFER, NetProvider> &conn)
530529

531530
template<class BUFFER, class NetProvider>
532531
DecodeStatus
533-
processResponse(Connection<BUFFER, NetProvider> &conn,
534-
Response<BUFFER> *result)
532+
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
535533
{
536534
//Decode response. In case of success - fill in feature map
537535
//and adjust end-of-decoded data pointer. Call GC if needed.
@@ -563,7 +561,7 @@ processResponse(Connection<BUFFER, NetProvider> &conn,
563561
}
564562
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
565563
response.header.code, ", schema=", response.header.schema_id);
566-
if (result != nullptr) {
564+
if (result != nullptr && response.header.sync == req_sync) {
567565
*result = std::move(response);
568566
} else {
569567
conn.impl->futures.insert({response.header.sync,

src/Client/Connector.hpp

+11-9
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ class Connector
9494
/**
9595
* A helper to decode responses of a connection.
9696
* Can be called when the connection is not ready to decode - it's just no-op.
97+
* If `result` is not `nullptr`, it is used to return response for a request with
98+
* `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored.
9799
* Returns -1 in the case of any error, 0 on success.
98100
*/
99-
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
100-
Response<BUFFER> *result);
101+
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result);
101102

102103
private:
103104
NetProvider m_NetProvider;
@@ -172,7 +173,7 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
172173

173174
template<class BUFFER, class NetProvider>
174175
int
175-
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
176+
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync,
176177
Response<BUFFER> *result)
177178
{
178179
if (!hasDataToDecode(conn))
@@ -183,7 +184,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
183184

184185
int rc = 0;
185186
while (hasDataToDecode(conn)) {
186-
DecodeStatus status = processResponse(conn, result);
187+
DecodeStatus status = processResponse(conn, req_sync, result);
187188
if (status == DECODE_ERR) {
188189
rc = -1;
189190
break;
@@ -213,9 +214,10 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
213214
Timer timer{timeout};
214215
timer.start();
215216
static constexpr int INVALID_SYNC = -1;
217+
int req_sync = static_cast<int>(future);
216218
if (result != NULL)
217219
result->header.sync = INVALID_SYNC;
218-
if (connectionDecodeResponses(conn, result) != 0)
220+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
219221
return -1;
220222
if (result != NULL && result->header.sync != INVALID_SYNC) {
221223
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -227,7 +229,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
227229
strerror(errno), errno);
228230
return -1;
229231
}
230-
if (connectionDecodeResponses(conn, result) != 0)
232+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
231233
return -1;
232234
if (result != NULL && result->header.sync != INVALID_SYNC) {
233235
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -266,7 +268,7 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
266268
strerror(errno), errno);
267269
return -1;
268270
}
269-
if (connectionDecodeResponses(conn, nullptr) != 0)
271+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
270272
return -1;
271273
bool finish = true;
272274
for (size_t i = last_not_ready; i < futures.size(); ++i) {
@@ -306,7 +308,7 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
306308
}
307309
Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin();
308310
assert(hasDataToDecode(conn));
309-
if (connectionDecodeResponses(conn, nullptr) != 0)
311+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
310312
return std::nullopt;
311313
return conn;
312314
}
@@ -325,7 +327,7 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
325327
strerror(errno), errno);
326328
return -1;
327329
}
328-
if (connectionDecodeResponses(conn, nullptr) != 0)
330+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
329331
return -1;
330332
if ((conn.getFutureCount() - ready_futures) >= future_count)
331333
return 0;

test/ClientTest.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,27 @@ test_wait(Connector<BUFFER, NetProvider> &client)
13031303
fail_unless(result.header.sync == static_cast<int>(f));
13041304
fail_unless(result.header.code == 0);
13051305

1306+
TEST_CASE("wait with argument result - several requests");
1307+
/* Obtain in direct order. */
1308+
f1 = conn.ping();
1309+
f2 = conn.ping();
1310+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1311+
fail_unless(result.header.sync == static_cast<int>(f1));
1312+
fail_unless(result.header.code == 0);
1313+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1314+
fail_unless(result.header.sync == static_cast<int>(f2));
1315+
fail_unless(result.header.code == 0);
1316+
1317+
/* Obtain in reversed order. */
1318+
f1 = conn.ping();
1319+
f2 = conn.ping();
1320+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1321+
fail_unless(result.header.sync == static_cast<int>(f2));
1322+
fail_unless(result.header.code == 0);
1323+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1324+
fail_unless(result.header.sync == static_cast<int>(f1));
1325+
fail_unless(result.header.code == 0);
1326+
13061327
client.close(conn);
13071328
}
13081329

0 commit comments

Comments
 (0)