Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RONDB-863: RDRS1 Optimizations. Remove base64 from C layer and multi … #652

Open
wants to merge 4 commits into
base: 22.10-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022, 2023 Hopsworks AB
* Copyright (C) 2022, 2023, 2025 Hopsworks AB
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
Expand All @@ -18,6 +18,7 @@
*/

#include "src/db-operations/pk/common.hpp"
#include <cstring>
#include <decimal_utils.hpp>
#include <NdbError.hpp>
#include <my_time.h>
Expand Down Expand Up @@ -835,13 +836,8 @@ RS_Status WriteColToRespBuff(std::shared_ptr<ColRec> colRec, PKRResponse *respon
return RS_CLIENT_ERROR(ERROR_019);
} else {
require(attrBytes <= MAX_TUPLE_SIZE_IN_BYTES);
char buffer[MAX_TUPLE_SIZE_IN_BYTES_ENCODED];

size_t outlen = 0;
base64_encode(dataStart, attrBytes, (char *)&buffer[0], &outlen, 0);

return response->Append_string(colRec->ndbRec->getColumn()->getName(),
std::string(buffer, outlen), RDRS_BINARY_DATATYPE);
return response->Append_bin(colRec->ndbRec->getColumn()->getName(),
dataStart, attrBytes, RDRS_BINARY_DATATYPE);
}
}
case NdbDictionary::Column::Datetime: {
Expand All @@ -861,35 +857,26 @@ RS_Status WriteColToRespBuff(std::shared_ptr<ColRec> colRec, PKRResponse *respon
case NdbDictionary::Column::Blob: {
///< Binary large object (see NdbBlob)
/// Treat it as binary data
Uint64 length = 0;
if (colRec->blob->getLength(length) == -1) {
Uint32 resp_buff_col_no = -1;
Uint64 col_length = 0;
if (colRec->blob->getLength(col_length) == -1) {
return RS_SERVER_ERROR(ERROR_037 + std::string(" Reading column length failed.") +
std::string(" Column: ") + std::string(col->getName()) +
" Type: " + std::to_string(col->getType()));
}

// check for max length
// (4 * ceil(input_size / 3))
const size_t maxEncodedSize = length / 3 + (length % 3 != 0) * 4;
if (response->GetRemainingCapacity() < maxEncodedSize) {
return RS_SERVER_ERROR(ERROR_016 + std::string(" Buffer Remaining Capacity: ") +
std::to_string(response->GetRemainingCapacity()) +
" Required: " + std::to_string(maxEncodedSize));
}

Uint64 chunk = 0;
Uint64 total_read = 0;
char buffer[BLOB_MAX_FETCH_SIZE];

struct base64_state state;
size_t encodeOutlen = 0;
base64_stream_encode_init(&state, 0);

for (chunk = 0; chunk < (length / (BLOB_MAX_FETCH_SIZE)) + 1; chunk++) {
for (chunk = 0; chunk < (col_length / (BLOB_MAX_FETCH_SIZE)) + 1; chunk++) {
Uint64 pos = chunk * BLOB_MAX_FETCH_SIZE;
Uint32 bytes = BLOB_MAX_FETCH_SIZE; // NOTE this is bytes to read and also bytes read.
if (pos + bytes > length) {
bytes = length - pos;
if (pos + bytes > col_length) {
bytes = col_length - pos;
}

if (bytes != 0) {
Expand All @@ -911,32 +898,26 @@ RS_Status WriteColToRespBuff(std::shared_ptr<ColRec> colRec, PKRResponse *respon
if (bytes > 0) {
total_read += bytes;
if (chunk == 0) {
response->Append_string(colRec->ndbRec->getColumn()->getName(), "",
response->Append_bin(colRec->ndbRec->getColumn()->getName(), buffer, 0,
RDRS_BINARY_DATATYPE);
// This adds a column to the response buffer. Right now the last byte of the
// response buffer is '\0'. Remove the last byte and start appending the
// base64 data
response->AdvanceWritePointer(-1);
// This adds a column to the response buffer.
resp_buff_col_no = response->GetCurrentColNumber();
}

base64_stream_encode(&state, (const char *)buffer, bytes,
(char *)response->GetWritePointer(), &encodeOutlen);
response->AdvanceWritePointer(encodeOutlen);
memcpy((void *)response->GetWritePointer(), (void *)buffer, bytes);
response->AdvanceWritePointer(bytes);
}
}
}

if (total_read != length) {
if (total_read != col_length) {
return RS_RONDB_SERVER_ERROR(colRec->blob->getNdbError(),
ERROR_037 + std::string(" Not all of the data was read.") +
std::string(" Column: ") + std::string(col->getName()) +
" Expected to read: " + std::to_string(length) +
" Expected to read: " + std::to_string(col_length) +
" bytes. Read: " + std::to_string(total_read));
}
base64_stream_encode_final(&state, (char *)response->GetWritePointer(), &encodeOutlen);
response->AdvanceWritePointer(encodeOutlen);
(response->GetResponseBuffer())[response->GetWriteHeader()] = '\0';
response->AdvanceWritePointer(1);
response->SetColumnLength(resp_buff_col_no, col_length);

return RS_OK;
}
Expand Down Expand Up @@ -1017,14 +998,8 @@ RS_Status WriteColToRespBuff(std::shared_ptr<ColRec> colRec, PKRResponse *respon
for (int j = words - 1; j >= 0; j--) {
reversed[i++] = colRec->ndbRec->aRef()[j];
}

char buffer[BIT_MAX_SIZE_IN_BYTES_ENCODED];

size_t outlen = 0;
base64_encode(reversed, words, (char *)&buffer[0], &outlen, 0);

return response->Append_string(colRec->ndbRec->getColumn()->getName(),
std::string(buffer, outlen), RDRS_BIT_DATATYPE);
return response->Append_bin(colRec->ndbRec->getColumn()->getName(),
reversed, words, RDRS_BIT_DATATYPE);
}
case NdbDictionary::Column::Time: {
///< Time without date
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022 Hopsworks AB
* Copyright (C) 2022, 2025 Hopsworks AB
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -29,6 +29,7 @@
#include "NdbOperation.hpp"
#include "NdbRecAttr.hpp"
#include "NdbTransaction.hpp"
#include "my_compiler.h"
#include "src/db-operations/pk/common.hpp"
#include "src/db-operations/pk/pkr-request.hpp"
#include "src/db-operations/pk/pkr-response.hpp"
Expand All @@ -47,6 +48,7 @@ PKROperation::PKROperation(RS_Buffer *reqBuff, RS_Buffer *respBuff, Ndb *ndbObje
pkOpTuple.tableDict = nullptr;
pkOpTuple.primaryKeysCols = nullptr;
pkOpTuple.primaryKeySizes = nullptr;
pkOpTuple.transaction = nullptr;

this->subOpTuples.push_back(pkOpTuple);

Expand All @@ -68,9 +70,14 @@ PKROperation::PKROperation(Uint32 noOps, RS_Buffer *reqBuffs, RS_Buffer *respBuf
this->subOpTuples.push_back(pkOpTuple);
}

this->ndbObject = ndbObject;
this->noOps = noOps;
this->isBatch = true;
this->ndbObject = ndbObject;
this->noOps = noOps;
this->isBatch = true;

#ifdef MULTI_TX_BATCH
this->singleTransaction = false;
this->numOpsSent = 0;
#endif
}

PKROperation::~PKROperation() {
Expand Down Expand Up @@ -110,11 +117,35 @@ PKROperation::~PKROperation() {
*/

RS_Status PKROperation::SetupTransaction() {
const NdbDictionary::Table *table_dict = subOpTuples[0].tableDict;
transaction = ndbObject->startTransaction(table_dict);
if (transaction == nullptr) {
return RS_RONDB_SERVER_ERROR(ndbObject->getNdbError(), ERROR_005);
#ifdef MULTI_TX_BATCH
if (unlikely(singleTransaction)) {
#endif

SubOpTuple *subOp = &subOpTuples[0];
subOp->transaction = ndbObject->startTransaction(subOp->tableDict);
if (unlikely(subOp->transaction == nullptr)) {
return RS_RONDB_SERVER_ERROR(ndbObject->getNdbError(), ERROR_005);
}

#ifdef MULTI_TX_BATCH
} else {

for (size_t i = 0; i < noOps; i++) {
PKRRequest *req = subOpTuples[i].pkRequest;
if (req->IsInvalidOp()) {
// this sub-operation was previously marked invalid.
continue;
}

const NdbDictionary::Table *table_dict = subOpTuples[i].tableDict;
subOpTuples[i].transaction = ndbObject->startTransaction(table_dict);
if (subOpTuples[i].transaction == nullptr) {
return RS_RONDB_SERVER_ERROR(ndbObject->getNdbError(), ERROR_005);
}
}
}
#endif

return RS_OK;
}

Expand All @@ -125,15 +156,27 @@ RS_Status PKROperation::SetupTransaction() {
*/
RS_Status PKROperation::SetupReadOperation() {

#ifdef MULTI_TX_BATCH
numOpsSent = 0;
#endif

size_t opIdx = 0;
start:
for (size_t opIdx = 0; opIdx < noOps; opIdx++) {
for (; opIdx < noOps; opIdx++) {
if (subOpTuples[opIdx].pkRequest->IsInvalidOp()) { // this sub operation can not be processed
continue;
}

PKRRequest *req = subOpTuples[opIdx].pkRequest;
const NdbDictionary::Table *tableDict = subOpTuples[opIdx].tableDict;
std::vector<std::shared_ptr<ColRec>> *recs = &subOpTuples[opIdx].recs;
NdbTransaction *transaction = subOpTuples[0].transaction;

#ifdef MULTI_TX_BATCH
if (likely(!singleTransaction)) {
transaction = subOpTuples[opIdx].transaction;
}
#endif

// cleaned by destrctor
Int8 **primaryKeysCols = (Int8 **)malloc(req->PKColumnsCount() * sizeof(Int8 *));
Expand All @@ -149,6 +192,7 @@ RS_Status PKROperation::SetupReadOperation() {
if (status.http_code != SUCCESS) {
if (isBatch) {
subOpTuples[opIdx].pkRequest->MarkInvalidOp(status);
opIdx++;
goto start;
} else {
return status;
Expand Down Expand Up @@ -194,6 +238,14 @@ RS_Status PKROperation::SetupReadOperation() {
it++;
}
}

#ifdef MULTI_TX_BATCH
if (likely(!singleTransaction)) {
transaction->executeAsynchPrepare(NdbTransaction::Commit, nullptr, nullptr);
}
numOpsSent++;
#endif

}

return RS_OK;
Expand Down Expand Up @@ -225,9 +277,22 @@ RS_Status PKROperation::GetColValue(const NdbDictionary::Table *tableDict,
}

RS_Status PKROperation::Execute() {
if (transaction->execute(NdbTransaction::NoCommit) != 0) {
return RS_RONDB_SERVER_ERROR(transaction->getNdbError(), ERROR_009);
#ifdef MULTI_TX_BATCH
if (unlikely(singleTransaction)) {
#endif

NdbTransaction *trans = subOpTuples[0].transaction;
if (unlikely(trans->execute(NdbTransaction::NoCommit) != 0)) {
return RS_RONDB_SERVER_ERROR(trans->getNdbError(), ERROR_009);
}

#ifdef MULTI_TX_BATCH
} else {
if (ndbObject->sendPollNdb(WAITFOR_RESPONSE_TIMEOUT, numOpsSent) < numOpsSent) {
return RS_RONDB_SERVER_ERROR(ndbObject->getNdbError(), ERROR_009);
}
}
#endif

return RS_OK;
}
Expand Down Expand Up @@ -419,6 +484,17 @@ RS_Status PKROperation::ValidateRequest() {

return error;
}

#ifdef MULTI_TX_BATCH
const NdbDictionary::Table *table_dict = subOpTuples[i].tableDict;
if (table_dict->getColumn(req->ReadColumnName(i))->getType() ==
NdbDictionary::Column::Blob ||
table_dict->getColumn(req->ReadColumnName(i))->getType() ==
NdbDictionary::Column::Text) {
singleTransaction = true;
}
#endif

}
}
}
Expand All @@ -427,7 +503,23 @@ RS_Status PKROperation::ValidateRequest() {
}

void PKROperation::CloseTransaction() {
ndbObject->closeTransaction(transaction);
#ifdef MULTI_TX_BATCH
if (unlikely(singleTransaction)) {
#endif

if (subOpTuples[0].transaction != nullptr) {
ndbObject->closeTransaction(subOpTuples[0].transaction);
}

#ifdef MULTI_TX_BATCH
} else {
for (size_t i = 0; i < noOps; i++) {
if (subOpTuples[i].transaction != nullptr){
ndbObject->closeTransaction(subOpTuples[i].transaction);
}
}
}
#endif
}

RS_Status PKROperation::PerformOperation() {
Expand Down Expand Up @@ -472,12 +564,15 @@ RS_Status PKROperation::PerformOperation() {
}

RS_Status PKROperation::Abort() {
if (transaction != nullptr) {
NdbTransaction::CommitStatusType status = transaction->commitStatus();
if (status == NdbTransaction::CommitStatusType::Started) {
transaction->execute(NdbTransaction::Rollback);
for (size_t i = 0; i < noOps; i++) {
NdbTransaction *transaction = subOpTuples[i].transaction;
if (transaction != nullptr) {
NdbTransaction::CommitStatusType status = transaction->commitStatus();
if (status == NdbTransaction::CommitStatusType::Started) {
transaction->execute(NdbTransaction::Rollback);
}
ndbObject->closeTransaction(transaction);
}
ndbObject->closeTransaction(transaction);
}

return RS_OK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
#include "src/db-operations/pk/common.hpp"
#include "src/rdrs-dal.h"

//#define MULTI_TX_BATCH

typedef struct SubOpTuple {
PKRRequest *pkRequest;
PKRResponse *pkResponse;
NdbTransaction *transaction;
NdbOperation *ndbOperation;
const NdbDictionary::Table *tableDict;
std::vector<std::shared_ptr<ColRec>> recs;
Expand All @@ -45,9 +48,14 @@ typedef struct SubOpTuple {
class PKROperation {
private:
Uint32 noOps;
NdbTransaction *transaction = nullptr;
Ndb *ndbObject = nullptr;
bool isBatch = false;
Ndb *ndbObject = nullptr;
bool isBatch = false;

#ifdef MULTI_TX_BATCH
int numOpsSent = 0;
bool singleTransaction = false;
#endif

std::vector<SubOpTuple> subOpTuples;

public:
Expand Down
Loading