Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build cpplint image
run: pip install cpplint
run: pip install "cpplint<2"
- name: Check code in /src
run: find src/ -name "*.cc" -o -name "*.h" | xargs cpplint
- name: Check code in /tests
Expand Down Expand Up @@ -56,7 +56,7 @@ jobs:
- reductstore_version: "main"
exclude_api_version_tag: ""
- reductstore_version: "latest"
exclude_api_version_tag: "~[1_12]"
exclude_api_version_tag: "~[1_13]"
- license_file: ""
exclude_license_tag: "~[license]"

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Added

- RS-543: Support conditional query, [PR-79](https://github.com/reductstore/reduct-cpp/pull/79)

## [1.12.0] = 2024-10-04

### Added
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ in C++20. It allows developers to easily interact with the database from their C
## Features

* Written in C++20
* Support ReductStore [HTTP API v1.12](https://www.reduct.store/docs/next/http-api)
* Support ReductStore [HTTP API v1.13](https://www.reduct.store/docs/next/http-api)
* Support HTTP and HTTPS protocols
* Support Linux AMD64 and Windows

Expand Down
88 changes: 64 additions & 24 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
namespace reduct {

using internal::IHttpClient;
using internal::QueryOptionsToJsonString;

class Bucket : public IBucket {
public:
Expand Down Expand Up @@ -174,8 +175,8 @@ class Bucket : public IBucket {
path.append(fmt::format("?ts={}", ToMicroseconds(*ts)));
}

auto record_err = ReadRecord(std::move(path), false, false, callback);
return record_err;
auto record_err = ReadRecord(std::move(path), ReadType::kSingle, false, callback);
return record_err.error;
}

Error Head(std::string_view entry_name, std::optional<Time> ts, ReadRecordCallback callback) const noexcept override {
Expand All @@ -184,31 +185,46 @@ class Bucket : public IBucket {
path.append(fmt::format("?ts={}", ToMicroseconds(*ts)));
}

auto record_err = ReadRecord(std::move(path), false, true, callback);
return record_err;
auto record_err = ReadRecord(std::move(path), ReadType::kSingle, true, callback);
return record_err.error;
}

Error Query(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop, QueryOptions options,
ReadRecordCallback callback) const noexcept override {
std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [body, err] = client_->Get(url);
if (err) {
return err;
std::string body;
if (options.when) {
auto [json_payload, json_err] = QueryOptionsToJsonString("QUERY", start, stop, options);
if (json_err) {
return json_err;
}

auto [resp, resp_err] = client_->PostWithResponse(fmt::format("{}/{}/q", path_, entry_name), json_payload.dump());
if (resp_err) {
return resp_err;
}

body = std::move(resp);
} else {
std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [resp, err] = client_->Get(url);
if (err) {
return err;
}

body = std::move(resp);
}

uint64_t id;
try {
auto data = nlohmann::json::parse(body);
id = data.at("id");
id = data["id"];
} catch (const std::exception& ex) {
return Error{.code = -1, .message = ex.what()};
}

while (true) {
bool batched = internal::IsCompatible("1.5", client_->api_version());
auto [stopped, record_err] =
ReadRecord(fmt::format("{}/{}{}?q={}", path_, entry_name, batched ? "/batch" : "", id), batched,
options.head_only, callback);
auto [stopped, record_err] = ReadRecord(fmt::format("{}/{}/batch?q={}", path_, entry_name, id),
ReadType::kBatched, options.head_only, callback);

if (stopped) {
break;
Expand All @@ -232,14 +248,31 @@ class Bucket : public IBucket {

Result<uint64_t> RemoveQuery(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop,
QueryOptions options) const noexcept override {
std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [resp, err] = client_->Delete(url);
if (err) {
return {0, std::move(err)};
std::string body;
if (options.when) {
auto [json_payload, json_err] = QueryOptionsToJsonString("REMOVE", start, stop, options);
if (json_err) {
return {0, std::move(json_err)};
}

auto [resp, resp_err] = client_->PostWithResponse(fmt::format("{}/{}/q", path_, entry_name), json_payload.dump());
if (resp_err) {
return {0, std::move(resp_err)};
}

body = std::move(resp);
} else {
std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [resp, err] = client_->Delete(url);
if (err) {
return {0, std::move(err)};
}

body = std::get<0>(resp);
}

try {
auto data = nlohmann::json::parse(std::get<0>(resp));
auto data = nlohmann::json::parse(body);
return {data.at("removed_records"), Error::kOk};
} catch (const std::exception& ex) {
return {0, Error{.code = -1, .message = ex.what()}};
Expand Down Expand Up @@ -306,22 +339,26 @@ class Bucket : public IBucket {
return url;
}

Result<bool> ReadRecord(std::string&& path, bool batched, bool head,
enum class ReadType {
kSingle,
kBatched,
};

Result<bool> ReadRecord(std::string&& path, ReadType type, bool head,
const ReadRecordCallback& callback) const noexcept {
std::deque<std::optional<std::string>> data;
std::mutex data_mutex;
std::future<void> future;
bool stopped = false;

auto parse_headers_and_receive_data = [&stopped, &data, &data_mutex, &callback, &future, batched, head,
auto parse_headers_and_receive_data = [&type, &stopped, &data, &data_mutex, &callback, &future, head,
this](IHttpClient::Headers&& headers) {
std::vector<ReadableRecord> records;
if (batched) {
if (type == ReadType::kBatched) {
records = ParseAndBuildBatchedRecords(&data, &data_mutex, head, std::move(headers));
} else {
records.push_back(ParseAndBuildSingleRecord(&data, &data_mutex, head, std::move(headers)));
records.emplace_back(ParseAndBuildSingleRecord(&data, &data_mutex, head, std::move(headers)));
}

for (auto& record : records) {
Task task([record = std::move(record), &callback, &stopped] {
if (stopped) {
Expand Down Expand Up @@ -362,7 +399,10 @@ class Bucket : public IBucket {
std::lock_guard lock(data_mutex);
data.emplace_back(std::nullopt);
}
future.wait();

if (future.valid()) {
future.wait();
}
}

return {stopped, err};
Expand Down
9 changes: 6 additions & 3 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,16 @@ class IBucket {
*/
[[nodiscard]] virtual Result<BatchErrors> UpdateBatch(std::string_view entry_name,
BatchCallback callback) const noexcept = 0;

/**
* Query options
*/
struct QueryOptions {
LabelMap include; ///< include labels
LabelMap exclude; ///< exclude labels
[[deprecated("Use when instead")]] LabelMap include; ///< include labels
[[deprecated("Use when instead")]] LabelMap exclude; ///< exclude labels

std::optional<std::string> when; ///< query condition
std::optional<bool> strict; ///< strict mode

std::optional<double> each_s; ///< return one record each S seconds
std::optional<size_t> each_n; ///< return each N-th record
std::optional<size_t> limit; ///< limit number of records
Expand Down
57 changes: 57 additions & 0 deletions src/reduct/internal/serialisation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,61 @@ Result<IClient::FullReplicationInfo> ParseFullReplicationInfo(const nlohmann::js
return {info, Error::kOk};
}

Result<nlohmann::json> QueryOptionsToJsonString(std::string_view type, std::optional<IBucket::Time> start,
std::optional<IBucket::Time> stop,
const IBucket::QueryOptions& options) {
nlohmann::json json_data;
json_data["query_type"] = type;

if (start) {
json_data["start"] = std::chrono::duration_cast<std::chrono::microseconds>(start->time_since_epoch()).count();
}

if (stop) {
json_data["stop"] = std::chrono::duration_cast<std::chrono::microseconds>(stop->time_since_epoch()).count();
}

for (const auto& [key, value] : options.include) {
json_data["include"][key] = value;
}

for (const auto& [key, value] : options.exclude) {
json_data["exclude"][key] = value;
}

if (options.each_s) {
json_data["each_s"] = *options.each_s;
}

if (options.each_n) {
json_data["each_n"] = *options.each_n;
}

if (options.limit) {
json_data["limit"] = *options.limit;
}

if (options.ttl) {
json_data["ttl"] = options.ttl->count() / 1000;
}

if (options.continuous) {
json_data["continuous"] = true;
}

if (options.when) {
try {
json_data["when"] = nlohmann::json::parse(*options.when);
} catch (const std::exception& ex) {
return {{}, Error{.code = -1, .message = ex.what()}};
}
}

if (options.strict) {
json_data["strict"] = *options.strict;
}

return {json_data, Error::kOk};
}

} // namespace reduct::internal
8 changes: 5 additions & 3 deletions src/reduct/internal/serialisation.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

#include <nlohmann/json.hpp>

#include "reduct/client.h"
#include "reduct/bucket.h"
#include "reduct/client.h"

namespace reduct::internal {

Expand All @@ -30,15 +30,13 @@ Result<IBucket::Settings> ParseBucketSettings(const nlohmann::json& json);
*/
Result<IClient::FullTokenInfo> ParseTokenInfo(const nlohmann::json& json);


/**
* @brief Parse list of replication info from JSON string
* @param json
* @return
*/
Result<std::vector<IClient::ReplicationInfo>> ParseReplicationList(const nlohmann::json& data);


/**
* @brief Serialize replication settings
* @param settings to serialize
Expand All @@ -53,6 +51,10 @@ nlohmann::json ReplicationSettingsToJsonString(IClient::ReplicationSettings sett
*/
Result<IClient::FullReplicationInfo> ParseFullReplicationInfo(const nlohmann::json& data);

Result<nlohmann::json> QueryOptionsToJsonString(std::string_view type, std::optional<IBucket::Time> start,
std::optional<IBucket::Time> stop,
const IBucket::QueryOptions& options);

}; // namespace reduct::internal

#endif // REDUCTCPP_SERIALISATION_H
Loading
Loading