Skip to content

Commit

Permalink
AVRO-4111: [C++] Replace boost::iostreams with zlib library (#3290)
Browse files Browse the repository at this point in the history
* AVRO-4111: [C++] Replace boost::iostreams with zlib library

* declare buf as uint8_t

* fix lint

* remove unused cmake variables
  • Loading branch information
wgtmac authored Jan 16, 2025
1 parent 8e51c7e commit f59db49
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-lang-c++-ARM.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update -q
sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev cmake
sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev zlib1g-dev cmake
- name: Build
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-lang-c++.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- uses: actions/checkout@v4

- name: Install Dependencies
run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev cmake
run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake

- name: Print Versions
run: |
Expand Down
17 changes: 12 additions & 5 deletions lang/c++/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ else (SNAPPY_FOUND)
message("Disabled snappy codec. libsnappy not found.")
endif (SNAPPY_FOUND)

find_package(ZLIB REQUIRED)
if (ZLIB_FOUND)
message("Enabled zlib codec")
else (ZLIB_FOUND)
message(FATAL_ERROR "ZLIB is not found")
endif (ZLIB_FOUND)

add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS})

add_definitions (-DAVRO_VERSION="${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH}")
Expand Down Expand Up @@ -140,8 +147,8 @@ set_property (TARGET avrocpp
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_DYN_LINK)

add_library (avrocpp_s STATIC ${AVRO_SOURCE_FILES})
target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR})
target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} fmt::fmt-header-only)
target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR} ${ZLIB_INCLUDE_DIR})
target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES} fmt::fmt-header-only)

set_property (TARGET avrocpp avrocpp_s
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_SOURCE)
Expand All @@ -152,8 +159,8 @@ set_target_properties (avrocpp PROPERTIES
set_target_properties (avrocpp_s PROPERTIES
VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH})

target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} fmt::fmt-header-only)
target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR})
target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES} fmt::fmt-header-only)
target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR} ${ZLIB_INCLUDE_DIR})

target_include_directories(avrocpp PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
Expand Down Expand Up @@ -209,7 +216,7 @@ if (AVRO_BUILD_TESTS)

macro (unittest name)
add_executable (${name} test/${name}.cc)
target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES})
target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES})
add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
endmacro (unittest)
Expand Down
160 changes: 100 additions & 60 deletions lang/c++/impl/DataFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
#include <random>
#include <sstream>

#include <boost/crc.hpp> // for boost::crc_32_type
#include <boost/iostreams/device/file.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filter/zlib.hpp>

#ifdef SNAPPY_CODEC_AVAILABLE
#include <snappy.h>
#endif

#include <zlib.h>

namespace avro {
using std::copy;
using std::istringstream;
Expand All @@ -55,12 +52,8 @@ const string AVRO_SNAPPY_CODEC = "snappy";
const size_t minSyncInterval = 32;
const size_t maxSyncInterval = 1u << 30;

boost::iostreams::zlib_params get_zlib_params() {
boost::iostreams::zlib_params ret;
ret.method = boost::iostreams::zlib::deflated;
ret.noheader = true;
return ret;
}
// Recommended by https://www.zlib.net/zlib_how.html
const size_t zlibBufGrowSize = 128 * 1024;

} // namespace

Expand Down Expand Up @@ -144,21 +137,45 @@ void DataFileWriterBase::sync() {
std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
copy(*in, *stream_);
} else if (codec_ == DEFLATE_CODEC) {
std::vector<char> buf;
std::vector<uint8_t> buf;
{
boost::iostreams::filtering_ostream os;
os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
os.push(boost::iostreams::back_inserter(buf));
const uint8_t *data;
size_t len;
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;

int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
if (ret != Z_OK) {
throw Exception("Failed to initialize deflate, error: {}", ret);
}

std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
while (input->next(&data, &len)) {
boost::iostreams::write(os, reinterpret_cast<const char *>(data), len);
const uint8_t *data;
size_t len;
while (ret != Z_STREAM_END && input->next(&data, &len)) {
zs.avail_in = static_cast<uInt>(len);
zs.next_in = const_cast<Bytef *>(data);
bool flush = (zs.total_in + len) >= buffer_->byteCount();
do {
if (zs.total_out == buf.size()) {
buf.resize(buf.size() + zlibBufGrowSize);
}
zs.avail_out = static_cast<uInt>(buf.size() - zs.total_out);
zs.next_out = buf.data() + zs.total_out;
ret = deflate(&zs, flush ? Z_FINISH : Z_NO_FLUSH);
if (ret == Z_STREAM_END) {
break;
}
if (ret != Z_OK) {
throw Exception("Failed to deflate, error: {}", ret);
}
} while (zs.avail_out == 0);
}

buf.resize(zs.total_out);
(void) deflateEnd(&zs);
} // make sure all is flushed
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(buf.data()), buf.size());
std::unique_ptr<InputStream> in = memoryInputStream(buf.data(), buf.size());
int64_t byteCount = buf.size();
avro::encode(*encoderPtr_, byteCount);
encoderPtr_->flush();
Expand All @@ -167,35 +184,28 @@ void DataFileWriterBase::sync() {
} else if (codec_ == SNAPPY_CODEC) {
std::vector<char> temp;
std::string compressed;
boost::crc_32_type crc;
{
boost::iostreams::filtering_ostream os;
os.push(boost::iostreams::back_inserter(temp));
const uint8_t *data;
size_t len;

std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
while (input->next(&data, &len)) {
boost::iostreams::write(os, reinterpret_cast<const char *>(data),
len);
}
} // make sure all is flushed
const uint8_t *data;
size_t len;
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
while (input->next(&data, &len)) {
temp.insert(temp.end(), reinterpret_cast<const char *>(data),
reinterpret_cast<const char *>(data) + len);
}

crc.process_bytes(reinterpret_cast<const char *>(temp.data()),
temp.size());
// For Snappy, add the CRC32 checksum
auto checksum = crc();
auto checksum = crc32(0, reinterpret_cast<const Bytef *>(temp.data()),
static_cast<uInt>(temp.size()));

// Now compress
size_t compressed_size = snappy::Compress(
reinterpret_cast<const char *>(temp.data()), temp.size(),
&compressed);

temp.clear();
{
boost::iostreams::filtering_ostream os;
os.push(boost::iostreams::back_inserter(temp));
boost::iostreams::write(os, compressed.c_str(), compressed_size);
}
temp.insert(temp.end(), compressed.c_str(),
compressed.c_str() + compressed_size);

temp.push_back(static_cast<char>((checksum >> 24) & 0xFF));
temp.push_back(static_cast<char>((checksum >> 16) & 0xFF));
temp.push_back(static_cast<char>((checksum >> 8) & 0xFF));
Expand Down Expand Up @@ -285,8 +295,7 @@ void DataFileReaderBase::init(const ValidSchema &readerSchema) {
static void drain(InputStream &in) {
const uint8_t *p = nullptr;
size_t n = 0;
while (in.next(&p, &n))
;
while (in.next(&p, &n));
}

char hex(unsigned int x) {
Expand Down Expand Up @@ -384,7 +393,6 @@ void DataFileReaderBase::readDataBlock() {
dataStream_ = std::move(st);
#ifdef SNAPPY_CODEC_AVAILABLE
} else if (codec_ == SNAPPY_CODEC) {
boost::crc_32_type crc;
uint32_t checksum = 0;
compressed_.clear();
uncompressed.clear();
Expand All @@ -408,35 +416,67 @@ void DataFileReaderBase::readDataBlock() {
throw Exception(
"Snappy Compression reported an error when decompressing");
}
crc.process_bytes(uncompressed.c_str(), uncompressed.size());
auto c = crc();
auto c = crc32(0, reinterpret_cast<const Bytef *>(uncompressed.c_str()),
static_cast<uInt>(uncompressed.size()));
if (checksum != c) {
throw Exception(
"Checksum did not match for Snappy compression: Expected: {}, computed: {}",
checksum, c);
}
os_.reset(new boost::iostreams::filtering_istream());
os_->push(
boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
uncompressed.size()));
std::unique_ptr<InputStream> in = istreamInputStream(*os_);

std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
uncompressed.size());

dataDecoder_->init(*in);
dataStream_ = std::move(in);
#endif
} else {
compressed_.clear();
const uint8_t *data;
size_t len;
while (st->next(&data, &len)) {
compressed_.insert(compressed_.end(), data, data + len);
uncompressed.clear();

{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = 0;
zs.next_in = Z_NULL;

int ret = inflateInit2(&zs, /*windowBits=*/-15);
if (ret != Z_OK) {
throw Exception("Failed to initialize inflate, error: {}", ret);
}

const uint8_t *data;
size_t len;
while (ret != Z_STREAM_END && st->next(&data, &len)) {
zs.avail_in = static_cast<uInt>(len);
zs.next_in = const_cast<Bytef *>(data);
do {
if (zs.total_out == uncompressed.size()) {
uncompressed.resize(uncompressed.size() + zlibBufGrowSize);
}
zs.avail_out = static_cast<uInt>(uncompressed.size() - zs.total_out);
zs.next_out = reinterpret_cast<Bytef *>(uncompressed.data() + zs.total_out);
ret = inflate(&zs, Z_NO_FLUSH);
if (ret == Z_STREAM_END) {
break;
}
if (ret != Z_OK) {
throw Exception("Failed to inflate, error: {}", ret);
}
} while (zs.avail_out == 0);
}

uncompressed.resize(zs.total_out);
(void) inflateEnd(&zs);
}
os_.reset(new boost::iostreams::filtering_istream());
os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
os_->push(boost::iostreams::basic_array_source<char>(
compressed_.data(), compressed_.size()));

std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
std::unique_ptr<InputStream> in = memoryInputStream(
reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
uncompressed.size());

dataDecoder_->init(*in);
dataStream_ = std::move(in);
}
Expand Down
3 changes: 0 additions & 3 deletions lang/c++/include/avro/DataFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
#include <string>
#include <vector>

#include <boost/iostreams/filtering_stream.hpp>

namespace avro {

/** Specify type of compression to use when writing data files. */
Expand Down Expand Up @@ -216,7 +214,6 @@ class AVRO_DECL DataFileReaderBase {
DataFileSync sync_{};

// for compressed buffer
std::unique_ptr<boost::iostreams::filtering_istream> os_;
std::vector<char> compressed_;
std::string uncompressed;
void readHeader();
Expand Down

0 comments on commit f59db49

Please sign in to comment.