Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Parsing server URLs with an additional path, [PR-102](https://github.com/reductstore/reduct-cpp/pull/102)
- Fix Multi-entry API implementation,[PR-108](https://github.com/reductstore/reduct-cpp/pull/108)

## [1.17.1] - 2025-11-17

Expand Down
198 changes: 124 additions & 74 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 ReductSoftware UG
// Copyright 2022-2026 ReductSoftware UG

#include "reduct/bucket.h"
#define FMT_HEADER_ONLY 1
Expand All @@ -23,6 +23,7 @@

#include "reduct/internal/batch_v1.h"
#include "reduct/internal/batch_v2.h"
#include "reduct/internal/headers.h"
#include "reduct/internal/http_client.h"
#include "reduct/internal/serialisation.h"

Expand Down Expand Up @@ -165,15 +166,27 @@ class Bucket : public IBucket {
}

Result<BatchErrors> WriteBatch(std::string_view entry_name, BatchCallback callback) const noexcept override {
return ProcessBatch(entry_name, std::move(callback), BatchType::kWrite);
return ProcessBatchV1(entry_name, std::move(callback), BatchType::kWrite);
}

Result<BatchRecordErrors> WriteBatch(BatchCallback callback) const noexcept override {
return ProcessBatchV2(std::move(callback), BatchType::kWrite);
}

Result<BatchErrors> UpdateBatch(std::string_view entry_name, BatchCallback callback) const noexcept override {
return ProcessBatch(entry_name, std::move(callback), BatchType::kUpdate);
return ProcessBatchV1(entry_name, std::move(callback), BatchType::kUpdate);
}

Result<BatchRecordErrors> UpdateBatch(BatchCallback callback) const noexcept override {
return ProcessBatchV2(std::move(callback), BatchType::kUpdate);
}

Result<BatchErrors> RemoveBatch(std::string_view entry_name, BatchCallback callback) const noexcept override {
return ProcessBatch(entry_name, std::move(callback), BatchType::kRemove);
return ProcessBatchV1(entry_name, std::move(callback), BatchType::kRemove);
}

Result<BatchRecordErrors> RemoveBatch(BatchCallback callback) const noexcept override {
return ProcessBatchV2(std::move(callback), BatchType::kRemove);
}

Error Update(std::string_view entry_name, const WriteOptions& options) const noexcept override {
Expand Down Expand Up @@ -209,16 +222,30 @@ class Bucket : public IBucket {
Error Query(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop, QueryOptions options,
ReadRecordCallback callback) const noexcept override {
if (SupportsBatchProtocolV2()) {
return QueryV2(entry_name, start, stop, options, callback);
const auto entries = std::vector{std::string(entry_name)};
return QueryV2(entries, start, stop, options, callback);
}

return QueryV1(entry_name, start, stop, options, callback);
}

Error Query(const std::vector<std::string>& entry_names, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options, ReadRecordCallback callback) const noexcept override {
if (!SupportsBatchProtocolV2()) {
return Error{.code = -1, .message = "Batch protocol v2 not supported"};
}

if (entry_names.empty()) {
return Error{.code = -1, .message = "No entry names provided"};
}

return QueryV2(entry_names, start, stop, options, callback);
}

Error QueryV1(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop, QueryOptions options,
const ReadRecordCallback& callback) const {
std::string body;
auto [json_payload, json_err] = QueryOptionsToJsonString("QUERY", start, stop, options);
auto [json_payload, json_err] = QueryOptionsToJsonString("QUERY", {}, start, stop, options);
if (json_err) {
return json_err;
}
Expand Down Expand Up @@ -262,21 +289,12 @@ class Bucket : public IBucket {
return Error::kOk;
}

Error QueryV2(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop, QueryOptions options,
const ReadRecordCallback& callback) const {
auto [json_payload, json_err] = QueryOptionsToJsonString("QUERY", start, stop, options);
Error QueryV2(const std::vector<std::string>& entries, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options, const ReadRecordCallback& callback) const {
auto [json_payload, json_err] = QueryOptionsToJsonString("QUERY", entries, start, stop, options);
if (json_err) {
return json_err;
}
auto entries = ParseEntryList(entry_name);
if (entries.empty()) {
return Error{.code = 400, .message = "Entry name is required"};
}

json_payload["entries"] = nlohmann::ordered_json::array();
for (const auto& entry : entries) {
json_payload["entries"].push_back(entry);
}

auto [resp, resp_err] = client_->PostWithResponse(fmt::format("{}/q", io_path_), json_payload.dump());
if (resp_err) {
Expand Down Expand Up @@ -317,16 +335,30 @@ class Bucket : public IBucket {
Result<uint64_t> RemoveQuery(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options) const noexcept override {
if (SupportsBatchProtocolV2()) {
return RemoveQueryV2(entry_name, start, stop, options);
const auto entries = std::vector<std::string>{std::string(entry_name)};
return RemoveQueryV2(entries, start, stop, options);
}

return RemoveQueryV1(entry_name, start, stop, options);
}

Result<uint64_t> RemoveQuery(std::vector<std::string> entry_names, std::optional<Time> start,
std::optional<Time> stop, QueryOptions options) const noexcept override {
if (!SupportsBatchProtocolV2()) {
return {0, Error{.code = -1, .message = "Batch protocol v2 not supported"}};
}

if (entry_names.empty()) {
return {0, Error{.code = -1, .message = "No entry names provided"}};
}

return RemoveQueryV2(entry_names, start, stop, options);
}

Result<uint64_t> RemoveQueryV1(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options) const {
std::string body;
auto [json_payload, json_err] = QueryOptionsToJsonString("REMOVE", start, stop, options);
auto [json_payload, json_err] = QueryOptionsToJsonString("REMOVE", {}, start, stop, options);
if (json_err) {
return {0, std::move(json_err)};
}
Expand All @@ -346,19 +378,14 @@ class Bucket : public IBucket {
}
}

Result<uint64_t> RemoveQueryV2(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options) const {
auto [json_payload, json_err] = QueryOptionsToJsonString("REMOVE", start, stop, options);
Result<uint64_t> RemoveQueryV2(const std::vector<std::string>& entries, std::optional<Time> start,
std::optional<Time> stop, QueryOptions options) const {
auto [json_payload, json_err] = QueryOptionsToJsonString("REMOVE", entries, start, stop, options);
if (json_err) {
return {0, std::move(json_err)};
}
auto entries = ParseEntryList(entry_name);
if (entries.empty()) {
return {0, Error{.code = 400, .message = "Entry name is required"}};
}
json_payload["entries"] = nlohmann::ordered_json::array();
for (const auto& entry : entries) {
json_payload["entries"].push_back(entry);
return {0, Error{.code = -1, .message = "At least one entry name is required"}};
}

auto [resp, resp_err] = client_->PostWithResponse(fmt::format("{}/q", io_path_), json_payload.dump());
Expand Down Expand Up @@ -392,8 +419,8 @@ class Bucket : public IBucket {
return Error::kOk;
}

Result<std::string> CreateQueryLink(std::string entry_name, QueryLinkOptions options) const noexcept override {
auto [json_payload, json_err] = internal::QueryLinkOptionsToJsonString(name_, entry_name, options);
Result<std::string> CreateQueryLink(std::string_view entry_name, QueryLinkOptions options) const noexcept override {
auto [json_payload, json_err] = internal::QueryLinkOptionsToJsonString(name_, {std::string(entry_name)}, options);

auto file_name =
options.file_name ? *options.file_name : fmt::format("{}_{}.bin", entry_name, options.record_index);
Expand All @@ -410,6 +437,31 @@ class Bucket : public IBucket {
}
}

Result<std::string> CreateQueryLink(const std::vector<std::string>& entries,
QueryLinkOptions options) const noexcept override {
if (!SupportsBatchProtocolV2()) {
return {{}, Error{.code = -1, .message = "Batch protocol v2 not supported"}};
}
if (entries.empty()) {
return {{}, Error{.code = -1, .message = "At least one entry name is required"}};
}

auto [json_payload, json_err] = internal::QueryLinkOptionsToJsonString(name_, entries, options);

auto file_name = options.file_name ? *options.file_name : fmt::format("{}_{}.bin", name_, options.record_index);
auto [body, err] = client_->PostWithResponse(fmt::format("/links/{}", file_name), json_payload.dump());
if (err) {
return {{}, std::move(err)};
}

try {
auto data = nlohmann::json::parse(body);
return {data.at("link").get<std::string>(), Error::kOk};
} catch (const std::exception& ex) {
return {{}, Error{.code = -1, .message = ex.what()}};
}
}

private:
enum class ReadType {
kSingle,
Expand Down Expand Up @@ -487,7 +539,7 @@ class Bucket : public IBucket {
bool stopped = false;

IHttpClient::Headers request_headers;
request_headers.emplace("x-reduct-query-id", std::to_string(query_id));
request_headers.emplace(std::string(internal::kHeaderQueryId), std::to_string(query_id));

auto parse_headers_and_receive_data = [&stopped, &data, &data_mutex, &callback, &future, head,
this](IHttpClient::Headers&& headers) {
Expand Down Expand Up @@ -545,14 +597,14 @@ class Bucket : public IBucket {
bool head, IHttpClient::Headers&& headers) {
ReadableRecord record;

record.timestamp = internal::FromMicroseconds(headers["x-reduct-time"]);
record.timestamp = internal::FromMicroseconds(headers[std::string(internal::kHeaderTime)]);
record.size = std::stoi(headers["content-length"]);
record.content_type = headers["content-type"];
record.last = headers["x-reduct-last"] == "1";
record.last = headers[std::string(internal::kHeaderLast)] == "1";

for (const auto& [key, value] : headers) {
if (key.starts_with("x-reduct-label-")) {
record.labels.emplace(key.substr(15), value);
if (key.starts_with(internal::kHeaderLabelPrefix)) {
record.labels.emplace(std::string(key.substr(internal::kHeaderLabelPrefix.size())), value);
}
}

Expand Down Expand Up @@ -593,7 +645,7 @@ class Bucket : public IBucket {
IHttpClient::Headers MakeHeadersFromLabels(const WriteOptions& options) const {
IHttpClient::Headers headers;
for (const auto& [key, value] : options.labels) {
headers.emplace(fmt::format("x-reduct-label-{}", key), value);
headers.emplace(fmt::format("{}{}", internal::kHeaderLabelPrefix, key), value);
}
return headers;
}
Expand All @@ -603,52 +655,50 @@ class Bucket : public IBucket {
return api_version && internal::IsCompatible("1.18", *api_version);
}

static std::string TrimWhitespace(std::string_view value) {
auto start = value.find_first_not_of(" \t");
if (start == std::string_view::npos) {
return "";
}
auto end = value.find_last_not_of(" \t");
return std::string(value.substr(start, end - start + 1));
}

static std::vector<std::string> ParseEntryList(std::string_view entries_raw) {
std::vector<std::string> entries;
size_t start = 0;
while (start < entries_raw.size()) {
auto comma = entries_raw.find(',', start);
auto slice =
entries_raw.substr(start, comma == std::string_view::npos ? entries_raw.size() - start : comma - start);
auto trimmed = TrimWhitespace(slice);
if (!trimmed.empty()) {
entries.emplace_back(std::move(trimmed));
}
if (comma == std::string_view::npos) {
break;
}
start = comma + 1;
}
Result<BatchErrors> ProcessBatchV1(std::string_view entry_name, BatchCallback callback,
BatchType type) const noexcept {
Batch batch;
callback(&batch);

if (entries.empty() && !entries_raw.empty()) {
auto trimmed = TrimWhitespace(entries_raw);
if (!trimmed.empty()) {
entries.emplace_back(std::move(trimmed));
}
if (SupportsBatchProtocolV2()) {
return internal::ProcessBatchV2(client_.get(), io_path_, entry_name, std::move(batch), type);
}

return entries;
return internal::ProcessBatchV1(client_.get(), path_, entry_name, std::move(batch), type);
}

Result<WriteBatchErrors> ProcessBatch(std::string_view entry_name, BatchCallback callback,
BatchType type) const noexcept {
Result<BatchRecordErrors> ProcessBatchV2(BatchCallback callback, BatchType type) const noexcept {
Batch batch;
callback(&batch);

if (batch.records().empty()) {
return {BatchRecordErrors{}, Error::kOk};
}

if (SupportsBatchProtocolV2()) {
return internal::ProcessBatchV2(client_.get(), io_path_, entry_name, std::move(batch), type);
return internal::ProcessBatchV2Records(client_.get(), io_path_, std::move(batch), type);
}

return internal::ProcessBatchV1(client_.get(), path_, entry_name, std::move(batch), type);
std::string entry_name;
for (const auto& record : batch.records()) {
if (!record.entry.empty()) {
entry_name = record.entry;
break;
}
}

if (entry_name.empty()) {
return {{}, Error{.code = 400, .message = "Entry name is required"}};
}

auto [errors, err] = internal::ProcessBatchV1(client_.get(), path_, entry_name, std::move(batch), type);
if (err) {
return {{}, err};
}

BatchRecordErrors record_errors;
record_errors.emplace(std::move(entry_name), std::move(errors));
return {record_errors, Error::kOk};
}

std::unique_ptr<internal::IHttpClient> client_;
Expand All @@ -667,8 +717,8 @@ std::unique_ptr<IBucket> IBucket::Build(std::string_view server_url, std::string
return std::make_unique<Bucket>(server_url, name, options);
}

std::unique_ptr<IBucket> IBucket::Build(std::string_view server_url, std::string_view name,
const HttpOptions& options, std::optional<std::string> api_version) noexcept {
std::unique_ptr<IBucket> IBucket::Build(std::string_view server_url, std::string_view name, const HttpOptions& options,
std::optional<std::string> api_version) noexcept {
return std::make_unique<Bucket>(server_url, name, options, std::move(api_version));
}

Expand Down
Loading
Loading