Skip to content

Commit 1ad6d87

Browse files
client: encapsulate free functions on ConnectionImpl and Connection
Currently, we have a bunch of free functions on ConnectionImpl and Connection, such as `hasSentBytes`, `hasNotRecvBytes`, `hasDataToSend`, `hasDataToDecode`, etc. All of them have a `ConnectionImpl` or `Connection` as an argument. This is an anti-pattern, and these functions need to be encapsulated in the corresponding classes. Closes #152
1 parent a956eeb commit 1ad6d87

File tree

4 files changed

+146
-135
lines changed

4 files changed

+146
-135
lines changed

src/Client/Connection.hpp

Lines changed: 131 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ struct ConnectionImpl
9393
void prepare_auth(std::string_view user, std::string_view passwd);
9494
void commit_auth(std::string_view user, std::string_view passwd);
9595

96+
void hasSentBytes(size_t bytes);
97+
void hasNotRecvBytes(size_t bytes);
98+
bool hasDataToSend();
99+
bool hasDataToDecode();
100+
101+
DecodeStatus processResponse(int req_sync, Response<BUFFER> *result);
102+
int decodeGreeting();
103+
104+
void inputBufGC();
105+
96106
Connector<BUFFER, NetProvider> &connector;
97107
BUFFER inBuf;
98108
static constexpr size_t GC_STEP_CNT = 100;
@@ -199,6 +209,123 @@ ConnectionImpl<BUFFER, NetProvider>::commit_auth(std::string_view user, std::str
199209
connector.readyToSend(this);
200210
}
201211

212+
template <class BUFFER, class NetProvider>
213+
void
214+
ConnectionImpl<BUFFER, NetProvider>::hasSentBytes(size_t bytes)
215+
{
216+
// dropBack()/dropFront() interfaces require number of bytes be greater
217+
// than zero so let's check it first.
218+
if (bytes > 0)
219+
getOutBuf().dropFront(bytes);
220+
}
221+
222+
template <class BUFFER, class NetProvider>
223+
void
224+
ConnectionImpl<BUFFER, NetProvider>::hasNotRecvBytes(size_t bytes)
225+
{
226+
if (bytes > 0)
227+
getInBuf().dropBack(bytes);
228+
}
229+
230+
template <class BUFFER, class NetProvider>
231+
bool
232+
ConnectionImpl<BUFFER, NetProvider>::hasDataToSend()
233+
{
234+
// We drop content of input buffer once it has been sent. So to detect
235+
// if there's any data to send it's enough to check buffer's emptiness.
236+
return !getOutBuf().empty();
237+
}
238+
239+
template <class BUFFER, class NetProvider>
240+
bool
241+
ConnectionImpl<BUFFER, NetProvider>::hasDataToDecode()
242+
{
243+
assert(endDecoded < getInBuf().end() || endDecoded == getInBuf().end());
244+
return endDecoded != getInBuf().end();
245+
}
246+
247+
template <class BUFFER, class NetProvider>
248+
void
249+
ConnectionImpl<BUFFER, NetProvider>::inputBufGC()
250+
{
251+
if (gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
252+
LOG_DEBUG("Flushed input buffer of the connection %p", this);
253+
getInBuf().flush();
254+
}
255+
}
256+
257+
template <class BUFFER, class NetProvider>
258+
DecodeStatus
259+
ConnectionImpl<BUFFER, NetProvider>::processResponse(int req_sync, Response<BUFFER> *result)
260+
{
261+
// Decode response. In case of success - fill in feature map
262+
// and adjust end-of-decoded data pointer. Call GC if needed.
263+
if (!getInBuf().has(endDecoded, MP_RESPONSE_SIZE))
264+
return DECODE_NEEDMORE;
265+
266+
Response<BUFFER> response;
267+
response.size = dec.decodeResponseSize();
268+
if (response.size < 0) {
269+
LOG_ERROR("Failed to decode response size");
270+
// In case of corrupted response size all other data in the buffer
271+
// is likely to be decoded in the wrong way (since we don't
272+
// know how much bytes should be skipped). So let's simply
273+
// terminate here.
274+
std::abort();
275+
}
276+
response.size += MP_RESPONSE_SIZE;
277+
if (!getInBuf().has(endDecoded, response.size)) {
278+
// Response was received only partially. Reset decoder position
279+
// to the start of response to make this function re-entered.
280+
dec.reset(endDecoded);
281+
return DECODE_NEEDMORE;
282+
}
283+
if (dec.decodeResponse(response) != 0) {
284+
setError("Failed to decode response, skipping bytes..");
285+
endDecoded += response.size;
286+
return DECODE_ERR;
287+
}
288+
LOG_DEBUG("Header: sync=", response.header.sync, ", code=", response.header.code,
289+
", schema=", response.header.schema_id);
290+
if (result != nullptr && response.header.sync == req_sync) {
291+
*result = std::move(response);
292+
} else {
293+
futures.insert({response.header.sync, std::move(response)});
294+
}
295+
endDecoded += response.size;
296+
inputBufGC();
297+
return DECODE_SUCC;
298+
}
299+
300+
template <class BUFFER, class NetProvider>
301+
int
302+
ConnectionImpl<BUFFER, NetProvider>::decodeGreeting()
303+
{
304+
// TODO: that's not zero-copy, should be rewritten in that pattern.
305+
assert(getInBuf().has(endDecoded, Iproto::GREETING_SIZE));
306+
char greeting_buf[Iproto::GREETING_SIZE];
307+
endDecoded.read({greeting_buf, sizeof(greeting_buf)});
308+
dec.reset(endDecoded);
309+
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, greeting) != 0)
310+
return -1;
311+
is_greeting_received = true;
312+
LOG_DEBUG("Version: ", greeting.version_id);
313+
314+
#ifndef NDEBUG
315+
// print salt in hex format.
316+
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
317+
const char *hex = "0123456789abcdef";
318+
for (size_t i = 0; i < greeting.salt_size; i++) {
319+
uint8_t u = greeting.salt[i];
320+
hex_salt[i * 2] = hex[u / 16];
321+
hex_salt[i * 2 + 1] = hex[u % 16];
322+
}
323+
hex_salt[greeting.salt_size * 2] = 0;
324+
LOG_DEBUG("Salt: ", hex_salt);
325+
#endif
326+
return 0;
327+
}
328+
202329
/** Each connection is supposed to be bound to a single socket. */
203330
template<class BUFFER, class NetProvider>
204331
class Connection
@@ -240,6 +367,8 @@ class Connection
240367
void flush();
241368
size_t getFutureCount() const;
242369

370+
bool hasDataToDecode();
371+
243372
template <class T>
244373
rid_t call(const std::string &func, const T &args);
245374
rid_t ping();
@@ -514,129 +643,11 @@ Connection<BUFFER, NetProvider>::getOutBuf()
514643
return impl->getOutBuf();
515644
}
516645

517-
template <class BUFFER, class NetProvider>
518-
void
519-
hasSentBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
520-
{
521-
//dropBack()/dropFront() interfaces require number of bytes be greater
522-
//than zero so let's check it first.
523-
if (bytes > 0)
524-
conn->getOutBuf().dropFront(bytes);
525-
}
526-
527-
template <class BUFFER, class NetProvider>
528-
void
529-
hasNotRecvBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
530-
{
531-
if (bytes > 0)
532-
conn->getInBuf().dropBack(bytes);
533-
}
534-
535646
template <class BUFFER, class NetProvider>
536647
bool
537-
hasDataToSend(ConnectionImpl<BUFFER, NetProvider> *conn)
648+
Connection<BUFFER, NetProvider>::hasDataToDecode()
538649
{
539-
//We drop content of input buffer once it has been sent. So to detect
540-
//if there's any data to send it's enough to check buffer's emptiness.
541-
return !conn->getOutBuf().empty();
542-
}
543-
544-
template <class BUFFER, class NetProvider>
545-
bool
546-
hasDataToDecode(Connection<BUFFER, NetProvider> &conn)
547-
{
548-
return hasDataToDecode(conn.getImpl());
549-
}
550-
551-
template <class BUFFER, class NetProvider>
552-
bool
553-
hasDataToDecode(ConnectionImpl<BUFFER, NetProvider> *conn)
554-
{
555-
assert(conn->endDecoded < conn->getInBuf().end() || conn->endDecoded == conn->getInBuf().end());
556-
return conn->endDecoded != conn->getInBuf().end();
557-
}
558-
559-
template <class BUFFER, class NetProvider>
560-
static void
561-
inputBufGC(ConnectionImpl<BUFFER, NetProvider> *conn)
562-
{
563-
if (conn->gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
564-
LOG_DEBUG("Flushed input buffer of the connection %p", conn);
565-
conn->getInBuf().flush();
566-
}
567-
}
568-
569-
template <class BUFFER, class NetProvider>
570-
DecodeStatus
571-
processResponse(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync, Response<BUFFER> *result)
572-
{
573-
//Decode response. In case of success - fill in feature map
574-
//and adjust end-of-decoded data pointer. Call GC if needed.
575-
if (!conn->getInBuf().has(conn->endDecoded, MP_RESPONSE_SIZE))
576-
return DECODE_NEEDMORE;
577-
578-
Response<BUFFER> response;
579-
response.size = conn->dec.decodeResponseSize();
580-
if (response.size < 0) {
581-
LOG_ERROR("Failed to decode response size");
582-
//In case of corrupted response size all other data in the buffer
583-
//is likely to be decoded in the wrong way (since we don't
584-
// know how much bytes should be skipped). So let's simply
585-
//terminate here.
586-
std::abort();
587-
588-
}
589-
response.size += MP_RESPONSE_SIZE;
590-
if (!conn->getInBuf().has(conn->endDecoded, response.size)) {
591-
//Response was received only partially. Reset decoder position
592-
//to the start of response to make this function re-entered.
593-
conn->dec.reset(conn->endDecoded);
594-
return DECODE_NEEDMORE;
595-
}
596-
if (conn->dec.decodeResponse(response) != 0) {
597-
conn->setError("Failed to decode response, skipping bytes..");
598-
conn->endDecoded += response.size;
599-
return DECODE_ERR;
600-
}
601-
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
602-
response.header.code, ", schema=", response.header.schema_id);
603-
if (result != nullptr && response.header.sync == req_sync) {
604-
*result = std::move(response);
605-
} else {
606-
conn->futures.insert({response.header.sync, std::move(response)});
607-
}
608-
conn->endDecoded += response.size;
609-
inputBufGC(conn);
610-
return DECODE_SUCC;
611-
}
612-
613-
template <class BUFFER, class NetProvider>
614-
int
615-
decodeGreeting(ConnectionImpl<BUFFER, NetProvider> *conn)
616-
{
617-
//TODO: that's not zero-copy, should be rewritten in that pattern.
618-
assert(conn->getInBuf().has(conn->endDecoded, Iproto::GREETING_SIZE));
619-
char greeting_buf[Iproto::GREETING_SIZE];
620-
conn->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
621-
conn->dec.reset(conn->endDecoded);
622-
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, conn->greeting) != 0)
623-
return -1;
624-
conn->is_greeting_received = true;
625-
LOG_DEBUG("Version: ", conn->greeting.version_id);
626-
627-
#ifndef NDEBUG
628-
//print salt in hex format.
629-
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
630-
const char *hex = "0123456789abcdef";
631-
for (size_t i = 0; i < conn->greeting.salt_size; i++) {
632-
uint8_t u = conn->greeting.salt[i];
633-
hex_salt[i * 2] = hex[u / 16];
634-
hex_salt[i * 2 + 1] = hex[u % 16];
635-
}
636-
hex_salt[conn->greeting.salt_size * 2] = 0;
637-
LOG_DEBUG("Salt: ", hex_salt);
638-
#endif
639-
return 0;
650+
return hasDataToDecode(getImpl());
640651
}
641652

642653
////////////////////////////BOX-like interface functions////////////////////////

src/Client/Connector.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,15 +206,15 @@ int
206206
Connector<BUFFER, NetProvider>::connectionDecodeResponses(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync,
207207
Response<BUFFER> *result)
208208
{
209-
if (!hasDataToDecode(conn))
209+
if (!conn->hasDataToDecode())
210210
return 0;
211211

212212
/* Ready to decode connection must be in the corresponding set. */
213213
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
214214

215215
int rc = 0;
216-
while (hasDataToDecode(conn)) {
217-
DecodeStatus status = processResponse(conn, req_sync, result);
216+
while (conn->hasDataToDecode()) {
217+
DecodeStatus status = conn->processResponse(req_sync, result);
218218
if (status == DECODE_ERR) {
219219
rc = -1;
220220
break;
@@ -229,7 +229,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(ConnectionImpl<BUFFER,
229229
assert(status == DECODE_SUCC);
230230
}
231231
/* A connection that has no data to decode must not be left in the set. */
232-
if (!hasDataToDecode(conn))
232+
if (!conn->hasDataToDecode())
233233
m_ReadyToDecode.erase(conn);
234234
return rc;
235235
}
@@ -396,7 +396,7 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
396396
return std::nullopt;
397397
}
398398
auto *conn = *m_ReadyToDecode.begin();
399-
assert(hasDataToDecode(conn));
399+
assert(conn->hasDataToDecode());
400400
if (connectionDecodeResponses(conn) != 0)
401401
return std::nullopt;
402402
return conn;

src/Client/EpollNetProvider.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ EpollNetProvider<BUFFER, Stream>::recv(ConnImpl_t *conn)
189189
size_t iov_cnt = buf.getIOV(itr, iov, IOVEC_MAX_SIZE);
190190

191191
ssize_t rcvd = conn->get_strm().recv(iov, iov_cnt);
192-
hasNotRecvBytes(conn, CONN_READAHEAD - (rcvd < 0 ? 0 : rcvd));
192+
conn->hasNotRecvBytes(CONN_READAHEAD - (rcvd < 0 ? 0 : rcvd));
193193
if (rcvd < 0) {
194194
conn->setError(std::string("Failed to receive response: ") + strerror(errno), errno);
195195
return -1;
@@ -206,7 +206,7 @@ EpollNetProvider<BUFFER, Stream>::recv(ConnImpl_t *conn)
206206
return 0;
207207
/* Receive and decode greetings. */
208208
LOG_DEBUG("Greetings are received, read bytes ", rcvd);
209-
if (decodeGreeting(conn) != 0) {
209+
if (conn->decodeGreeting() != 0) {
210210
conn->setError("Failed to decode greetings");
211211
return -1;
212212
}
@@ -225,7 +225,7 @@ template <class BUFFER, class Stream>
225225
int
226226
EpollNetProvider<BUFFER, Stream>::send(ConnImpl_t *conn)
227227
{
228-
while (hasDataToSend(conn)) {
228+
while (conn->hasDataToSend()) {
229229
struct iovec iov[IOVEC_MAX_SIZE];
230230
auto &buf = conn->getOutBuf();
231231
size_t iov_cnt = buf.getIOV(buf.template begin<true>(),
@@ -241,7 +241,7 @@ EpollNetProvider<BUFFER, Stream>::send(ConnImpl_t *conn)
241241
setPollSetting(conn, EPOLLIN | EPOLLOUT);
242242
return 1;
243243
} else {
244-
hasSentBytes(conn, sent);
244+
conn->hasSentBytes(sent);
245245
}
246246
}
247247
/* All data from connection has been successfully written. */
@@ -289,7 +289,7 @@ EpollNetProvider<BUFFER, Stream>::wait(int timeout)
289289
int rc = recv(conn);
290290
if (rc < 0)
291291
return -1;
292-
if (hasDataToDecode(conn))
292+
if (conn->hasDataToDecode())
293293
m_Connector.readyToDecode(conn);
294294
}
295295

0 commit comments

Comments
 (0)