diff --git a/CHANGELOG.md b/CHANGELOG.md index 8961be2..ffe5450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,36 @@ # Changelog +## 0.20.0 - 2024-07-09 + +This release adds support for encoding DBN within the C++ client. +It also improves historical symbology support with the new `TsSymbolMap` class that +handles mapping historical records to a text symbol. To support this class, several types +for date fields were changed from strings or ints to `date::year_month_day`. + +### Enhancements +- Added `TsSymbolMap` to support historical symbology where mappings change between days +- Added `DbnEncoder` class for encoding DBN data +- Added blocking API similar to `LiveBlocking` to `DbnFileStore` with new `GetMetadata` + and `NextRecord` methods +- Added `BboMsg` record struct for future `bbo-1m` and `bbo-1s` schemas +- Added `PitSymbol` map constructor from `Metadata` and a `date::year_month_day` +- Added `Metadata::CreateSymbolMap` and `Metadata::CreateSymbolMapForDate` methods for + creating symbology maps from historical metadata +- Added blocking API similar to `LiveBlocking` to `DbnFileStore` +- Added `SymbologyResolution::CreateSymbolMap` method for creating a symbology map from + a symbology resolution response +- Added `InFileStream` and `OutFileStream` helper classes for reading and writing binary + output respectively + +### Breaking changes +- Added new dependency on [Howard Hinnant's date library](https://howardhinnant.github.io/date/date.html) +- Added `ILogReceiver*` parameter to all `DbnDecoder` constructors and one `DbnFileStore` constructor +- Removed type `StrMappingInterval`. `MappingInterval` is now also used in `SymbologyResolution`. +- Changed `Bbo1sMsg` and `Bbo1mMsg` to be aliases for `BboMsg` +- Changed type of `start_date` and `end_date` in `MappingInterval` to `date::year_month_day` +- Added `stype_in` and `stype_out` fields to `SymbologyResolution` to support creating + a `TsSymbolMap` + ## 0.19.1 - 2024-06-25 ### Enhancements diff --git a/CMakeLists.txt b/CMakeLists.txt index b7aa75d..e96ecc0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ cmake_minimum_required(VERSION 3.14) # Project details # -project("databento" VERSION 0.19.1 LANGUAGES CXX) +project("databento" VERSION 0.20.0 LANGUAGES CXX) string(TOUPPER ${PROJECT_NAME} PROJECT_NAME_UPPERCASE) # @@ -181,6 +181,38 @@ else() # Ignore compiler warnings in headers add_system_include_property(httplib) endif() +# date +if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_DATE) + # Check if date target already exists + if(TARGET date::date) + get_target_property(DATE_SOURCE_DIR date::date SOURCE_DIR) + message(STATUS "date::date already available as a target at ${DATE_SOURCE_DIR}") + get_target_property(DATE_INCLUDE_DIRS date::date INTERFACE_INCLUDE_DIRECTORIES) + if(DATE_INCLUDE_DIRS) + message(STATUS "date::date include directories: ${DATE_INCLUDE_DIRS}") + endif() + else() + find_package(date REQUIRED) + endif() +else() + set(date_version 3.0.1) + if(CMAKE_VERSION VERSION_LESS 3.24) + FetchContent_Declare( + date_src + URL https://github.com/HowardHinnant/date/archive/refs/tags/v${date_version}.tar.gz + ) + else() + # DOWNLOAD_EXTRACT_TIMESTAMP added in 3.24 + FetchContent_Declare( + date_src + URL https://github.com/HowardHinnant/date/archive/refs/tags/v${date_version}.tar.gz + DOWNLOAD_EXTRACT_TIMESTAMP TRUE + ) + endif() + FetchContent_MakeAvailable(date_src) + # Ignore compiler warnings in headers + add_system_include_property(date) +endif() # openSSL find_package(OpenSSL REQUIRED) if(OPENSSL_FOUND) @@ -204,6 +236,7 @@ endif() target_link_libraries( ${PROJECT_NAME} PUBLIC + date::date httplib::httplib nlohmann_json::nlohmann_json OpenSSL::Crypto diff --git a/README.md b/README.md index 5169928..f9450b9 100644 --- a/README.md +++ b/README.md @@ -64,11 +64,12 @@ You'll need to ensure the following dependencies are installed: - [Zstandard (zstd)](https://github.com/facebook/zstd) - [nlohmann\_json (header-only)](https://github.com/nlohmann/json) - [cpp-httplib (header-only)](https://github.com/yhirose/cpp-httplib) +- [date (header-only)](https://github.com/HowardHinnant/date) - [dirent (Windows-only, header-only)](https://github.com/tronkko/dirent) -By default, cpp-httplib and nlohmann\_json are downloaded by CMake as part of the build process. +By default, date, cpp-httplib and nlohmann\_json are downloaded by CMake as part of the build process. If you would like to use a local version of these libraries, enable the CMake flag -`DATABENTO_ENABLE_EXTERNAL_HTTPLIB` or `DATABENTO_ENABLE_EXTERNAL_JSON`. +`DATABENTO_ENABLE_EXTERNAL_DATE`, `DATABENTO_ENABLE_EXTERNAL_HTTPLIB`, or `DATABENTO_ENABLE_EXTERNAL_JSON` respectively. #### Ubuntu @@ -126,26 +127,32 @@ To run this program, set the `DATABENTO_API_KEY` environment variable with an ac ### Historical -Here is a simple program that fetches 10 minutes worth of historical trades for the entire CME Globex market: +Here is a simple program that fetches 10 minutes worth of historical trades for two CME futures: ```cpp -#include +#include #include +#include #include using namespace databento; int main() { - auto client = HistoricalBuilder{}.SetKeyFromEnv().Build(); - auto print_trades = [](const Record& record) { + auto client = HistoricalBuilder{}.SetKey("$YOUR_API_KEY").Build(); + TsSymbolMap symbol_map; + auto decode_symbols = [&symbol_map](const Metadata& metadata) { + symbol_map = metadata.CreateSymbolMap(); + }; + auto print_trades = [&symbol_map](const Record& record) { const auto& trade_msg = record.Get(); - std::cout << trade_msg << '\n'; + std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " + << trade_msg << '\n'; return KeepGoing::Continue; }; - client.TimeseriesGetRange("GLBX.MDP3", - {"2022-06-10T14:30", "2022-06-10T14:40"}, - kAllSymbols, Schema::Trades, SType::RawSymbol, - SType::InstrumentId, {}, {}, print_trades); + client.TimeseriesGetRange( + "GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, kAllSymbols, + Schema::Trades, SType::RawSymbol, SType::InstrumentId, {}, decode_symbols, + print_trades); } ``` @@ -156,7 +163,7 @@ These examples can be compiled by enabling the cmake option `DATABENTO_ENABLE_EX ## Documentation -You can find more detailed examples and the full API documentation on the [Databento doc site](https://databento.com/docs/getting-started?historical=cpp&live=cpp). +You can find more detailed examples and the full API documentation on the [Databento doc site](https://databento.com/docs/quickstart?historical=cpp&live=cpp). ## License diff --git a/cmake/SourcesAndHeaders.cmake b/cmake/SourcesAndHeaders.cmake index 0c8d686..4436e03 100644 --- a/cmake/SourcesAndHeaders.cmake +++ b/cmake/SourcesAndHeaders.cmake @@ -5,8 +5,8 @@ set(headers include/databento/datetime.hpp include/databento/dbn.hpp include/databento/dbn_decoder.hpp + include/databento/dbn_encoder.hpp include/databento/dbn_file_store.hpp - include/databento/detail/file_stream.hpp include/databento/detail/http_client.hpp include/databento/detail/json_helpers.hpp include/databento/detail/scoped_fd.hpp @@ -16,6 +16,7 @@ set(headers include/databento/detail/zstd_stream.hpp include/databento/enums.hpp include/databento/exceptions.hpp + include/databento/file_stream.hpp include/databento/fixed_price.hpp include/databento/flag_set.hpp include/databento/historical.hpp @@ -39,9 +40,10 @@ set(sources src/compat.cpp src/datetime.cpp src/dbn.cpp + src/dbn_constants.hpp src/dbn_decoder.cpp + src/dbn_encoder.cpp src/dbn_file_store.cpp - src/detail/file_stream.cpp src/detail/http_client.cpp src/detail/json_helpers.cpp src/detail/scoped_fd.cpp @@ -50,6 +52,7 @@ set(sources src/detail/zstd_stream.cpp src/enums.cpp src/exceptions.cpp + src/file_stream.cpp src/fixed_price.cpp src/flag_set.cpp src/historical.cpp diff --git a/example/historical/readme.cpp b/example/historical/readme.cpp index ac8538a..fc6bc91 100644 --- a/example/historical/readme.cpp +++ b/example/historical/readme.cpp @@ -1,22 +1,28 @@ // Duplicate of the example usage code from the README.md to ensure // it compiles and to be able to clang-format it. // NOLINTBEGIN(google-build-using-namespace) -#include +#include #include +#include #include using namespace databento; int main() { auto client = HistoricalBuilder{}.SetKey("$YOUR_API_KEY").Build(); - auto print_trades = [](const Record& record) { + TsSymbolMap symbol_map; + auto decode_symbols = [&symbol_map](const Metadata& metadata) { + symbol_map = metadata.CreateSymbolMap(); + }; + auto print_trades = [&symbol_map](const Record& record) { const auto& trade_msg = record.Get(); - std::cout << trade_msg << '\n'; + std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " + << trade_msg << '\n'; return KeepGoing::Continue; }; - client.TimeseriesGetRange("GLBX.MDP3", - {"2022-06-10T14:30", "2022-06-10T14:40"}, - kAllSymbols, Schema::Trades, SType::RawSymbol, - SType::InstrumentId, {}, {}, print_trades); + client.TimeseriesGetRange( + "GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, {"ESM2", "NQZ2"}, + Schema::Trades, SType::RawSymbol, SType::InstrumentId, {}, decode_symbols, + print_trades); } // NOLINTEND(google-build-using-namespace) diff --git a/example/historical/symbology_resolve.cpp b/example/historical/symbology_resolve.cpp index 339f781..3b6c259 100644 --- a/example/historical/symbology_resolve.cpp +++ b/example/historical/symbology_resolve.cpp @@ -16,7 +16,7 @@ int main(int argc, char* argv[]) { const auto stype_out = databento::FromString(argv[3]); std::vector symbols; - for (int i = 6; i < argc; ++i) { + for (int i = 5; i < argc; ++i) { symbols.emplace_back(argv[i]); } diff --git a/include/databento/dbn.hpp b/include/databento/dbn.hpp index 811375f..129320a 100644 --- a/include/databento/dbn.hpp +++ b/include/databento/dbn.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -9,13 +11,15 @@ #include "databento/enums.hpp" namespace databento { +// Forward declare +class PitSymbolMap; +class TsSymbolMap; + struct MappingInterval { - // The start date of the interval (inclusive) as - // YYYYMMDD e.g. 2022-10-08 is represented as 20221008 - std::uint32_t start_date; - // The end date of the interval (exclusive) as - // YYYYMMDD e.g. 2022-10-08 is represented as 20221008 - std::uint32_t end_date; + // The start date of the interval (inclusive). + date::year_month_day start_date; + // The end date of the interval (exclusive). + date::year_month_day end_date; std::string symbol; }; @@ -69,6 +73,16 @@ struct Metadata { // Symbol mappings containing a native symbol and its mapping intervals. std::vector mappings; + // Creates a symbology mapping from instrument ID to text symbol for the given + // date. + // + // This method is useful when working with a historical request over a single + // day or in other situations where you're sure the mappings don't change + // during the time range of the request. Otherwise, `SymbolMap()` is + // recommmended. + PitSymbolMap CreateSymbolMapForDate(date::year_month_day date) const; + // Creates a symbology mapping from instrument ID and date to text symbol. + TsSymbolMap CreateSymbolMap() const; // Upgrades the metadata according to `upgrade_policy` if necessary. void Upgrade(VersionUpgradePolicy upgrade_policy); }; diff --git a/include/databento/dbn_decoder.hpp b/include/databento/dbn_decoder.hpp index 5170e64..422a28a 100644 --- a/include/databento/dbn_decoder.hpp +++ b/include/databento/dbn_decoder.hpp @@ -6,10 +6,11 @@ #include #include "databento/dbn.hpp" -#include "databento/detail/file_stream.hpp" #include "databento/detail/shared_channel.hpp" #include "databento/enums.hpp" // Upgrade Policy +#include "databento/file_stream.hpp" #include "databento/ireadable.hpp" +#include "databento/log.hpp" #include "databento/record.hpp" // Record, RecordHeader namespace databento { @@ -17,14 +18,12 @@ namespace databento { // handled. Defaults to upgrading DBNv1 data to version 2 (the current version). class DbnDecoder { public: - explicit DbnDecoder(detail::SharedChannel channel); - explicit DbnDecoder(detail::FileStream file_stream); - explicit DbnDecoder(std::unique_ptr input); - DbnDecoder(std::unique_ptr input, + DbnDecoder(ILogReceiver* log_receiver, detail::SharedChannel channel); + DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream); + DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr input); + DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr input, VersionUpgradePolicy upgrade_policy); - // Decode metadata from the given buffer. - static Metadata DecodeMetadata(const std::vector& buffer); static std::pair DecodeMetadataVersionAndSize( const std::uint8_t* buffer, std::size_t size); static Metadata DecodeMetadataFields(std::uint8_t version, @@ -60,8 +59,10 @@ class DbnDecoder { std::vector::const_iterator buffer_end_it); bool DetectCompression(); std::size_t FillBuffer(); + std::size_t GetReadBufferSize() const; RecordHeader* BufferRecordHeader(); + ILogReceiver* log_receiver_; std::uint8_t version_{}; VersionUpgradePolicy upgrade_policy_; bool ts_out_{}; diff --git a/include/databento/dbn_encoder.hpp b/include/databento/dbn_encoder.hpp new file mode 100644 index 0000000..efd93db --- /dev/null +++ b/include/databento/dbn_encoder.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include // uint32_t + +#include "databento/dbn.hpp" // Metadata +#include "databento/iwritable.hpp" +#include "databento/record.hpp" +#include "databento/with_ts_out.hpp" + +namespace databento { +class DbnEncoder { + public: + explicit DbnEncoder(const Metadata& metadata, IWritable* output); + + static void EncodeMetadata(const Metadata& metadata, IWritable* output); + static void EncodeRecord(const Record& record, IWritable* output); + + template + void EncodeRecord(const R& record) { + static_assert( + has_header_v, + "must be a DBN record struct with an `hd` RecordHeader field"); + EncodeRecord(Record{&record.hd}); + } + template + void EncodeRecord(const WithTsOut record) { + static_assert( + has_header_v, + "must be a DBN record struct with an `hd` RecordHeader field"); + EncodeRecord(Record{&record.rec.hd}); + } + void EncodeRecord(const Record& record); + + private: + static std::uint32_t CalcLength(const Metadata& metadata); + + IWritable* output_; +}; +} // namespace databento diff --git a/include/databento/dbn_file_store.hpp b/include/databento/dbn_file_store.hpp index b64315b..cbdb906 100644 --- a/include/databento/dbn_file_store.hpp +++ b/include/databento/dbn_file_store.hpp @@ -2,23 +2,39 @@ #include +#include "databento/dbn.hpp" // DecodeMetadata #include "databento/dbn_decoder.hpp" // DbnDecoder #include "databento/enums.hpp" // VersionUpgradePolicy -#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback +#include "databento/log.hpp" +#include "databento/record.hpp" +#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback namespace databento { -// A reader for DBN files. +// A reader for DBN files. This class provides both a callback API similar to +// TimeseriesGetRange in historical data and LiveThreaded for live data as well +// as a blocking API similar to that of LiveBlocking. Only one API should be +// used on a given instance. class DbnFileStore { public: explicit DbnFileStore(const std::string& file_path); - DbnFileStore(const std::string& file_path, + DbnFileStore(ILogReceiver* log_receiver, const std::string& file_path, VersionUpgradePolicy upgrade_policy); + // Callback API: calling Replay consumes the input. void Replay(const MetadataCallback& metadata_callback, const RecordCallback& record_callback); void Replay(const RecordCallback& record_callback); + // Blocking API + const Metadata& GetMetadata(); + // Returns the next record or `nullptr` if there are no remaining records. + const Record* NextRecord(); + private: - DbnDecoder parser_; + void MaybeDecodeMetadata(); + + DbnDecoder decoder_; + Metadata metadata_{}; + bool has_decoded_metadata_{false}; }; } // namespace databento diff --git a/include/databento/detail/json_helpers.hpp b/include/databento/detail/json_helpers.hpp index 8e61b0e..17b5348 100644 --- a/include/databento/detail/json_helpers.hpp +++ b/include/databento/detail/json_helpers.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include // multimap @@ -89,6 +90,10 @@ template <> std::vector ParseAt(const std::string& endpoint, const nlohmann::json& json, const std::string& key); +template <> +date::year_month_day ParseAt(const std::string& endpoint, + const nlohmann::json& json, + const std::string& key); } // namespace detail } // namespace databento diff --git a/include/databento/detail/zstd_stream.hpp b/include/databento/detail/zstd_stream.hpp index 540bbba..67490cc 100644 --- a/include/databento/detail/zstd_stream.hpp +++ b/include/databento/detail/zstd_stream.hpp @@ -8,20 +8,22 @@ #include #include "databento/ireadable.hpp" +#include "databento/iwritable.hpp" +#include "databento/log.hpp" namespace databento { namespace detail { -class ZstdStream : public IReadable { +class ZstdDecodeStream : public IReadable { public: - explicit ZstdStream(std::unique_ptr input); - ZstdStream(std::unique_ptr input, - std::vector&& in_buffer); + explicit ZstdDecodeStream(std::unique_ptr input); + ZstdDecodeStream(std::unique_ptr input, + std::vector&& in_buffer); // Read exactly `length` bytes into `buffer`. void ReadExact(std::uint8_t* buffer, std::size_t length) override; // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. - size_t ReadSome(std::uint8_t* buffer, std::size_t max_length) override; + std::size_t ReadSome(std::uint8_t* buffer, std::size_t max_length) override; private: std::unique_ptr input_; @@ -30,5 +32,27 @@ class ZstdStream : public IReadable { std::vector in_buffer_; ZSTD_inBuffer z_in_buffer_; }; + +class ZstdCompressStream : public IWritable { + public: + explicit ZstdCompressStream(IWritable* output); + ZstdCompressStream(ILogReceiver* log_receiver, IWritable* output); + ZstdCompressStream(const ZstdCompressStream&) = delete; + ZstdCompressStream& operator=(const ZstdCompressStream&) = delete; + ZstdCompressStream(ZstdCompressStream&&) = delete; + ZstdCompressStream& operator=(ZstdCompressStream&&) = delete; + ~ZstdCompressStream() override; + + void WriteAll(const std::uint8_t* buffer, std::size_t length) override; + + private: + ILogReceiver* log_receiver_; + IWritable* output_; + std::unique_ptr z_cstream_; + std::vector in_buffer_; + ZSTD_inBuffer z_in_buffer_; + std::size_t in_size_; + std::vector out_buffer_; +}; } // namespace detail } // namespace databento diff --git a/include/databento/detail/file_stream.hpp b/include/databento/file_stream.hpp similarity index 58% rename from include/databento/detail/file_stream.hpp rename to include/databento/file_stream.hpp index e37b0f2..422ff00 100644 --- a/include/databento/detail/file_stream.hpp +++ b/include/databento/file_stream.hpp @@ -2,16 +2,16 @@ #include // size_t #include // uint8_t -#include // ifstream +#include // ifstream, ofstream #include #include "databento/ireadable.hpp" +#include "databento/iwritable.hpp" namespace databento { -namespace detail { -class FileStream : public IReadable { +class InFileStream : public IReadable { public: - explicit FileStream(const std::string& file_path); + explicit InFileStream(const std::string& file_path); // Read exactly `length` bytes into `buffer`. void ReadExact(std::uint8_t* buffer, std::size_t length) override; @@ -22,5 +22,14 @@ class FileStream : public IReadable { private: std::ifstream stream_; }; -} // namespace detail + +class OutFileStream : public IWritable { + public: + explicit OutFileStream(const std::string& file_path); + + void WriteAll(const std::uint8_t* buffer, std::size_t length) override; + + private: + std::ofstream stream_; +}; } // namespace databento diff --git a/include/databento/historical.hpp b/include/databento/historical.hpp index 23b80aa..3e9ab6f 100644 --- a/include/databento/historical.hpp +++ b/include/databento/historical.hpp @@ -241,6 +241,7 @@ class Historical { DbnFileStore TimeseriesGetRangeToFile(const HttplibParams& params, const std::string& file_path); + ILogReceiver* log_receiver_; const std::string key_; const std::string gateway_; detail::HttpClient client_; diff --git a/include/databento/iwritable.hpp b/include/databento/iwritable.hpp new file mode 100644 index 0000000..9f51805 --- /dev/null +++ b/include/databento/iwritable.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include // size_t +#include // uint8_t + +namespace databento { +// An abstract class for writable objects to allow for runtime polymorphism +// around DBN encoding. +class IWritable { + public: + virtual ~IWritable() = default; + + virtual void WriteAll(const std::uint8_t* buffer, std::size_t length) = 0; +}; +} // namespace databento diff --git a/include/databento/record.hpp b/include/databento/record.hpp index 4f49bca..5a3eab0 100644 --- a/include/databento/record.hpp +++ b/include/databento/record.hpp @@ -6,6 +6,7 @@ #include // strncmp #include #include // tie +#include #include "databento/constants.hpp" // kSymbolCstrLen #include "databento/datetime.hpp" // UnixNanos @@ -40,6 +41,20 @@ struct RecordHeader { } }; +// Type trait helper for templated functions accepting DBN records. +namespace detail { +// std::void_t added in C++17 +template +using void_t = void; +} // namespace detail +template > +struct has_header : std::false_type {}; +template +struct has_header().hd)>> + : std::is_same().hd), RecordHeader> {}; +template +constexpr bool has_header_v = has_header::value; + class Record { public: explicit Record(RecordHeader* record) : record_{record} {} @@ -162,9 +177,7 @@ static_assert(alignof(TradeMsg) == 8, "Must have 8-byte alignment"); struct Mbp1Msg { static bool HasRType(RType rtype) { switch (rtype) { - case RType::Mbp1: // fallthrough - case RType::Bbo1M: // fallthrough - case RType::Bbo1S: + case RType::Mbp1: return true; default: return false; @@ -187,8 +200,9 @@ struct Mbp1Msg { std::array levels; }; using TbboMsg = Mbp1Msg; -using Bbo1SMsg = Mbp1Msg; -using Bbo1MMsg = Mbp1Msg; +static_assert(alignof(Mbp1Msg) == 8, "Must have 8-byte alignment"); +static_assert(sizeof(Mbp1Msg) == sizeof(TradeMsg) + sizeof(BidAskPair), + "Mbp1Msg size must match Rust"); struct Mbp10Msg { static bool HasRType(RType rtype) { return rtype == rtype::Mbp10; } @@ -208,14 +222,40 @@ struct Mbp10Msg { std::uint32_t sequence; std::array levels; }; - -static_assert(alignof(Mbp1Msg) == 8, "Must have 8-byte alignment"); static_assert(alignof(Mbp10Msg) == 8, "Must have 8-byte alignment"); -static_assert(sizeof(Mbp1Msg) == sizeof(TradeMsg) + sizeof(BidAskPair), - "Mbp1Msg size must match Rust"); static_assert(sizeof(Mbp10Msg) == sizeof(TradeMsg) + sizeof(BidAskPair) * 10, "Mbp10Msg size must match Rust"); +struct BboMsg { + static bool HasRType(RType rtype) { + switch (rtype) { + case RType::Bbo1S: // fallthrough + case RType::Bbo1M: + return true; + default: + return false; + }; + } + + UnixNanos IndexTs() const { return ts_recv; } + + RecordHeader hd; + std::int64_t price; + std::uint32_t size; + char reserved1; + Side side; + FlagSet flags; + char reserved2; + UnixNanos ts_recv; + std::array reserved3; + std::uint32_t sequence; + std::array levels; +}; +using Bbo1SMsg = BboMsg; +using Bbo1MMsg = BboMsg; +static_assert(alignof(BboMsg) == 8, "Must have 8-byte alignment"); +static_assert(sizeof(BboMsg) == sizeof(Mbp1Msg), "BboMsg size must match Rust"); + struct CbboMsg { static bool HasRType(RType rtype) { switch (rtype) { @@ -237,7 +277,7 @@ struct CbboMsg { Action action; Side side; FlagSet flags; - std::array reserved; + char reserved; UnixNanos ts_recv; TimeDeltaNanos ts_in_delta; std::uint32_t sequence; @@ -556,6 +596,16 @@ inline bool operator!=(const Mbp10Msg& lhs, const Mbp10Msg& rhs) { return !(lhs == rhs); } +inline bool operator==(const BboMsg& lhs, const BboMsg& rhs) { + return std::tie(lhs.hd, lhs.price, lhs.size, lhs.side, lhs.flags, lhs.ts_recv, + lhs.sequence, lhs.levels) == + std::tie(rhs.hd, rhs.price, rhs.size, rhs.side, rhs.flags, rhs.ts_recv, + rhs.sequence, rhs.levels); +} +inline bool operator!=(const BboMsg& lhs, const BboMsg& rhs) { + return !(lhs == rhs); +} + inline bool operator==(const CbboMsg& lhs, const CbboMsg& rhs) { return lhs.hd == rhs.hd && lhs.price == rhs.price && lhs.size == rhs.size && lhs.action == rhs.action && lhs.side == rhs.side && @@ -665,8 +715,10 @@ std::string ToString(const Mbp1Msg& mbp_msg); std::ostream& operator<<(std::ostream& stream, const Mbp1Msg& mbp_msg); std::string ToString(const Mbp10Msg& mbp_msg); std::ostream& operator<<(std::ostream& stream, const Mbp10Msg& mbp_msg); -std::string ToString(const CbboMsg& mbp_msg); -std::ostream& operator<<(std::ostream& stream, const CbboMsg& mbp_msg); +std::string ToString(const BboMsg& bbo_msg); +std::ostream& operator<<(std::ostream& stream, const BboMsg& bbo_msg); +std::string ToString(const CbboMsg& cbbo_msg); +std::ostream& operator<<(std::ostream& stream, const CbboMsg& cbbo_msg); std::string ToString(const TradeMsg& trade_msg); std::ostream& operator<<(std::ostream& stream, const TradeMsg& trade_msg); std::string ToString(const OhlcvMsg& ohlcv_msg); diff --git a/include/databento/symbol_map.hpp b/include/databento/symbol_map.hpp index 4d210ee..0998f55 100644 --- a/include/databento/symbol_map.hpp +++ b/include/databento/symbol_map.hpp @@ -1,30 +1,101 @@ #pragma once +#include #include +#include +#include #include #include +#include #include "databento/compat.hpp" #include "databento/record.hpp" namespace databento { -// A point-in-time symbol map. Useful for working with live symbology or -// a historical request over a single day or other situations where the -// symbol mappings are known not to change. +// Forward declare +struct Metadata; + +/// A timeseries symbol map. Useful for working with historical +/// data. +class TsSymbolMap { + public: + // UTC date and instrument ID to text symbol. + using Store = std::map, + std::shared_ptr>; + + TsSymbolMap() = default; + explicit TsSymbolMap(const Metadata& metadata); + + bool IsEmpty() const { return map_.empty(); } + std::size_t Size() const { return map_.size(); } + const Store& Map() const { return map_; } + Store& Map() { return map_; } + Store::const_iterator Find(date::year_month_day date, + std::uint32_t instrument_id) const { + return map_.find(std::make_pair(date, instrument_id)); + } + template + Store::const_iterator Find(const R& rec) const { + static_assert( + has_header_v, + "must be a DBN record struct with an `hd` RecordHeader field"); + date::year_month_day index_date{ + date::sys_days{date::floor(rec.IndexTs())}}; + return map_.find(std::make_pair(index_date, rec.hd.instrument_id)); + } + const std::string& At(date::year_month_day date, + std::uint32_t instrument_id) const { + return *map_.at(std::make_pair(date, instrument_id)); + } + template + const std::string& At(const R& rec) const { + static_assert( + has_header_v, + "must be a DBN record struct with an `hd` RecordHeader field"); + date::year_month_day index_date{ + date::sys_days{date::floor(rec.IndexTs())}}; + return *map_.at(std::make_pair(index_date, rec.hd.instrument_id)); + } + void Insert(std::uint32_t instrument_id, date::year_month_day start_date, + date::year_month_day end_date, + const std::shared_ptr& symbol); + + private: + Store map_; +}; + +// A point-in-time symbol map. Useful for working with live +// symbology or a historical request over a single day or other +// situations where the symbol mappings are known not to change. class PitSymbolMap { public: + // Instrument ID to text symbol using Store = std::unordered_map; PitSymbolMap() = default; + PitSymbolMap(const Metadata& metadata, date::year_month_day date); bool IsEmpty() const { return map_.empty(); } std::size_t Size() const { return map_.size(); } const Store& Map() const { return map_; } Store& Map() { return map_; } + Store::const_iterator Find(const Record& rec) const { + return map_.find(rec.Header().instrument_id); + } Store::const_iterator Find(std::uint32_t instrument_id) const { return map_.find(instrument_id); } - std::string& operator[](std::uint32_t instrument_id) { + template + const std::string& At(const R& rec) const { + static_assert( + has_header_v, + "must be a DBN record struct with an `hd` RecordHeader field"); + return map_.at(rec.hd.instrument_id); + } + const std::string& At(const Record& rec) const { + return map_.at(rec.Header().instrument_id); + } + const std::string& operator[](std::uint32_t instrument_id) { return map_[instrument_id]; } void OnRecord(const Record& rec); diff --git a/include/databento/symbology.hpp b/include/databento/symbology.hpp index bb49296..93234cd 100644 --- a/include/databento/symbology.hpp +++ b/include/databento/symbology.hpp @@ -5,22 +5,22 @@ #include #include +#include "databento/dbn.hpp" +#include "databento/enums.hpp" +#include "databento/symbol_map.hpp" + namespace databento { // Sentinel value for requesting all symbols static const std::vector kAllSymbols{"ALL_SYMBOLS"}; -struct StrMappingInterval { - // YYYY-MM-DD - std::string start_date; - // YYYY-MM-DD - std::string end_date; - std::string symbol; -}; - struct SymbologyResolution { - std::unordered_map> mappings; + std::unordered_map> mappings; std::vector partial; std::vector not_found; + SType stype_in; + SType stype_out; + + TsSymbolMap CreateSymbolMap() const; }; // Converts a vector of symbols to a comma-delineated string for sending to @@ -34,10 +34,7 @@ std::string JoinSymbolStrings( std::vector::const_iterator symbols_end); std::string JoinSymbolStrings(const std::string& method_name, const std::vector& symbols); -std::string ToString(const StrMappingInterval& mapping_interval); std::string ToString(const SymbologyResolution& sym_res); -std::ostream& operator<<(std::ostream& stream, - const StrMappingInterval& mapping_interval); std::ostream& operator<<(std::ostream& stream, const SymbologyResolution& sym_res); } // namespace databento diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index a530273..b0d95cd 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,13 +1,13 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.19.1 +pkgver=0.20.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') url="https://github.com/databento/databento-cpp" license=('Apache-2.0') -depends=('cpp-httplib' 'openssl' 'nlohmann-json' 'zstd') +depends=('cpp-httplib' 'chrono-date' 'openssl' 'nlohmann-json' 'zstd') makedepends=('cmake' 'gtest') source=("${_pkgname}::git+${url}.git") sha256sums=('SKIP') @@ -20,13 +20,14 @@ prepare() { cmake -S "${_pkgname}" -B build \ -DCMAKE_BUILD_TYPE=Release \ -DCMAKE_INSTALL_PREFIX='/usr' \ - -DDATABENTO_USE_EXTERNAL_JSON=ON \ + -DDATABENTO_USE_EXTERNAL_DATE=ON \ -DDATABENTO_USE_EXTERNAL_HTTPLIB=ON \ + -DDATABENTO_USE_EXTERNAL_JSON=ON \ -Wno-dev } build() { - cmake --build build + cmake --build build --parallel } check() { diff --git a/src/dbn.cpp b/src/dbn.cpp index 1e7ac26..ad7af28 100644 --- a/src/dbn.cpp +++ b/src/dbn.cpp @@ -4,10 +4,17 @@ #include // ostringstream #include "databento/constants.hpp" +#include "databento/symbol_map.hpp" #include "stream_op_helper.hpp" namespace databento { +PitSymbolMap Metadata::CreateSymbolMapForDate(date::year_month_day date) const { + return PitSymbolMap{*this, date}; +} + +TsSymbolMap Metadata::CreateSymbolMap() const { return TsSymbolMap{*this}; } + void Metadata::Upgrade(VersionUpgradePolicy upgrade_policy) { if (version < kDbnVersion && upgrade_policy == VersionUpgradePolicy::Upgrade) { diff --git a/src/dbn_constants.hpp b/src/dbn_constants.hpp new file mode 100644 index 0000000..9908953 --- /dev/null +++ b/src/dbn_constants.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include +#include +#include + +namespace databento { +constexpr std::size_t kMagicSize = 4; +constexpr std::size_t kMetadataPreludeSize = 8; +constexpr std::uint32_t kZstdMagicNumber = 0xFD2FB528; +constexpr auto kDbnPrefix = "DBN"; +constexpr std::size_t kFixedMetadataLen = 100; +constexpr std::size_t kDatasetCstrLen = 16; +constexpr std::size_t kMetadataReservedLen = 53; +constexpr std::size_t kMetadataReservedLenV1 = 47; +constexpr std::size_t kBufferCapacity = 8UL * 1024; +constexpr std::uint16_t kNullSchema = std::numeric_limits::max(); +constexpr std::uint8_t kNullSType = std::numeric_limits::max(); +constexpr std::uint64_t kNullRecordCount = + std::numeric_limits::max(); +} // namespace databento diff --git a/src/dbn_decoder.cpp b/src/dbn_decoder.cpp index 33e1993..27a2021 100644 --- a/src/dbn_decoder.cpp +++ b/src/dbn_decoder.cpp @@ -1,9 +1,9 @@ #include "databento/dbn_decoder.hpp" +#include + #include // copy -#include -#include // strncmp -#include +#include // strncmp #include #include "databento/compat.hpp" @@ -14,19 +14,11 @@ #include "databento/exceptions.hpp" #include "databento/record.hpp" #include "databento/with_ts_out.hpp" +#include "dbn_constants.hpp" using databento::DbnDecoder; namespace { -constexpr std::size_t kMagicSize = 4; -constexpr std::uint32_t kZstdMagicNumber = 0xFD2FB528; -constexpr auto kDbnPrefix = "DBN"; -constexpr std::size_t kFixedMetadataLen = 100; -constexpr std::size_t kDatasetCstrLen = 16; -constexpr std::size_t kReservedLen = 53; -constexpr std::size_t kReservedLenV1 = 47; -constexpr std::size_t kBufferCapacity = 8UL * 1024; - template T Consume(std::vector::const_iterator& byte_it) { const auto res = *reinterpret_cast(&*byte_it); @@ -59,26 +51,43 @@ std::string Consume(std::vector::const_iterator& byte_it, } return std::string{cstr, static_cast(str_len)}; } + +date::year_month_day DecodeIso8601Date(std::uint32_t yyyymmdd_int) { + const auto year = yyyymmdd_int / 10000; + const auto remaining = yyyymmdd_int % 10000; + const auto month = remaining / 100; + const auto day = remaining % 100; + return {date::year{static_cast(year)}, date::month{month}, + date::day{day}}; +} } // namespace -DbnDecoder::DbnDecoder(detail::SharedChannel channel) - : DbnDecoder(std::unique_ptr{ - new detail::SharedChannel{std::move(channel)}}) {} +DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, + detail::SharedChannel channel) + : DbnDecoder(log_receiver, + std::unique_ptr{ + new detail::SharedChannel{std::move(channel)}}) {} -DbnDecoder::DbnDecoder(detail::FileStream file_stream) - : DbnDecoder(std::unique_ptr{ - new detail::FileStream{std::move(file_stream)}}) {} +DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream) + : DbnDecoder(log_receiver, std::unique_ptr{ + new InFileStream{std::move(file_stream)}}) {} -DbnDecoder::DbnDecoder(std::unique_ptr input) - : DbnDecoder(std::move(input), VersionUpgradePolicy::Upgrade) {} +DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, + std::unique_ptr input) + : DbnDecoder(log_receiver, std::move(input), + VersionUpgradePolicy::Upgrade) {} -DbnDecoder::DbnDecoder(std::unique_ptr input, +DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, + std::unique_ptr input, VersionUpgradePolicy upgrade_policy) - : upgrade_policy_{upgrade_policy}, input_{std::move(input)} { + : log_receiver_{log_receiver}, + upgrade_policy_{upgrade_policy}, + input_{std::move(input)} { read_buffer_.reserve(kBufferCapacity); if (DetectCompression()) { - input_ = std::unique_ptr( - new detail::ZstdStream(std::move(input_), std::move(read_buffer_))); + input_ = + std::unique_ptr(new detail::ZstdDecodeStream( + std::move(input_), std::move(read_buffer_))); // Reinitialize buffer and get it into the same state as uncompressed input read_buffer_ = std::vector(); read_buffer_.reserve(kBufferCapacity); @@ -101,7 +110,7 @@ std::pair DbnDecoder::DecodeMetadataVersionAndSize( } const auto version = buffer[3]; const auto frame_size = *reinterpret_cast(&buffer[4]); - if (frame_size < ::kFixedMetadataLen) { + if (frame_size < kFixedMetadataLen) { throw DbnResponseError{ "Frame length cannot be shorter than the fixed metadata size"}; } @@ -121,7 +130,7 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( auto read_buffer_it = buffer.cbegin(); res.dataset = Consume(read_buffer_it, kDatasetCstrLen, "dataset"); const auto raw_schema = Consume(read_buffer_it); - if (raw_schema == std::numeric_limits::max()) { + if (raw_schema == kNullSchema) { res.has_mixed_schema = true; // must initialize res.schema = Schema::Mbo; @@ -139,7 +148,7 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( read_buffer_it += 8; } const auto raw_stype_in = Consume(read_buffer_it); - if (raw_stype_in == std::numeric_limits::max()) { + if (raw_stype_in == kNullSType) { res.has_mixed_stype_in = true; // must initialize res.stype_in = SType::InstrumentId; @@ -157,9 +166,9 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( } // skip reserved if (version == 1) { - read_buffer_it += ::kReservedLenV1; + read_buffer_it += kMetadataReservedLenV1; } else { - read_buffer_it += ::kReservedLen; + read_buffer_it += kMetadataReservedLen; } const auto schema_definition_length = Consume(read_buffer_it); @@ -181,10 +190,10 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( databento::Metadata DbnDecoder::DecodeMetadata() { // already read first 4 bytes detecting compression - read_buffer_.resize(8); + read_buffer_.resize(kMetadataPreludeSize); input_->ReadExact(&read_buffer_[4], 4); - const auto version_and_size = - DbnDecoder::DecodeMetadataVersionAndSize(read_buffer_.data(), 8); + const auto version_and_size = DbnDecoder::DecodeMetadataVersionAndSize( + read_buffer_.data(), kMetadataPreludeSize); version_ = version_and_size.first; read_buffer_.resize(version_and_size.second); input_->ReadExact(read_buffer_.data(), read_buffer_.size()); @@ -236,18 +245,23 @@ databento::Record DbnDecoder::DecodeRecordCompat( return rec; } -// assumes ParseMetadata has been called +// assumes DecodeMetadata has been called const databento::Record* DbnDecoder::DecodeRecord() { // need some unread unread_bytes - const auto unread_bytes = read_buffer_.size() - buffer_idx_; - if (unread_bytes == 0) { + if (GetReadBufferSize() == 0) { if (FillBuffer() == 0) { return nullptr; } } // check length - while (read_buffer_.size() - buffer_idx_ < BufferRecordHeader()->Size()) { + while (GetReadBufferSize() < BufferRecordHeader()->Size()) { if (FillBuffer() == 0) { + if (GetReadBufferSize() > 0) { + log_receiver_->Receive( + LogLevel::Warning, + "Unexpected partial record remaining in stream: " + + std::to_string(GetReadBufferSize()) + " bytes"); + } return nullptr; } } @@ -271,6 +285,10 @@ size_t DbnDecoder::FillBuffer() { return fill_size; } +std::size_t DbnDecoder::GetReadBufferSize() const { + return read_buffer_.size() - buffer_idx_; +} + databento::RecordHeader* DbnDecoder::BufferRecordHeader() { return reinterpret_cast(&read_buffer_[buffer_idx_]); } @@ -374,8 +392,10 @@ databento::SymbolMapping DbnDecoder::DecodeSymbolMapping( res.intervals.reserve(interval_count); for (std::size_t i = 0; i < interval_count; ++i) { MappingInterval interval; - interval.start_date = Consume(read_buffer_it); - interval.end_date = Consume(read_buffer_it); + auto raw_start_date = Consume(read_buffer_it); + interval.start_date = DecodeIso8601Date(raw_start_date); + auto raw_end_date = Consume(read_buffer_it); + interval.end_date = DecodeIso8601Date(raw_end_date); interval.symbol = DecodeSymbol(symbol_cstr_len, read_buffer_it); res.intervals.emplace_back(std::move(interval)); } diff --git a/src/dbn_encoder.cpp b/src/dbn_encoder.cpp new file mode 100644 index 0000000..e3cd5ca --- /dev/null +++ b/src/dbn_encoder.cpp @@ -0,0 +1,165 @@ +#include "databento/dbn_encoder.hpp" + +#include + +#include +#include +#include // accumulate +#include +#include + +#include "databento/constants.hpp" +#include "databento/dbn.hpp" +#include "databento/exceptions.hpp" +#include "databento/iwritable.hpp" +#include "dbn_constants.hpp" + +using databento::DbnEncoder; + +namespace { +void EncodeChars(const char* bytes, std::size_t length, + databento::IWritable* output) { + output->WriteAll(reinterpret_cast(bytes), length); +} + +void EncodeFixedLenCStr(std::size_t cstr_len, const std::string& str, + databento::IWritable* output) { + // >= to ensure space for null padding + if (str.size() >= cstr_len) { + throw databento::InvalidArgumentError{ + "EncodeFixedLenCStr", "str", + std::string{"String is too long to encode, maximum length of "} + + std::to_string(cstr_len - 1)}; + } + output->WriteAll(reinterpret_cast(str.data()), + str.length()); + // Null padding + std::vector filler(cstr_len - str.length()); + output->WriteAll(filler.data(), filler.size()); +} + +template +void EncodeAsBytes(T bytes, databento::IWritable* output) { + output->WriteAll(reinterpret_cast(&bytes), + sizeof(bytes)); +} + +void EncodeDate(date::year_month_day date, databento::IWritable* output) { + auto date_int = static_cast(std::int32_t{date.year()}) * 10000; + date_int += std::uint32_t{date.month()} * 100; + date_int += std::uint32_t{date.day()}; + EncodeAsBytes(date_int, output); +} + +void EncodeRepeatedSymbolCStr(std::size_t cstr_len, + const std::vector& symbols, + databento::IWritable* output) { + const auto length = static_cast(symbols.size()); + EncodeAsBytes(length, output); + for (const auto& symbol : symbols) { + EncodeFixedLenCStr(cstr_len, symbol, output); + } +} + +void EncodeSymbolMappings( + std::size_t cstr_len, + const std::vector& symbol_mappings, + databento::IWritable* output) { + const auto mappings_length = + static_cast(symbol_mappings.size()); + EncodeAsBytes(mappings_length, output); + for (const auto& symbol_mapping : symbol_mappings) { + EncodeFixedLenCStr(cstr_len, symbol_mapping.raw_symbol, output); + const auto interval_length = + static_cast(symbol_mapping.intervals.size()); + EncodeAsBytes(interval_length, output); + for (const auto& interval : symbol_mapping.intervals) { + EncodeDate(interval.start_date, output); + EncodeDate(interval.end_date, output); + EncodeFixedLenCStr(cstr_len, interval.symbol, output); + } + } +} +} // namespace + +DbnEncoder::DbnEncoder(const Metadata& metadata, IWritable* output) + : output_{output} { + EncodeMetadata(metadata, output_); +} + +void DbnEncoder::EncodeMetadata(const Metadata& metadata, IWritable* output) { + const auto version = std::min( + std::max(1, metadata.version), kDbnVersion); + EncodeChars(kDbnPrefix, kMagicSize - 1, output); + EncodeAsBytes(version, output); + const std::uint32_t length = CalcLength(metadata); + EncodeAsBytes(length, output); + EncodeFixedLenCStr(kDatasetCstrLen, metadata.dataset, output); + if (metadata.has_mixed_schema) { + EncodeAsBytes(kNullSchema, output); + } else { + EncodeAsBytes(metadata.schema, output); + } + EncodeAsBytes(metadata.start, output); + EncodeAsBytes(metadata.end, output); + EncodeAsBytes(metadata.limit, output); + if (version == 1) { + // backwards compatibility for record_count + EncodeAsBytes(kNullRecordCount, output); + } + if (metadata.has_mixed_stype_in) { + EncodeAsBytes(kNullSType, output); + } else { + EncodeAsBytes(metadata.stype_in, output); + } + EncodeAsBytes(metadata.stype_out, output); + EncodeAsBytes(static_cast(metadata.ts_out), output); + if (version > 1) { + const auto symbol_cstr_len = + static_cast(metadata.symbol_cstr_len); + EncodeAsBytes(symbol_cstr_len, output); + } + // padding + schema definition length + auto reserved_length = + version == 1 ? kMetadataReservedLenV1 : kMetadataReservedLen; + const std::vector padding(reserved_length + + sizeof(std::uint32_t)); + output->WriteAll(padding.data(), padding.size()); + + // variable-length data + EncodeRepeatedSymbolCStr(metadata.symbol_cstr_len, metadata.symbols, output); + EncodeRepeatedSymbolCStr(metadata.symbol_cstr_len, metadata.partial, output); + EncodeRepeatedSymbolCStr(metadata.symbol_cstr_len, metadata.not_found, + output); + EncodeSymbolMappings(metadata.symbol_cstr_len, metadata.mappings, output); +} + +void DbnEncoder::EncodeRecord(const Record& record, IWritable* output) { + output->WriteAll(reinterpret_cast(&record.Header()), + record.Size()); +} + +void DbnEncoder::EncodeRecord(const Record& record) { + EncodeRecord(record, output_); +} + +std::uint32_t DbnEncoder::CalcLength(const Metadata& metadata) { + const auto symbol_cstr_len = metadata.symbol_cstr_len; + const auto mapping_interval_len = sizeof(std::uint32_t) * 2 + symbol_cstr_len; + // schema_definition_length, symbols_count, partial_count, not_found_count, + // mappings_count + const auto var_len_counts_size = sizeof(std::uint32_t) * 5; + + const auto c_str_count = metadata.symbols.size() + metadata.partial.size() + + metadata.not_found.size(); + const auto mappings_len = std::accumulate( + metadata.mappings.begin(), metadata.mappings.end(), std::size_t{0}, + [symbol_cstr_len, mapping_interval_len](std::size_t acc, + const SymbolMapping& m) { + return acc + symbol_cstr_len + sizeof(std::uint32_t) + + m.intervals.size() * mapping_interval_len; + }); + return static_cast(kFixedMetadataLen + var_len_counts_size + + c_str_count * symbol_cstr_len + + mappings_len); +} diff --git a/src/dbn_file_store.cpp b/src/dbn_file_store.cpp index e6b9ee5..a166ac3 100644 --- a/src/dbn_file_store.cpp +++ b/src/dbn_file_store.cpp @@ -3,27 +3,30 @@ #include // unique_ptr #include // move -#include "databento/detail/file_stream.hpp" +#include "databento/file_stream.hpp" #include "databento/ireadable.hpp" +#include "databento/record.hpp" using databento::DbnFileStore; DbnFileStore::DbnFileStore(const std::string& file_path) - : parser_{detail::FileStream{file_path}} {} + : decoder_{ILogReceiver::Default(), InFileStream{file_path}} {} -DbnFileStore::DbnFileStore(const std::string& file_path, +DbnFileStore::DbnFileStore(ILogReceiver* log_receiver, + const std::string& file_path, VersionUpgradePolicy upgrade_policy) - : parser_{std::unique_ptr{new detail::FileStream{file_path}}, - upgrade_policy} {} + : decoder_{log_receiver, + std::unique_ptr{new InFileStream{file_path}}, + upgrade_policy} {} void DbnFileStore::Replay(const MetadataCallback& metadata_callback, const RecordCallback& record_callback) { - auto metadata = parser_.DecodeMetadata(); + auto metadata = decoder_.DecodeMetadata(); if (metadata_callback) { metadata_callback(std::move(metadata)); } const databento::Record* record; - while ((record = parser_.DecodeRecord()) != nullptr) { + while ((record = decoder_.DecodeRecord()) != nullptr) { if (record_callback(*record) == KeepGoing::Stop) { break; } @@ -33,3 +36,20 @@ void DbnFileStore::Replay(const MetadataCallback& metadata_callback, void DbnFileStore::Replay(const RecordCallback& record_callback) { Replay({}, record_callback); } + +const databento::Metadata& DbnFileStore::GetMetadata() { + MaybeDecodeMetadata(); + return metadata_; +} + +const databento::Record* DbnFileStore::NextRecord() { + MaybeDecodeMetadata(); + return decoder_.DecodeRecord(); +} + +void DbnFileStore::MaybeDecodeMetadata() { + if (!has_decoded_metadata_) { + metadata_ = decoder_.DecodeMetadata(); + has_decoded_metadata_ = true; + } +} diff --git a/src/detail/file_stream.cpp b/src/detail/file_stream.cpp deleted file mode 100644 index e40fa74..0000000 --- a/src/detail/file_stream.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "databento/detail/file_stream.hpp" - -#include // ios, streamsize -#include - -#include "databento/exceptions.hpp" - -using databento::detail::FileStream; - -FileStream::FileStream(const std::string& file_path) - : stream_{file_path, std::ios::binary} { - if (stream_.fail()) { - throw InvalidArgumentError{"FileStream", "file_path", - "Non-existent or invalid file"}; - } -} - -void FileStream::ReadExact(std::uint8_t* buffer, std::size_t length) { - const auto size = ReadSome(buffer, length); - if (size != length) { - std::ostringstream err_msg; - err_msg << "Unexpected end of file, expected " << length << " bytes, got " - << size; - throw DbnResponseError{err_msg.str()}; - } -} - -std::size_t FileStream::ReadSome(std::uint8_t* buffer, std::size_t max_length) { - stream_.read(reinterpret_cast(buffer), - static_cast(max_length)); - return static_cast(stream_.gcount()); -} diff --git a/src/detail/json_helpers.cpp b/src/detail/json_helpers.cpp index 82a4e96..7521902 100644 --- a/src/detail/json_helpers.cpp +++ b/src/detail/json_helpers.cpp @@ -1,6 +1,7 @@ #include "databento/detail/json_helpers.hpp" #include // accumulate +#include // istringstream namespace databento { namespace detail { @@ -113,5 +114,19 @@ std::vector ParseAt(const std::string& endpoint, return {symbols_json.begin(), symbols_json.end()}; } +template <> +date::year_month_day ParseAt(const std::string& endpoint, + const nlohmann::json& json, + const std::string& key) { + std::string raw_start = detail::CheckedAt(endpoint, json, key); + std::istringstream start_stream{raw_start}; + date::year_month_day start; + start_stream >> date::parse("%F", start); + if (start_stream.fail()) { + throw JsonResponseError::TypeMismatch(endpoint, "YYYY-MM-DD date string", + raw_start); + } + return start; +} } // namespace detail } // namespace databento diff --git a/src/detail/shared_channel.cpp b/src/detail/shared_channel.cpp index 7a01393..d874671 100644 --- a/src/detail/shared_channel.cpp +++ b/src/detail/shared_channel.cpp @@ -4,7 +4,6 @@ #include #include #include // stringstream -#include #include "databento/exceptions.hpp" // DbnResponseError diff --git a/src/detail/zstd_stream.cpp b/src/detail/zstd_stream.cpp index f6df24b..7021d75 100644 --- a/src/detail/zstd_stream.cpp +++ b/src/detail/zstd_stream.cpp @@ -1,24 +1,26 @@ #include "databento/detail/zstd_stream.hpp" +#include #include #include // move #include "databento/exceptions.hpp" +#include "databento/log.hpp" -using databento::detail::ZstdStream; +using databento::detail::ZstdDecodeStream; -ZstdStream::ZstdStream(std::unique_ptr input) - : ZstdStream{std::move(input), {}} {} +ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input) + : ZstdDecodeStream{std::move(input), {}} {} -ZstdStream::ZstdStream(std::unique_ptr input, - std::vector&& in_buffer) +ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input, + std::vector&& in_buffer) : input_{std::move(input)}, z_dstream_{::ZSTD_createDStream(), ::ZSTD_freeDStream}, read_suggestion_{::ZSTD_initDStream(z_dstream_.get())}, in_buffer_{std::move(in_buffer)}, z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} {} -void ZstdStream::ReadExact(std::uint8_t* buffer, std::size_t length) { +void ZstdDecodeStream::ReadExact(std::uint8_t* buffer, std::size_t length) { std::size_t size{}; do { size += ReadSome(&buffer[size], length - size); @@ -32,7 +34,8 @@ void ZstdStream::ReadExact(std::uint8_t* buffer, std::size_t length) { } } -size_t ZstdStream::ReadSome(std::uint8_t* buffer, std::size_t max_length) { +std::size_t ZstdDecodeStream::ReadSome(std::uint8_t* buffer, + std::size_t max_length) { ZSTD_outBuffer z_out_buffer{buffer, max_length, 0}; std::size_t read_size = 0; do { @@ -64,3 +67,73 @@ size_t ZstdStream::ReadSome(std::uint8_t* buffer, std::size_t max_length) { } while (z_out_buffer.pos == 0 && read_size > 0); return z_out_buffer.pos; } + +using databento::detail::ZstdCompressStream; + +ZstdCompressStream::ZstdCompressStream(IWritable* output) + : ZstdCompressStream{ILogReceiver::Default(), output} {} +ZstdCompressStream::ZstdCompressStream(ILogReceiver* log_receiver, + IWritable* output) + : log_receiver_{log_receiver}, + output_{output}, + z_cstream_{::ZSTD_createCStream(), ::ZSTD_freeCStream}, + in_buffer_{}, + z_in_buffer_{in_buffer_.data(), 0, 0}, + in_size_{::ZSTD_CStreamInSize()}, + out_buffer_(::ZSTD_CStreamOutSize()) { + in_buffer_.reserve(in_size_); + z_in_buffer_.src = in_buffer_.data(); + // enable checksums + ::ZSTD_CCtx_setParameter(z_cstream_.get(), ZSTD_c_checksumFlag, 1); +} + +ZstdCompressStream::~ZstdCompressStream() { + ZSTD_outBuffer z_out_buffer{out_buffer_.data(), out_buffer_.size(), 0}; + while (true) { + const std::size_t remaining = ::ZSTD_compressStream2( + z_cstream_.get(), &z_out_buffer, &z_in_buffer_, ::ZSTD_e_end); + if (remaining == 0) { + break; + } + if (::ZSTD_isError(remaining) && log_receiver_) { + log_receiver_->Receive( + LogLevel::Error, + std::string{"Zstd error compressing end of stream: "} + + ::ZSTD_getErrorName(remaining)); + break; + } + } + assert(z_in_buffer_.pos == z_in_buffer_.size); + // Forward compressed output + if (z_out_buffer.pos > 0) { + output_->WriteAll(out_buffer_.data(), z_out_buffer.pos); + } +} + +void ZstdCompressStream::WriteAll(const std::uint8_t* buffer, + std::size_t length) { + in_buffer_.insert(in_buffer_.end(), buffer, buffer + length); + z_in_buffer_ = {in_buffer_.data(), in_buffer_.size(), 0}; + // Wait for sufficient data before compressing + if (in_buffer_.size() >= in_size_) { + ZSTD_outBuffer z_out_buffer{out_buffer_.data(), out_buffer_.size(), 0}; + const std::size_t remaining = ::ZSTD_compressStream2( + z_cstream_.get(), &z_out_buffer, &z_in_buffer_, ::ZSTD_e_continue); + if (::ZSTD_isError(remaining)) { + throw DbnResponseError{std::string{"Zstd error compressing: "} + + ::ZSTD_getErrorName(remaining)}; + } + // Shift unread input to front + const auto unread_input = z_in_buffer_.size - z_in_buffer_.pos; + if (unread_input > 0) { + std::copy( + in_buffer_.cbegin() + static_cast(z_in_buffer_.pos), + in_buffer_.cend(), in_buffer_.begin()); + } + in_buffer_.resize(unread_input); + if (z_out_buffer.pos > 0) { + // Forward compressed output + output_->WriteAll(out_buffer_.data(), z_out_buffer.pos); + } + } +} diff --git a/src/file_stream.cpp b/src/file_stream.cpp new file mode 100644 index 0000000..042ef9a --- /dev/null +++ b/src/file_stream.cpp @@ -0,0 +1,48 @@ +#include "databento/file_stream.hpp" + +#include // ios, streamsize +#include + +#include "databento/exceptions.hpp" + +using databento::InFileStream; + +InFileStream::InFileStream(const std::string& file_path) + : stream_{file_path, std::ios::binary} { + if (stream_.fail()) { + throw InvalidArgumentError{"InFileStream", "file_path", + "Non-existent or invalid file"}; + } +} + +void InFileStream::ReadExact(std::uint8_t* buffer, std::size_t length) { + const auto size = ReadSome(buffer, length); + if (size != length) { + std::ostringstream err_msg; + err_msg << "Unexpected end of file, expected " << length << " bytes, got " + << size; + throw DbnResponseError{err_msg.str()}; + } +} + +std::size_t InFileStream::ReadSome(std::uint8_t* buffer, + std::size_t max_length) { + stream_.read(reinterpret_cast(buffer), + static_cast(max_length)); + return static_cast(stream_.gcount()); +} + +using databento::OutFileStream; + +OutFileStream::OutFileStream(const std::string& file_path) + : stream_{file_path, std::ios::binary} { + if (stream_.fail()) { + throw InvalidArgumentError{"OutFileStream", "file_path", + "Non-existent or invalid file"}; + } +} + +void OutFileStream::WriteAll(const std::uint8_t* buffer, std::size_t length) { + stream_.write(reinterpret_cast(buffer), + static_cast(length)); +} diff --git a/src/flag_set.cpp b/src/flag_set.cpp index b988a62..64dc1f7 100644 --- a/src/flag_set.cpp +++ b/src/flag_set.cpp @@ -5,13 +5,6 @@ #include "stream_op_helper.hpp" namespace databento { -constexpr FlagSet::Repr FlagSet::kLast; -constexpr FlagSet::Repr FlagSet::kTob; -constexpr FlagSet::Repr FlagSet::kSnapshot; -constexpr FlagSet::Repr FlagSet::kMbp; -constexpr FlagSet::Repr FlagSet::kBadTsRecv; -constexpr FlagSet::Repr FlagSet::kMaybeBadBook; - std::ostream& operator<<(std::ostream& stream, FlagSet flag_set) { const std::array, 6> kFlagsAndNames = {{ diff --git a/src/historical.cpp b/src/historical.cpp index cd3b9e2..62bdb58 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -15,6 +15,8 @@ #include // unique_ptr #include #include // move + +#include "databento/file_stream.hpp" #ifdef _WIN32 #include // _mkdir #endif @@ -148,13 +150,15 @@ std::string PathJoin(const std::string& dir, const std::string& path) { Historical::Historical(ILogReceiver* log_receiver, std::string key, HistoricalGateway gateway) - : key_{std::move(key)}, + : log_receiver_{log_receiver}, + key_{std::move(key)}, gateway_{UrlFromGateway(gateway)}, client_{log_receiver, key_, gateway_} {} Historical::Historical(ILogReceiver* log_receiver, std::string key, std::string gateway, std::uint16_t port) - : key_{std::move(key)}, + : log_receiver_{log_receiver}, + key_{std::move(key)}, gateway_{std::move(gateway)}, client_{log_receiver, key_, gateway_, port} {} @@ -344,14 +348,10 @@ std::string Historical::BatchDownload(const std::string& output_dir, void Historical::StreamToFile(const std::string& url_path, const HttplibParams& params, const std::string& file_path) { - std::ofstream out_file{file_path, std::ios::binary}; - if (out_file.fail()) { - throw InvalidArgumentError{"Historical::StreamToFile", "file_path", - "Failed to open file"}; - } + OutFileStream out_file{file_path}; this->client_.GetRawStream( url_path, params, [&out_file](const char* data, std::size_t length) { - out_file.write(data, static_cast(length)); + out_file.WriteAll(reinterpret_cast(data), length); return true; }); } @@ -755,7 +755,7 @@ databento::SymbologyResolution Historical::SymbologyResolve( const auto& mappings_json = detail::CheckedAt(kEndpoint, json, "result"); const auto& partial_json = detail::CheckedAt(kEndpoint, json, "partial"); const auto& not_found_json = detail::CheckedAt(kEndpoint, json, "not_found"); - SymbologyResolution res{}; + SymbologyResolution res{{}, {}, {}, stype_in, stype_out}; if (!mappings_json.is_object()) { throw JsonResponseError::TypeMismatch(kEndpoint, "mappings object", mappings_json); @@ -767,13 +767,15 @@ databento::SymbologyResolution Historical::SymbologyResolve( throw JsonResponseError::TypeMismatch(kEndpoint, "array", mapping.key(), mapping_json); } - std::vector mapping_intervals; + std::vector mapping_intervals; std::transform(mapping_json.begin(), mapping_json.end(), std::back_inserter(mapping_intervals), [](const nlohmann::json& interval_json) { - return StrMappingInterval{ - detail::CheckedAt(kEndpoint, interval_json, "d0"), - detail::CheckedAt(kEndpoint, interval_json, "d1"), + return MappingInterval{ + detail::ParseAt( + kEndpoint, interval_json, "d0"), + detail::ParseAt( + kEndpoint, interval_json, "d1"), detail::CheckedAt(kEndpoint, interval_json, "s"), }; }); @@ -893,7 +895,7 @@ void Historical::TimeseriesGetRange(const HttplibParams& params, } }}; try { - DbnDecoder dbn_decoder{channel}; + DbnDecoder dbn_decoder{log_receiver_, channel}; Metadata metadata = dbn_decoder.DecodeMetadata(); if (metadata_callback) { metadata_callback(std::move(metadata)); @@ -979,7 +981,7 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile( databento::DbnFileStore Historical::TimeseriesGetRangeToFile( const HttplibParams& params, const std::string& file_path) { StreamToFile(kTimeseriesGetRangePath, params, file_path); - return DbnFileStore{file_path}; + return DbnFileStore{log_receiver_, file_path, VersionUpgradePolicy::Upgrade}; } using databento::HistoricalBuilder; diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 60dd237..bfbc624 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -18,6 +18,7 @@ #include "databento/record.hpp" // Record #include "databento/symbology.hpp" // JoinSymbolStrings #include "databento/version.hpp" // DATABENTO_VERSION +#include "dbn_constants.hpp" // kMetadataPreludeSize using databento::LiveBlocking; @@ -118,7 +119,6 @@ void LiveBlocking::Subscribe(const std::string& sub_msg, } databento::Metadata LiveBlocking::Start() { - constexpr auto kMetadataPreludeSize = 8; client_.WriteAll("start_session\n"); client_.ReadExact(read_buffer_.data(), kMetadataPreludeSize); const auto version_and_size = DbnDecoder::DecodeMetadataVersionAndSize( diff --git a/src/record.cpp b/src/record.cpp index 60fddd6..3d41b03 100644 --- a/src/record.cpp +++ b/src/record.cpp @@ -306,6 +306,22 @@ std::ostream& operator<<(std::ostream& stream, const Mbp10Msg& mbp_msg) { static_cast(levels_helper.Finish())) .Finish(); } +std::string ToString(const BboMsg& bbo_msg) { return MakeString(bbo_msg); } +std::ostream& operator<<(std::ostream& stream, const BboMsg& bbo_msg) { + return StreamOpBuilder{stream} + .SetTypeName("BboMsg") + .SetSpacer("\n ") + .Build() + .AddField("hd", bbo_msg.hd) + .AddField("price", FixPx{bbo_msg.price}) + .AddField("size", bbo_msg.size) + .AddField("side", bbo_msg.side) + .AddField("flags", bbo_msg.flags) + .AddField("ts_recv", bbo_msg.ts_recv) + .AddField("sequence", bbo_msg.sequence) + .AddField("levels", std::get<0>(bbo_msg.levels)) + .Finish(); +} std::string ToString(const CbboMsg& cbbo_msg) { return MakeString(cbbo_msg); } std::ostream& operator<<(std::ostream& stream, const CbboMsg& cbbo_msg) { return StreamOpBuilder{stream} diff --git a/src/symbol_map.cpp b/src/symbol_map.cpp index 0014c5c..d1afbbb 100644 --- a/src/symbol_map.cpp +++ b/src/symbol_map.cpp @@ -1,9 +1,115 @@ #include "databento/symbol_map.hpp" -#include "databento/compat.hpp" +#include + +#include +#include + +#include "databento/datetime.hpp" +#include "databento/dbn.hpp" +#include "databento/enums.hpp" +#include "databento/exceptions.hpp" + +using databento::TsSymbolMap; + +namespace { +bool IsInverse(const databento::Metadata& metadata) { + if (!metadata.has_mixed_stype_in) { + if (metadata.stype_in == databento::SType::InstrumentId) { + return true; + } + if (metadata.stype_out == databento::SType::InstrumentId) { + return false; + } + } + throw databento::InvalidArgumentError{ + "SymbolMap", "metadata", + "Can only create symbol maps from metadata where InstrumentId is one " + "of the stypes"}; +} +} // namespace + +TsSymbolMap::TsSymbolMap(const Metadata& metadata) { + if (::IsInverse(metadata)) { + for (const auto& mapping : metadata.mappings) { + const auto iid = + static_cast(std::stoul(mapping.raw_symbol)); + for (const auto& interval : mapping.intervals) { + // Handle old symbology format + if (interval.symbol.empty()) { + continue; + } + Insert(iid, interval.start_date, interval.end_date, + std::make_shared(interval.symbol)); + } + } + } else { + for (const auto& mapping : metadata.mappings) { + const auto symbol = std::make_shared(mapping.raw_symbol); + for (const auto& interval : mapping.intervals) { + // Handle old symbology format + if (interval.symbol.empty()) { + continue; + } + const auto iid = + static_cast(std::stoul(interval.symbol)); + Insert(iid, interval.start_date, interval.end_date, symbol); + } + } + } +} + +void TsSymbolMap::Insert(std::uint32_t instrument_id, + date::year_month_day start_date, + date::year_month_day end_date, + const std::shared_ptr& symbol) { + if (start_date > end_date) { + throw InvalidArgumentError{"TsSymbolMap::Insert", "end_date", + "can't be before start_date"}; + } + if (start_date == end_date) { + // Ignore + return; + } + for (date::sys_days day = start_date; day < end_date; day += date::days{1}) { + map_.emplace(std::make_pair(date::year_month_day{day}, instrument_id), + symbol); + } +} using databento::PitSymbolMap; +PitSymbolMap::PitSymbolMap(const Metadata& metadata, + date::year_month_day date) { + if (date::sys_days{date} < date::floor(metadata.start) || + // need to compare with `end` as datetime to handle midnight case + UnixNanos{date::sys_days{date}} >= metadata.end) { + throw InvalidArgumentError{"PitSymbolMap::PitSymbolMap", "date", + "Outside query range"}; + } + const auto is_inverse = IsInverse(metadata); + for (const auto& mapping : metadata.mappings) { + const auto interval_it = std::find_if( + mapping.intervals.begin(), mapping.intervals.end(), + [date](const MappingInterval& interval) { + return date >= interval.start_date && date < interval.end_date; + }); + // Empty symbols in old symbology format + if (interval_it == mapping.intervals.end() || interval_it->symbol.empty()) { + continue; + } + if (is_inverse) { + const auto iid = + static_cast(std::stoul(mapping.raw_symbol)); + map_.emplace(iid, interval_it->symbol); + } else { + const auto iid = + static_cast(std::stoul(interval_it->symbol)); + map_.emplace(iid, mapping.raw_symbol); + } + } +} + template void PitSymbolMap::OnSymbolMapping(const SymbolMappingRec& symbol_mapping) { const auto it = map_.find(symbol_mapping.hd.instrument_id); diff --git a/src/symbology.cpp b/src/symbology.cpp index 98a7150..2defa99 100644 --- a/src/symbology.cpp +++ b/src/symbology.cpp @@ -1,12 +1,38 @@ #include "databento/symbology.hpp" +#include +#include #include // accumulate #include +#include #include "databento/exceptions.hpp" // InvalidArgumentError #include "stream_op_helper.hpp" // StreamOpBuilder namespace databento { +TsSymbolMap SymbologyResolution::CreateSymbolMap() const { + TsSymbolMap res; + if (stype_in == SType::InstrumentId) { + for (const auto& mapping : mappings) { + const auto iid = static_cast(std::stoul(mapping.first)); + for (const auto& interval : mapping.second) { + res.Insert(iid, interval.start_date, interval.end_date, + std::make_shared(interval.symbol)); + } + } + } else { + for (const auto& mapping : mappings) { + auto symbol = std::make_shared(mapping.first); + for (const auto& interval : mapping.second) { + const auto iid = + static_cast(std::stoul(interval.symbol)); + res.Insert(iid, interval.start_date, interval.end_date, symbol); + } + } + } + return res; +} + std::string JoinSymbolStrings( const std::string& method_name, std::vector::const_iterator symbols_begin, @@ -29,26 +55,10 @@ std::string JoinSymbolStrings(const std::string& method_name, return JoinSymbolStrings(method_name, symbols.begin(), symbols.end()); } -std::string ToString(const StrMappingInterval& mapping_interval) { - return MakeString(mapping_interval); -} - std::string ToString(const SymbologyResolution& sym_res) { return MakeString(sym_res); } -std::ostream& operator<<(std::ostream& stream, - const StrMappingInterval& mapping_interval) { - return StreamOpBuilder{stream} - .SetSpacer(" ") - .SetTypeName("StrMappingInterval") - .Build() - .AddField("start_date", mapping_interval.start_date) - .AddField("end_date", mapping_interval.end_date) - .AddField("symbol", mapping_interval.symbol) - .Finish(); -} - std::ostream& operator<<(std::ostream& stream, const SymbologyResolution& sym_res) { auto stream_helper = StreamOpBuilder{stream} @@ -92,9 +102,11 @@ std::ostream& operator<<(std::ostream& stream, for (const auto& symbol : sym_res.not_found) { not_found_helper.AddItem(symbol); } - stream_helper.AddField( - "not_found", static_cast(not_found_helper.Finish())); - - return stream_helper.Finish(); + return stream_helper + .AddField("not_found", + static_cast(not_found_helper.Finish())) + .AddField("stype_in", sym_res.stype_in) + .AddField("stype_out", sym_res.stype_out) + .Finish(); } } // namespace databento diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0fa3442..babdb4f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,6 +18,7 @@ verbose_message("Adding tests under ${CMAKE_PROJECT_NAME}Tests...") set( test_headers include/mock/mock_http_server.hpp + include/mock/mock_io.hpp include/mock/mock_lsg_server.hpp include/mock/mock_tcp_server.hpp include/temp_file.hpp @@ -28,6 +29,7 @@ set( src/batch_tests.cpp src/datetime_tests.cpp src/dbn_decoder_tests.cpp + src/dbn_encoder_tests.cpp src/dbn_tests.cpp src/file_stream_tests.cpp src/flag_set_tests.cpp @@ -39,6 +41,7 @@ set( src/log_tests.cpp src/metadata_tests.cpp src/mock_http_server.cpp + src/mock_io.cpp src/mock_lsg_server.cpp src/mock_tcp_server.cpp src/record_tests.cpp diff --git a/test/include/mock/mock_io.hpp b/test/include/mock/mock_io.hpp new file mode 100644 index 0000000..89c006f --- /dev/null +++ b/test/include/mock/mock_io.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include + +#include "databento/ireadable.hpp" +#include "databento/iwritable.hpp" + +namespace databento { +namespace test { +namespace mock { +class MockIo : public databento::IWritable, public databento::IReadable { + public: + void WriteAll(const std::uint8_t* buffer, std::size_t length); + + void ReadExact(std::uint8_t* buffer, std::size_t length); + + std::size_t ReadSome(std::uint8_t* buffer, std::size_t max_length); + + const std::vector& GetContents() const { return contents_; } + + private: + std::vector contents_; + std::ptrdiff_t read_idx_{0}; +}; +} // namespace mock +} // namespace test +} // namespace databento diff --git a/test/include/mock/mock_lsg_server.hpp b/test/include/mock/mock_lsg_server.hpp index c31b450..ab51451 100644 --- a/test/include/mock/mock_lsg_server.hpp +++ b/test/include/mock/mock_lsg_server.hpp @@ -1,6 +1,7 @@ #pragma once #include // EXPECT_EQ + #ifdef _WIN32 #include // SSIZE_T #include // send @@ -20,11 +21,24 @@ using ssize_t = SSIZE_T; #include "databento/detail/scoped_fd.hpp" // ScopedFd #include "databento/detail/scoped_thread.hpp" // ScopedThread #include "databento/enums.hpp" // Schema, SType -#include "databento/record.hpp" // RecordHeader +#include "databento/iwritable.hpp" +#include "databento/record.hpp" // RecordHeader namespace databento { namespace test { namespace mock { +class SocketStream : public databento::IWritable { + public: + explicit SocketStream(detail::Socket socket) : socket_{socket} {} + + void WriteAll(const std::uint8_t* buffer, std::size_t length) override; + ::ssize_t LastWriteSize() const { return last_write_size_; } + + private: + detail::Socket socket_; + ::ssize_t last_write_size_{}; +}; + class MockLsgServer { public: MockLsgServer(std::string dataset, bool ts_out, @@ -47,7 +61,7 @@ class MockLsgServer { std::size_t Send(const std::string& msg); ::ssize_t UncheckedSend(const std::string& msg); template - void SendRecord(Rec rec) { + void SendRecord(const Rec& rec) { const std::string rec_str{reinterpret_cast(&rec), sizeof(rec)}; Send(rec_str); } diff --git a/test/src/dbn_decoder_tests.cpp b/test/src/dbn_decoder_tests.cpp index 63fb178..13d90ff 100644 --- a/test/src/dbn_decoder_tests.cpp +++ b/test/src/dbn_decoder_tests.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -8,20 +9,27 @@ #include // streamsize, ios::binary, ios::ate #include #include +#include +#include #include "databento/compat.hpp" #include "databento/constants.hpp" #include "databento/datetime.hpp" #include "databento/dbn.hpp" #include "databento/dbn_decoder.hpp" -#include "databento/detail/file_stream.hpp" +#include "databento/dbn_encoder.hpp" #include "databento/detail/scoped_thread.hpp" #include "databento/detail/shared_channel.hpp" +#include "databento/detail/zstd_stream.hpp" #include "databento/enums.hpp" #include "databento/exceptions.hpp" +#include "databento/file_stream.hpp" #include "databento/ireadable.hpp" +#include "databento/iwritable.hpp" +#include "databento/log.hpp" #include "databento/record.hpp" #include "databento/with_ts_out.hpp" +#include "mock/mock_io.hpp" namespace databento { namespace test { @@ -31,6 +39,7 @@ class DbnDecoderTests : public testing::Test { std::unique_ptr file_target_; std::unique_ptr channel_target_; detail::ScopedThread write_thread_; + std::unique_ptr logger_{new NullLogReceiver}; void ReadFromFile(const std::string& schema_str, const std::string& extension, std::uint8_t version) { @@ -56,11 +65,12 @@ class DbnDecoderTests : public testing::Test { channel_.Finish(); }}; channel_target_.reset(new DbnDecoder{ + logger_.get(), std::unique_ptr{new detail::SharedChannel{channel_}}, upgrade_policy}); // File setup file_target_.reset(new DbnDecoder{ - std::unique_ptr{new detail::FileStream{file_path}}, + logger_.get(), std::unique_ptr{new InFileStream{file_path}}, upgrade_policy}); } @@ -71,8 +81,8 @@ class DbnDecoderTests : public testing::Test { ASSERT_EQ(mapping.intervals.size(), 1); const auto& interval = mapping.intervals.at(0); EXPECT_EQ(interval.symbol, "5482"); - EXPECT_EQ(interval.start_date, 20201228); - EXPECT_EQ(interval.end_date, 20201229); + EXPECT_EQ(interval.start_date, date::year{2020} / 12 / 28); + EXPECT_EQ(interval.end_date, date::year{2020} / 12 / 29); } }; @@ -126,8 +136,8 @@ TEST_F(DbnDecoderTests, TestDecodeDefinitionUpgrade) { ASSERT_EQ(mapping.intervals.size(), 62); const auto& interval = mapping.intervals.at(0); EXPECT_EQ(interval.symbol, "6819"); - EXPECT_EQ(interval.start_date, 20211004); - EXPECT_EQ(interval.end_date, 20211005); + EXPECT_EQ(interval.start_date, date::year{2021} / 10 / 4); + EXPECT_EQ(interval.end_date, date::year{2021} / 10 / 5); const auto ch_record1 = channel_target_->DecodeRecord(); const auto f_record1 = file_target_->DecodeRecord(); @@ -931,8 +941,8 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) { ASSERT_EQ(mapping.intervals.size(), 62); const auto& interval = mapping.intervals.at(0); EXPECT_EQ(interval.symbol, "6819"); - EXPECT_EQ(interval.start_date, 20211004); - EXPECT_EQ(interval.end_date, 20211005); + EXPECT_EQ(interval.start_date, date::year{2021} / 10 / 4); + EXPECT_EQ(interval.end_date, date::year{2021} / 10 / 5); const auto ch_record1 = channel_target_->DecodeRecord(); const auto f_record1 = file_target_->DecodeRecord(); @@ -1041,5 +1051,145 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeStatistics) { EXPECT_EQ(ch_stat2.stat_type, StatType::TradingSessionHighPrice); EXPECT_EQ(ch_stat2.price, 100 * kFixedPriceScale); } + +class DbnIdentityTests : public testing::TestWithParam< + std::tuple> { + protected: + std::unique_ptr logger_{new NullLogReceiver}; +}; + +INSTANTIATE_TEST_SUITE_P( + TestFiles, DbnIdentityTests, + testing::Values(std::make_tuple(1, Schema::Mbo, Compression::None), + std::make_tuple(1, Schema::Trades, Compression::None), + std::make_tuple(1, Schema::Mbp1, Compression::None), + std::make_tuple(1, Schema::Tbbo, Compression::None), + std::make_tuple(1, Schema::Mbp10, Compression::None), + std::make_tuple(1, Schema::Ohlcv1D, Compression::None), + std::make_tuple(1, Schema::Ohlcv1H, Compression::None), + std::make_tuple(1, Schema::Ohlcv1M, Compression::None), + std::make_tuple(1, Schema::Ohlcv1S, Compression::None), + std::make_tuple(1, Schema::Definition, Compression::None), + std::make_tuple(1, Schema::Imbalance, Compression::None), + std::make_tuple(1, Schema::Statistics, Compression::None), + std::make_tuple(1, Schema::Mbo, Compression::Zstd), + std::make_tuple(1, Schema::Trades, Compression::Zstd), + std::make_tuple(1, Schema::Mbp1, Compression::Zstd), + std::make_tuple(1, Schema::Tbbo, Compression::Zstd), + std::make_tuple(1, Schema::Mbp10, Compression::Zstd), + std::make_tuple(1, Schema::Ohlcv1D, Compression::Zstd), + std::make_tuple(1, Schema::Ohlcv1H, Compression::Zstd), + std::make_tuple(1, Schema::Ohlcv1M, Compression::Zstd), + std::make_tuple(1, Schema::Ohlcv1S, Compression::Zstd), + std::make_tuple(1, Schema::Definition, Compression::Zstd), + std::make_tuple(1, Schema::Imbalance, Compression::Zstd), + std::make_tuple(1, Schema::Statistics, Compression::Zstd), + std::make_tuple(2, Schema::Mbo, Compression::None), + std::make_tuple(2, Schema::Trades, Compression::None), + std::make_tuple(2, Schema::Tbbo, Compression::None), + std::make_tuple(2, Schema::Mbp1, Compression::None), + std::make_tuple(2, Schema::Mbp10, Compression::None), + std::make_tuple(2, Schema::Ohlcv1D, Compression::None), + std::make_tuple(2, Schema::Ohlcv1H, Compression::None), + std::make_tuple(2, Schema::Ohlcv1M, Compression::None), + std::make_tuple(2, Schema::Ohlcv1S, Compression::None), + std::make_tuple(2, Schema::Definition, Compression::None), + std::make_tuple(2, Schema::Imbalance, Compression::None), + std::make_tuple(2, Schema::Statistics, Compression::None), + std::make_tuple(2, Schema::Mbo, Compression::Zstd), + std::make_tuple(2, Schema::Trades, Compression::Zstd), + std::make_tuple(2, Schema::Tbbo, Compression::Zstd), + std::make_tuple(2, Schema::Mbp1, Compression::Zstd), + std::make_tuple(2, Schema::Mbp10, Compression::Zstd), + std::make_tuple(2, Schema::Ohlcv1D, Compression::Zstd), + std::make_tuple(2, Schema::Ohlcv1H, Compression::Zstd), + std::make_tuple(2, Schema::Ohlcv1M, Compression::Zstd), + std::make_tuple(2, Schema::Ohlcv1S, Compression::Zstd), + std::make_tuple(2, Schema::Definition, Compression::Zstd), + std::make_tuple(2, Schema::Imbalance, Compression::Zstd), + std::make_tuple(2, Schema::Statistics, Compression::Zstd)), + [](const testing::TestParamInfo< + std::tuple>& test_info) { + const auto version = std::get<0>(test_info.param); + const auto schema = std::get<1>(test_info.param); + const auto compression = std::get<2>(test_info.param); + std::string schema_str = ToString(schema); + for (auto& c : schema_str) { + if (c == '-') { + c = '_'; + } + } + return schema_str + "_" + ToString(compression) + "_DBNv" + + std::to_string(version); + }); + +TEST_P(DbnIdentityTests, TestIdentity) { + const auto version = std::get<0>(GetParam()); + const auto schema = std::get<1>(GetParam()); + const auto compression = std::get<2>(GetParam()); + const auto file_name = + std::string{TEST_BUILD_DIR "/data/test_data."} + ToString(schema) + + (version == 1 ? ".v1" : "") + + (compression == Compression::Zstd ? ".dbn.zst" : ".dbn"); + DbnDecoder file_decoder{ + logger_.get(), std::unique_ptr{new InFileStream{file_name}}, + VersionUpgradePolicy::AsIs}; + const Metadata file_metadata = file_decoder.DecodeMetadata(); + + mock::MockIo buf_io; + { + std::unique_ptr zstd_io; + if (compression == Compression::Zstd) { + zstd_io.reset(new detail::ZstdCompressStream{&buf_io}); + } + DbnEncoder encoder{ + file_metadata, + zstd_io ? static_cast(zstd_io.get()) : &buf_io}; + while (auto* record = file_decoder.DecodeRecord()) { + encoder.EncodeRecord(*record); + } + // Free zstd_io and flush + } + + file_decoder = {logger_.get(), + std::unique_ptr{new InFileStream{file_name}}, + VersionUpgradePolicy::AsIs}; + file_decoder.DecodeMetadata(); + + std::unique_ptr input{new mock::MockIo{std::move(buf_io)}}; + DbnDecoder buf_decoder{logger_.get(), std::move(input), + VersionUpgradePolicy::AsIs}; + const auto buf_metadata = buf_decoder.DecodeMetadata(); + EXPECT_EQ(file_metadata, buf_metadata); + while (auto* buf_record = buf_decoder.DecodeRecord()) { + auto* file_record = file_decoder.DecodeRecord(); + ASSERT_NE(file_record, nullptr); + if (auto* mbo = buf_record->GetIf()) { + EXPECT_EQ(*mbo, file_record->Get()); + } else if (auto* trade = buf_record->GetIf()) { + EXPECT_EQ(*trade, file_record->Get()); + } else if (auto* mbp1 = buf_record->GetIf()) { + EXPECT_EQ(*mbp1, file_record->Get()); + } else if (auto* mbp10 = buf_record->GetIf()) { + EXPECT_EQ(*mbp10, file_record->Get()); + } else if (auto* ohlcv = buf_record->GetIf()) { + EXPECT_EQ(*ohlcv, file_record->Get()); + } else if (auto* trade = buf_record->GetIf()) { + EXPECT_EQ(*trade, file_record->Get()); + } else if (auto* imbalance = buf_record->GetIf()) { + EXPECT_EQ(*imbalance, file_record->Get()); + } else if (auto* def = buf_record->GetIf()) { + EXPECT_EQ(*def, file_record->Get()); + } else if (auto* stats = buf_record->GetIf()) { + EXPECT_EQ(*stats, file_record->Get()); + } else { + FAIL() << "Unexpected rtype " + << static_cast(file_record->Header().rtype); + } + } + ASSERT_EQ(file_decoder.DecodeRecord(), nullptr); +} + +TEST_F(DbnDecoderTests, TestDbnIdentityWithTsOut) {} } // namespace test } // namespace databento diff --git a/test/src/dbn_encoder_tests.cpp b/test/src/dbn_encoder_tests.cpp new file mode 100644 index 0000000..eb024c6 --- /dev/null +++ b/test/src/dbn_encoder_tests.cpp @@ -0,0 +1,48 @@ +#include +#include + +#include +#include + +#include "databento/constants.hpp" +#include "databento/datetime.hpp" +#include "databento/dbn.hpp" +#include "databento/dbn_decoder.hpp" +#include "databento/dbn_encoder.hpp" +#include "databento/log.hpp" +#include "mock/mock_io.hpp" + +namespace databento { +namespace test { +TEST(DbnEncoderTests, TestEncodeDecodeMetadataIdentity) { + std::unique_ptr logger{new NullLogReceiver}; + const Metadata metadata{ + kDbnVersion, + dataset::kGlbxMdp3, + false, + Schema::Mbp10, + UnixNanos{std::chrono::nanoseconds{1657230820000000000}}, + UnixNanos{std::chrono::nanoseconds{1658960170000000000}}, + 0, + false, + SType::RawSymbol, + SType::InstrumentId, + true, + kSymbolCstrLen, + {"ES", "NG"}, + {"ESM2"}, + {"QQQQQ"}, + {{"ES.0", + {{date::year{2022} / 7 / 26, date::year{2022} / 9 / 1, "ESU2"}}}, + {"NG.0", + {{date::year{2022} / 7 / 26, date::year{2022} / 8 / 29, "NGU2"}, + {date::year{2022} / 8 / 29, date::year{2022} / 9 / 1, "NGV2"}}}}}; + mock::MockIo io{}; + DbnEncoder::EncodeMetadata(metadata, &io); + DbnDecoder decoder{logger.get(), std::unique_ptr( + new mock::MockIo{std::move(io)})}; + const auto res = decoder.DecodeMetadata(); + ASSERT_EQ(res, metadata); +} +} // namespace test +} // namespace databento diff --git a/test/src/dbn_tests.cpp b/test/src/dbn_tests.cpp index 8e52150..e05dcb6 100644 --- a/test/src/dbn_tests.cpp +++ b/test/src/dbn_tests.cpp @@ -9,23 +9,24 @@ namespace databento { namespace test { TEST(DbnTests, TestMetadataToString) { - const Metadata target{kDbnVersion, - dataset::kGlbxMdp3, - false, - Schema::Ohlcv1D, - UnixNanos{std::chrono::seconds{1696959347}}, - UnixNanos{std::chrono::seconds{1696950000}}, - {}, - false, - SType::RawSymbol, - SType::InstrumentId, - false, - kSymbolCstrLen, - {"NGG3", "NGQ4"}, - {"ng"}, - {"nf"}, - {{"NGG3", {{20220601, 20220701, "3"}}}, - {"NGQ4", {{20220601, 20220701, "4"}}}}}; + const Metadata target{ + kDbnVersion, + dataset::kGlbxMdp3, + false, + Schema::Ohlcv1D, + UnixNanos{std::chrono::seconds{1696959347}}, + UnixNanos{std::chrono::seconds{1696950000}}, + {}, + false, + SType::RawSymbol, + SType::InstrumentId, + false, + kSymbolCstrLen, + {"NGG3", "NGQ4"}, + {"ng"}, + {"nf"}, + {{"NGG3", {{date::year{2022} / 6 / 1, date::year{2022} / 7 / 1, "3"}}}, + {"NGQ4", {{date::year{2022} / 6 / 1, date::year{2022} / 7 / 1, "4"}}}}}; const auto res = ToString(target); ASSERT_EQ(res, R"(Metadata { version = 2, @@ -44,8 +45,8 @@ TEST(DbnTests, TestMetadataToString) { partial = { "ng" }, not_found = { "nf" }, mappings = { - SymbolMapping { raw_symbol = "NGG3", intervals = { MappingInterval { start_date = 20220601, end_date = 20220701, symbol = "3" } } }, - SymbolMapping { raw_symbol = "NGQ4", intervals = { MappingInterval { start_date = 20220601, end_date = 20220701, symbol = "4" } } } + SymbolMapping { raw_symbol = "NGG3", intervals = { MappingInterval { start_date = 2022-06-01, end_date = 2022-07-01, symbol = "3" } } }, + SymbolMapping { raw_symbol = "NGQ4", intervals = { MappingInterval { start_date = 2022-06-01, end_date = 2022-07-01, symbol = "4" } } } } })"); } diff --git a/test/src/file_stream_tests.cpp b/test/src/file_stream_tests.cpp index 224de3e..a720706 100644 --- a/test/src/file_stream_tests.cpp +++ b/test/src/file_stream_tests.cpp @@ -1,15 +1,17 @@ #include -#include "databento/detail/file_stream.hpp" +#include + #include "databento/exceptions.hpp" +#include "databento/file_stream.hpp" +#include "temp_file.hpp" namespace databento { -namespace detail { namespace test { -TEST(FileStreamTests, TestReadExactInsufficient) { +TEST(InFileStreamTests, TestReadExactInsufficient) { const std::string file_path = TEST_BUILD_DIR "/data/test_data.ohlcv-1d.v1.dbn"; - databento::detail::FileStream target{file_path}; + InFileStream target{file_path}; std::vector buffer(1024); // File is less than 1KiB try { target.ReadExact(buffer.data(), buffer.size()); @@ -20,16 +22,30 @@ TEST(FileStreamTests, TestReadExactInsufficient) { } } -TEST(FileStreamTests, TestReadSomeLessThanMax) { +TEST(InFileStreamTests, TestReadSomeLessThanMax) { const std::string file_path = TEST_BUILD_DIR "/data/test_data.ohlcv-1d.v1.dbn"; - databento::detail::FileStream target{file_path}; + InFileStream target{file_path}; std::vector buffer(1024); // File is less than 1KiB const auto read_size = target.ReadSome(buffer.data(), buffer.size()); ASSERT_GT(read_size, 0); ASSERT_TRUE(std::any_of(buffer.cbegin(), buffer.cend(), [](std::uint8_t byte) { return byte != 0; })); } + +TEST(OutFileStreamTests, TestWriteAllCanBeRead) { + constexpr auto data = "abcdefgh"; + TempFile temp_file{"out"}; + ASSERT_FALSE(temp_file.Exists()); + { + OutFileStream target{temp_file.Path()}; + target.WriteAll(reinterpret_cast(data), 8); + } + ASSERT_TRUE(temp_file.Exists()); + InFileStream input{temp_file.Path()}; + std::vector buf(9); + input.ReadExact(reinterpret_cast(buf.data()), 8); + ASSERT_STREQ(buf.data(), data); +} } // namespace test -} // namespace detail } // namespace databento diff --git a/test/src/historical_tests.cpp b/test/src/historical_tests.cpp index da7e438..acbd2b6 100644 --- a/test/src/historical_tests.cpp +++ b/test/src/historical_tests.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -602,8 +603,8 @@ TEST_F(HistoricalTests, TestSymbologyResolve) { const auto& esm2_mappings = res.mappings.at("ESM2"); ASSERT_EQ(esm2_mappings.size(), 1); const auto& esm2_mapping = esm2_mappings.at(0); - EXPECT_EQ(esm2_mapping.start_date, "2022-06-06"); - EXPECT_EQ(esm2_mapping.end_date, "2022-06-10"); + EXPECT_EQ(esm2_mapping.start_date, date::year{2022} / 6 / 6); + EXPECT_EQ(esm2_mapping.end_date, date::year{2022} / 6 / 10); EXPECT_EQ(esm2_mapping.symbol, "3403"); } diff --git a/test/src/mock_io.cpp b/test/src/mock_io.cpp new file mode 100644 index 0000000..75cfde4 --- /dev/null +++ b/test/src/mock_io.cpp @@ -0,0 +1,33 @@ +#include "mock/mock_io.hpp" + +#include +#include +#include + +using databento::test::mock::MockIo; + +void MockIo::WriteAll(const std::uint8_t* buffer, std::size_t length) { + contents_.insert(contents_.end(), buffer, buffer + length); +} + +void MockIo::ReadExact(std::uint8_t* buffer, std::size_t length) { + const auto remaining_bytes = contents_.size() - read_idx_; + if (remaining_bytes < length) { + throw std::runtime_error{"Not enough bytes remaining: expected " + + std::to_string(length) + " got " + + std::to_string(remaining_bytes)}; + } + auto s_length = static_cast(length); + std::copy(contents_.cbegin() + read_idx_, + contents_.cbegin() + read_idx_ + s_length, buffer); + read_idx_ += s_length; +} + +std::size_t MockIo::ReadSome(std::uint8_t* buffer, std::size_t max_length) { + auto read_size = static_cast( + std::min(max_length, contents_.size() - read_idx_)); + std::copy(contents_.cbegin() + read_idx_, + contents_.cbegin() + read_idx_ + read_size, buffer); + read_idx_ += read_size; + return read_size; +} diff --git a/test/src/mock_lsg_server.cpp b/test/src/mock_lsg_server.cpp index 3803e3f..779cd6a 100644 --- a/test/src/mock_lsg_server.cpp +++ b/test/src/mock_lsg_server.cpp @@ -8,14 +8,30 @@ #endif #include // SHA256_DIGEST_LENGTH +#include #include -#include +#include "databento/compat.hpp" +#include "databento/constants.hpp" +#include "databento/datetime.hpp" +#include "databento/dbn_encoder.hpp" #include "databento/enums.hpp" #include "databento/exceptions.hpp" #include "databento/symbology.hpp" // JoinSymbolStrings #include "mock/mock_tcp_server.hpp" // InitSocket +using databento::test::mock::SocketStream; + +void SocketStream::WriteAll(const std::uint8_t* buffer, std::size_t length) { +// MSG_NOSIGNAL doesn't exist on Windows, but also isn't necessary +#ifdef _WIN32 + constexpr int MSG_NOSIGNAL = {}; +#endif + // Don't send a SIGPIPE if the connection is closed + last_write_size_ = ::send(socket_, reinterpret_cast(buffer), + length, MSG_NOSIGNAL); +} + using databento::test::mock::MockLsgServer; MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, @@ -146,33 +162,17 @@ void MockLsgServer::Subscribe(const std::vector& symbols, void MockLsgServer::Start() { const auto received = Receive(); EXPECT_EQ(received, "start_session\n"); - Send("DBN\1"); - // frame length: fixed size + length of schema definition, symbols, partial, - // not_found, and mappings - constexpr std::uint32_t kFrameLen = 100 + sizeof(std::uint32_t) * 5; - SendBytes(kFrameLen); - std::size_t bytes_written{}; - // dataset - bytes_written += Send(dataset_); - bytes_written += Send(std::string(16 - dataset_.length(), '\0')); - // mixed schema - bytes_written += SendBytes(std::numeric_limits::max()); - // start - bytes_written += SendBytes(std::uint64_t{0}); - // end - bytes_written += SendBytes(std::numeric_limits::max()); - // limit - bytes_written += SendBytes(std::uint64_t{0}); - // record_count - bytes_written += SendBytes(std::numeric_limits::max()); - // stype_in - bytes_written += SendBytes(SType::RawSymbol); - // stype_out - bytes_written += SendBytes(SType::InstrumentId); - // padding - bytes_written += Send(std::string(48 + sizeof(std::uint32_t) * 5, '\0')); - - ASSERT_EQ(bytes_written, kFrameLen); + + SocketStream writable{conn_fd_.Get()}; + Metadata metadata{1, dataset_, + true, {}, + {}, UnixNanos{std::chrono::nanoseconds{kUndefTimestamp}}, + 0, true, + {}, SType::InstrumentId, + false, kSymbolCstrLenV1, + {}, {}, + {}, {}}; + DbnEncoder::EncodeMetadata(metadata, &writable); } void MockLsgServer::Close() { conn_fd_.Close(); } diff --git a/test/src/symbol_map_tests.cpp b/test/src/symbol_map_tests.cpp index 5e3297e..eae5006 100644 --- a/test/src/symbol_map_tests.cpp +++ b/test/src/symbol_map_tests.cpp @@ -1,29 +1,257 @@ +#include #include +#include #include #include #include #include #include "databento/compat.hpp" +#include "databento/constants.hpp" +#include "databento/datetime.hpp" +#include "databento/dbn.hpp" +#include "databento/exceptions.hpp" +#include "databento/publishers.hpp" #include "databento/record.hpp" #include "databento/symbol_map.hpp" namespace databento { namespace test { +Metadata GenMetadata() { + Metadata metadata{ + kDbnVersion, + ToString(Dataset::XnasItch), + false, + Schema::Trades, + {date::sys_days{date::year{2023} / 7 / 1}}, + {date::sys_days{date::year{2023} / 8 / 1}}, + {}, + false, + SType::RawSymbol, + SType::InstrumentId, + false, + kSymbolCstrLen, + {}, + {}, + {}, + {{"AAPL", {{date::year{2023} / 7 / 1, date::year{2023} / 8 / 1, "32"}}}, + {"TSLA", + {{date::year{2023} / 7 / 1, date::year{2023} / 7 / 3, "10221"}, + {date::year{2023} / 7 / 3, date::year{2023} / 7 / 5, "10213"}, + {date::year{2023} / 7 / 5, date::year{2023} / 7 / 6, "10209"}, + {date::year{2023} / 7 / 6, date::year{2023} / 7 / 7, "10206"}, + {date::year{2023} / 7 / 7, date::year{2023} / 7 / 10, "10201"}, + {date::year{2023} / 7 / 10, date::year{2023} / 7 / 11, "10193"}, + {date::year{2023} / 7 / 11, date::year{2023} / 7 / 12, "10192"}, + {date::year{2023} / 7 / 12, date::year{2023} / 7 / 13, "10189"}, + {date::year{2023} / 7 / 13, date::year{2023} / 7 / 14, "10191"}, + {date::year{2023} / 7 / 14, date::year{2023} / 7 / 17, "10188"}, + {date::year{2023} / 7 / 17, date::year{2023} / 7 / 20, "10186"}, + {date::year{2023} / 7 / 20, date::year{2023} / 7 / 21, "10184"}, + {date::year{2023} / 7 / 21, date::year{2023} / 7 / 24, "10181"}, + {date::year{2023} / 7 / 24, date::year{2023} / 7 / 25, "10174"}, + {date::year{2023} / 7 / 25, date::year{2023} / 7 / 26, "10172"}, + {date::year{2023} / 7 / 26, date::year{2023} / 7 / 27, "10169"}, + {date::year{2023} / 7 / 27, date::year{2023} / 7 / 28, "10168"}, + {date::year{2023} / 7 / 28, date::year{2023} / 7 / 31, "10164"}, + {date::year{2023} / 7 / 31, date::year{2023} / 8 / 1, "10163"}}}, + {"MSFT", + { + {date::year{2023} / 7 / 1, date::year{2023} / 7 / 3, "6854"}, + {date::year{2023} / 7 / 3, date::year{2023} / 7 / 5, "6849"}, + {date::year{2023} / 7 / 5, date::year{2023} / 7 / 6, "6846"}, + {date::year{2023} / 7 / 6, date::year{2023} / 7 / 7, "6843"}, + {date::year{2023} / 7 / 7, date::year{2023} / 7 / 10, "6840"}, + {date::year{2023} / 7 / 10, date::year{2023} / 7 / 11, "6833"}, + {date::year{2023} / 7 / 11, date::year{2023} / 7 / 12, "6830"}, + {date::year{2023} / 7 / 12, date::year{2023} / 7 / 13, "6826"}, + {date::year{2023} / 7 / 13, date::year{2023} / 7 / 17, "6827"}, + {date::year{2023} / 7 / 17, date::year{2023} / 7 / 18, "6824"}, + {date::year{2023} / 7 / 18, date::year{2023} / 7 / 19, "6823"}, + {date::year{2023} / 7 / 19, date::year{2023} / 7 / 20, "6822"}, + {date::year{2023} / 7 / 20, date::year{2023} / 7 / 21, "6818"}, + {date::year{2023} / 7 / 21, date::year{2023} / 7 / 24, "6815"}, + {date::year{2023} / 7 / 24, date::year{2023} / 7 / 25, "6814"}, + {date::year{2023} / 7 / 25, date::year{2023} / 7 / 26, "6812"}, + {date::year{2023} / 7 / 26, date::year{2023} / 7 / 27, "6810"}, + {date::year{2023} / 7 / 27, date::year{2023} / 7 / 28, "6808"}, + {date::year{2023} / 7 / 28, date::year{2023} / 7 / 31, "6805"}, + {date::year{2023} / 7 / 31, date::year{2023} / 8 / 1, "6803"}, + }}, + {"NVDA", + {{date::year{2023} / 7 / 1, date::year{2023} / 7 / 3, "7348"}, + {date::year{2023} / 7 / 3, date::year{2023} / 7 / 5, "7343"}, + {date::year{2023} / 7 / 5, date::year{2023} / 7 / 6, "7340"}, + {date::year{2023} / 7 / 6, date::year{2023} / 7 / 7, "7337"}, + {date::year{2023} / 7 / 7, date::year{2023} / 7 / 10, "7335"}, + {date::year{2023} / 7 / 10, date::year{2023} / 7 / 11, "7328"}, + {date::year{2023} / 7 / 11, date::year{2023} / 7 / 12, "7325"}, + {date::year{2023} / 7 / 12, date::year{2023} / 7 / 13, "7321"}, + {date::year{2023} / 7 / 13, date::year{2023} / 7 / 17, "7322"}, + {date::year{2023} / 7 / 17, date::year{2023} / 7 / 18, "7320"}, + {date::year{2023} / 7 / 18, date::year{2023} / 7 / 19, "7319"}, + {date::year{2023} / 7 / 19, date::year{2023} / 7 / 20, "7318"}, + {date::year{2023} / 7 / 20, date::year{2023} / 7 / 21, "7314"}, + {date::year{2023} / 7 / 21, date::year{2023} / 7 / 24, "7311"}, + {date::year{2023} / 7 / 24, date::year{2023} / 7 / 25, "7310"}, + {date::year{2023} / 7 / 25, date::year{2023} / 7 / 26, "7308"}, + {date::year{2023} / 7 / 26, date::year{2023} / 7 / 27, "7303"}, + {date::year{2023} / 7 / 27, date::year{2023} / 7 / 28, "7301"}, + {date::year{2023} / 7 / 28, date::year{2023} / 7 / 31, "7298"}, + {date::year{2023} / 7 / 31, date::year{2023} / 8 / 1, "7295"}}}, + {"PLTR", + {{date::year{2023} / 7 / 1, date::year{2023} / 7 / 3, "8043"}, + {date::year{2023} / 7 / 3, date::year{2023} / 7 / 5, "8038"}, + {date::year{2023} / 7 / 5, date::year{2023} / 7 / 6, "8035"}, + {date::year{2023} / 7 / 6, date::year{2023} / 7 / 7, "8032"}, + {date::year{2023} / 7 / 7, date::year{2023} / 7 / 10, "8029"}, + {date::year{2023} / 7 / 10, date::year{2023} / 7 / 11, "8022"}, + {date::year{2023} / 7 / 11, date::year{2023} / 7 / 12, "8019"}, + {date::year{2023} / 7 / 12, date::year{2023} / 7 / 13, "8015"}, + {date::year{2023} / 7 / 13, date::year{2023} / 7 / 17, "8016"}, + {date::year{2023} / 7 / 17, date::year{2023} / 7 / 19, "8014"}, + {date::year{2023} / 7 / 19, date::year{2023} / 7 / 20, "8013"}, + {date::year{2023} / 7 / 20, date::year{2023} / 7 / 21, "8009"}, + {date::year{2023} / 7 / 21, date::year{2023} / 7 / 24, "8006"}, + {date::year{2023} / 7 / 24, date::year{2023} / 7 / 25, "8005"}, + {date::year{2023} / 7 / 25, date::year{2023} / 7 / 26, "8003"}, + {date::year{2023} / 7 / 26, date::year{2023} / 7 / 27, "7999"}, + {date::year{2023} / 7 / 27, date::year{2023} / 7 / 28, "7997"}, + {date::year{2023} / 7 / 28, date::year{2023} / 7 / 31, "7994"}, + // Test old format + {date::year{2023} / 7 / 31, date::year{2023} / 8 / 1, ""}}}}}; + return metadata; +} + +Metadata GenInverseMetadata() { + auto metadata = GenMetadata(); + metadata.stype_in = SType::InstrumentId; + metadata.stype_out = SType::RawSymbol; + std::vector new_mappings; + for (const auto& mapping : metadata.mappings) { + for (const auto& interval : mapping.intervals) { + if (interval.symbol.empty()) { + continue; + } + new_mappings.push_back( + SymbolMapping{interval.symbol, + {MappingInterval{interval.start_date, interval.end_date, + mapping.raw_symbol}}}); + } + } + metadata.mappings = new_mappings; + return metadata; +} + template SM GenMapping(std::uint32_t instrument_id, const char* stype_out_symbol) { - SM res = {}; - res.hd = RecordHeader{sizeof(SM) / RecordHeader::kLengthMultiplier, - RType::SymbolMapping, - 1, - instrument_id, - {}}; + SM res = {sizeof(SM) / RecordHeader::kLengthMultiplier, + RType::SymbolMapping, + 1, + instrument_id, + {}}; std::strncpy(res.stype_out_symbol.data(), stype_out_symbol, res.stype_out_symbol.size()); return res; } +TEST(TsSymbolMapTests, TestBasic) { + auto metadata = GenMetadata(); + TsSymbolMap target{metadata}; + EXPECT_EQ(target.At(date::year{2023} / 7 / 2, 32), "AAPL"); + EXPECT_EQ(target.At(date::year{2023} / 7 / 30, 32), "AAPL"); + EXPECT_EQ(target.At(date::year{2023} / 7 / 31, 32), "AAPL"); + EXPECT_EQ(target.Find(date::year{2023} / 8 / 1, 32), target.Map().end()); + EXPECT_EQ(target.At(date::year{2023} / 7 / 8, 8029), "PLTR"); + EXPECT_EQ(target.Find(date::year{2023} / 7 / 10, 8029), target.Map().end()); + EXPECT_EQ(target.At(date::year{2023} / 7 / 10, 8022), "PLTR"); + EXPECT_EQ(target.At(date::year{2023} / 7 / 20, 10184), "TSLA"); + EXPECT_EQ(target.At(date::year{2023} / 7 / 21, 10181), "TSLA"); + EXPECT_EQ(target.At(date::year{2023} / 7 / 24, 10174), "TSLA"); + EXPECT_EQ(target.At(date::year{2023} / 7 / 25, 10172), "TSLA"); + MboMsg record{ + RecordHeader{{}, + RType::Mbo, + 0, + 10172, + UnixNanos{date::sys_days{date::year{2023} / 7 / 24}} + + std::chrono::hours{23}}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + UnixNanos{date::sys_days{date::year{2023} / 7 / 25}} + + std::chrono::minutes{155}, + {}, + {}}; + EXPECT_EQ(target.At(record), "TSLA"); + auto it = target.Find(record); + ASSERT_NE(it, target.Map().end()); + EXPECT_EQ(*it->second, "TSLA"); + EXPECT_EQ(target.At(date::year{2023} / 7 / 25, 10172), "TSLA"); + + auto inverse_metadata = GenInverseMetadata(); + TsSymbolMap inverse_target{inverse_metadata}; + ASSERT_EQ(inverse_target.Size(), target.Size()); + for (const auto& kv : target.Map()) { + EXPECT_EQ(*kv.second, inverse_target.At(kv.first.first, kv.first.second)); + } +} + +TEST(TsSymbolMapTests, TestSTypeError) { + auto metadata = GenMetadata(); + metadata.stype_out = SType::RawSymbol; + ASSERT_THROW(TsSymbolMap{metadata}, InvalidArgumentError); +} + +TEST(TsSymbolMapTests, TestInsertStartEndDateSame) { + TsSymbolMap target; + ASSERT_TRUE(target.Map().empty()); + target.Insert(1, date::year{2023} / 12 / 3, date::year{2023} / 12 / 3, + std::make_shared("test")); + ASSERT_TRUE(target.Map().empty()); +} + +TEST(PitSymbolMapTests, TestFromMetadata) { + auto metadata = GenMetadata(); + auto target = metadata.CreateSymbolMapForDate(date::year{2023} / 7 / 31); + EXPECT_EQ(target.Size(), 4); + EXPECT_EQ(target[32], "AAPL"); + EXPECT_EQ(target[7295], "NVDA"); + // NVDA from previous day + EXPECT_EQ(target.Find(7298), target.Map().end()); + EXPECT_EQ(target[10163], "TSLA"); + EXPECT_EQ(target[6803], "MSFT"); + auto inverse_target = + GenMetadata().CreateSymbolMapForDate(date::year{2023} / 7 / 31); + EXPECT_EQ(inverse_target.Map(), target.Map()); +} + +TEST(PitSymbolMapTests, TestFromMetadataOutOfRange) { + auto metadata = GenMetadata(); + ASSERT_EQ(metadata.start, UnixNanos{std::chrono::seconds{1688169600}}); + ASSERT_EQ(metadata.end, UnixNanos{std::chrono::seconds{1690848000}}); + ASSERT_THROW(PitSymbolMap(metadata, date::year{2023} / 8 / 1), + InvalidArgumentError); + ASSERT_THROW(PitSymbolMap(metadata, date::year{2023} / 6 / 30), + InvalidArgumentError); + metadata.end = UnixNanos{date::sys_days{date::year{2023} / 7 / 1}} + + std::chrono::hours{8}; + ASSERT_NE(metadata.end, UnixNanos{date::sys_days{date::year{2023} / 7 / 1}}); + ASSERT_NO_THROW(PitSymbolMap(metadata, date::year{2023} / 7 / 1)); + ASSERT_THROW(PitSymbolMap(metadata, date::year{2023} / 7 / 2), + InvalidArgumentError); + metadata.end = UnixNanos{date::sys_days{date::year{2023} / 7 / 2}}; + ASSERT_THROW(PitSymbolMap(metadata, date::year{2023} / 7 / 2), + InvalidArgumentError); + metadata.end += std::chrono::nanoseconds{1}; + ASSERT_NO_THROW(PitSymbolMap(metadata, date::year{2023} / 7 / 2)); +} + TEST(PitSymbolMapTests, TestOnSymbolMapping) { PitSymbolMap target; target.OnSymbolMapping(GenMapping(1, "AAPL")); diff --git a/test/src/symbology_tests.cpp b/test/src/symbology_tests.cpp index a2144ab..07928f1 100644 --- a/test/src/symbology_tests.cpp +++ b/test/src/symbology_tests.cpp @@ -1,5 +1,7 @@ +#include #include +#include "databento/enums.hpp" #include "databento/symbology.hpp" namespace databento { @@ -7,28 +9,35 @@ namespace test { TEST(SymbologyTests, TestSymbologyResolutionToString) { const SymbologyResolution target{ { - {"ESM2", {{"2022-06-01", "2022-06-17", "12344"}}}, - {"ESU2", {{"2022-06-01", "2022-07-01", "12345"}}}, + {"ESM2", + {{date::year{2022} / 6 / 1, date::year{2022} / 6 / 17, "12344"}}}, + {"ESU2", + {{date::year{2022} / 6 / 1, date::year{2022} / 7 / 1, "12345"}}}, }, {"ESM2"}, {"EEES"}, - }; + SType::RawSymbol, + SType::InstrumentId}; const auto res = ToString(target); // Try both orders because it depends on hash implementation if (res != R"(SymbologyResolution { mappings = { - { "ESU2", { StrMappingInterval { start_date = "2022-06-01", end_date = "2022-07-01", symbol = "12345" } } }, - { "ESM2", { StrMappingInterval { start_date = "2022-06-01", end_date = "2022-06-17", symbol = "12344" } } } + { "ESU2", { MappingInterval { start_date = 2022-06-01, end_date = 2022-07-01, symbol = "12345" } } }, + { "ESM2", { MappingInterval { start_date = 2022-06-01, end_date = 2022-06-17, symbol = "12344" } } } }, partial = { "ESM2" }, - not_found = { "EEES" } + not_found = { "EEES" }, + stype_in = raw_symbol, + stype_out = instrument_id })" && res != R"(SymbologyResolution { mappings = { - { "ESM2", { StrMappingInterval { start_date = "2022-06-01", end_date = "2022-06-17", symbol = "12344" } } }, - { "ESU2", { StrMappingInterval { start_date = "2022-06-01", end_date = "2022-07-01", symbol = "12345" } } } + { "ESM2", { MappingInterval { start_date = 2022-06-01, end_date = 2022-06-17, symbol = "12344" } } }, + { "ESU2", { MappingInterval { start_date = 2022-06-01, end_date = 2022-07-01, symbol = "12345" } } } }, partial = { "ESM2" }, - not_found = { "EEES" } + not_found = { "EEES" }, + stype_in = raw_symbol, + stype_out = instrument_id })") { FAIL() << res; } diff --git a/test/src/zstd_stream_tests.cpp b/test/src/zstd_stream_tests.cpp index cbf4d7a..9ffd04e 100644 --- a/test/src/zstd_stream_tests.cpp +++ b/test/src/zstd_stream_tests.cpp @@ -3,12 +3,14 @@ #include #include #include +#include #include "databento/compat.hpp" -#include "databento/detail/file_stream.hpp" #include "databento/detail/zstd_stream.hpp" #include "databento/enums.hpp" +#include "databento/file_stream.hpp" #include "databento/ireadable.hpp" +#include "mock/mock_io.hpp" namespace databento { namespace detail { @@ -18,8 +20,9 @@ TEST(ZstdStreamTests, TestMultiFrameFiles) { const std::string file_path = TEST_BUILD_DIR "/data/multi-frame.definition.v1.dbn.zst"; - databento::detail::ZstdStream target{std::unique_ptr{ - new databento::detail::FileStream{file_path}}}; + databento::detail::ZstdDecodeStream target{ + std::unique_ptr{ + new databento::InFileStream{file_path}}}; for (std::size_t i = 0; i < kRecordCount; ++i) { databento::InstrumentDefMsgV1 def_msg; target.ReadExact(reinterpret_cast(&def_msg), @@ -27,6 +30,26 @@ TEST(ZstdStreamTests, TestMultiFrameFiles) { EXPECT_EQ(def_msg.hd.rtype, databento::rtype::InstrumentDef); } } + +TEST(ZstdStreamTests, TestIdentity) { + std::vector source_data; + for (std::int64_t i = 0; i < 100000; ++i) { + source_data.emplace_back(i); + } + auto size = source_data.size() * sizeof(std::int64_t); + databento::test::mock::MockIo mock_io; + { + ZstdCompressStream compressor{&mock_io}; + for (auto it = source_data.begin(); it != source_data.end(); it += 100) { + compressor.WriteAll(reinterpret_cast(&*it), + 100 * sizeof(std::int64_t)); + } + } + std::vector res(size); + ZstdDecodeStream decode{std::unique_ptr{ + new databento::test::mock::MockIo{std::move(mock_io)}}}; + decode.ReadExact(res.data(), size); +} } // namespace test } // namespace detail } // namespace databento