Skip to content

Commit

Permalink
typescript: add McapWriter.InitializeForAppending() (#1060)
Browse files Browse the repository at this point in the history
### Public-Facing Changes

- Added `InitializeForAppending()` to `McapWriter`.
- `McapIndexedReader` now exposes `dataEndOffset` and `dataSectionCrc`.

### Description

Append mode works by using `McapIndexedReader` to read the summary
section, loading all indexes, channels, etc. from the footer into
memory, then chopping off everything from DataEnd onwards and continuing
to write from there.

This does not work if the `DataEnd` record contains extra data or
padding. This seems to be a fundamental flaw with the spec; a separate
PR will be raised to change the spec to disallow this.

See also:
https://github.com/foxglove/mcap/blob/4d62967abe95e755b91f16cce390f6560d18e07e/go/cli/mcap/utils/mcap_amendment.go

Supersedes / closes #1016
Resolves FG-5821

---------

Co-authored-by: Alex Ernst <alex.m.ernst14@gmail.com>
  • Loading branch information
jtbandes and alexern14 authored Mar 12, 2024
1 parent 6addfd5 commit d7c0bb3
Show file tree
Hide file tree
Showing 8 changed files with 701 additions and 51 deletions.
2 changes: 1 addition & 1 deletion typescript/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mcap/core",
"version": "2.0.2",
"version": "2.1.0",
"description": "MCAP file support in TypeScript",
"license": "MIT",
"repository": {
Expand Down
11 changes: 11 additions & 0 deletions typescript/core/src/ISeekableWriter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { IWritable } from "./IWritable";

/**
* ISeekableWriter describes a writer interface with seek abilities.
*/
export interface ISeekableWriter extends IWritable {
/** Move the cursor to the given position */
seek(position: bigint): Promise<void>;
/** Remove data after the current write position */
truncate(): Promise<void>;
}
37 changes: 20 additions & 17 deletions typescript/core/src/McapIndexedReader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ describe("McapIndexedReader", () => {
new Uint8Array([
...MCAP_MAGIC,
...record(Opcode.FOOTER, [
...uint64LE(0n), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(0n), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(0), // summary crc
]),
...MCAP_MAGIC,
Expand Down Expand Up @@ -92,8 +92,8 @@ describe("McapIndexedReader", () => {
...string(""), // library
]),
...record(Opcode.FOOTER, [
...uint64LE(0n), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(0n), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(0), // summary crc
]),
...MCAP_MAGIC,
Expand All @@ -111,8 +111,8 @@ describe("McapIndexedReader", () => {
...string("lib"), // library
]),
...record(Opcode.FOOTER, [
...uint64LE(0n), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(0n), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(0), // summary crc
]),
...MCAP_MAGIC,
Expand All @@ -130,6 +130,7 @@ describe("McapIndexedReader", () => {
...string(""), // profile
...string(""), // library
]),
...record(Opcode.DATA_END, [...uint32LE(0)]),
];
const summaryStart = data.length;

Expand All @@ -142,15 +143,15 @@ describe("McapIndexedReader", () => {

data.push(
...record(Opcode.FOOTER, [
...uint64LE(BigInt(summaryStart)), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(BigInt(summaryStart)), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(crc32(new Uint8Array([42]))), // summary crc
]),
...MCAP_MAGIC,
);
const readable = makeReadable(new Uint8Array(data));
await expect(McapIndexedReader.Initialize({ readable })).rejects.toThrow(
"Incorrect summary CRC 491514153 (expected 163128923)",
"Incorrect summary CRC 1656343536 (expected 163128923)",
);
});

Expand All @@ -161,6 +162,7 @@ describe("McapIndexedReader", () => {
...string(""), // profile
...string(""), // library
]),
...record(Opcode.DATA_END, [...uint32LE(0)]),
];
const summaryStart = data.length;
data.push(
Expand All @@ -178,8 +180,8 @@ describe("McapIndexedReader", () => {
...keyValues(string, string, [["foo", "bar"]]), // user data
]),
...record(Opcode.FOOTER, [
...uint64LE(BigInt(summaryStart)), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(BigInt(summaryStart)), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(crc32(new Uint8Array(0))), // summary crc
]),
...MCAP_MAGIC,
Expand Down Expand Up @@ -324,6 +326,7 @@ describe("McapIndexedReader", () => {
]),
);
const messageIndexLength = BigInt(data.length) - messageIndexOffset;
data.push(...record(Opcode.DATA_END, [...uint32LE(0)]));
const summaryStart = data.length;
data.push(
...channel,
Expand All @@ -339,8 +342,8 @@ describe("McapIndexedReader", () => {
...uint64LE(BigInt(chunkContents.length)), // uncompressed size
]),
...record(Opcode.FOOTER, [
...uint64LE(BigInt(summaryStart)), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(BigInt(summaryStart)), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(crc32(new Uint8Array(0))), // summary crc
]),
...MCAP_MAGIC,
Expand Down Expand Up @@ -922,8 +925,8 @@ describe("McapIndexedReader", () => {
...string("foo"), // name
]),
...record(Opcode.FOOTER, [
...uint64LE(BigInt(summaryStart)), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(BigInt(summaryStart)), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(0), // summary crc
]),
...MCAP_MAGIC,
Expand Down Expand Up @@ -1051,8 +1054,8 @@ describe("McapIndexedReader", () => {
...string("application/json"), // media type
]),
...record(Opcode.FOOTER, [
...uint64LE(BigInt(summaryStart)), // summary offset
...uint64LE(0n), // summary start offset
...uint64LE(BigInt(summaryStart)), // summary start
...uint64LE(0n), // summary offset start
...uint32LE(0), // summary crc
]),
...MCAP_MAGIC,
Expand Down
58 changes: 45 additions & 13 deletions typescript/core/src/McapIndexedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type McapIndexedReaderArgs = {
summaryOffsetsByOpcode: ReadonlyMap<number, TypedMcapRecords["SummaryOffset"]>;
header: TypedMcapRecords["Header"];
footer: TypedMcapRecords["Footer"];
dataEndOffset: bigint;
dataSectionCrc?: number;
};

export class McapIndexedReader {
Expand All @@ -30,6 +32,9 @@ export class McapIndexedReader {
readonly summaryOffsetsByOpcode: ReadonlyMap<number, TypedMcapRecords["SummaryOffset"]>;
readonly header: TypedMcapRecords["Header"];
readonly footer: TypedMcapRecords["Footer"];
// Used for appending attachments/metadata to existing MCAP files
readonly dataEndOffset: bigint;
readonly dataSectionCrc?: number;

#readable: IReadable;
#decompressHandlers?: DecompressHandlers;
Expand All @@ -51,6 +56,8 @@ export class McapIndexedReader {
this.summaryOffsetsByOpcode = args.summaryOffsetsByOpcode;
this.header = args.header;
this.footer = args.footer;
this.dataEndOffset = args.dataEndOffset;
this.dataSectionCrc = args.dataSectionCrc;

for (const chunk of args.chunkIndexes) {
if (this.#messageStartTime == undefined || chunk.messageStartTime < this.#messageStartTime) {
Expand Down Expand Up @@ -93,6 +100,7 @@ export class McapIndexedReader {
const size = await readable.size();

let header: TypedMcapRecords["Header"];
let headerEndOffset: bigint;
{
const headerPrefix = await readable.read(
0n,
Expand All @@ -104,15 +112,15 @@ export class McapIndexedReader {
headerPrefix.byteLength,
);
void parseMagic(headerPrefixView, 0);
const headerLength = headerPrefixView.getBigUint64(
const headerContentLength = headerPrefixView.getBigUint64(
MCAP_MAGIC.length + /* Opcode.HEADER */ 1,
true,
);
const headerReadLength =
/* Opcode.HEADER */ 1n + /* record content length */ 8n + headerContentLength;

const headerRecord = await readable.read(
BigInt(MCAP_MAGIC.length),
/* Opcode.HEADER */ 1n + /* record content length */ 8n + headerLength,
);
const headerRecord = await readable.read(BigInt(MCAP_MAGIC.length), headerReadLength);
headerEndOffset = BigInt(MCAP_MAGIC.length) + headerReadLength;
const headerResult = parseRecord({
view: new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength),
startOffset: 0,
Expand Down Expand Up @@ -218,14 +226,27 @@ export class McapIndexedReader {
),
);

const dataEndLength =
/* Opcode.DATA_END */ 1n + /* record content length */ 8n + /* data_section_crc */ 4n;

const dataEndOffset = footer.summaryStart - dataEndLength;
if (dataEndOffset < headerEndOffset) {
throw errorWithLibrary(
`Expected DataEnd position (summary start ${footer.summaryStart} - ${dataEndLength} = ${dataEndOffset}) to be after Header end offset (${headerEndOffset})`,
);
}

// Future optimization: avoid holding whole summary blob in memory at once
const allSummaryData = await readable.read(
footer.summaryStart,
footerOffset - footer.summaryStart,
const dataEndAndSummarySection = await readable.read(
dataEndOffset,
footerOffset - dataEndOffset,
);
if (footer.summaryCrc !== 0) {
let summaryCrc = crc32Init();
summaryCrc = crc32Update(summaryCrc, allSummaryData);
summaryCrc = crc32Update(
summaryCrc,
dataEndAndSummarySection.subarray(Number(dataEndLength)),
);
summaryCrc = crc32Update(summaryCrc, footerPrefix);
summaryCrc = crc32Final(summaryCrc);
if (summaryCrc !== footer.summaryCrc) {
Expand All @@ -236,9 +257,9 @@ export class McapIndexedReader {
}

const indexView = new DataView(
allSummaryData.buffer,
allSummaryData.byteOffset,
allSummaryData.byteLength,
dataEndAndSummarySection.buffer,
dataEndAndSummarySection.byteOffset,
dataEndAndSummarySection.byteLength,
);

const channelsById = new Map<number, TypedMcapRecords["Channel"]>();
Expand All @@ -248,6 +269,7 @@ export class McapIndexedReader {
const metadataIndexes: TypedMcapRecords["MetadataIndex"][] = [];
const summaryOffsetsByOpcode = new Map<number, TypedMcapRecords["SummaryOffset"]>();
let statistics: TypedMcapRecords["Statistics"] | undefined;
let dataSectionCrc: number | undefined;

let offset = 0;
for (
Expand All @@ -256,6 +278,11 @@ export class McapIndexedReader {
result.record;
offset += result.usedBytes
) {
if (offset === 0 && result.record.type !== "DataEnd") {
throw errorWithLibrary(
`Expected DataEnd record to precede summary section, but found ${result.record.type}`,
);
}
switch (result.record.type) {
case "Schema":
schemasById.set(result.record.id, result.record);
Expand All @@ -281,14 +308,17 @@ export class McapIndexedReader {
case "SummaryOffset":
summaryOffsetsByOpcode.set(result.record.groupOpcode, result.record);
break;
case "DataEnd":
dataSectionCrc =
result.record.dataSectionCrc === 0 ? undefined : result.record.dataSectionCrc;
break;
case "Header":
case "Footer":
case "Message":
case "Chunk":
case "MessageIndex":
case "Attachment":
case "Metadata":
case "DataEnd":
throw errorWithLibrary(`${result.record.type} record not allowed in index section`);
case "Unknown":
break;
Expand All @@ -310,6 +340,8 @@ export class McapIndexedReader {
summaryOffsetsByOpcode,
header,
footer,
dataEndOffset,
dataSectionCrc,
});
}

Expand Down
Loading

0 comments on commit d7c0bb3

Please sign in to comment.