Skip to content

Commit

Permalink
C++: Make LZ4 and ZSTD dependencies optional + clang-format
Browse files Browse the repository at this point in the history
New defines: MCAP_COMPRESSION_NO_ZSTD, MCAP_COMPRESSION_NO_LZ4
  • Loading branch information
asherikov committed Nov 12, 2023
1 parent 4ab424e commit 070e3b3
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 19 deletions.
1 change: 1 addition & 0 deletions cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ test: dev-image
.PHONY: test-host
test-host:
./test/build/Debug/bin/unit-tests
./test/build/Debug/bin/unit-tests-nocompress

.PHONY: hdoc-build
hdoc-build:
Expand Down
10 changes: 10 additions & 0 deletions cpp/mcap/include/mcap/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class MCAP_PUBLIC BufferReader final : public ICompressedReader {
uint64_t size_;
};

#ifndef MCAP_COMPRESSION_NO_ZSTD
/**
* @brief ICompressedReader implementation that decompresses Zstandard
* (https://facebook.github.io/zstd/) data.
Expand Down Expand Up @@ -183,7 +184,9 @@ class MCAP_PUBLIC ZStdReader final : public ICompressedReader {
Status status_;
ByteArray uncompressedData_;
};
#endif

#ifndef MCAP_COMPRESSION_NO_LZ4
/**
* @brief ICompressedReader implementation that decompresses LZ4
* (https://lz4.github.io/lz4/) data.
Expand Down Expand Up @@ -222,6 +225,7 @@ class MCAP_PUBLIC LZ4Reader final : public ICompressedReader {
uint64_t compressedSize_;
uint64_t uncompressedSize_;
};
#endif

struct LinearMessageView;

Expand Down Expand Up @@ -539,8 +543,12 @@ struct MCAP_PUBLIC TypedChunkReader {
RecordReader reader_;
Status status_;
BufferReader uncompressedReader_;
#ifndef MCAP_COMPRESSION_NO_LZ4
LZ4Reader lz4Reader_;
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
ZStdReader zstdReader_;
#endif
};

/**
Expand Down Expand Up @@ -627,7 +635,9 @@ struct MCAP_PUBLIC IndexedMessageReader {
Status status_;
McapReader& mcapReader_;
RecordReader recordReader_;
#ifndef MCAP_COMPRESSION_NO_LZ4
LZ4Reader lz4Reader_;
#endif
ReadMessageOptions options_;
std::unordered_set<ChannelId> selectedChannels_;
std::function<void(const Message&, RecordOffset)> onMessage_;
Expand Down
64 changes: 49 additions & 15 deletions cpp/mcap/include/mcap/reader.inl
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#include "internal.hpp"
#include <algorithm>
#include <cassert>
#include <lz4frame.h>
#include <zstd.h>
#include <zstd_errors.h>
#ifndef MCAP_COMPRESSION_NO_LZ4
# include <lz4frame.h>
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
# include <zstd.h>
# include <zstd_errors.h>
#endif

namespace mcap {

Expand Down Expand Up @@ -119,6 +123,7 @@ uint64_t FileStreamReader::read(std::byte** output, uint64_t offset, uint64_t si

// LZ4Reader ///////////////////////////////////////////////////////////////////

#ifndef MCAP_COMPRESSION_NO_LZ4
LZ4Reader::LZ4Reader() {
const LZ4F_errorCode_t err =
LZ4F_createDecompressionContext((LZ4F_dctx**)&decompressionContext_, LZ4F_VERSION);
Expand Down Expand Up @@ -206,9 +211,11 @@ Status LZ4Reader::decompressAll(const std::byte* data, uint64_t compressedSize,
}
return result;
}
#endif

// ZStdReader //////////////////////////////////////////////////////////////////

#ifndef MCAP_COMPRESSION_NO_ZSTD
void ZStdReader::reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) {
status_ = DecompressAll(data, size, uncompressedSize, &uncompressedData_);
}
Expand Down Expand Up @@ -255,6 +262,7 @@ Status ZStdReader::DecompressAll(const std::byte* data, uint64_t compressedSize,
}
return result;
}
#endif

// McapReader //////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -1251,10 +1259,26 @@ TypedChunkReader::TypedChunkReader()
, status_{StatusCode::Success} {}

void TypedChunkReader::reset(const Chunk& chunk, Compression compression) {
ICompressedReader* decompressor =
(compression == Compression::None) ? static_cast<ICompressedReader*>(&uncompressedReader_)
: (compression == Compression::Lz4) ? static_cast<ICompressedReader*>(&lz4Reader_)
: static_cast<ICompressedReader*>(&zstdReader_);
ICompressedReader* decompressor;

switch (compression) {
#ifndef MCAP_COMPRESSION_NO_LZ4
case Compression::Lz4:
decompressor = static_cast<ICompressedReader*>(&lz4Reader_);
break;
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
default:
case Compression::Zstd:
decompressor = static_cast<ICompressedReader*>(&zstdReader_);
break;
#else
default:
#endif
case Compression::None:
decompressor = static_cast<ICompressedReader*>(&uncompressedReader_);
break;
}
decompressor->reset(chunk.records, chunk.compressedSize, chunk.uncompressedSize);
reader_.reset(*decompressor, 0, decompressor->size());
status_ = decompressor->status();
Expand Down Expand Up @@ -1604,7 +1628,8 @@ LinearMessageView::Iterator::Iterator(LinearMessageView& view)
}
}

LinearMessageView::Iterator::Impl::Impl(LinearMessageView& view) : view_(view) {
LinearMessageView::Iterator::Impl::Impl(LinearMessageView& view)
: view_(view) {
auto dataStart = view.dataStart_;
auto dataEnd = view.dataEnd_;
auto readMessageOptions = view.readMessageOptions_;
Expand Down Expand Up @@ -1658,16 +1683,18 @@ void LinearMessageView::Iterator::Impl::onMessage(const Message& message, Record

auto& channel = *maybeChannel;
// make sure the message is on the right topic
if (view_.readMessageOptions_.topicFilter && !view_.readMessageOptions_.topicFilter(channel.topic)) {
if (view_.readMessageOptions_.topicFilter &&
!view_.readMessageOptions_.topicFilter(channel.topic)) {
return;
}
SchemaPtr maybeSchema;
if (channel.schemaId != 0) {
maybeSchema = view_.mcapReader_.schema(channel.schemaId);
if (!maybeSchema) {
view_.onProblem_(Status{StatusCode::InvalidSchemaId,
internal::StrCat("channel ", channel.id, " (", channel.topic,
") references missing schema id ", channel.schemaId)});
view_.onProblem_(
Status{StatusCode::InvalidSchemaId,
internal::StrCat("channel ", channel.id, " (", channel.topic,
") references missing schema id ", channel.schemaId)});
return;
}
}
Expand Down Expand Up @@ -1840,13 +1867,20 @@ void IndexedMessageReader::decompressChunk(const Chunk& chunk,
if (*compression == Compression::None) {
slot.decompressedChunk.insert(slot.decompressedChunk.end(), &chunk.records[0],
&chunk.records[chunk.uncompressedSize]);
} else if (*compression == Compression::Lz4) {
}
#ifndef MCAP_COMPRESSION_NO_LZ4
else if (*compression == Compression::Lz4) {
status_ = lz4Reader_.decompressAll(chunk.records, chunk.compressedSize, chunk.uncompressedSize,
&slot.decompressedChunk);
} else if (*compression == Compression::Zstd) {
}
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
else if (*compression == Compression::Zstd) {
status_ = ZStdReader::DecompressAll(chunk.records, chunk.compressedSize, chunk.uncompressedSize,
&slot.decompressedChunk);
} else {
}
#endif
else {
status_ = Status(StatusCode::UnrecognizedCompression,
internal::StrCat("unhandled compression: ", chunk.compression));
}
Expand Down
10 changes: 10 additions & 0 deletions cpp/mcap/include/mcap/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <vector>

// Forward declaration
#ifndef MCAP_COMPRESSION_NO_ZSTD
struct ZSTD_CCtx_s;
#endif

namespace mcap {

Expand Down Expand Up @@ -251,6 +253,7 @@ class MCAP_PUBLIC BufferWriter final : public IChunkWriter {
std::vector<std::byte> buffer_;
};

#ifndef MCAP_COMPRESSION_NO_LZ4
/**
* @brief An in-memory IChunkWriter implementation that holds data in a
* temporary buffer before flushing to an LZ4-compressed buffer.
Expand All @@ -273,7 +276,9 @@ class MCAP_PUBLIC LZ4Writer final : public IChunkWriter {
std::vector<std::byte> compressedBuffer_;
CompressionLevel compressionLevel_;
};
#endif

#ifndef MCAP_COMPRESSION_NO_ZSTD
/**
* @brief An in-memory IChunkWriter implementation that holds data in a
* temporary buffer before flushing to an ZStandard-compressed buffer.
Expand All @@ -297,6 +302,7 @@ class MCAP_PUBLIC ZStdWriter final : public IChunkWriter {
std::vector<std::byte> compressedBuffer_;
ZSTD_CCtx_s* zstdContext_ = nullptr;
};
#endif

/**
* @brief Provides a write interface to an MCAP file.
Expand Down Expand Up @@ -445,8 +451,12 @@ class MCAP_PUBLIC McapWriter final {
std::unique_ptr<FileWriter> fileOutput_;
std::unique_ptr<StreamWriter> streamOutput_;
std::unique_ptr<BufferWriter> uncompressedChunk_;
#ifndef MCAP_COMPRESSION_NO_LZ4
std::unique_ptr<LZ4Writer> lz4Chunk_;
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
std::unique_ptr<ZStdWriter> zstdChunk_;
#endif
std::vector<Schema> schemas_;
std::vector<Channel> channels_;
std::vector<AttachmentIndex> attachmentIndex_;
Expand Down
30 changes: 26 additions & 4 deletions cpp/mcap/include/mcap/writer.inl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
#include <algorithm>
#include <cassert>
#include <iostream>
#include <lz4frame.h>
#include <lz4hc.h>
#include <zstd.h>
#include <zstd_errors.h>
#ifndef MCAP_COMPRESSION_NO_LZ4
# include <lz4frame.h>
# include <lz4hc.h>
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
# include <zstd.h>
# include <zstd_errors.h>
#endif

namespace mcap {

Expand Down Expand Up @@ -131,6 +135,7 @@ const std::byte* BufferWriter::compressedData() const {

// LZ4Writer ///////////////////////////////////////////////////////////////////

#ifndef MCAP_COMPRESSION_NO_LZ4
namespace internal {

int LZ4CompressionLevel(CompressionLevel level) {
Expand Down Expand Up @@ -199,9 +204,11 @@ const std::byte* LZ4Writer::data() const {
const std::byte* LZ4Writer::compressedData() const {
return compressedBuffer_.data();
}
#endif

// ZStdWriter //////////////////////////////////////////////////////////////////

#ifndef MCAP_COMPRESSION_NO_ZSTD
namespace internal {

int ZStdCompressionLevel(CompressionLevel level) {
Expand Down Expand Up @@ -278,6 +285,7 @@ const std::byte* ZStdWriter::data() const {
const std::byte* ZStdWriter::compressedData() const {
return compressedBuffer_.data();
}
#endif

// McapWriter //////////////////////////////////////////////////////////////////

Expand All @@ -295,12 +303,16 @@ void McapWriter::open(IWritable& writer, const McapWriterOptions& options) {
default:
uncompressedChunk_ = std::make_unique<BufferWriter>();
break;
#ifndef MCAP_COMPRESSION_NO_LZ4
case Compression::Lz4:
lz4Chunk_ = std::make_unique<LZ4Writer>(options.compressionLevel, chunkSize_);
break;
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
case Compression::Zstd:
zstdChunk_ = std::make_unique<ZStdWriter>(options.compressionLevel, chunkSize_);
break;
#endif
}
auto* chunkWriter = getChunkWriter();
if (chunkWriter) {
Expand Down Expand Up @@ -457,7 +469,9 @@ void McapWriter::terminate() {
fileOutput_.reset();
streamOutput_.reset();
uncompressedChunk_.reset();
#ifndef MCAP_COMPRESSION_NO_ZSTD
zstdChunk_.reset();
#endif

channels_.clear();
attachmentIndex_.clear();
Expand Down Expand Up @@ -658,10 +672,14 @@ IWritable& McapWriter::getOutput() {
default:
case Compression::None:
return *uncompressedChunk_;
#ifndef MCAP_COMPRESSION_NO_ZSTD
case Compression::Zstd:
return *zstdChunk_;
#endif
#ifndef MCAP_COMPRESSION_NO_LZ4
case Compression::Lz4:
return *lz4Chunk_;
#endif
}
}

Expand All @@ -674,10 +692,14 @@ IChunkWriter* McapWriter::getChunkWriter() {
case Compression::None:
default:
return uncompressedChunk_.get();
#ifndef MCAP_COMPRESSION_NO_LZ4
case Compression::Lz4:
return lz4Chunk_.get();
#endif
#ifndef MCAP_COMPRESSION_NO_ZSTD
case Compression::Zstd:
return zstdChunk_.get();
#endif
}
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ target_link_libraries(streamed-writer-conformance ${CONAN_LIBS})

add_executable(unit-tests unit_tests.cpp)
target_link_libraries(unit-tests ${CONAN_LIBS})

add_executable(unit-tests-nocompress unit_tests.cpp)
target_link_libraries(unit-tests-nocompress ${CONAN_LIBS})
target_compile_definitions(unit-tests-nocompress PRIVATE MCAP_COMPRESSION_NO_LZ4 MCAP_COMPRESSION_NO_ZSTD)
2 changes: 2 additions & 0 deletions cpp/test/unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ TEST_CASE("Message index records", "[writer]") {
REQUIRE(messageIndexChannelIds[1] == channel2.id);
}

#ifndef MCAP_COMPRESSION_NO_LZ4
TEST_CASE("LZ4 compression", "[reader][writer]") {
SECTION("Roundtrip") {
Buffer buffer;
Expand Down Expand Up @@ -781,6 +782,7 @@ TEST_CASE("Read Order", "[reader][writer]") {
REQUIRE(count == reverse_order_expected.size());
}
}
#endif

TEST_CASE("ReadJobQueue order", "[reader]") {
SECTION("successive chunks with out-of-order timestamps") {
Expand Down

0 comments on commit 070e3b3

Please sign in to comment.