From 5d4769b9640e42aee4f6c1c80e266d601176f1a4 Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 1 Aug 2024 21:21:47 -0600 Subject: [PATCH 01/13] Parquet works. Very Messy --- CMakeLists.txt | 17 +- src/CMakeLists.txt | 6 +- src/config.h.cmake | 3 + src/microsim/output/MSVTKExport.cpp | 4 +- src/utils/common/FileHelpers.cpp | 9 + src/utils/common/FileHelpers.h | 7 + src/utils/iodevices/CMakeLists.txt | 8 +- src/utils/iodevices/OutputDevice.cpp | 41 +- src/utils/iodevices/OutputDevice.h | 110 ++++- src/utils/iodevices/OutputDevice_CERR.cpp | 5 +- src/utils/iodevices/OutputDevice_CERR.h | 5 +- src/utils/iodevices/OutputDevice_COUT.cpp | 8 +- src/utils/iodevices/OutputDevice_COUT.h | 5 +- src/utils/iodevices/OutputDevice_File.cpp | 16 +- src/utils/iodevices/OutputDevice_File.h | 6 +- src/utils/iodevices/OutputDevice_Network.cpp | 11 +- src/utils/iodevices/OutputDevice_Network.h | 15 - src/utils/iodevices/OutputDevice_Parquet.cpp | 81 ++++ src/utils/iodevices/OutputDevice_Parquet.h | 118 +++++ src/utils/iodevices/OutputDevice_String.cpp | 11 +- src/utils/iodevices/OutputDevice_String.h | 15 +- src/utils/iodevices/OutputFormatter.h | 19 +- src/utils/iodevices/ParquetFormatter.h | 453 +++++++++++++++++++ src/utils/iodevices/PlainXMLFormatter.cpp | 17 +- src/utils/iodevices/PlainXMLFormatter.h | 69 ++- src/utils/iodevices/StreamDevices.h | 314 +++++++++++++ 26 files changed, 1220 insertions(+), 153 deletions(-) create mode 100644 src/utils/iodevices/OutputDevice_Parquet.cpp create mode 100644 src/utils/iodevices/OutputDevice_Parquet.h create mode 100644 src/utils/iodevices/ParquetFormatter.h create mode 100644 src/utils/iodevices/StreamDevices.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 61f39c7ff6e3..7e2f24fe9e1b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,7 +94,7 @@ endif () # C++14 is needed by Google Test >= 1.13, for all the other parts C++11 should be enough. # This will silently fall back to C++11 if 14 is not supported by the compiler. -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) # compiler specific flags if (GNU_COMPILER OR CLANG_COMPILER) @@ -541,6 +541,21 @@ if (CHECK_OPTIONAL_LIBS) set(CMAKE_PREFIX_PATH "${CMAKE_PREFIX_PATH};${GDAL_PATH};${FFMPEG_PATH};${OSG_PATH};${GL2PS_PATH};${GEOS_PATH}") file(GLOB SUMO_OPTIONAL_LIBRARIES_DLL "${GDAL_PATH}/bin/*.dll" "${FFMPEG_PATH}/bin/*.dll" "${OSG_PATH}/bin/*.dll" "${GL2PS_PATH}/bin/*.dll" "${JUPEDSIM_CUSTOMDIR}/bin/*.dll") + find_package(Arrow) + find_package(Parquet) + if (Arrow_FOUND AND Parquet_FOUND) + set(HAVE_PARQUET 1) + set(ENABLED_FEATURES "${ENABLED_FEATURES} PARQUET") + set(PARQUET_LIBRARY Parquet::parquet_shared) + if (GTEST_FOUND) + add_definitions("GTest_SOURCE=Bundled") + endif() + # this is for gtest compatibility + else() + set(PARQUET_LIBRARY "") + set(HAVE_PARQUET 0) + endif() + # GDAL (for geopositioning) find_package(GDAL) if (GDAL_FOUND) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1a8c2e361ffd..fe67ec9e89dc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,9 +1,12 @@ set(netconvertlibs netwrite netimport netbuild foreign_eulerspiral ${GDAL_LIBRARY} netimport_vissim netimport_vissim_typeloader netimport_vissim_tempstructs ${commonlibs} ${TCMALLOC_LIBRARY}) +FIND_LIBRARY(PARQUET_LIBRARY NAMES parquet) + set(sumolibs traciserver netload microsim_cfmodels microsim_engine microsim_lcmodels microsim_devices microsim_trigger microsim_output microsim_transportables microsim_actions - microsim_traffic_lights microsim mesosim ${commonvehiclelibs} ${GEOS_LIBRARY}) + microsim_traffic_lights microsim mesosim ${commonvehiclelibs} ${GEOS_LIBRARY} ${PARQUET_LIBRARY}) + if (OPENSCENEGRAPH_FOUND) set(osgviewlib osgview) endif () @@ -57,6 +60,7 @@ add_executable(sumo sumo_main.cpp) set_target_properties(sumo PROPERTIES OUTPUT_NAME sumo${BINARY_SUFFIX}) set_target_properties(sumo PROPERTIES OUTPUT_NAME_DEBUG sumo${BINARY_SUFFIX}D) target_link_libraries(sumo microsim traciserver libsumostatic ${sumolibs} ${TCMALLOC_LIBRARY}) + add_dependencies(sumo generate-version-h install_dll) if (FOX_FOUND) diff --git a/src/config.h.cmake b/src/config.h.cmake index 33f5e304801c..85b1db7c932c 100644 --- a/src/config.h.cmake +++ b/src/config.h.cmake @@ -184,6 +184,9 @@ /* defined if GDAL is available */ #cmakedefine HAVE_GDAL +/* defined if PARQUET is available */ +#cmakedefine HAVE_PARQUET + /* defined if GL2PS is available */ #cmakedefine HAVE_GL2PS diff --git a/src/microsim/output/MSVTKExport.cpp b/src/microsim/output/MSVTKExport.cpp index 6887a643ccee..00c429d4a1fb 100644 --- a/src/microsim/output/MSVTKExport.cpp +++ b/src/microsim/output/MSVTKExport.cpp @@ -46,7 +46,7 @@ MSVTKExport::write(OutputDevice& of, SUMOTime /* timestep */) { of << "\n"; of << "\n"; of << "\n"; - of << " \n"; + of << " \n"; of << "\n"; of << " " << List2String(getSpeed()) << "\n"; of << "\n"; @@ -56,7 +56,7 @@ MSVTKExport::write(OutputDevice& of, SUMOTime /* timestep */) { of << "\n"; of << "\n"; of << " " << getOffset((int) speed.size()) << "\n"; - of << " " << speed.size() << "\n"; + of << " " << toString(speed.size()) << "\n"; of << "\n"; of << "\n"; of << " \n"; diff --git a/src/utils/common/FileHelpers.cpp b/src/utils/common/FileHelpers.cpp index dc6a7fccd535..7be7f0423aa9 100644 --- a/src/utils/common/FileHelpers.cpp +++ b/src/utils/common/FileHelpers.cpp @@ -135,6 +135,15 @@ FileHelpers::addExtension(const std::string& path, const std::string& extension) } } +std::string +FileHelpers::getExtension(const std::string& path) { + const auto beg = path.find_last_of("."); + if (beg == std::string::npos) { + return ""; + } + return path.substr(beg, path.size()); +} + std::string FileHelpers::getConfigurationRelative(const std::string& configPath, const std::string& path) { diff --git a/src/utils/common/FileHelpers.h b/src/utils/common/FileHelpers.h index afe23e3340b2..a10e88d1bf27 100644 --- a/src/utils/common/FileHelpers.h +++ b/src/utils/common/FileHelpers.h @@ -80,6 +80,13 @@ class FileHelpers { */ static std::string addExtension(const std::string& path, const std::string& extension); + /** @brief Get the file extension from the given file path + * + * @param[in] path The path to the file + * @return the file extension (with dot, example: '.xml') + */ + static std::string getExtension(const std::string& path); + /** @brief Returns the second path as a relative path to the first file * * Given the position of the configuration file, and the information where a second diff --git a/src/utils/iodevices/CMakeLists.txt b/src/utils/iodevices/CMakeLists.txt index 5dd2ba85699f..3fc9620219bc 100644 --- a/src/utils/iodevices/CMakeLists.txt +++ b/src/utils/iodevices/CMakeLists.txt @@ -9,11 +9,15 @@ set(utils_iodevices_STAT_SRCS OutputDevice_File.h OutputDevice_String.cpp OutputDevice_String.h - OutputDevice_Network.cpp - OutputDevice_Network.h + # OutputDevice_Network.cpp + # OutputDevice_Network.h + OutputDevice_Parquet.cpp + OutputDevice_Parquet.h OutputFormatter.h PlainXMLFormatter.cpp PlainXMLFormatter.h + StreamDevices.h + ) add_library(utils_iodevices STATIC ${utils_iodevices_STAT_SRCS}) diff --git a/src/utils/iodevices/OutputDevice.cpp b/src/utils/iodevices/OutputDevice.cpp index 638bfaf05887..64ef2348a60d 100644 --- a/src/utils/iodevices/OutputDevice.cpp +++ b/src/utils/iodevices/OutputDevice.cpp @@ -36,6 +36,7 @@ #include "OutputDevice_COUT.h" #include "OutputDevice_CERR.h" #include "OutputDevice_Network.h" +#include "OutputDevice_Parquet.h" #include "PlainXMLFormatter.h" #include #include @@ -77,17 +78,18 @@ OutputDevice::getDevice(const std::string& name, bool usePrefix) { } else if (name == "stderr") { dev = OutputDevice_CERR::getDevice(); } else if (FileHelpers::isSocket(name)) { - try { - const bool ipv6 = name[0] == '['; // IPv6 adresses may be written like '[::1]:8000' - const size_t sepIndex = name.find(":", ipv6 ? name.find("]") : 0); - const int port = StringUtils::toInt(name.substr(sepIndex + 1)); - dev = new OutputDevice_Network(ipv6 ? name.substr(1, sepIndex - 2) : name.substr(0, sepIndex), port); - } catch (NumberFormatException&) { - throw IOError("Given port number '" + name.substr(name.find(":") + 1) + "' is not numeric."); - } catch (EmptyData&) { - throw IOError(TL("No port number given.")); - } - } else { + // try { + // const bool ipv6 = name[0] == '['; // IPv6 adresses may be written like '[::1]:8000' + // const size_t sepIndex = name.find(":", ipv6 ? name.find("]") : 0); + // const int port = StringUtils::toInt(name.substr(sepIndex + 1)); + // dev = new OutputDevice_Network(ipv6 ? name.substr(1, sepIndex - 2) : name.substr(0, sepIndex), port); + // } catch (NumberFormatException&) { + // throw IOError("Given port number '" + name.substr(name.find(":") + 1) + "' is not numeric."); + // } catch (EmptyData&) { + throw IOError(TL("No port number given.")); + // } + } + else { std::string name2 = (name == "nul" || name == "NUL") ? "/dev/null" : name; if (usePrefix && OptionsCont::getOptions().isSet("output-prefix") && name2 != "/dev/null") { std::string prefix = OptionsCont::getOptions().getString("output-prefix"); @@ -102,11 +104,19 @@ OutputDevice::getDevice(const std::string& name, bool usePrefix) { name2 = FileHelpers::prependToLastPathComponent(prefix, name); } name2 = StringUtils::substituteEnvironment(name2, &OptionsIO::getLoadTime()); + // check the file extension + const auto file_ext = FileHelpers::getExtension(name); const int len = (int)name.length(); - dev = new OutputDevice_File(name2, len > 3 && name.substr(len - 3) == ".gz"); + if (file_ext == ".parquet" || file_ext == ".prq") { + dev = new OutputDevice_Parquet(name2); + } + else { + dev = new OutputDevice_File(name2, len > 3 && FileHelpers::getExtension(name) == ".gz"); + } } + // todo: extract this to a class method? (b.c. Parquet doesn't have an iostream) dev->setPrecision(); - dev->getOStream() << std::setiosflags(std::ios::fixed); + dev->setOSFlags(std::ios::fixed); myOutputDevices[name] = dev; return *dev; } @@ -207,6 +217,11 @@ OutputDevice::OutputDevice(const int defaultIndentation, const std::string& file myFilename(filename), myFormatter(new PlainXMLFormatter(defaultIndentation)) { } +OutputDevice::OutputDevice(const std::string& filename, OutputFormatter* formatter) : + myFilename(filename), myFormatter(formatter) { +} + + OutputDevice::~OutputDevice() { delete myFormatter; diff --git a/src/utils/iodevices/OutputDevice.h b/src/utils/iodevices/OutputDevice.h index 6029f27731d8..d3e80e986064 100644 --- a/src/utils/iodevices/OutputDevice.h +++ b/src/utils/iodevices/OutputDevice.h @@ -22,6 +22,7 @@ /****************************************************************************/ #pragma once #include +#include #include #include @@ -29,16 +30,24 @@ #include #include #include "PlainXMLFormatter.h" +#include "ParquetFormatter.h" +#include "StreamDevices.h" // =========================================================================== // class definitions // =========================================================================== +// Create an ENUM of the different writers (XML and Parquet) +enum class OutputWriterType { + XML, + PARQUET +}; + /** * @class OutputDevice * @brief Static storage of an output device and its base (abstract) implementation * - * OutputDevices are basically a capsule around an std::ostream, which give a + * OutputDevices are basically a capsule around an StreamDevice, which give a * unified access to sockets, files and stdout. * * Usually, an application builds as many output devices as needed. Each @@ -95,8 +104,8 @@ class OutputDevice { * @exception IOError If the output could not be built for any reason (error message is supplied) */ static bool createDeviceByOption(const std::string& optionName, - const std::string& rootElement = "", - const std::string& schemaFile = ""); + const std::string& rootElement = "", + const std::string& schemaFile = ""); /** @brief Returns the device described by the option @@ -139,6 +148,9 @@ class OutputDevice { /// @brief Constructor OutputDevice(const int defaultIndentation = 0, const std::string& filename = ""); + /// @brief Constructor + OutputDevice(const std::string& filename, OutputFormatter* formatter); + /// @brief Destructor virtual ~OutputDevice(); @@ -167,7 +179,7 @@ class OutputDevice { /** @brief Sets the precision or resets it to default * @param[in] precision The accuracy (number of digits behind '.') to set */ - void setPrecision(int precision = gPrecision); + virtual void setPrecision(int precision = gPrecision); /// @brief return precision set on the device int precision(); @@ -190,14 +202,24 @@ class OutputDevice { * @todo Describe what is saved */ bool writeXMLHeader(const std::string& rootElement, - const std::string& schemaFile, - std::map attrs = std::map(), - bool includeConfig = true); + const std::string& schemaFile, + std::map attrs = std::map(), + bool includeConfig = true); template bool writeHeader(const SumoXMLTag& rootElement) { - return static_cast(myFormatter)->writeHeader(getOStream(), rootElement); + switch (this->getType()) + { + case OutputWriterType::XML: + // cast the writer to the correct type + return getFormatter()->writeHeader(getOStream(), rootElement); + case OutputWriterType::PARQUET: + // cast the writer to the correct type + return getFormatter()->writeHeader(getOStream(), rootElement); + default: + break; + } } @@ -210,7 +232,7 @@ class OutputDevice { * @param[in] xmlElement Name of element to open * @return The OutputDevice for further processing */ - OutputDevice& openTag(const std::string& xmlElement); + virtual OutputDevice& openTag(const std::string& xmlElement); /** @brief Opens an XML tag @@ -220,7 +242,7 @@ class OutputDevice { * @param[in] xmlElement Id of the element to open * @return The OutputDevice for further processing */ - OutputDevice& openTag(const SumoXMLTag& xmlElement); + virtual OutputDevice& openTag(const SumoXMLTag& xmlElement); /** @brief Closes the most recently opened tag and optionally adds a comment @@ -233,7 +255,7 @@ class OutputDevice { * @return Whether a further element existed in the stack and could be closed * @todo it is not verified that the topmost element was closed */ - bool closeTag(const std::string& comment = ""); + virtual bool closeTag(const std::string& comment = ""); @@ -252,8 +274,7 @@ class OutputDevice { */ template OutputDevice& writeAttr(const SumoXMLAttr attr, const T& val) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); - return *this; + return writeAttr(toString(attr), val); } inline bool useAttribute(const SumoXMLAttr attr, SumoXMLAttrMask attributeMask) const { @@ -271,7 +292,7 @@ class OutputDevice { OutputDevice& writeOptionalAttr(const SumoXMLAttr attr, const T& val, long long int attributeMask) { assert((int)attr <= 63); if (attributeMask == 0 || useAttribute(attr, attributeMask)) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); + writeAttr(attr, val); } return *this; } @@ -279,7 +300,7 @@ class OutputDevice { OutputDevice& writeOptionalAttr(const SumoXMLAttr attr, const T& val, SumoXMLAttrMask attributeMask) { assert((int)attr <= (int)attributeMask.size()); if (attributeMask.none() || useAttribute(attr, attributeMask)) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); + writeAttr(attr, val); } return *this; } @@ -293,7 +314,20 @@ class OutputDevice { */ template OutputDevice& writeAttr(const std::string& attr, const T& val) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); + switch (this->getType()) + { + case OutputWriterType::XML: + // cast the writer to the correct type + getFormatter()->writeAttr(getOStream(), attr, val); + break; + case OutputWriterType::PARQUET: + // cast the writer to the correct type + getFormatter()->writeAttr(getOStream(), attr, val); + break; + default: + throw IOError("Unknown output writer type"); + break; + } return *this; } @@ -341,7 +375,14 @@ class OutputDevice { */ template OutputDevice& operator<<(const T& t) { - getOStream() << t; + // getOStream() << t; + // get the correct formatter + if (this->getOStream().allowRaw()) { + this->getOStream() << t; + } + else { + throw IOError("Raw output is not allowed for this output device"); + } postWriteHook(); return *this; } @@ -354,17 +395,39 @@ class OutputDevice { return myFormatter->wroteHeader(); } + /// @todo should move to the formatter + /// @brief Returns the type of the output device + virtual void setOSFlags(std::ios_base::fmtflags flags) { + getOStream() << std::setiosflags(flags); + } + protected: /// @brief Returns the associated ostream - virtual std::ostream& getOStream() = 0; + virtual StreamDevice& getOStream() { + return *myStreamDevice; + }; + /// @brief Returns the associated ostream + template + T* getStreamDevice() { + return static_cast(myStreamDevice); + } + /// @brief Returns whether the output device is a parquet + virtual OutputWriterType getType() const { + return OutputWriterType::XML; + } /** @brief Called after every write access. * * Default implementation does nothing. */ virtual void postWriteHook(); + /// @brief Returns the formatter + OutputFormatter* getFormatter() { + return myFormatter; + } + private: /// @brief map from names to output devices @@ -376,6 +439,16 @@ class OutputDevice { protected: const std::string myFilename; + /// @brief the stream device + StreamDevice* myStreamDevice; + + /// @brief return a type casted formatter + template + T* getFormatter() { + return static_cast(myFormatter); + } + + private: /// @brief The formatter for XML OutputFormatter* const myFormatter; @@ -386,5 +459,4 @@ class OutputDevice { /// @brief Invalidated assignment operator. OutputDevice& operator=(const OutputDevice&) = delete; - }; diff --git a/src/utils/iodevices/OutputDevice_CERR.cpp b/src/utils/iodevices/OutputDevice_CERR.cpp index 7174694e5003..b688b2822dbc 100644 --- a/src/utils/iodevices/OutputDevice_CERR.cpp +++ b/src/utils/iodevices/OutputDevice_CERR.cpp @@ -47,6 +47,7 @@ OutputDevice_CERR::getDevice() { // method definitions // =========================================================================== OutputDevice_CERR::OutputDevice_CERR() : OutputDevice(0, "CERR") { + myStreamDevice = new FileDevice(&std::cerr); } @@ -55,9 +56,9 @@ OutputDevice_CERR::~OutputDevice_CERR() { } -std::ostream& +StreamDevice& OutputDevice_CERR::getOStream() { - return std::cerr; + return *myStreamDevice; } diff --git a/src/utils/iodevices/OutputDevice_CERR.h b/src/utils/iodevices/OutputDevice_CERR.h index 4d5e2062c0d9..7b51105689d7 100644 --- a/src/utils/iodevices/OutputDevice_CERR.h +++ b/src/utils/iodevices/OutputDevice_CERR.h @@ -47,14 +47,14 @@ class OutputDevice_CERR : public OutputDevice { /** @brief Returns the associated ostream * @return cerr */ - std::ostream& getOStream(); + StreamDevice& getOStream() override; /** @brief Called after every write access. * * Calls flush on stderr. */ - virtual void postWriteHook(); + virtual void postWriteHook() override; /// @} @@ -73,5 +73,4 @@ class OutputDevice_CERR : public OutputDevice { /// @brief my singular instance static OutputDevice* myInstance; - }; diff --git a/src/utils/iodevices/OutputDevice_COUT.cpp b/src/utils/iodevices/OutputDevice_COUT.cpp index a6ff12d5f26d..1679e7fafbdd 100644 --- a/src/utils/iodevices/OutputDevice_COUT.cpp +++ b/src/utils/iodevices/OutputDevice_COUT.cpp @@ -47,6 +47,7 @@ OutputDevice_COUT::getDevice() { // method definitions // =========================================================================== OutputDevice_COUT::OutputDevice_COUT() : OutputDevice(0, "COUT") { + myStreamDevice = new FileDevice(&std::cout); } @@ -55,15 +56,14 @@ OutputDevice_COUT::~OutputDevice_COUT() { } -std::ostream& +StreamDevice& OutputDevice_COUT::getOStream() { - return std::cout; + return *myStreamDevice; } - void OutputDevice_COUT::postWriteHook() { - std::cout.flush(); + myStreamDevice->flush(); } diff --git a/src/utils/iodevices/OutputDevice_COUT.h b/src/utils/iodevices/OutputDevice_COUT.h index 1bf862d877e9..cc615ded177f 100644 --- a/src/utils/iodevices/OutputDevice_COUT.h +++ b/src/utils/iodevices/OutputDevice_COUT.h @@ -47,14 +47,14 @@ class OutputDevice_COUT : public OutputDevice { /** @brief Returns the associated ostream * @return cout */ - std::ostream& getOStream(); + StreamDevice& getOStream() override; /** @brief Called after every write access. * * Calls flush on stdout. */ - virtual void postWriteHook(); + virtual void postWriteHook() override; /// @} @@ -73,5 +73,4 @@ class OutputDevice_COUT : public OutputDevice { /// @brief my singular instance static OutputDevice* myInstance; - }; diff --git a/src/utils/iodevices/OutputDevice_File.cpp b/src/utils/iodevices/OutputDevice_File.cpp index 7318dfdc80f4..a0668816430c 100644 --- a/src/utils/iodevices/OutputDevice_File.cpp +++ b/src/utils/iodevices/OutputDevice_File.cpp @@ -40,7 +40,7 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com if (fullName == "/dev/null") { myAmNull = true; #ifdef WIN32 - myFileStream = new std::ofstream("NUL"); + myStreamDevice = new FileDevice(new std::ofstream("NUL")); if (!myFileStream->good()) { delete myFileStream; throw IOError(TLF("Could not redirect to NUL device (%).", std::string(std::strerror(errno)))); @@ -52,34 +52,34 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com #ifdef HAVE_ZLIB if (compressed) { try { - myFileStream = new zstr::ofstream(localName.c_str(), std::ios_base::out); + myStreamDevice = new FileDevice(new zstr::ofstream(localName.c_str(), std::ios_base::out)); } catch (strict_fstream::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } catch (zstr::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } } else { - myFileStream = new std::ofstream(localName.c_str(), std::ios_base::out); + myStreamDevice = new FileDevice(new std::ofstream(localName.c_str(), std::ios_base::out)); } #else UNUSED_PARAMETER(compressed); myFileStream = new std::ofstream(localName.c_str(), std::ios_base::out); #endif - if (!myFileStream->good()) { - delete myFileStream; + if (!myStreamDevice->good()) { + delete myStreamDevice; throw IOError("Could not build output file '" + fullName + "' (" + std::strerror(errno) + ")."); } } OutputDevice_File::~OutputDevice_File() { - delete myFileStream; + delete myStreamDevice; } -std::ostream& +StreamDevice& OutputDevice_File::getOStream() { - return *myFileStream; + return *myStreamDevice; } diff --git a/src/utils/iodevices/OutputDevice_File.h b/src/utils/iodevices/OutputDevice_File.h index cc3ec46cfea4..741c66eb04b6 100644 --- a/src/utils/iodevices/OutputDevice_File.h +++ b/src/utils/iodevices/OutputDevice_File.h @@ -23,6 +23,7 @@ #include #include "OutputDevice.h" +#include "StreamDevices.h" // =========================================================================== @@ -63,14 +64,11 @@ class OutputDevice_File : public OutputDevice { /** @brief Returns the associated ostream * @return The used stream */ - std::ostream& getOStream() override; + StreamDevice& getOStream() override; /// @} private: - /// The wrapped ofstream - std::ostream* myFileStream = nullptr; - /// am I redirecting to /dev/null bool myAmNull = false; diff --git a/src/utils/iodevices/OutputDevice_Network.cpp b/src/utils/iodevices/OutputDevice_Network.cpp index 22227f969b0d..65987aac267a 100644 --- a/src/utils/iodevices/OutputDevice_Network.cpp +++ b/src/utils/iodevices/OutputDevice_Network.cpp @@ -51,6 +51,7 @@ OutputDevice_Network::OutputDevice_Network(const std::string& host, std::this_thread::sleep_for(std::chrono::seconds(wait)); } } + myStreamDevice = new StringStream(); } @@ -60,16 +61,10 @@ OutputDevice_Network::~OutputDevice_Network() { } -std::ostream& -OutputDevice_Network::getOStream() { - return myMessage; -} - - void OutputDevice_Network::postWriteHook() { - const std::string toSend = myMessage.str(); - myMessage.str(""); + const std::string toSend = myStreamDevice->str(); + myStreamDevice->str(""); if (toSend.empty() || !mySocket->has_client_connection()) { return; } diff --git a/src/utils/iodevices/OutputDevice_Network.h b/src/utils/iodevices/OutputDevice_Network.h index d7c7ba9feb06..a75253aeaede 100644 --- a/src/utils/iodevices/OutputDevice_Network.h +++ b/src/utils/iodevices/OutputDevice_Network.h @@ -63,18 +63,6 @@ class OutputDevice_Network : public OutputDevice { protected: /// @name Methods that override/implement OutputDevice-methods /// @{ - - /** @brief Returns the associated ostream - * - * The stream is an ostringstream, actually, into which the message - * is written. It is sent when postWriteHook is called. - * - * @return The used stream - * @see postWriteHook - */ - std::ostream& getOStream(); - - /** @brief Sends the data which was written to the string stream over the socket. * * Converts the stored message into a vector of chars and sends them via to @@ -84,9 +72,6 @@ class OutputDevice_Network : public OutputDevice { /// @} private: - /// @brief packet buffer - std::ostringstream myMessage; - /// @brief the socket to transfer the data tcpip::Socket* mySocket; diff --git a/src/utils/iodevices/OutputDevice_Parquet.cpp b/src/utils/iodevices/OutputDevice_Parquet.cpp new file mode 100644 index 000000000000..d035b2e4a201 --- /dev/null +++ b/src/utils/iodevices/OutputDevice_Parquet.cpp @@ -0,0 +1,81 @@ +/****************************************************************************/ +// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo +// Copyright (C) 2004-2024 German Aerospace Center (DLR) and others. +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0/ +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License 2.0 are satisfied: GNU General Public License, version 2 +// or later which is available at +// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later +/****************************************************************************/ +/// @file OutputDevice_File.cpp +/// @author Daniel Krajzewicz +/// @author Michael Behrisch +/// @author Jakob Erdmann +/// @date 2004 +/// +// An output device that encapsulates an ofstream +/****************************************************************************/ +#include + +#include +#include +#include +#include +#include + +#include "OutputDevice_Parquet.h" + +#include +#include + +#include +#include +#include +#include + + +// =========================================================================== +// method definitions +// =========================================================================== +OutputDevice_Parquet::OutputDevice_Parquet(const std::string& fullName) + : OutputDevice(fullName, new ParquetFormatter()) { + // set the type of compression. TODO this should be based on the build options + builder.compression(parquet::Compression::ZSTD); +} + + +bool OutputDevice_Parquet::closeTag(const std::string& comment) { + // open the file for writing + if (myFile == nullptr) { + auto formatter = dynamic_cast(this->getFormatter()); + if (formatter == nullptr) { + throw IOError("Formatter is not a ParquetFormatter"); + } + // Create a Parquet file + PARQUET_ASSIGN_OR_THROW( + this->myFile, arrow::io::FileOutputStream::Open(this->myFilename)); + + this->myStreamDevice = new ParquetStream(parquet::ParquetFileWriter::Open(this->myFile, std::static_pointer_cast( + parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, formatter->getNodeVector()) + ), this->builder.build())); + + // check if the file was opened correctly + if (this->myFile->closed()) { + throw IOError("Could not build output file '" + this->myFullName + "' (" + std::strerror(errno) + ")."); + } + } + // now actually write the data + return getFormatter()->closeTag(getOStream()); +} + + +OutputDevice_Parquet::~OutputDevice_Parquet() { + delete myStreamDevice; + arrow::Status status = this->myFile->Close(); + (void)status; // Suppress unused variable warning +} +/****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_Parquet.h b/src/utils/iodevices/OutputDevice_Parquet.h new file mode 100644 index 000000000000..651d62c35a4e --- /dev/null +++ b/src/utils/iodevices/OutputDevice_Parquet.h @@ -0,0 +1,118 @@ +/****************************************************************************/ +// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo +// Copyright (C) 2004-2024 German Aerospace Center (DLR) and others. +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0/ +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License 2.0 are satisfied: GNU General Public License, version 2 +// or later which is available at +// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later +/****************************************************************************/ +/// @file OutputDevice_File.h +/// @author Daniel Krajzewicz +/// @author Michael Behrisch +/// @date 2004 +/// +// An output device that encapsulates an ofstream +/****************************************************************************/ +#pragma once + +#include + +#ifdef HAVE_PARQUET + +#include +#include "OutputDevice.h" +#include "ParquetFormatter.h" + +#include "arrow/io/file.h" +#include "arrow/util/config.h" +#include "parquet/exception.h" +#include "parquet/stream_reader.h" +#include "parquet/stream_writer.h" + + + +/** + * @class OutputDevice_File + * @brief An output device that encapsulates an ofstream + * + * Please note that the device gots responsible for the stream and deletes + * it (it should not be deleted elsewhere). + */ +class OutputDevice_Parquet : public OutputDevice { +public: + /** @brief Constructor + * @param[in] fullName The name of the output file to use + * @param[in] compressed whether to apply gzip compression + * @exception IOError Should not be thrown by this implementation + */ + OutputDevice_Parquet(const std::string& fullName); + + + /// @brief Destructor + ~OutputDevice_Parquet(); + + /** @brief returns the information whether the device will discard all output + * @return Whether the device redirects to /dev/null + */ + bool isNull() override { + return myAmNull; + } + + /** @brief implements the close tag logic. This is where the file is first opened and the schema is created. + * This exploits the fact that for *most* SUMO files, all the fields are present at the first close tag event. + */ + bool closeTag(const std::string& comment) override; + + template + OutputDevice& writeAttr(const std::string& attr, const T& val) { + this->getFormatter()->writeAttr(this->getOStream(), attr, val); + // check if the attribute is the field table + + return *this; + } + + + /** @brief writes a line feed if applicable. overriden from the base class to do nothing + */ + void lf() {}; + + // null the setPrecision method + void setPrecision(int precision) override {}; + + void setOSFlags(std::ios_base::fmtflags flags) override { + + }; + +protected: + + /// @brief Returns whether the output device is a parquet + OutputWriterType getType() const override { + return OutputWriterType::PARQUET; + } + +private: + /// The wrapped ofstream + std::shared_ptr myFile = nullptr; + // the builder for the writer properties + parquet::WriterProperties::Builder builder; + // the schema + std::shared_ptr schema; + + /// am I redirecting to /dev/null + bool myAmNull = false; + + /// do I allow optional attributes + bool allowOptionalAttributes = false; + + std::string myFullName; + + parquet::schema::NodeVector myNodeVector; + +}; + +#endif // HAVE_PARQUET \ No newline at end of file diff --git a/src/utils/iodevices/OutputDevice_String.cpp b/src/utils/iodevices/OutputDevice_String.cpp index a4f1e3cb5d4d..4355214d0580 100644 --- a/src/utils/iodevices/OutputDevice_String.cpp +++ b/src/utils/iodevices/OutputDevice_String.cpp @@ -29,8 +29,10 @@ // =========================================================================== OutputDevice_String::OutputDevice_String(const int defaultIndentation) : OutputDevice(defaultIndentation) { + auto stream = new std::ostringstream(); + (*stream) << std::setiosflags(std::ios::fixed); + myStreamDevice = new StringStream(); setPrecision(); - myStream << std::setiosflags(std::ios::fixed); } @@ -40,13 +42,12 @@ OutputDevice_String::~OutputDevice_String() { std::string OutputDevice_String::getString() const { - return myStream.str(); + return myStreamDevice->str(); } - -std::ostream& +StreamDevice& OutputDevice_String::getOStream() { - return myStream; + return *myStreamDevice; } diff --git a/src/utils/iodevices/OutputDevice_String.h b/src/utils/iodevices/OutputDevice_String.h index ee7fef776a86..9bd97bfeee48 100644 --- a/src/utils/iodevices/OutputDevice_String.h +++ b/src/utils/iodevices/OutputDevice_String.h @@ -51,19 +51,6 @@ class OutputDevice_String : public OutputDevice { */ std::string getString() const; -protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * @return The used stream - */ - std::ostream& getOStream(); - /// @} - - -private: - /// The wrapped ofstream - std::ostringstream myStream; + StreamDevice& getOStream(); }; diff --git a/src/utils/iodevices/OutputFormatter.h b/src/utils/iodevices/OutputFormatter.h index a0c896513790..703cd8ef43dc 100644 --- a/src/utils/iodevices/OutputFormatter.h +++ b/src/utils/iodevices/OutputFormatter.h @@ -24,6 +24,7 @@ #include #include #include +#include "StreamDevices.h" // =========================================================================== @@ -63,7 +64,7 @@ class OutputFormatter { * @todo Check which parameter is used herein * @todo Describe what is saved */ - virtual bool writeXMLHeader(std::ostream& into, const std::string& rootElement, + virtual bool writeXMLHeader(StreamDevice& into, const std::string& rootElement, const std::map& attrs, bool includeConfig = true) = 0; @@ -78,7 +79,7 @@ class OutputFormatter { * @param[in] xmlElement Name of element to open * @return The OutputDevice for further processing */ - virtual void openTag(std::ostream& into, const std::string& xmlElement) = 0; + virtual void openTag(StreamDevice& into, const std::string& xmlElement) = 0; /** @brief Opens an XML tag @@ -88,7 +89,7 @@ class OutputFormatter { * @param[in] into The output stream to use * @param[in] xmlElement Id of the element to open */ - virtual void openTag(std::ostream& into, const SumoXMLTag& xmlElement) = 0; + virtual void openTag(StreamDevice& into, const SumoXMLTag& xmlElement) = 0; /** @brief Closes the most recently opened tag and optinally add a comment @@ -97,11 +98,17 @@ class OutputFormatter { * @return Whether a further element existed in the stack and could be closed * @todo it is not verified that the topmost element was closed */ - virtual bool closeTag(std::ostream& into, const std::string& comment = "") = 0; + virtual bool closeTag(StreamDevice& into, const std::string& comment = "") = 0; - virtual void writePreformattedTag(std::ostream& into, const std::string& val) = 0; + virtual void writePreformattedTag(StreamDevice& into, const std::string& val) = 0; - virtual void writePadding(std::ostream& into, const std::string& val) = 0; + virtual void writePadding(StreamDevice& into, const std::string& val) = 0; virtual bool wroteHeader() const = 0; + + template + void writeRaw(StreamDevice& into, const T& val){ + + into << val; + }; }; diff --git a/src/utils/iodevices/ParquetFormatter.h b/src/utils/iodevices/ParquetFormatter.h new file mode 100644 index 000000000000..238ec3388932 --- /dev/null +++ b/src/utils/iodevices/ParquetFormatter.h @@ -0,0 +1,453 @@ +/****************************************************************************/ +// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo +// Copyright (C) 2012-2024 German Aerospace Center (DLR) and others. +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0/ +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License 2.0 are satisfied: GNU General Public License, version 2 +// or later which is available at +// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later +/****************************************************************************/ +/// @file PlainXMLFormatter.h +/// @author Daniel Krajzewicz +/// @author Michael Behrisch +/// @date 2012 +/// +// Output formatter for plain XML output +/****************************************************************************/ +#pragma once +#include + +#ifdef HAVE_PARQUET +// parquet-cpp +#include +#include + +#include +#include +#include + +#include "OutputFormatter.h" +#include +#include "StreamDevices.h" + +#define PARQUET_TESTING + + +// Helper function to determine if a type is a fixed-length character array +template +struct is_fixed_char_array : std::false_type {}; + +template +struct is_fixed_char_array : std::true_type {}; + +// Helper template for the static_assert +template +constexpr bool always_false = false; + +// Overloaded function for different types +template +void AppendField(parquet::schema::NodeVector& fields, const T& val, const std::string& field_name) { + + if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY, + parquet::ConvertedType::UTF8)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::FIXED_LEN_BYTE_ARRAY, + parquet::ConvertedType::NONE, 1)); + } + else if constexpr (is_fixed_char_array::value) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::FIXED_LEN_BYTE_ARRAY, + parquet::ConvertedType::NONE, sizeof(T))); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_8)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::UINT_16)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::OPTIONAL, parquet::Type::INT64, + parquet::ConvertedType::UINT_64)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, + parquet::ConvertedType::NONE)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT64, + parquet::ConvertedType::TIMESTAMP_MICROS)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT64, + parquet::ConvertedType::TIMESTAMP_MILLIS)); + } + else { + // warn + fmt::print("Unsupported type for AppendField\n"); + } + // else { + // static_assert(always_false, "Unsupported type for AppendField"); + // } +} + +// =========================================================================== +// class definitions +// =========================================================================== +/** + * @class TypedAttribute + * @brief A class to represent an attribute with a specific type + * + * This class is used to represent an attribute of an XML XMLElement with a specific type. + */ + // Base class +class AttributeBase { +public: + AttributeBase(std::string name) : name_(std::move(name)) {} + virtual ~AttributeBase() = default; + + const std::string& getName() const { return name_; } + + // Pure virtual function for printing + virtual void print(StreamDevice& os) const = 0; + +private: + std::string name_; +}; + +// Helper function to convert various types to Parquet-compatible types +template +auto convertToParquetType(const T& value) { + if constexpr (std::is_same_v) { + if constexpr (sizeof(unsigned long) <= sizeof(uint32_t)) { + return static_cast(value); + } else { + return static_cast(value); + } + } else if constexpr (std::is_same_v) { + return value; + } else if constexpr (std::is_integral_v) { + if constexpr (std::is_signed_v) { + if constexpr (sizeof(T) <= 1) return static_cast(value); + else if constexpr (sizeof(T) <= 2) return static_cast(value); + else if constexpr (sizeof(T) <= 4) return static_cast(value); + else return static_cast(value); + } else { + if constexpr (sizeof(T) <= 1) return static_cast(value); + else if constexpr (sizeof(T) <= 2) return static_cast(value); + else if constexpr (sizeof(T) <= 4) return static_cast(value); + else return static_cast(value); + } + } else if constexpr (std::is_floating_point_v) { + if constexpr (sizeof(T) <= 4) return static_cast(value); + else return static_cast(value); + } else if constexpr (std::is_same_v || + std::is_same_v) { + return value; + } else if constexpr (std::is_same_v) { + return value; + } else if constexpr (std::is_array_v) { + // try the toString function + return toString(value); + } else if constexpr (std::is_same_v || + std::is_same_v || + std::is_same_v) { + return std::string_view(value); + } else { + // For any other type, convert to string + return toString(value); + } +} + +template +class Attribute : public AttributeBase { +public: + Attribute(const std::string& name, const T& value) + : AttributeBase(name), value_(convertToParquetType(value)) {} + + void print(StreamDevice& os) const override { + os << value_; + } + +private: + decltype(convertToParquetType(std::declval())) value_; +}; + +// Specialization for numeric types +template +class Attribute>> : public AttributeBase { +public: + Attribute(const std::string& name, const T& value) + : AttributeBase(name), value_(convertToParquetType(value)) {} + + void print(StreamDevice& os) const override { + os << value_; + } + +private: + decltype(convertToParquetType(std::declval())) value_; +}; + + +class XMLElement { +public: + /// @brief Constructor + explicit XMLElement(std::string name) : myName(std::move(name)), beenWritten(false) {} + + /// @brief Destructor + virtual ~XMLElement() = default; + + /// @brief Move constructor + XMLElement(XMLElement&& other) noexcept = default; + + + /// @brief Move assignment operator + XMLElement& operator=(XMLElement&& other) noexcept = default; + + /// @brief Add an attribute to the XMLElement + /// @param attr The attribute to add + void addAttribute(std::unique_ptr attr) { + myAttributes.push_back(std::move(attr)); + } + + // define a comparison operator (just checks the name) + bool operator==(const XMLElement& other) const { + return myName == other.myName; + } + + // define a comparison operator (just checks the name) + bool operator==(const std::string& other) const { + return myName == other; + } + + /// @brief a method to write the XMLElement to a stream using the << operator + friend StreamDevice& operator<<(StreamDevice& into, const XMLElement& elem) { + for (const auto& attr : elem.myAttributes) { + attr->print(into); + } + return into; + } + + /// @brief a method to check whether the XMLElement has been written + bool written() const { + return beenWritten; + } + + /// @brief a method to set the XMLElement as written + void setWritten() { + beenWritten = true; + } + + /// @brief get the attributes + const std::vector>& getAttributes() const { + return myAttributes; + } + + /// @brief get the name of the element + const std::string& getName() const { + return myName; + } + +protected: + /// @brief The name of the XMLElement + std::string myName; + + /// @brief stores whether the XMLElement has been written + bool beenWritten; + + /// @brief a store for the attributes + std::vector> myAttributes; +}; + +/** + * @class PlainXMLFormatter + * @brief Output formatter for plain XML output + * + * PlainXMLFormatter format XML like output into the output stream. + */ +class ParquetFormatter : public OutputFormatter { +public: + /// @brief Constructor + ParquetFormatter() {}; + + /// @brief Destructor + virtual ~ParquetFormatter() { } + + /** @brief Writes an XML header with optional configuration + * + * If something has been written (myXMLStack is not empty), nothing + * is written and false returned. + * + * @param[in] into The output stream to use + * @param[in] rootXMLElement The root XMLElement to use + * @param[in] attrs Additional attributes to save within the rootXMLElement + * @todo Describe what is saved + */ + // turn off the warning for unused parameters + [[maybe_unused]] + bool writeXMLHeader(StreamDevice& into, const std::string& rootXMLElement, + const std::map& attrs, + bool includeConfig = true) override { + return 0; + }; + + + /** @brief Opens an XML tag + * + * An indentation, depending on the current xml-XMLElement-stack size, is written followed + * by the given xml XMLElement ("<" + xmlXMLElement) + * The xml XMLElement is added to the stack, then. + * + * @param[in] into The output stream to use + * @param[in] xmlXMLElement Name of XMLElement to open + * @return The OutputDevice for further processing + */ + void openTag(StreamDevice& into, const std::string& xmlXMLElement) override { +#ifdef PARQUET_TESTING + // assert that the stack does not contain the XMLElement + assert(std::find(myXMLStack.begin(), myXMLStack.end(), xmlXMLElement) == myXMLStack.end()); +#endif + myXMLStack.push_back(XMLElement(xmlXMLElement)); + } + + /** @brief Opens an XML tag + * + * Helper method which finds the correct string before calling openTag. + * + * @param[in] into The output stream to use + * @param[in] xmlXMLElement Id of the XMLElement to open + */ + inline void openTag(StreamDevice& into, const SumoXMLTag& xmlXMLElement) override { + openTag(into, toString(xmlXMLElement)); + }; + + + /** @brief Closes the most recently opened tag + * + * @param[in] into The output stream to use + * @return Whether a further XMLElement existed in the stack and could be closed + * @todo it is not verified that the topmost XMLElement was closed + */ + inline bool closeTag(StreamDevice& into, const std::string& comment = "") override { + if (myXMLStack.empty()) { + return false; + } + + // only check the last XMLElement + if (!myXMLStack.back().written()) { + for (auto& elem : myXMLStack) { + into << elem; + elem.setWritten(); + } + // close the row + into.endLine(); + } + // pop the last XMLElement and remove from memory + myXMLStack.pop_back(); + return false; + }; + + + /** @brief writes a preformatted tag to the device but ensures that any + * pending tags are closed + * @param[in] into The output stream to use + * @param[in] val The preformatted data + */ + void writePreformattedTag(StreamDevice& into, const std::string& val) override { + // don't take any action + return; + }; + + /** @brief writes arbitrary padding + */ + inline void writePadding(StreamDevice& into, const std::string& val) override {}; + + + /** @brief writes an arbitrary attribute + * + * @param[in] into The output stream to use + * @param[in] attr The attribute (name) + * @param[in] val The attribute value + */ + template + void writeAttr(StreamDevice& into, const std::string& attr, const T& val) { + std::unique_ptr typed_attr = std::make_unique>(attr, val); + this->myXMLStack.back().addAttribute(std::move(typed_attr)); + if (!sharedNodeVector && this->fields.find(attr) == this->fields.end()) { + // add the field to the schema + AppendField(myNodeVector, val, attr); + this->fields.insert(attr); + } + } + + /** @brief returns the node vector + * @return const parquet::schema::NodeVector& + */ + inline const parquet::schema::NodeVector& getNodeVector() { + sharedNodeVector = true; + return myNodeVector; + } + + bool wroteHeader() const override { + return !myXMLStack.empty(); + } + + /** + * @brief Get the Stack object + * + * @return std::vector<_Tag *>& + */ + inline std::vector& getStack() { + return myXMLStack; + } + + /** + * @brief Write the header. (This has no effect for the ParquetFormatter) + * + * @param Return success + */ + inline bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) { return true; }; + + + template + void writeRaw(StreamDevice& into, T& val) { + throw std::runtime_error("writeRaw not implemented for ParquetFormatter"); + } + + +private: + /// @brief The stack of begun xml XMLElements. + /// We don't need to store the full XMLElement, just the value + std::vector myXMLStack; + + /// @brief The parquet node vector + parquet::schema::NodeVector myNodeVector; + + /// @brief flag to determin if we have shared NodeVector + bool sharedNodeVector{false}; + + // @brief the set of unique fields + std::set fields; +}; +// =========================================================================== +#endif // HAVE_PARQUET \ No newline at end of file diff --git a/src/utils/iodevices/PlainXMLFormatter.cpp b/src/utils/iodevices/PlainXMLFormatter.cpp index 561b9c6ce81d..df07a136074e 100644 --- a/src/utils/iodevices/PlainXMLFormatter.cpp +++ b/src/utils/iodevices/PlainXMLFormatter.cpp @@ -23,6 +23,7 @@ #include #include #include "PlainXMLFormatter.h" +#include "StreamDevices.h" // =========================================================================== @@ -34,7 +35,7 @@ PlainXMLFormatter::PlainXMLFormatter(const int defaultIndentation) bool -PlainXMLFormatter::writeHeader(std::ostream& into, const SumoXMLTag& rootElement) { +PlainXMLFormatter::writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) { if (myXMLStack.empty()) { OptionsCont::getOptions().writeXMLHeader(into); openTag(into, rootElement); @@ -45,13 +46,13 @@ PlainXMLFormatter::writeHeader(std::ostream& into, const SumoXMLTag& rootElement bool -PlainXMLFormatter::writeXMLHeader(std::ostream& into, const std::string& rootElement, +PlainXMLFormatter::writeXMLHeader(StreamDevice& into, const std::string& rootElement, const std::map& attrs, bool includeConfig) { if (myXMLStack.empty()) { OptionsCont::getOptions().writeXMLHeader(into, includeConfig); openTag(into, rootElement); for (std::map::const_iterator it = attrs.begin(); it != attrs.end(); ++it) { - writeAttr(into, it->first, it->second); + writeAttr(into, toString(it->first), it->second); } into << ">\n"; myHavePendingOpener = false; @@ -62,7 +63,7 @@ PlainXMLFormatter::writeXMLHeader(std::ostream& into, const std::string& rootEle void -PlainXMLFormatter::openTag(std::ostream& into, const std::string& xmlElement) { +PlainXMLFormatter::openTag(StreamDevice& into, const std::string& xmlElement) { if (myHavePendingOpener) { into << ">\n"; } @@ -73,13 +74,13 @@ PlainXMLFormatter::openTag(std::ostream& into, const std::string& xmlElement) { void -PlainXMLFormatter::openTag(std::ostream& into, const SumoXMLTag& xmlElement) { +PlainXMLFormatter::openTag(StreamDevice& into, const SumoXMLTag& xmlElement) { openTag(into, toString(xmlElement)); } bool -PlainXMLFormatter::closeTag(std::ostream& into, const std::string& comment) { +PlainXMLFormatter::closeTag(StreamDevice& into, const std::string& comment) { if (!myXMLStack.empty()) { if (myHavePendingOpener) { into << "/>" << comment << "\n"; @@ -96,7 +97,7 @@ PlainXMLFormatter::closeTag(std::ostream& into, const std::string& comment) { void -PlainXMLFormatter::writePreformattedTag(std::ostream& into, const std::string& val) { +PlainXMLFormatter::writePreformattedTag(StreamDevice& into, const std::string& val) { if (myHavePendingOpener) { into << ">\n"; myHavePendingOpener = false; @@ -105,7 +106,7 @@ PlainXMLFormatter::writePreformattedTag(std::ostream& into, const std::string& v } void -PlainXMLFormatter::writePadding(std::ostream& into, const std::string& val) { +PlainXMLFormatter::writePadding(StreamDevice& into, const std::string& val) { into << val; } diff --git a/src/utils/iodevices/PlainXMLFormatter.h b/src/utils/iodevices/PlainXMLFormatter.h index 885058a1cd52..c437fab0e011 100644 --- a/src/utils/iodevices/PlainXMLFormatter.h +++ b/src/utils/iodevices/PlainXMLFormatter.h @@ -26,6 +26,7 @@ #endif #include "OutputFormatter.h" +#include "StreamDevices.h" // =========================================================================== @@ -57,7 +58,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] attrs Additional attributes to save within the rootElement * @todo Describe what is saved */ - bool writeXMLHeader(std::ostream& into, const std::string& rootElement, + bool writeXMLHeader(StreamDevice& into, const std::string& rootElement, const std::map& attrs, bool includeConfig = true); @@ -70,7 +71,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] into The output stream to use * @param[in] rootElement The root element to use */ - bool writeHeader(std::ostream& into, const SumoXMLTag& rootElement); + bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement); /** @brief Opens an XML tag @@ -83,7 +84,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] xmlElement Name of element to open * @return The OutputDevice for further processing */ - void openTag(std::ostream& into, const std::string& xmlElement); + void openTag(StreamDevice& into, const std::string& xmlElement); /** @brief Opens an XML tag @@ -93,7 +94,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] into The output stream to use * @param[in] xmlElement Id of the element to open */ - void openTag(std::ostream& into, const SumoXMLTag& xmlElement); + void openTag(StreamDevice& into, const SumoXMLTag& xmlElement); /** @brief Closes the most recently opened tag @@ -102,7 +103,7 @@ class PlainXMLFormatter : public OutputFormatter { * @return Whether a further element existed in the stack and could be closed * @todo it is not verified that the topmost element was closed */ - bool closeTag(std::ostream& into, const std::string& comment = ""); + bool closeTag(StreamDevice& into, const std::string& comment = ""); /** @brief writes a preformatted tag to the device but ensures that any @@ -110,40 +111,38 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] into The output stream to use * @param[in] val The preformatted data */ - void writePreformattedTag(std::ostream& into, const std::string& val); + void writePreformattedTag(StreamDevice& into, const std::string& val); /** @brief writes arbitrary padding */ - void writePadding(std::ostream& into, const std::string& val); + void writePadding(StreamDevice& into, const std::string& val); - - /** @brief writes an arbitrary attribute + /** @brief writes a named attribute * * @param[in] into The output stream to use * @param[in] attr The attribute (name) * @param[in] val The attribute value */ template - static void writeAttr(std::ostream& into, const std::string& attr, const T& val) { + void writeAttr(StreamDevice& into, const std::string& attr, const T& val) { into << " " << attr << "=\"" << toString(val, into.precision()) << "\""; } - - /** @brief writes a named attribute - * - * @param[in] into The output stream to use - * @param[in] attr The attribute (name) - * @param[in] val The attribute value - */ - template - static void writeAttr(std::ostream& into, const SumoXMLAttr attr, const T& val) { - into << " " << toString(attr) << "=\"" << toString(val, into.precision()) << "\""; + template<> + void writeAttr(StreamDevice& into, const std::string& attr, const double& val){ +#ifdef HAVE_FMT + fmt::print(into, " {}=\"{:.{}f}\"", attr, val, into.precision()); +#else + into << " " << attr << "=\"" << val << "\""; +#endif } bool wroteHeader() const { return !myXMLStack.empty(); } + + private: /// @brief The stack of begun xml elements std::vector myXMLStack; @@ -156,20 +155,20 @@ class PlainXMLFormatter : public OutputFormatter { }; -// =========================================================================== -// specialized template implementations (for speedup) -// =========================================================================== -template <> -inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val) { -#ifdef HAVE_FMT - fmt::print(into, " {}=\"{:.{}f}\"", toString(attr), val, into.precision()); -#else - into << " " << toString(attr) << "=\"" << val << "\""; -#endif -} +// // =========================================================================== +// // specialized template implementations (for speedup) +// // =========================================================================== +// template <> +// inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val) { +// #ifdef HAVE_FMT +// fmt::print(into, " {}=\"{:.{}f}\"", toString(attr), val, into.precision()); +// #else +// into << " " << toString(attr) << "=\"" << val << "\""; +// #endif +// } -template <> -inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const std::string& val) { - into << " " << toString(attr) << "=\"" << val << "\""; -} +// template <> +// inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const std::string& val) { +// into << " " << toString(attr) << "=\"" << val << "\""; +// } diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h new file mode 100644 index 000000000000..da7ec9f8dc4f --- /dev/null +++ b/src/utils/iodevices/StreamDevices.h @@ -0,0 +1,314 @@ + +#pragma once +#include +#include + +#include + +#ifdef HAVE_PARQUET +#include +#include + +#include +#include +#include +#endif + +class StreamDevice { +public: + + enum Type { + FILE, + STRING, + PARQUET + }; + + virtual ~StreamDevice() = default; + + // implement a stream interface + virtual bool ok() = 0; + + virtual StreamDevice& flush() = 0; + + virtual void close() = 0; + + virtual bool good() = 0; + + virtual std::string str() = 0; + + // set precision + virtual int precision() = 0; + + // implement a stream operator + virtual operator std::ostream& () = 0; + + virtual void str(const std::string& s) {}; + + virtual StreamDevice& endLine() = 0; + + // get the type of the stream + virtual Type type() const = 0; + + virtual bool allowRaw() const { + return false; + } +}; + + +class FileDevice : public StreamDevice { +public: + + // write a constructor that takes a std::ofstream + FileDevice(std::ofstream* stream) : myStream(stream) {}; + FileDevice(std::ostream* stream) : myStream(stream) {}; + // constructor that takes a ofstream by value + FileDevice(std::ofstream stream) : myStream(new std::ofstream(std::move(stream))) {}; + + virtual ~FileDevice() = default; + + bool ok() override { + return myStream->good(); + } + + StreamDevice& flush() override { + myStream->flush(); + return *this; + } + + void close() override { + return; + } + + template + StreamDevice& print(const T& t) { + (*myStream) << t; + return *this; + } + + int precision() override { + return (int)myStream->precision(); + } + + bool good() override { + return myStream->good(); + } + + std::string str() override { + return ""; + } + + operator std::ostream& () override { + return *myStream; + } + + StreamDevice& endLine() override { + (*myStream) << std::endl; + return *this; + } + + // get the type of the stream + Type type() const override { + return Type::FILE; + } + + bool allowRaw() const override { + return true; + } + + + +private: + const std::unique_ptr myStream; +}; + + +// create a class for std::ostringstream +class StringStream : public StreamDevice { +public: + + StringStream() = default; + + virtual ~StringStream() = default; + + bool ok() override { + return myStream->good(); + } + + StreamDevice& flush() override { + // do nothing + myStream->flush(); + return *this; + } + + void close() override { + return; + } + + template + StreamDevice& print(const T& t) { + (*myStream) << t; + return *this; + } + + int precision() override { + return (int)myStream->precision(); + } + + bool good() override { + return true; + } + + std::string str() override { + return myStream->str(); + } + + operator std::ostream& () override { + return *myStream; + } + + void str(const std::string& s) override { + myStream->str(s); + }; + + StreamDevice& endLine() override { + (*myStream) << std::endl; + return *this; + } + + // get the type of the stream + Type type() const override { + return Type::STRING; + } + + bool allowRaw() const override { + return true; + } + +private: + std::unique_ptr myStream; +}; + + +class ParquetStream : public StreamDevice { + +#ifdef HAVE_PARQUET + +public: + + ParquetStream(std::unique_ptr file) : myStream(std::move(file)) {}; + + virtual ~ParquetStream() = default; + + bool ok() override { + return true; + } + + bool good() override { + return true; + } + + StreamDevice& flush() override { + // do nothing + return *this; + } + + void close() override { + + } + + // define the << operator for parquet::StreamWriter + template + StreamDevice& log(const T& t) { + myStream << t; + return *this; + } + + int precision() override { + return 0; + } + + std::string str() override { + return ""; + } + + template + void print(const T& t) { + myStream << t; + + } + + void print(const std::__1::__iom_t2& t) { + // do nothing + int i = 0; + } + + void print(const std::__iom_t5& s) {}; + + operator std::ostream& () override { + throw std::runtime_error("Not implemented"); + } + + void str(const std::string& s) override { + throw std::runtime_error("Not implemented"); + }; + + StreamDevice& endLine() override { + myStream.EndRow(); + return *this; + } + + // get the type of the stream + Type type() const override { + return Type::PARQUET; + } + + bool allowRaw() const override { + return false; + } + + bool closeStream() { + myStream.EndRowGroup(); + return true; + } + +private: + parquet::StreamWriter myStream; + +#endif +}; + +// implement a templated stream operator. The base class does nothing +template +StreamDevice& operator<<(StreamDevice& stream, const T& t) { + switch (stream.type()) { + case StreamDevice::Type::FILE: + static_cast(&stream)->print(t); + break; + case StreamDevice::Type::STRING: + static_cast(&stream)->print(t); + break; + case StreamDevice::Type::PARQUET: + static_cast(&stream)->print(t); + break; + default: + throw std::runtime_error("Unknown stream type"); + } + return stream; +} + + +// StreamDevice& operator<<(StreamDevice& stream, const std::__1::__iom_t2& t) { +// switch (stream.type()) { +// case StreamDevice::Type::FILE: +// static_cast(stream).print(t); +// break; +// case StreamDevice::Type::STRING: +// static_cast(stream).print(t); +// break; +// case StreamDevice::Type::PARQUET: +// break; +// default: +// throw std::runtime_error("Unknown stream type"); +// } +// return stream; +// } \ No newline at end of file From e2f973e7056161fa12d5640dc5540fc1d6d0a34f Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 2 Aug 2024 20:51:33 -0600 Subject: [PATCH 02/13] Cleaning up. Build w/o parquet flag works Signed-off-by: Max --- src/utils/iodevices/OutputDevice.cpp | 12 +- src/utils/iodevices/OutputDevice.h | 18 +- src/utils/iodevices/OutputDevice_CERR.cpp | 9 +- src/utils/iodevices/OutputDevice_CERR.h | 11 +- src/utils/iodevices/OutputDevice_COUT.cpp | 9 +- src/utils/iodevices/OutputDevice_COUT.h | 6 - src/utils/iodevices/OutputDevice_File.cpp | 13 +- src/utils/iodevices/OutputDevice_File.h | 12 - src/utils/iodevices/OutputDevice_Parquet.cpp | 17 +- src/utils/iodevices/OutputDevice_Parquet.h | 42 ++-- src/utils/iodevices/OutputDevice_String.cpp | 8 +- src/utils/iodevices/OutputDevice_String.h | 3 - src/utils/iodevices/OutputFormatter.h | 9 + src/utils/iodevices/ParquetFormatter.h | 4 +- src/utils/iodevices/StreamDevices.h | 234 +++++++------------ 15 files changed, 148 insertions(+), 259 deletions(-) diff --git a/src/utils/iodevices/OutputDevice.cpp b/src/utils/iodevices/OutputDevice.cpp index 64ef2348a60d..d564914f8107 100644 --- a/src/utils/iodevices/OutputDevice.cpp +++ b/src/utils/iodevices/OutputDevice.cpp @@ -107,8 +107,12 @@ OutputDevice::getDevice(const std::string& name, bool usePrefix) { // check the file extension const auto file_ext = FileHelpers::getExtension(name); const int len = (int)name.length(); - if (file_ext == ".parquet" || file_ext == ".prq") { - dev = new OutputDevice_Parquet(name2); + if (file_ext == ".parquet" || file_ext == ".prq") { +#ifdef HAVE_PARQUET + dev = new OutputDevice_Parquet(name2); +#else + throw IOError(TL("Parquet output is not supported in this build.")); +#endif } else { dev = new OutputDevice_File(name2, len > 3 && FileHelpers::getExtension(name) == ".gz"); @@ -255,13 +259,13 @@ OutputDevice::close() { void OutputDevice::setPrecision(int precision) { - getOStream() << std::setprecision(precision); + getOStream().setPrecision(precision); } int OutputDevice::precision() { - return (int)getOStream().precision(); + return getOStream().precision(); } diff --git a/src/utils/iodevices/OutputDevice.h b/src/utils/iodevices/OutputDevice.h index d3e80e986064..789e4234147b 100644 --- a/src/utils/iodevices/OutputDevice.h +++ b/src/utils/iodevices/OutputDevice.h @@ -209,17 +209,7 @@ class OutputDevice { template bool writeHeader(const SumoXMLTag& rootElement) { - switch (this->getType()) - { - case OutputWriterType::XML: - // cast the writer to the correct type - return getFormatter()->writeHeader(getOStream(), rootElement); - case OutputWriterType::PARQUET: - // cast the writer to the correct type - return getFormatter()->writeHeader(getOStream(), rootElement); - default: - break; - } + return getFormatter()->writeHeader(getOStream(), rootElement); } @@ -321,8 +311,12 @@ class OutputDevice { getFormatter()->writeAttr(getOStream(), attr, val); break; case OutputWriterType::PARQUET: +#ifdef HAVE_PARQUET // cast the writer to the correct type getFormatter()->writeAttr(getOStream(), attr, val); +#else + throw IOError("Parquet output is not supported in this build. Please recompile with the correct options."); +#endif break; default: throw IOError("Unknown output writer type"); @@ -398,7 +392,7 @@ class OutputDevice { /// @todo should move to the formatter /// @brief Returns the type of the output device virtual void setOSFlags(std::ios_base::fmtflags flags) { - getOStream() << std::setiosflags(flags); + getOStream().setOSFlags(flags); } protected: diff --git a/src/utils/iodevices/OutputDevice_CERR.cpp b/src/utils/iodevices/OutputDevice_CERR.cpp index b688b2822dbc..559bb0e2bb52 100644 --- a/src/utils/iodevices/OutputDevice_CERR.cpp +++ b/src/utils/iodevices/OutputDevice_CERR.cpp @@ -47,7 +47,7 @@ OutputDevice_CERR::getDevice() { // method definitions // =========================================================================== OutputDevice_CERR::OutputDevice_CERR() : OutputDevice(0, "CERR") { - myStreamDevice = new FileDevice(&std::cerr); + myStreamDevice = new OStreamDevice(&std::cerr); } @@ -56,15 +56,10 @@ OutputDevice_CERR::~OutputDevice_CERR() { } -StreamDevice& -OutputDevice_CERR::getOStream() { - return *myStreamDevice; -} - - void OutputDevice_CERR::postWriteHook() { std::cerr.flush(); + getOStream().flush(); } diff --git a/src/utils/iodevices/OutputDevice_CERR.h b/src/utils/iodevices/OutputDevice_CERR.h index 7b51105689d7..28700671fb16 100644 --- a/src/utils/iodevices/OutputDevice_CERR.h +++ b/src/utils/iodevices/OutputDevice_CERR.h @@ -41,15 +41,6 @@ class OutputDevice_CERR : public OutputDevice { protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * @return cerr - */ - StreamDevice& getOStream() override; - - /** @brief Called after every write access. * * Calls flush on stderr. @@ -67,7 +58,7 @@ class OutputDevice_CERR : public OutputDevice { /// @brief Destructor ~OutputDevice_CERR(); - + private: /// @brief my singular instance diff --git a/src/utils/iodevices/OutputDevice_COUT.cpp b/src/utils/iodevices/OutputDevice_COUT.cpp index 1679e7fafbdd..192d3d9d4711 100644 --- a/src/utils/iodevices/OutputDevice_COUT.cpp +++ b/src/utils/iodevices/OutputDevice_COUT.cpp @@ -47,7 +47,7 @@ OutputDevice_COUT::getDevice() { // method definitions // =========================================================================== OutputDevice_COUT::OutputDevice_COUT() : OutputDevice(0, "COUT") { - myStreamDevice = new FileDevice(&std::cout); + myStreamDevice = new OStreamDevice(&std::cout); } @@ -55,15 +55,10 @@ OutputDevice_COUT::~OutputDevice_COUT() { myInstance = nullptr; } - -StreamDevice& -OutputDevice_COUT::getOStream() { - return *myStreamDevice; -} - void OutputDevice_COUT::postWriteHook() { myStreamDevice->flush(); + getOStream().flush(); } diff --git a/src/utils/iodevices/OutputDevice_COUT.h b/src/utils/iodevices/OutputDevice_COUT.h index cc615ded177f..a7230ed20a27 100644 --- a/src/utils/iodevices/OutputDevice_COUT.h +++ b/src/utils/iodevices/OutputDevice_COUT.h @@ -44,12 +44,6 @@ class OutputDevice_COUT : public OutputDevice { /// @name Methods that override/implement OutputDevice-methods /// @{ - /** @brief Returns the associated ostream - * @return cout - */ - StreamDevice& getOStream() override; - - /** @brief Called after every write access. * * Calls flush on stdout. diff --git a/src/utils/iodevices/OutputDevice_File.cpp b/src/utils/iodevices/OutputDevice_File.cpp index a0668816430c..1a9448d72516 100644 --- a/src/utils/iodevices/OutputDevice_File.cpp +++ b/src/utils/iodevices/OutputDevice_File.cpp @@ -40,7 +40,7 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com if (fullName == "/dev/null") { myAmNull = true; #ifdef WIN32 - myStreamDevice = new FileDevice(new std::ofstream("NUL")); + myStreamDevice = new OStreamDevice(new std::ofstream("NUL")); if (!myFileStream->good()) { delete myFileStream; throw IOError(TLF("Could not redirect to NUL device (%).", std::string(std::strerror(errno)))); @@ -52,14 +52,14 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com #ifdef HAVE_ZLIB if (compressed) { try { - myStreamDevice = new FileDevice(new zstr::ofstream(localName.c_str(), std::ios_base::out)); + myStreamDevice = new OStreamDevice(new zstr::ofstream(localName.c_str(), std::ios_base::out)); } catch (strict_fstream::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } catch (zstr::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } } else { - myStreamDevice = new FileDevice(new std::ofstream(localName.c_str(), std::ios_base::out)); + myStreamDevice = new OStreamDevice(new std::ofstream(localName.c_str(), std::ios_base::out)); } #else UNUSED_PARAMETER(compressed); @@ -76,11 +76,4 @@ OutputDevice_File::~OutputDevice_File() { delete myStreamDevice; } - -StreamDevice& -OutputDevice_File::getOStream() { - return *myStreamDevice; -} - - /****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_File.h b/src/utils/iodevices/OutputDevice_File.h index 741c66eb04b6..3d4beb2465d3 100644 --- a/src/utils/iodevices/OutputDevice_File.h +++ b/src/utils/iodevices/OutputDevice_File.h @@ -56,18 +56,6 @@ class OutputDevice_File : public OutputDevice { return myAmNull; } - -protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * @return The used stream - */ - StreamDevice& getOStream() override; - /// @} - - private: /// am I redirecting to /dev/null bool myAmNull = false; diff --git a/src/utils/iodevices/OutputDevice_Parquet.cpp b/src/utils/iodevices/OutputDevice_Parquet.cpp index d035b2e4a201..99ea9959391b 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.cpp +++ b/src/utils/iodevices/OutputDevice_Parquet.cpp @@ -11,22 +11,26 @@ // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later /****************************************************************************/ -/// @file OutputDevice_File.cpp +/// @file OutputDevice_Parquet.cpp /// @author Daniel Krajzewicz /// @author Michael Behrisch /// @author Jakob Erdmann -/// @date 2004 +/// @author Max Schrader +/// @date 2024 /// -// An output device that encapsulates an ofstream +// An output device that encapsulates an Parquet file /****************************************************************************/ #include +#ifdef HAVE_PARQUET + #include #include #include #include #include + #include "OutputDevice_Parquet.h" #include @@ -74,8 +78,11 @@ bool OutputDevice_Parquet::closeTag(const std::string& comment) { OutputDevice_Parquet::~OutputDevice_Parquet() { + // have to delete the stream device before the file. This dumps unwritten data to the file delete myStreamDevice; - arrow::Status status = this->myFile->Close(); - (void)status; // Suppress unused variable warning + [[maybe_unused]] arrow::Status status = this->myFile->Close(); + } + +#endif /****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_Parquet.h b/src/utils/iodevices/OutputDevice_Parquet.h index 651d62c35a4e..8d993d0f3536 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.h +++ b/src/utils/iodevices/OutputDevice_Parquet.h @@ -11,12 +11,14 @@ // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later /****************************************************************************/ -/// @file OutputDevice_File.h +/// @file OutputDevice_Parquet.cpp /// @author Daniel Krajzewicz /// @author Michael Behrisch -/// @date 2004 +/// @author Jakob Erdmann +/// @author Max Schrader +/// @date 2024 /// -// An output device that encapsulates an ofstream +// An output device that encapsulates an Parquet file /****************************************************************************/ #pragma once @@ -28,11 +30,11 @@ #include "OutputDevice.h" #include "ParquetFormatter.h" -#include "arrow/io/file.h" -#include "arrow/util/config.h" -#include "parquet/exception.h" -#include "parquet/stream_reader.h" -#include "parquet/stream_writer.h" +#include +#include +#include +#include +#include @@ -52,31 +54,14 @@ class OutputDevice_Parquet : public OutputDevice { */ OutputDevice_Parquet(const std::string& fullName); - /// @brief Destructor ~OutputDevice_Parquet(); - /** @brief returns the information whether the device will discard all output - * @return Whether the device redirects to /dev/null - */ - bool isNull() override { - return myAmNull; - } - /** @brief implements the close tag logic. This is where the file is first opened and the schema is created. * This exploits the fact that for *most* SUMO files, all the fields are present at the first close tag event. */ bool closeTag(const std::string& comment) override; - template - OutputDevice& writeAttr(const std::string& attr, const T& val) { - this->getFormatter()->writeAttr(this->getOStream(), attr, val); - // check if the attribute is the field table - - return *this; - } - - /** @brief writes a line feed if applicable. overriden from the base class to do nothing */ void lf() {}; @@ -95,6 +80,9 @@ class OutputDevice_Parquet : public OutputDevice { return OutputWriterType::PARQUET; } + /// do I allow optional attributes + bool allowOptionalAttributes = false; + private: /// The wrapped ofstream std::shared_ptr myFile = nullptr; @@ -106,9 +94,7 @@ class OutputDevice_Parquet : public OutputDevice { /// am I redirecting to /dev/null bool myAmNull = false; - /// do I allow optional attributes - bool allowOptionalAttributes = false; - + /// my full name std::string myFullName; parquet::schema::NodeVector myNodeVector; diff --git a/src/utils/iodevices/OutputDevice_String.cpp b/src/utils/iodevices/OutputDevice_String.cpp index 4355214d0580..0760f6f2a082 100644 --- a/src/utils/iodevices/OutputDevice_String.cpp +++ b/src/utils/iodevices/OutputDevice_String.cpp @@ -31,7 +31,7 @@ OutputDevice_String::OutputDevice_String(const int defaultIndentation) : OutputDevice(defaultIndentation) { auto stream = new std::ostringstream(); (*stream) << std::setiosflags(std::ios::fixed); - myStreamDevice = new StringStream(); + myStreamDevice = new OStreamDevice(stream); setPrecision(); } @@ -45,10 +45,4 @@ OutputDevice_String::getString() const { return myStreamDevice->str(); } -StreamDevice& -OutputDevice_String::getOStream() { - return *myStreamDevice; -} - - /****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_String.h b/src/utils/iodevices/OutputDevice_String.h index 9bd97bfeee48..eaeed7242939 100644 --- a/src/utils/iodevices/OutputDevice_String.h +++ b/src/utils/iodevices/OutputDevice_String.h @@ -50,7 +50,4 @@ class OutputDevice_String : public OutputDevice { * @return The content as string */ std::string getString() const; - - StreamDevice& getOStream(); - }; diff --git a/src/utils/iodevices/OutputFormatter.h b/src/utils/iodevices/OutputFormatter.h index 703cd8ef43dc..61bc31d28674 100644 --- a/src/utils/iodevices/OutputFormatter.h +++ b/src/utils/iodevices/OutputFormatter.h @@ -68,6 +68,15 @@ class OutputFormatter { const std::map& attrs, bool includeConfig = true) = 0; + /** @brief Writes an XML header + * + * + * @param[in] into The output stream to use + * @param[in] rootElement The root element to use + * @return Whether the header was written + */ + virtual bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) = 0; + /** @brief Opens an XML tag * diff --git a/src/utils/iodevices/ParquetFormatter.h b/src/utils/iodevices/ParquetFormatter.h index 238ec3388932..b8eb0f1eaf62 100644 --- a/src/utils/iodevices/ParquetFormatter.h +++ b/src/utils/iodevices/ParquetFormatter.h @@ -16,7 +16,7 @@ /// @author Michael Behrisch /// @date 2012 /// -// Output formatter for plain XML output +// Output formatter for Parquet output /****************************************************************************/ #pragma once #include @@ -426,7 +426,7 @@ class ParquetFormatter : public OutputFormatter { * * @param Return success */ - inline bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) { return true; }; + inline bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) override { return true; }; template diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index da7ec9f8dc4f..edfb211f58e6 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -17,54 +17,89 @@ class StreamDevice { public: + /// @brief The type of the stream enum Type { - FILE, - STRING, - PARQUET + OSTREAM, // std::ostream (or std::ofstream) + PARQUET // parquet::StreamWriter }; + // create a constructor that a type and raw write access + StreamDevice(Type type, bool access) : myType(type), rawWriteAccess(access) {}; + // create a default constructor + StreamDevice() = default; + + /// @brief Destructor virtual ~StreamDevice() = default; - // implement a stream interface + /// @brief is the stream ok + /// @return true if the stream is ok virtual bool ok() = 0; + /// @brief flush the stream + /// @return this virtual StreamDevice& flush() = 0; + /// @brief close the stream virtual void close() = 0; + /// @brief is the stream good + /// @return true if the stream is good virtual bool good() = 0; + /// @brief read the stream into a string + /// @return the string virtual std::string str() = 0; - // set precision + /// @brief set the precision + /// @param precision + virtual void setPrecision(const int& precision) = 0; + + /// @brief get the precision + /// @return the precision virtual int precision() = 0; - // implement a stream operator + /// @brief implement a stream operator virtual operator std::ostream& () = 0; + /// @brief write a string to the stream + /// @param s the string to write virtual void str(const std::string& s) {}; + /// @brief write an endline to the stream + /// @return this virtual StreamDevice& endLine() = 0; - // get the type of the stream - virtual Type type() const = 0; + /// @brief set the output stream flags + /// @param flags the flags to set + virtual void setOSFlags(std::ios_base::fmtflags flags) = 0; - virtual bool allowRaw() const { - return false; + /// @brief get the type of the stream + virtual Type type() const { + return myType; + }; + + /// @brief allow raw output + bool allowRaw() const { + return rawWriteAccess; } -}; +protected: + /// @brief allow raw write access + bool rawWriteAccess = false; + /// @brief the type of the stream + Type myType = Type::OSTREAM; + +}; -class FileDevice : public StreamDevice { +class OStreamDevice : public StreamDevice { public: // write a constructor that takes a std::ofstream - FileDevice(std::ofstream* stream) : myStream(stream) {}; - FileDevice(std::ostream* stream) : myStream(stream) {}; - // constructor that takes a ofstream by value - FileDevice(std::ofstream stream) : myStream(new std::ofstream(std::move(stream))) {}; + OStreamDevice(std::ofstream* stream) : myStream(std::move(stream)), StreamDevice(Type::OSTREAM, true) {}; + OStreamDevice(std::ostream* stream) : myStream(std::move(stream)), StreamDevice(Type::OSTREAM, true) {}; + OStreamDevice(std::ofstream stream) : myStream(new std::ofstream(std::move(stream))), StreamDevice(Type::OSTREAM, true) {}; - virtual ~FileDevice() = default; + virtual ~OStreamDevice() = default; bool ok() override { return myStream->good(); @@ -76,7 +111,7 @@ class FileDevice : public StreamDevice { } void close() override { - return; + myStream->flush(); } template @@ -85,69 +120,12 @@ class FileDevice : public StreamDevice { return *this; } - int precision() override { - return (int)myStream->precision(); - } - - bool good() override { - return myStream->good(); - } - - std::string str() override { - return ""; - } - - operator std::ostream& () override { - return *myStream; - } - - StreamDevice& endLine() override { - (*myStream) << std::endl; - return *this; - } - - // get the type of the stream - Type type() const override { - return Type::FILE; - } - - bool allowRaw() const override { - return true; - } - - - -private: - const std::unique_ptr myStream; -}; - - -// create a class for std::ostringstream -class StringStream : public StreamDevice { -public: - - StringStream() = default; - - virtual ~StringStream() = default; - - bool ok() override { - return myStream->good(); - } - - StreamDevice& flush() override { - // do nothing - myStream->flush(); - return *this; - } - - void close() override { - return; + void setPrecision(const int& precision) override { + *myStream << std::setprecision(precision); } - template - StreamDevice& print(const T& t) { - (*myStream) << t; - return *this; + void setOSFlags(std::ios_base::fmtflags flags) override { + myStream->setf(flags); } int precision() override { @@ -155,21 +133,17 @@ class StringStream : public StreamDevice { } bool good() override { - return true; + return myStream->good(); } std::string str() override { - return myStream->str(); + return ""; } operator std::ostream& () override { return *myStream; } - void str(const std::string& s) override { - myStream->str(s); - }; - StreamDevice& endLine() override { (*myStream) << std::endl; return *this; @@ -177,25 +151,22 @@ class StringStream : public StreamDevice { // get the type of the stream Type type() const override { - return Type::STRING; - } - - bool allowRaw() const override { - return true; + return Type::OSTREAM; } private: - std::unique_ptr myStream; + const std::unique_ptr myStream; }; - class ParquetStream : public StreamDevice { #ifdef HAVE_PARQUET public: - ParquetStream(std::unique_ptr file) : myStream(std::move(file)) {}; + ParquetStream(std::unique_ptr file) : StreamDevice(Type::PARQUET, false) { + myStream = std::make_unique(std::move(file)); + }; virtual ~ParquetStream() = default; @@ -204,7 +175,8 @@ class ParquetStream : public StreamDevice { } bool good() override { - return true; + // check that the stream is not null + return myStream != nullptr; } StreamDevice& flush() override { @@ -213,19 +185,11 @@ class ParquetStream : public StreamDevice { } void close() override { - - } - - // define the << operator for parquet::StreamWriter - template - StreamDevice& log(const T& t) { - myStream << t; - return *this; + myStream->EndRowGroup(); + myStream.release(); } - int precision() override { - return 0; - } + void setPrecision(FMT_MAYBE_UNUSED const int& precision) override {} std::string str() override { return ""; @@ -233,27 +197,22 @@ class ParquetStream : public StreamDevice { template void print(const T& t) { - myStream << t; - - } + (*myStream) << t; - void print(const std::__1::__iom_t2& t) { - // do nothing - int i = 0; } - void print(const std::__iom_t5& s) {}; + void setOSFlags(std::ios_base::fmtflags flags) override {UNUSED_PARAMETER(flags);} operator std::ostream& () override { throw std::runtime_error("Not implemented"); } - void str(const std::string& s) override { + void str(FMT_MAYBE_UNUSED const std::string& s) override { throw std::runtime_error("Not implemented"); }; StreamDevice& endLine() override { - myStream.EndRow(); + myStream->EndRow(); return *this; } @@ -262,17 +221,12 @@ class ParquetStream : public StreamDevice { return Type::PARQUET; } - bool allowRaw() const override { - return false; - } - - bool closeStream() { - myStream.EndRowGroup(); - return true; + int precision() override { + return 0; } private: - parquet::StreamWriter myStream; + std::unique_ptr myStream; #endif }; @@ -281,34 +235,22 @@ class ParquetStream : public StreamDevice { template StreamDevice& operator<<(StreamDevice& stream, const T& t) { switch (stream.type()) { - case StreamDevice::Type::FILE: - static_cast(&stream)->print(t); - break; - case StreamDevice::Type::STRING: - static_cast(&stream)->print(t); + case StreamDevice::Type::OSTREAM: + static_cast(&stream)->print(t); break; + // case StreamDevice::Type::STRING: + // static_cast(&stream)->print(t); + // break; case StreamDevice::Type::PARQUET: +#ifdef HAVE_PARQUET static_cast(&stream)->print(t); +#else + throw std::runtime_error("Parquet not supported in this build"); +#endif break; default: - throw std::runtime_error("Unknown stream type"); + // assert that this does not happen + throw std::runtime_error("Unknown stream type in StreamDevice"); } return stream; -} - - -// StreamDevice& operator<<(StreamDevice& stream, const std::__1::__iom_t2& t) { -// switch (stream.type()) { -// case StreamDevice::Type::FILE: -// static_cast(stream).print(t); -// break; -// case StreamDevice::Type::STRING: -// static_cast(stream).print(t); -// break; -// case StreamDevice::Type::PARQUET: -// break; -// default: -// throw std::runtime_error("Unknown stream type"); -// } -// return stream; -// } \ No newline at end of file +} \ No newline at end of file From f444857a835cfbcc9b39587b526919583c6c75c1 Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 6 Aug 2024 19:40:23 -0600 Subject: [PATCH 03/13] Fixing linux build Signed-off-by: Max --- src/config.h.cmake | 2 +- src/utils/common/MsgRetrievingFunction.h | 30 ++++++------------------ src/utils/gui/div/GUIMessageWindow.h | 17 ++++++-------- src/utils/iodevices/OutputDevice.h | 8 +++---- src/utils/iodevices/OutputFormatter.h | 2 +- src/utils/iodevices/PlainXMLFormatter.h | 1 - src/utils/iodevices/StreamDevices.h | 26 +++++++++++++------- 7 files changed, 36 insertions(+), 50 deletions(-) diff --git a/src/config.h.cmake b/src/config.h.cmake index 85b1db7c932c..f2a4fe219063 100644 --- a/src/config.h.cmake +++ b/src/config.h.cmake @@ -185,7 +185,7 @@ #cmakedefine HAVE_GDAL /* defined if PARQUET is available */ -#cmakedefine HAVE_PARQUET +# cmakedefine HAVE_PARQUET /* defined if GL2PS is available */ #cmakedefine HAVE_GL2PS diff --git a/src/utils/common/MsgRetrievingFunction.h b/src/utils/common/MsgRetrievingFunction.h index 90fc96b76751..693c3d8ba626 100644 --- a/src/utils/common/MsgRetrievingFunction.h +++ b/src/utils/common/MsgRetrievingFunction.h @@ -50,7 +50,11 @@ class MsgRetrievingFunction : public OutputDevice { MsgRetrievingFunction(T* object, Operation operation, MsgHandler::MsgType type) : myObject(object), myOperation(operation), - myMsgType(type) {} + myMsgType(type) { + /// @todo We should design a new formatter type for this. + myFormatter = new PlainXMLFormatter(); + myStreamDevice = new OStreamDevice(new std::ostringstream()); + } /// @brief Destructor @@ -58,27 +62,11 @@ class MsgRetrievingFunction : public OutputDevice { protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * - * The stream is an ostringstream, actually, into which the message - * is written. It is sent when postWriteHook is called. - * - * @return The used stream - * @see postWriteHook - */ - std::ostream& getOStream() { - return myMessage; - } - - /** @brief Sends the data which was written to the string stream via the retrieving function. */ virtual void postWriteHook() { - (myObject->*myOperation)(myMsgType, myMessage.str()); - myMessage.str(""); + (myObject->*myOperation)(myMsgType, getOStream().str()); + getOStream().str(""); } /// @} @@ -92,8 +80,4 @@ class MsgRetrievingFunction : public OutputDevice { /// @brief The type of message to retrieve. MsgHandler::MsgType myMsgType; - - /// @brief message buffer - std::ostringstream myMessage; - }; diff --git a/src/utils/gui/div/GUIMessageWindow.h b/src/utils/gui/div/GUIMessageWindow.h index 6d8f94e708b6..6193f7e1cf97 100644 --- a/src/utils/gui/div/GUIMessageWindow.h +++ b/src/utils/gui/div/GUIMessageWindow.h @@ -124,29 +124,26 @@ class GUIMessageWindow : public FXText { /// @brief constructor MsgOutputDevice(GUIMessageWindow* msgWindow, GUIEventType type) : myMsgWindow(msgWindow), - myType(type) { } + myType(type){ + /// @todo We should design a new formatter type for this. + myFormatter = new PlainXMLFormatter(); + myStreamDevice = new OStreamDevice(new std::ostringstream()); + } /// @brief destructor ~MsgOutputDevice() { } protected: - /// @brief get Output Stream - std::ostream& getOStream() { - return myStream; - } /// @brief write hook void postWriteHook() { - myMsgWindow->appendMsg(myType, myStream.str()); - myStream.str(""); + myMsgWindow->appendMsg(myType, getOStream().str()); + getOStream().str(""); } private: /// @brief pointer to message Windows GUIMessageWindow* myMsgWindow; - /// @brief output string stream - std::ostringstream myStream; - /// @brief type of event GUIEventType myType; }; diff --git a/src/utils/iodevices/OutputDevice.h b/src/utils/iodevices/OutputDevice.h index 789e4234147b..b9d8d536e532 100644 --- a/src/utils/iodevices/OutputDevice.h +++ b/src/utils/iodevices/OutputDevice.h @@ -436,17 +436,15 @@ class OutputDevice { /// @brief the stream device StreamDevice* myStreamDevice; + /// @brief The formatter for XML + OutputFormatter* myFormatter; + /// @brief return a type casted formatter template T* getFormatter() { return static_cast(myFormatter); } - -private: - /// @brief The formatter for XML - OutputFormatter* const myFormatter; - private: /// @brief Invalidated copy constructor. OutputDevice(const OutputDevice&) = delete; diff --git a/src/utils/iodevices/OutputFormatter.h b/src/utils/iodevices/OutputFormatter.h index 61bc31d28674..c282ed5d5817 100644 --- a/src/utils/iodevices/OutputFormatter.h +++ b/src/utils/iodevices/OutputFormatter.h @@ -119,5 +119,5 @@ class OutputFormatter { void writeRaw(StreamDevice& into, const T& val){ into << val; - }; + } }; diff --git a/src/utils/iodevices/PlainXMLFormatter.h b/src/utils/iodevices/PlainXMLFormatter.h index c437fab0e011..90705d61ace6 100644 --- a/src/utils/iodevices/PlainXMLFormatter.h +++ b/src/utils/iodevices/PlainXMLFormatter.h @@ -128,7 +128,6 @@ class PlainXMLFormatter : public OutputFormatter { into << " " << attr << "=\"" << toString(val, into.precision()) << "\""; } - template<> void writeAttr(StreamDevice& into, const std::string& attr, const double& val){ #ifdef HAVE_FMT fmt::print(into, " {}=\"{:.{}f}\"", attr, val, into.precision()); diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index edfb211f58e6..3c9bb47d9b43 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -2,7 +2,7 @@ #pragma once #include #include - +#include #include #ifdef HAVE_PARQUET @@ -24,7 +24,7 @@ class StreamDevice { }; // create a constructor that a type and raw write access - StreamDevice(Type type, bool access) : myType(type), rawWriteAccess(access) {}; + StreamDevice(Type type, bool access) : rawWriteAccess(access), myType(type) {}; // create a default constructor StreamDevice() = default; @@ -63,7 +63,7 @@ class StreamDevice { /// @brief write a string to the stream /// @param s the string to write - virtual void str(const std::string& s) {}; + virtual void str(const std::string& s) = 0; /// @brief write an endline to the stream /// @return this @@ -95,11 +95,11 @@ class OStreamDevice : public StreamDevice { public: // write a constructor that takes a std::ofstream - OStreamDevice(std::ofstream* stream) : myStream(std::move(stream)), StreamDevice(Type::OSTREAM, true) {}; - OStreamDevice(std::ostream* stream) : myStream(std::move(stream)), StreamDevice(Type::OSTREAM, true) {}; - OStreamDevice(std::ofstream stream) : myStream(new std::ofstream(std::move(stream))), StreamDevice(Type::OSTREAM, true) {}; + OStreamDevice(std::ofstream* stream) : StreamDevice(Type::OSTREAM, true), myStream(std::move(stream)) {} + OStreamDevice(std::ostream* stream) : StreamDevice(Type::OSTREAM, true), myStream(std::move(stream)) {} + OStreamDevice(std::ofstream stream) : StreamDevice(Type::OSTREAM, true), myStream(new std::ofstream(std::move(stream))) {} - virtual ~OStreamDevice() = default; + virtual ~OStreamDevice() override = default; bool ok() override { return myStream->good(); @@ -140,6 +140,10 @@ class OStreamDevice : public StreamDevice { return ""; } + void str(const std::string& s) override { + (*myStream) << s; + } + operator std::ostream& () override { return *myStream; } @@ -155,8 +159,8 @@ class OStreamDevice : public StreamDevice { } private: - const std::unique_ptr myStream; -}; + std::unique_ptr myStream; +}; // Add the missing semicolon here class ParquetStream : public StreamDevice { @@ -195,6 +199,10 @@ class ParquetStream : public StreamDevice { return ""; } + void str([[maybe_unused]] const std::string& s) override { + + }; + template void print(const T& t) { (*myStream) << t; From cdc1fc2f92faa08163c993a5fc1505b0c2050a45 Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 7 Aug 2024 03:27:55 +0000 Subject: [PATCH 04/13] update cmake lists Signed-off-by: Max --- CMakeLists.txt | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7e2f24fe9e1b..ee7950906faa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -532,6 +532,8 @@ if (EIGEN3_FOUND) set(ENABLED_FEATURES "${ENABLED_FEATURES} Eigen") endif (EIGEN3_FOUND) +option(WITH_PARQUET "Compile with Parquet support" ON) + if (CHECK_OPTIONAL_LIBS) file(GLOB GDAL_PATH "${SUMO_LIBRARIES}/gdal-?.?.?") file(GLOB FFMPEG_PATH "${SUMO_LIBRARIES}/FFMPEG-?.?.?") @@ -541,16 +543,22 @@ if (CHECK_OPTIONAL_LIBS) set(CMAKE_PREFIX_PATH "${CMAKE_PREFIX_PATH};${GDAL_PATH};${FFMPEG_PATH};${OSG_PATH};${GL2PS_PATH};${GEOS_PATH}") file(GLOB SUMO_OPTIONAL_LIBRARIES_DLL "${GDAL_PATH}/bin/*.dll" "${FFMPEG_PATH}/bin/*.dll" "${OSG_PATH}/bin/*.dll" "${GL2PS_PATH}/bin/*.dll" "${JUPEDSIM_CUSTOMDIR}/bin/*.dll") - find_package(Arrow) - find_package(Parquet) - if (Arrow_FOUND AND Parquet_FOUND) - set(HAVE_PARQUET 1) - set(ENABLED_FEATURES "${ENABLED_FEATURES} PARQUET") - set(PARQUET_LIBRARY Parquet::parquet_shared) - if (GTEST_FOUND) - add_definitions("GTest_SOURCE=Bundled") + if(WITH_PARQUET) + find_package(Arrow) + find_package(Parquet) + if (Arrow_FOUND AND Parquet_FOUND) + set(HAVE_PARQUET 1) + set(ENABLED_FEATURES "${ENABLED_FEATURES} PARQUET") + set(PARQUET_LIBRARY Parquet::parquet_shared) + if (GTEST_FOUND) + add_definitions("GTest_SOURCE=Bundled") + endif() + # this is for gtest compatibility + else() + message(WARNING "Parquet support requested but Arrow or Parquet libraries not found. Disabling Parquet support.") + set(PARQUET_LIBRARY "") + set(HAVE_PARQUET 0) endif() - # this is for gtest compatibility else() set(PARQUET_LIBRARY "") set(HAVE_PARQUET 0) From b9798391ffe368550c7fa952dcd0fbe77815ede2 Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 7 Aug 2024 18:54:47 -0600 Subject: [PATCH 05/13] Removed Gtest bundle Signed-off-by: Max --- CMakeLists.txt | 6 +++--- src/utils/iodevices/PlainXMLFormatter.h | 2 +- src/utils/iodevices/StreamDevices.h | 11 ++++++++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ee7950906faa..6d4c40bc57c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -550,9 +550,9 @@ if (CHECK_OPTIONAL_LIBS) set(HAVE_PARQUET 1) set(ENABLED_FEATURES "${ENABLED_FEATURES} PARQUET") set(PARQUET_LIBRARY Parquet::parquet_shared) - if (GTEST_FOUND) - add_definitions("GTest_SOURCE=Bundled") - endif() + # if (GTEST_FOUND) + # add_definitions("GTest_SOURCE=Bundled") + # endif() # this is for gtest compatibility else() message(WARNING "Parquet support requested but Arrow or Parquet libraries not found. Disabling Parquet support.") diff --git a/src/utils/iodevices/PlainXMLFormatter.h b/src/utils/iodevices/PlainXMLFormatter.h index 90705d61ace6..8f96318fb9ea 100644 --- a/src/utils/iodevices/PlainXMLFormatter.h +++ b/src/utils/iodevices/PlainXMLFormatter.h @@ -130,7 +130,7 @@ class PlainXMLFormatter : public OutputFormatter { void writeAttr(StreamDevice& into, const std::string& attr, const double& val){ #ifdef HAVE_FMT - fmt::print(into, " {}=\"{:.{}f}\"", attr, val, into.precision()); + fmt::print(into.getOStream(), " {}=\"{:.{}f}\"", attr, val, into.precision()); #else into << " " << attr << "=\"" << val << "\""; #endif diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index 3c9bb47d9b43..5937e6629393 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -83,6 +83,11 @@ class StreamDevice { return rawWriteAccess; } + /// @brief define the behavior of a cast to std::ostream + virtual std::ostream& getOStream() { + throw std::runtime_error("Not implemented"); + } + protected: /// @brief allow raw write access bool rawWriteAccess = false; @@ -158,6 +163,10 @@ class OStreamDevice : public StreamDevice { return Type::OSTREAM; } + std::ostream& getOStream() override { + return *myStream; + } + private: std::unique_ptr myStream; }; // Add the missing semicolon here @@ -261,4 +270,4 @@ StreamDevice& operator<<(StreamDevice& stream, const T& t) { throw std::runtime_error("Unknown stream type in StreamDevice"); } return stream; -} \ No newline at end of file +} From 131cec82e329a1f2af4e45c09d0fe889bf6a5bb5 Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 7 Aug 2024 19:15:04 -0600 Subject: [PATCH 06/13] removed str override Signed-off-by: Max --- src/utils/iodevices/StreamDevices.h | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index 5937e6629393..6dea56cef582 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -207,11 +207,7 @@ class ParquetStream : public StreamDevice { std::string str() override { return ""; } - - void str([[maybe_unused]] const std::string& s) override { - - }; - + template void print(const T& t) { (*myStream) << t; From 65ce54194f12ff6e37da30f052081866329010be Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 7 Aug 2024 21:26:24 -0600 Subject: [PATCH 07/13] hotfix of nesting Signed-off-by: Max --- src/utils/iodevices/OutputDevice_Parquet.cpp | 15 +++++++++++---- src/utils/iodevices/ParquetFormatter.h | 14 ++++++++++++-- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/utils/iodevices/OutputDevice_Parquet.cpp b/src/utils/iodevices/OutputDevice_Parquet.cpp index 99ea9959391b..19907287a4ac 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.cpp +++ b/src/utils/iodevices/OutputDevice_Parquet.cpp @@ -53,9 +53,16 @@ OutputDevice_Parquet::OutputDevice_Parquet(const std::string& fullName) bool OutputDevice_Parquet::closeTag(const std::string& comment) { - // open the file for writing - if (myFile == nullptr) { - auto formatter = dynamic_cast(this->getFormatter()); + // open the file for writing, but only if the depth is >=2 (i.e. we are closing the children tag). + //! @todo this is a bit of a hack, but it works for now + auto formatter = dynamic_cast(this->getFormatter()); + if (formatter->getDepth() < 2) { + // we have to clean up the stack, otherwise the file will not be written correctly + // when it is open + formatter->clearStack(); + return true; + } + if (myFile == nullptr) { if (formatter == nullptr) { throw IOError("Formatter is not a ParquetFormatter"); } @@ -73,7 +80,7 @@ bool OutputDevice_Parquet::closeTag(const std::string& comment) { } } // now actually write the data - return getFormatter()->closeTag(getOStream()); + return formatter->closeTag(getOStream()); } diff --git a/src/utils/iodevices/ParquetFormatter.h b/src/utils/iodevices/ParquetFormatter.h index b8eb0f1eaf62..b7b42bf3265a 100644 --- a/src/utils/iodevices/ParquetFormatter.h +++ b/src/utils/iodevices/ParquetFormatter.h @@ -51,7 +51,7 @@ constexpr bool always_false = false; // Overloaded function for different types template void AppendField(parquet::schema::NodeVector& fields, const T& val, const std::string& field_name) { - + UNUSED_PARAMETER(val); if constexpr (std::is_same_v) { fields.push_back(parquet::schema::PrimitiveNode::Make( field_name, parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY, @@ -426,7 +426,7 @@ class ParquetFormatter : public OutputFormatter { * * @param Return success */ - inline bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) override { return true; }; + inline bool writeHeader([[maybe_unused]] StreamDevice& into, [[maybe_unused]] const SumoXMLTag& rootElement) override { return true; }; template @@ -434,6 +434,16 @@ class ParquetFormatter : public OutputFormatter { throw std::runtime_error("writeRaw not implemented for ParquetFormatter"); } + int getDepth() const { + return static_cast(myXMLStack.size()); + } + + void clearStack() { + myXMLStack.clear(); + myNodeVector.clear(); + fields.clear(); + } + private: /// @brief The stack of begun xml XMLElements. From 0bbe3cb92e19764a32a73f151f18ef44e9dbc368 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 9 Aug 2024 14:05:17 -0600 Subject: [PATCH 08/13] init changes --- src/utils/iodevices/OutputDevice_Parquet.cpp | 3 ++- src/utils/iodevices/ParquetFormatter.h | 19 +++---------------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/src/utils/iodevices/OutputDevice_Parquet.cpp b/src/utils/iodevices/OutputDevice_Parquet.cpp index 19907287a4ac..dbe231a03f6e 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.cpp +++ b/src/utils/iodevices/OutputDevice_Parquet.cpp @@ -60,7 +60,8 @@ bool OutputDevice_Parquet::closeTag(const std::string& comment) { // we have to clean up the stack, otherwise the file will not be written correctly // when it is open formatter->clearStack(); - return true; + // this is critical for the file to be written correctly + return false; } if (myFile == nullptr) { if (formatter == nullptr) { diff --git a/src/utils/iodevices/ParquetFormatter.h b/src/utils/iodevices/ParquetFormatter.h index b7b42bf3265a..da31e54cb691 100644 --- a/src/utils/iodevices/ParquetFormatter.h +++ b/src/utils/iodevices/ParquetFormatter.h @@ -172,7 +172,9 @@ auto convertToParquetType(const T& value) { } else if constexpr (std::is_same_v || std::is_same_v || std::is_same_v) { - return std::string_view(value); + // have to take a copy of the string, to ensure its lifetime is long enough + auto x = std::string(value); + return x; } else { // For any other type, convert to string return toString(value); @@ -193,21 +195,6 @@ class Attribute : public AttributeBase { decltype(convertToParquetType(std::declval())) value_; }; -// Specialization for numeric types -template -class Attribute>> : public AttributeBase { -public: - Attribute(const std::string& name, const T& value) - : AttributeBase(name), value_(convertToParquetType(value)) {} - - void print(StreamDevice& os) const override { - os << value_; - } - -private: - decltype(convertToParquetType(std::declval())) value_; -}; - class XMLElement { public: From df79c60c1182bfeaafd3b1cc46ec7f4544bc17d8 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 9 Aug 2024 17:03:09 -0600 Subject: [PATCH 09/13] remove format unused Signed-off-by: Max --- src/utils/iodevices/StreamDevices.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index 6dea56cef582..b060a252b382 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -202,7 +202,7 @@ class ParquetStream : public StreamDevice { myStream.release(); } - void setPrecision(FMT_MAYBE_UNUSED const int& precision) override {} + void setPrecision(const int& precision) override {} std::string str() override { return ""; From 605ebcaca803f4f0f1c0bc7283e9e9e83d6feb73 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 9 Aug 2024 17:07:31 -0600 Subject: [PATCH 10/13] remove format unused Signed-off-by: Max --- src/utils/iodevices/StreamDevices.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index b060a252b382..4957c5520098 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -220,7 +220,7 @@ class ParquetStream : public StreamDevice { throw std::runtime_error("Not implemented"); } - void str(FMT_MAYBE_UNUSED const std::string& s) override { + void str(const std::string& s) override { throw std::runtime_error("Not implemented"); }; From 54e662ea573e52abe6eb980d047085979f214e32 Mon Sep 17 00:00:00 2001 From: Max Date: Sun, 11 Aug 2024 08:35:40 -0500 Subject: [PATCH 11/13] Fixed all warnings Signed-off-by: Max --- src/CMakeLists.txt | 4 +++- src/utils/iodevices/OutputDevice_Parquet.cpp | 1 + src/utils/iodevices/OutputDevice_Parquet.h | 13 +++++++------ src/utils/iodevices/ParquetFormatter.h | 15 +++++++++++++-- src/utils/iodevices/StreamDevices.h | 6 +++++- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fe67ec9e89dc..7fcfdd190e8c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,9 @@ set(netconvertlibs netwrite netimport netbuild foreign_eulerspiral ${GDAL_LIBRARY} netimport_vissim netimport_vissim_typeloader netimport_vissim_tempstructs ${commonlibs} ${TCMALLOC_LIBRARY}) -FIND_LIBRARY(PARQUET_LIBRARY NAMES parquet) +if(WITH_PARQUET) + FIND_LIBRARY(PARQUET_LIBRARY NAMES parquet) +endif() set(sumolibs traciserver netload microsim_cfmodels microsim_engine microsim_lcmodels microsim_devices microsim_trigger microsim_output microsim_transportables microsim_actions diff --git a/src/utils/iodevices/OutputDevice_Parquet.cpp b/src/utils/iodevices/OutputDevice_Parquet.cpp index dbe231a03f6e..97f132a6f8f5 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.cpp +++ b/src/utils/iodevices/OutputDevice_Parquet.cpp @@ -53,6 +53,7 @@ OutputDevice_Parquet::OutputDevice_Parquet(const std::string& fullName) bool OutputDevice_Parquet::closeTag(const std::string& comment) { + UNUSED_PARAMETER(comment); // open the file for writing, but only if the depth is >=2 (i.e. we are closing the children tag). //! @todo this is a bit of a hack, but it works for now auto formatter = dynamic_cast(this->getFormatter()); diff --git a/src/utils/iodevices/OutputDevice_Parquet.h b/src/utils/iodevices/OutputDevice_Parquet.h index 8d993d0f3536..82e1e282b632 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.h +++ b/src/utils/iodevices/OutputDevice_Parquet.h @@ -39,17 +39,16 @@ /** - * @class OutputDevice_File - * @brief An output device that encapsulates an ofstream + * @class OutputDevice_Parquet + * @brief An output device that encapsulates an parquet stream writer * - * Please note that the device gots responsible for the stream and deletes + * Please note that the device is responsible for the stream and deletes * it (it should not be deleted elsewhere). */ class OutputDevice_Parquet : public OutputDevice { public: /** @brief Constructor * @param[in] fullName The name of the output file to use - * @param[in] compressed whether to apply gzip compression * @exception IOError Should not be thrown by this implementation */ OutputDevice_Parquet(const std::string& fullName); @@ -67,10 +66,12 @@ class OutputDevice_Parquet : public OutputDevice { void lf() {}; // null the setPrecision method - void setPrecision(int precision) override {}; + void setPrecision(int precision) override { + UNUSED_PARAMETER(precision); + }; void setOSFlags(std::ios_base::fmtflags flags) override { - + UNUSED_PARAMETER(flags); }; protected: diff --git a/src/utils/iodevices/ParquetFormatter.h b/src/utils/iodevices/ParquetFormatter.h index da31e54cb691..8806c634bbe9 100644 --- a/src/utils/iodevices/ParquetFormatter.h +++ b/src/utils/iodevices/ParquetFormatter.h @@ -291,10 +291,13 @@ class ParquetFormatter : public OutputFormatter { * @todo Describe what is saved */ // turn off the warning for unused parameters - [[maybe_unused]] bool writeXMLHeader(StreamDevice& into, const std::string& rootXMLElement, const std::map& attrs, bool includeConfig = true) override { + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(rootXMLElement); + UNUSED_PARAMETER(attrs); + UNUSED_PARAMETER(includeConfig); return 0; }; @@ -310,6 +313,7 @@ class ParquetFormatter : public OutputFormatter { * @return The OutputDevice for further processing */ void openTag(StreamDevice& into, const std::string& xmlXMLElement) override { + UNUSED_PARAMETER(into); #ifdef PARQUET_TESTING // assert that the stack does not contain the XMLElement assert(std::find(myXMLStack.begin(), myXMLStack.end(), xmlXMLElement) == myXMLStack.end()); @@ -336,6 +340,7 @@ class ParquetFormatter : public OutputFormatter { * @todo it is not verified that the topmost XMLElement was closed */ inline bool closeTag(StreamDevice& into, const std::string& comment = "") override { + UNUSED_PARAMETER(comment); if (myXMLStack.empty()) { return false; } @@ -362,12 +367,17 @@ class ParquetFormatter : public OutputFormatter { */ void writePreformattedTag(StreamDevice& into, const std::string& val) override { // don't take any action + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(val); return; }; /** @brief writes arbitrary padding */ - inline void writePadding(StreamDevice& into, const std::string& val) override {}; + inline void writePadding(StreamDevice& into, const std::string& val) override { + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(val); + }; /** @brief writes an arbitrary attribute @@ -378,6 +388,7 @@ class ParquetFormatter : public OutputFormatter { */ template void writeAttr(StreamDevice& into, const std::string& attr, const T& val) { + UNUSED_PARAMETER(into); std::unique_ptr typed_attr = std::make_unique>(attr, val); this->myXMLStack.back().addAttribute(std::move(typed_attr)); if (!sharedNodeVector && this->fields.find(attr) == this->fields.end()) { diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index 4957c5520098..43c1ff1c8b5e 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -14,6 +14,7 @@ #include #endif + class StreamDevice { public: @@ -202,7 +203,9 @@ class ParquetStream : public StreamDevice { myStream.release(); } - void setPrecision(const int& precision) override {} + void setPrecision(const int& precision) override { + UNUSED_PARAMETER(precision); + } std::string str() override { return ""; @@ -221,6 +224,7 @@ class ParquetStream : public StreamDevice { } void str(const std::string& s) override { + UNUSED_PARAMETER(s); throw std::runtime_error("Not implemented"); }; From 063ef329e457156701ad83d8aa2defd5fe9bcbec Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 15 Aug 2024 17:54:54 -0600 Subject: [PATCH 12/13] segmentation fault gone --- src/utils/iodevices/OutputDevice.cpp | 1 + src/utils/iodevices/OutputDevice.h | 14 +++++++++++++- src/utils/iodevices/OutputDevice_String.cpp | 9 +++++---- src/utils/iodevices/OutputFormatter.h | 2 +- src/utils/iodevices/ParquetFormatter.h | 21 ++++++++++++++------- src/utils/iodevices/PlainXMLFormatter.cpp | 2 +- src/utils/iodevices/PlainXMLFormatter.h | 2 +- src/utils/iodevices/StreamDevices.h | 16 ++++++++-------- 8 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/utils/iodevices/OutputDevice.cpp b/src/utils/iodevices/OutputDevice.cpp index d564914f8107..65be2e1c73a5 100644 --- a/src/utils/iodevices/OutputDevice.cpp +++ b/src/utils/iodevices/OutputDevice.cpp @@ -229,6 +229,7 @@ OutputDevice::OutputDevice(const std::string& filename, OutputFormatter* format OutputDevice::~OutputDevice() { delete myFormatter; + delete myStreamDevice; } diff --git a/src/utils/iodevices/OutputDevice.h b/src/utils/iodevices/OutputDevice.h index b9d8d536e532..bb375d45830b 100644 --- a/src/utils/iodevices/OutputDevice.h +++ b/src/utils/iodevices/OutputDevice.h @@ -372,7 +372,7 @@ class OutputDevice { // getOStream() << t; // get the correct formatter if (this->getOStream().allowRaw()) { - this->getOStream() << t; + writeRaw(t); } else { throw IOError("Raw output is not allowed for this output device"); @@ -395,6 +395,18 @@ class OutputDevice { getOStream().setOSFlags(flags); } + // @brief handle the raw write + template + void writeRaw(const T& val) { + // cast the writer to the correct type + if (this->getType() == OutputWriterType::XML) { + getFormatter()->writeRaw(getOStream(), val); + } + else { + throw IOError("Raw output is not supported for this output type"); + } + } + protected: /// @brief Returns the associated ostream virtual StreamDevice& getOStream() { diff --git a/src/utils/iodevices/OutputDevice_String.cpp b/src/utils/iodevices/OutputDevice_String.cpp index 0760f6f2a082..c8279e4c1def 100644 --- a/src/utils/iodevices/OutputDevice_String.cpp +++ b/src/utils/iodevices/OutputDevice_String.cpp @@ -29,10 +29,11 @@ // =========================================================================== OutputDevice_String::OutputDevice_String(const int defaultIndentation) : OutputDevice(defaultIndentation) { - auto stream = new std::ostringstream(); - (*stream) << std::setiosflags(std::ios::fixed); - myStreamDevice = new OStreamDevice(stream); - setPrecision(); + // auto stream = new std::ostringstream(); + // (*stream) << std::setiosflags(std::ios::fixed); + myStreamDevice = new OStreamDevice(new std::ostringstream()); + myStreamDevice->setOSFlags(std::ios::fixed); + myStreamDevice->setPrecision(2); } diff --git a/src/utils/iodevices/OutputFormatter.h b/src/utils/iodevices/OutputFormatter.h index c282ed5d5817..29f5c566a863 100644 --- a/src/utils/iodevices/OutputFormatter.h +++ b/src/utils/iodevices/OutputFormatter.h @@ -50,7 +50,7 @@ class RGBColor; class OutputFormatter { public: /// @brief Destructor - virtual ~OutputFormatter() { } + virtual ~OutputFormatter() = default; /** @brief Writes an XML header with optional configuration diff --git a/src/utils/iodevices/ParquetFormatter.h b/src/utils/iodevices/ParquetFormatter.h index 8806c634bbe9..641e22d73683 100644 --- a/src/utils/iodevices/ParquetFormatter.h +++ b/src/utils/iodevices/ParquetFormatter.h @@ -103,8 +103,10 @@ void AppendField(parquet::schema::NodeVector& fields, const T& val, const std::s parquet::ConvertedType::TIMESTAMP_MILLIS)); } else { - // warn - fmt::print("Unsupported type for AppendField\n"); + // // warn + // fmt::print("Unsupported type for AppendField\n"); + // static_assert(always_false, "Unsupported type for AppendField"); + } // else { // static_assert(always_false, "Unsupported type for AppendField"); @@ -173,8 +175,7 @@ auto convertToParquetType(const T& value) { std::is_same_v || std::is_same_v) { // have to take a copy of the string, to ensure its lifetime is long enough - auto x = std::string(value); - return x; + return std::string(value); } else { // For any other type, convert to string return toString(value); @@ -188,11 +189,15 @@ class Attribute : public AttributeBase { : AttributeBase(name), value_(convertToParquetType(value)) {} void print(StreamDevice& os) const override { - os << value_; + if (value_){ + os << *value_; + } else{ + assert(false); + } } private: - decltype(convertToParquetType(std::declval())) value_; + std::optional()))> value_; }; @@ -278,7 +283,7 @@ class ParquetFormatter : public OutputFormatter { ParquetFormatter() {}; /// @brief Destructor - virtual ~ParquetFormatter() { } + virtual ~ParquetFormatter() = default; /** @brief Writes an XML header with optional configuration * @@ -429,6 +434,8 @@ class ParquetFormatter : public OutputFormatter { template void writeRaw(StreamDevice& into, T& val) { + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(val); throw std::runtime_error("writeRaw not implemented for ParquetFormatter"); } diff --git a/src/utils/iodevices/PlainXMLFormatter.cpp b/src/utils/iodevices/PlainXMLFormatter.cpp index df07a136074e..6cb27c6b9121 100644 --- a/src/utils/iodevices/PlainXMLFormatter.cpp +++ b/src/utils/iodevices/PlainXMLFormatter.cpp @@ -68,7 +68,7 @@ PlainXMLFormatter::openTag(StreamDevice& into, const std::string& xmlElement) { into << ">\n"; } myHavePendingOpener = true; - into << std::string(4 * (myXMLStack.size() + myDefaultIndentation), ' ') << "<" << xmlElement; + into << std::string(4 * (myXMLStack.size() + myDefaultIndentation), ' ').c_str() << "<" << xmlElement; myXMLStack.push_back(xmlElement); } diff --git a/src/utils/iodevices/PlainXMLFormatter.h b/src/utils/iodevices/PlainXMLFormatter.h index 8f96318fb9ea..33d920e5b0d1 100644 --- a/src/utils/iodevices/PlainXMLFormatter.h +++ b/src/utils/iodevices/PlainXMLFormatter.h @@ -45,7 +45,7 @@ class PlainXMLFormatter : public OutputFormatter { /// @brief Destructor - virtual ~PlainXMLFormatter() { } + virtual ~PlainXMLFormatter() = default; /** @brief Writes an XML header with optional configuration diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index 43c1ff1c8b5e..562f3ca0d40a 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -53,7 +53,7 @@ class StreamDevice { /// @brief set the precision /// @param precision - virtual void setPrecision(const int& precision) = 0; + virtual void setPrecision(int precision) = 0; /// @brief get the precision /// @return the precision @@ -126,8 +126,8 @@ class OStreamDevice : public StreamDevice { return *this; } - void setPrecision(const int& precision) override { - *myStream << std::setprecision(precision); + void setPrecision(int precision) override { + (*myStream) << std::setprecision(precision); } void setOSFlags(std::ios_base::fmtflags flags) override { @@ -203,7 +203,7 @@ class ParquetStream : public StreamDevice { myStream.release(); } - void setPrecision(const int& precision) override { + void setPrecision(int precision) override { UNUSED_PARAMETER(precision); } @@ -214,9 +214,11 @@ class ParquetStream : public StreamDevice { template void print(const T& t) { (*myStream) << t; - } + // protect this to only allow types that are supported by parquet + + void setOSFlags(std::ios_base::fmtflags flags) override {UNUSED_PARAMETER(flags);} operator std::ostream& () override { @@ -255,12 +257,10 @@ StreamDevice& operator<<(StreamDevice& stream, const T& t) { case StreamDevice::Type::OSTREAM: static_cast(&stream)->print(t); break; - // case StreamDevice::Type::STRING: - // static_cast(&stream)->print(t); - // break; case StreamDevice::Type::PARQUET: #ifdef HAVE_PARQUET static_cast(&stream)->print(t); + // throw std::runtime_error("Parquet not supported in this build"); #else throw std::runtime_error("Parquet not supported in this build"); #endif From 8e7add4f62fd4433cd06e7fe02758480c3bb5647 Mon Sep 17 00:00:00 2001 From: Max Date: Sun, 18 Aug 2024 09:15:30 -0500 Subject: [PATCH 13/13] Fixing seg faults Signed-off-by: Max --- src/utils/iodevices/OutputDevice.cpp | 7 -- src/utils/iodevices/OutputDevice.h | 22 ++--- src/utils/iodevices/OutputDevice_CERR.cpp | 2 +- src/utils/iodevices/OutputDevice_COUT.cpp | 2 +- src/utils/iodevices/OutputDevice_File.cpp | 16 ++-- src/utils/iodevices/OutputDevice_File.h | 4 - src/utils/iodevices/OutputDevice_Parquet.cpp | 13 +-- src/utils/iodevices/OutputDevice_Parquet.h | 2 +- src/utils/iodevices/OutputDevice_String.cpp | 9 +- src/utils/iodevices/OutputDevice_String.h | 5 -- src/utils/iodevices/StreamDevices.h | 94 +++++++++++++++++++- 11 files changed, 120 insertions(+), 56 deletions(-) diff --git a/src/utils/iodevices/OutputDevice.cpp b/src/utils/iodevices/OutputDevice.cpp index 65be2e1c73a5..f5b473fe23a1 100644 --- a/src/utils/iodevices/OutputDevice.cpp +++ b/src/utils/iodevices/OutputDevice.cpp @@ -226,13 +226,6 @@ OutputDevice::OutputDevice(const std::string& filename, OutputFormatter* format } - -OutputDevice::~OutputDevice() { - delete myFormatter; - delete myStreamDevice; -} - - bool OutputDevice::ok() { return getOStream().good(); diff --git a/src/utils/iodevices/OutputDevice.h b/src/utils/iodevices/OutputDevice.h index bb375d45830b..2a08aaf88a67 100644 --- a/src/utils/iodevices/OutputDevice.h +++ b/src/utils/iodevices/OutputDevice.h @@ -153,7 +153,7 @@ class OutputDevice { /// @brief Destructor - virtual ~OutputDevice(); + virtual ~OutputDevice() = default; /** @brief returns the information whether one can write into the device @@ -209,7 +209,7 @@ class OutputDevice { template bool writeHeader(const SumoXMLTag& rootElement) { - return getFormatter()->writeHeader(getOStream(), rootElement); + return getFormatter().writeHeader(getOStream(), rootElement); } @@ -308,12 +308,12 @@ class OutputDevice { { case OutputWriterType::XML: // cast the writer to the correct type - getFormatter()->writeAttr(getOStream(), attr, val); + getFormatter().writeAttr(getOStream(), attr, val); break; case OutputWriterType::PARQUET: #ifdef HAVE_PARQUET // cast the writer to the correct type - getFormatter()->writeAttr(getOStream(), attr, val); + getFormatter().writeAttr(getOStream(), attr, val); #else throw IOError("Parquet output is not supported in this build. Please recompile with the correct options."); #endif @@ -400,7 +400,7 @@ class OutputDevice { void writeRaw(const T& val) { // cast the writer to the correct type if (this->getType() == OutputWriterType::XML) { - getFormatter()->writeRaw(getOStream(), val); + getFormatter().writeRaw(getOStream(), val); } else { throw IOError("Raw output is not supported for this output type"); @@ -430,8 +430,8 @@ class OutputDevice { virtual void postWriteHook(); /// @brief Returns the formatter - OutputFormatter* getFormatter() { - return myFormatter; + OutputFormatter& getFormatter() { + return *myFormatter; } @@ -446,15 +446,15 @@ class OutputDevice { const std::string myFilename; /// @brief the stream device - StreamDevice* myStreamDevice; + std::unique_ptr myStreamDevice{nullptr}; /// @brief The formatter for XML - OutputFormatter* myFormatter; + std::unique_ptr myFormatter{nullptr}; /// @brief return a type casted formatter template - T* getFormatter() { - return static_cast(myFormatter); + T& getFormatter() { + return static_cast(*myFormatter); } private: diff --git a/src/utils/iodevices/OutputDevice_CERR.cpp b/src/utils/iodevices/OutputDevice_CERR.cpp index 559bb0e2bb52..9a26749194d2 100644 --- a/src/utils/iodevices/OutputDevice_CERR.cpp +++ b/src/utils/iodevices/OutputDevice_CERR.cpp @@ -47,7 +47,7 @@ OutputDevice_CERR::getDevice() { // method definitions // =========================================================================== OutputDevice_CERR::OutputDevice_CERR() : OutputDevice(0, "CERR") { - myStreamDevice = new OStreamDevice(&std::cerr); + myStreamDevice = std::make_unique(std::cerr); } diff --git a/src/utils/iodevices/OutputDevice_COUT.cpp b/src/utils/iodevices/OutputDevice_COUT.cpp index 192d3d9d4711..4702961143f2 100644 --- a/src/utils/iodevices/OutputDevice_COUT.cpp +++ b/src/utils/iodevices/OutputDevice_COUT.cpp @@ -47,7 +47,7 @@ OutputDevice_COUT::getDevice() { // method definitions // =========================================================================== OutputDevice_COUT::OutputDevice_COUT() : OutputDevice(0, "COUT") { - myStreamDevice = new OStreamDevice(&std::cout); + myStreamDevice = std::make_unique(); } diff --git a/src/utils/iodevices/OutputDevice_File.cpp b/src/utils/iodevices/OutputDevice_File.cpp index 1a9448d72516..8bf5b6823f3a 100644 --- a/src/utils/iodevices/OutputDevice_File.cpp +++ b/src/utils/iodevices/OutputDevice_File.cpp @@ -40,7 +40,7 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com if (fullName == "/dev/null") { myAmNull = true; #ifdef WIN32 - myStreamDevice = new OStreamDevice(new std::ofstream("NUL")); + myStreamDevice = std::make_unique(new std::ofstream("NUL")); if (!myFileStream->good()) { delete myFileStream; throw IOError(TLF("Could not redirect to NUL device (%).", std::string(std::strerror(errno)))); @@ -52,28 +52,22 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com #ifdef HAVE_ZLIB if (compressed) { try { - myStreamDevice = new OStreamDevice(new zstr::ofstream(localName.c_str(), std::ios_base::out)); + myStreamDevice = std::make_unique(new zstr::ofstream(localName.c_str(), std::ios_base::out)); } catch (strict_fstream::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } catch (zstr::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } } else { - myStreamDevice = new OStreamDevice(new std::ofstream(localName.c_str(), std::ios_base::out)); + myStreamDevice = std::make_unique(new std::ofstream(localName.c_str(), std::ios_base::out)); } #else UNUSED_PARAMETER(compressed); - myFileStream = new std::ofstream(localName.c_str(), std::ios_base::out); + myStreamDevice = std::make_unique(new std::ofstream(localName.c_str(), std::ios_base::out)); #endif if (!myStreamDevice->good()) { - delete myStreamDevice; + myStreamDevice.reset(); throw IOError("Could not build output file '" + fullName + "' (" + std::strerror(errno) + ")."); } } - - -OutputDevice_File::~OutputDevice_File() { - delete myStreamDevice; -} - /****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_File.h b/src/utils/iodevices/OutputDevice_File.h index 3d4beb2465d3..cdc9a237bc28 100644 --- a/src/utils/iodevices/OutputDevice_File.h +++ b/src/utils/iodevices/OutputDevice_File.h @@ -45,10 +45,6 @@ class OutputDevice_File : public OutputDevice { */ OutputDevice_File(const std::string& fullName, const bool compressed = false); - - /// @brief Destructor - ~OutputDevice_File(); - /** @brief returns the information whether the device will discard all output * @return Whether the device redirects to /dev/null */ diff --git a/src/utils/iodevices/OutputDevice_Parquet.cpp b/src/utils/iodevices/OutputDevice_Parquet.cpp index 97f132a6f8f5..a61ddd143734 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.cpp +++ b/src/utils/iodevices/OutputDevice_Parquet.cpp @@ -56,7 +56,7 @@ bool OutputDevice_Parquet::closeTag(const std::string& comment) { UNUSED_PARAMETER(comment); // open the file for writing, but only if the depth is >=2 (i.e. we are closing the children tag). //! @todo this is a bit of a hack, but it works for now - auto formatter = dynamic_cast(this->getFormatter()); + auto formatter = dynamic_cast(&this->getFormatter()); if (formatter->getDepth() < 2) { // we have to clean up the stack, otherwise the file will not be written correctly // when it is open @@ -64,7 +64,7 @@ bool OutputDevice_Parquet::closeTag(const std::string& comment) { // this is critical for the file to be written correctly return false; } - if (myFile == nullptr) { + if (myFile == nullptr) { if (formatter == nullptr) { throw IOError("Formatter is not a ParquetFormatter"); } @@ -72,7 +72,7 @@ bool OutputDevice_Parquet::closeTag(const std::string& comment) { PARQUET_ASSIGN_OR_THROW( this->myFile, arrow::io::FileOutputStream::Open(this->myFilename)); - this->myStreamDevice = new ParquetStream(parquet::ParquetFileWriter::Open(this->myFile, std::static_pointer_cast( + this->myStreamDevice = std::make_unique(parquet::ParquetFileWriter::Open(this->myFile, std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, formatter->getNodeVector()) ), this->builder.build())); @@ -88,9 +88,12 @@ bool OutputDevice_Parquet::closeTag(const std::string& comment) { OutputDevice_Parquet::~OutputDevice_Parquet() { // have to delete the stream device before the file. This dumps unwritten data to the file - delete myStreamDevice; + myStreamDevice.reset(); + // close the file (if open) + if (this->myFile.get() == nullptr) { + return; + } [[maybe_unused]] arrow::Status status = this->myFile->Close(); - } #endif diff --git a/src/utils/iodevices/OutputDevice_Parquet.h b/src/utils/iodevices/OutputDevice_Parquet.h index 82e1e282b632..1358395260e6 100644 --- a/src/utils/iodevices/OutputDevice_Parquet.h +++ b/src/utils/iodevices/OutputDevice_Parquet.h @@ -54,7 +54,7 @@ class OutputDevice_Parquet : public OutputDevice { OutputDevice_Parquet(const std::string& fullName); /// @brief Destructor - ~OutputDevice_Parquet(); + ~OutputDevice_Parquet() override; /** @brief implements the close tag logic. This is where the file is first opened and the schema is created. * This exploits the fact that for *most* SUMO files, all the fields are present at the first close tag event. diff --git a/src/utils/iodevices/OutputDevice_String.cpp b/src/utils/iodevices/OutputDevice_String.cpp index c8279e4c1def..a9719c119f15 100644 --- a/src/utils/iodevices/OutputDevice_String.cpp +++ b/src/utils/iodevices/OutputDevice_String.cpp @@ -29,18 +29,11 @@ // =========================================================================== OutputDevice_String::OutputDevice_String(const int defaultIndentation) : OutputDevice(defaultIndentation) { - // auto stream = new std::ostringstream(); - // (*stream) << std::setiosflags(std::ios::fixed); - myStreamDevice = new OStreamDevice(new std::ostringstream()); + myStreamDevice = std::make_unique(new std::ostringstream()); myStreamDevice->setOSFlags(std::ios::fixed); myStreamDevice->setPrecision(2); } - -OutputDevice_String::~OutputDevice_String() { -} - - std::string OutputDevice_String::getString() const { return myStreamDevice->str(); diff --git a/src/utils/iodevices/OutputDevice_String.h b/src/utils/iodevices/OutputDevice_String.h index eaeed7242939..3c6f0a20649e 100644 --- a/src/utils/iodevices/OutputDevice_String.h +++ b/src/utils/iodevices/OutputDevice_String.h @@ -41,11 +41,6 @@ class OutputDevice_String : public OutputDevice { */ OutputDevice_String(const int defaultIndentation = 0); - - /// @brief Destructor - ~OutputDevice_String(); - - /** @brief Returns the current content as a string * @return The content as string */ diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h index 562f3ca0d40a..d192415b9f25 100644 --- a/src/utils/iodevices/StreamDevices.h +++ b/src/utils/iodevices/StreamDevices.h @@ -21,6 +21,7 @@ class StreamDevice { /// @brief The type of the stream enum Type { OSTREAM, // std::ostream (or std::ofstream) + COUT, // std::cout PARQUET // parquet::StreamWriter }; @@ -104,8 +105,9 @@ class OStreamDevice : public StreamDevice { OStreamDevice(std::ofstream* stream) : StreamDevice(Type::OSTREAM, true), myStream(std::move(stream)) {} OStreamDevice(std::ostream* stream) : StreamDevice(Type::OSTREAM, true), myStream(std::move(stream)) {} OStreamDevice(std::ofstream stream) : StreamDevice(Type::OSTREAM, true), myStream(new std::ofstream(std::move(stream))) {} + OStreamDevice(std::basic_ostream stream) : StreamDevice(Type::OSTREAM, true), myStream(&stream) {} - virtual ~OStreamDevice() override = default; + virtual ~OStreamDevice() override = default; bool ok() override { return myStream->good(); @@ -143,7 +145,20 @@ class OStreamDevice : public StreamDevice { } std::string str() override { - return ""; + // Try casting to ostringstream + if (auto* oss_ptr = dynamic_cast(myStream.get())) { + return oss_ptr->str(); + } + + // Try casting to stringstream + if (auto* ss_ptr = dynamic_cast(myStream.get())) { + return ss_ptr->str(); + } + + // If it's neither, we need to use a more general approach + std::ostringstream oss; + oss << myStream->rdbuf(); + return oss.str(); } void str(const std::string& s) override { @@ -172,6 +187,78 @@ class OStreamDevice : public StreamDevice { std::unique_ptr myStream; }; // Add the missing semicolon here + +class COUTStreamDevice : public StreamDevice { +public: + + // write a constructor that takes a std::ofstream + COUTStreamDevice() : StreamDevice(Type::COUT, true), myStream(std::cout) {}; + COUTStreamDevice(std::ostream& stream) : StreamDevice(Type::COUT, true), myStream(stream) {}; + + virtual ~COUTStreamDevice() override = default; + + bool ok() override { + return myStream.good(); + } + + StreamDevice& flush() override { + myStream.flush(); + return *this; + } + + void close() override { + (void)(this->flush()); + } + + template + StreamDevice& print(const T& t) { + myStream << t; + return *this; + } + + void setPrecision(int precision) override { + myStream << std::setprecision(precision); + } + + void setOSFlags(std::ios_base::fmtflags flags) override { + myStream.setf(flags); + } + + int precision() override { + return static_cast(myStream.precision()); + } + + bool good() override { + return myStream.good(); + } + + std::string str() override { + return ""; + } + + void str(const std::string& s) override { + myStream << s; + } + + operator std::ostream& () override { + return myStream; + } + + StreamDevice& endLine() override { + myStream << std::endl; + return *this; + } + + std::ostream& getOStream() override { + return myStream; + } + +private: + + std::ostream& myStream; + +}; // Add the missing semicolon here + class ParquetStream : public StreamDevice { #ifdef HAVE_PARQUET @@ -257,6 +344,9 @@ StreamDevice& operator<<(StreamDevice& stream, const T& t) { case StreamDevice::Type::OSTREAM: static_cast(&stream)->print(t); break; + case StreamDevice::Type::COUT: + static_cast(&stream)->print(t); + break; case StreamDevice::Type::PARQUET: #ifdef HAVE_PARQUET static_cast(&stream)->print(t);