From d7c0bb35352ed568ce4dfd521a0410e5ff352174 Mon Sep 17 00:00:00 2001 From: Jacob Bandes-Storch Date: Tue, 12 Mar 2024 16:36:36 -0700 Subject: [PATCH] typescript: add McapWriter.InitializeForAppending() (#1060) ### Public-Facing Changes - Added `InitializeForAppending()` to `McapWriter`. - `McapIndexedReader` now exposes `dataEndOffset` and `dataSectionCrc`. ### Description Append mode works by using `McapIndexedReader` to read the summary section, loading all indexes, channels, etc. from the footer into memory, then chopping off everything from DataEnd onwards and continuing to write from there. This does not work if the `DataEnd` record contains extra data or padding. This seems to be a fundamental flaw with the spec; a separate PR will be raised to change the spec to disallow this. See also: https://github.com/foxglove/mcap/blob/4d62967abe95e755b91f16cce390f6560d18e07e/go/cli/mcap/utils/mcap_amendment.go Supersedes / closes https://github.com/foxglove/mcap/pull/1016 Resolves FG-5821 --------- Co-authored-by: Alex Ernst --- typescript/core/package.json | 2 +- typescript/core/src/ISeekableWriter.ts | 11 + typescript/core/src/McapIndexedReader.test.ts | 37 +- typescript/core/src/McapIndexedReader.ts | 58 ++- typescript/core/src/McapWriter.test.ts | 492 +++++++++++++++++- typescript/core/src/McapWriter.ts | 93 +++- typescript/core/src/TempBuffer.ts | 58 ++- typescript/core/src/index.ts | 1 + 8 files changed, 701 insertions(+), 51 deletions(-) create mode 100644 typescript/core/src/ISeekableWriter.ts diff --git a/typescript/core/package.json b/typescript/core/package.json index 320cd7cb41..9d492f33c6 100644 --- a/typescript/core/package.json +++ b/typescript/core/package.json @@ -1,6 +1,6 @@ { "name": "@mcap/core", - "version": "2.0.2", + "version": "2.1.0", "description": "MCAP file support in TypeScript", "license": "MIT", "repository": { diff --git a/typescript/core/src/ISeekableWriter.ts b/typescript/core/src/ISeekableWriter.ts new file mode 100644 index 0000000000..a5a8bbfd30 --- /dev/null +++ b/typescript/core/src/ISeekableWriter.ts @@ -0,0 +1,11 @@ +import { IWritable } from "./IWritable"; + +/** + * ISeekableWriter describes a writer interface with seek abilities. + */ +export interface ISeekableWriter extends IWritable { + /** Move the cursor to the given position */ + seek(position: bigint): Promise; + /** Remove data after the current write position */ + truncate(): Promise; +} diff --git a/typescript/core/src/McapIndexedReader.test.ts b/typescript/core/src/McapIndexedReader.test.ts index 548a2f23e6..4f3752eae4 100644 --- a/typescript/core/src/McapIndexedReader.test.ts +++ b/typescript/core/src/McapIndexedReader.test.ts @@ -57,8 +57,8 @@ describe("McapIndexedReader", () => { new Uint8Array([ ...MCAP_MAGIC, ...record(Opcode.FOOTER, [ - ...uint64LE(0n), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(0n), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(0), // summary crc ]), ...MCAP_MAGIC, @@ -92,8 +92,8 @@ describe("McapIndexedReader", () => { ...string(""), // library ]), ...record(Opcode.FOOTER, [ - ...uint64LE(0n), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(0n), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(0), // summary crc ]), ...MCAP_MAGIC, @@ -111,8 +111,8 @@ describe("McapIndexedReader", () => { ...string("lib"), // library ]), ...record(Opcode.FOOTER, [ - ...uint64LE(0n), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(0n), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(0), // summary crc ]), ...MCAP_MAGIC, @@ -130,6 +130,7 @@ describe("McapIndexedReader", () => { ...string(""), // profile ...string(""), // library ]), + ...record(Opcode.DATA_END, [...uint32LE(0)]), ]; const summaryStart = data.length; @@ -142,15 +143,15 @@ describe("McapIndexedReader", () => { data.push( ...record(Opcode.FOOTER, [ - ...uint64LE(BigInt(summaryStart)), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(BigInt(summaryStart)), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(crc32(new Uint8Array([42]))), // summary crc ]), ...MCAP_MAGIC, ); const readable = makeReadable(new Uint8Array(data)); await expect(McapIndexedReader.Initialize({ readable })).rejects.toThrow( - "Incorrect summary CRC 491514153 (expected 163128923)", + "Incorrect summary CRC 1656343536 (expected 163128923)", ); }); @@ -161,6 +162,7 @@ describe("McapIndexedReader", () => { ...string(""), // profile ...string(""), // library ]), + ...record(Opcode.DATA_END, [...uint32LE(0)]), ]; const summaryStart = data.length; data.push( @@ -178,8 +180,8 @@ describe("McapIndexedReader", () => { ...keyValues(string, string, [["foo", "bar"]]), // user data ]), ...record(Opcode.FOOTER, [ - ...uint64LE(BigInt(summaryStart)), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(BigInt(summaryStart)), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(crc32(new Uint8Array(0))), // summary crc ]), ...MCAP_MAGIC, @@ -324,6 +326,7 @@ describe("McapIndexedReader", () => { ]), ); const messageIndexLength = BigInt(data.length) - messageIndexOffset; + data.push(...record(Opcode.DATA_END, [...uint32LE(0)])); const summaryStart = data.length; data.push( ...channel, @@ -339,8 +342,8 @@ describe("McapIndexedReader", () => { ...uint64LE(BigInt(chunkContents.length)), // uncompressed size ]), ...record(Opcode.FOOTER, [ - ...uint64LE(BigInt(summaryStart)), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(BigInt(summaryStart)), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(crc32(new Uint8Array(0))), // summary crc ]), ...MCAP_MAGIC, @@ -922,8 +925,8 @@ describe("McapIndexedReader", () => { ...string("foo"), // name ]), ...record(Opcode.FOOTER, [ - ...uint64LE(BigInt(summaryStart)), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(BigInt(summaryStart)), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(0), // summary crc ]), ...MCAP_MAGIC, @@ -1051,8 +1054,8 @@ describe("McapIndexedReader", () => { ...string("application/json"), // media type ]), ...record(Opcode.FOOTER, [ - ...uint64LE(BigInt(summaryStart)), // summary offset - ...uint64LE(0n), // summary start offset + ...uint64LE(BigInt(summaryStart)), // summary start + ...uint64LE(0n), // summary offset start ...uint32LE(0), // summary crc ]), ...MCAP_MAGIC, diff --git a/typescript/core/src/McapIndexedReader.ts b/typescript/core/src/McapIndexedReader.ts index 222279b36a..5955300a40 100644 --- a/typescript/core/src/McapIndexedReader.ts +++ b/typescript/core/src/McapIndexedReader.ts @@ -18,6 +18,8 @@ type McapIndexedReaderArgs = { summaryOffsetsByOpcode: ReadonlyMap; header: TypedMcapRecords["Header"]; footer: TypedMcapRecords["Footer"]; + dataEndOffset: bigint; + dataSectionCrc?: number; }; export class McapIndexedReader { @@ -30,6 +32,9 @@ export class McapIndexedReader { readonly summaryOffsetsByOpcode: ReadonlyMap; readonly header: TypedMcapRecords["Header"]; readonly footer: TypedMcapRecords["Footer"]; + // Used for appending attachments/metadata to existing MCAP files + readonly dataEndOffset: bigint; + readonly dataSectionCrc?: number; #readable: IReadable; #decompressHandlers?: DecompressHandlers; @@ -51,6 +56,8 @@ export class McapIndexedReader { this.summaryOffsetsByOpcode = args.summaryOffsetsByOpcode; this.header = args.header; this.footer = args.footer; + this.dataEndOffset = args.dataEndOffset; + this.dataSectionCrc = args.dataSectionCrc; for (const chunk of args.chunkIndexes) { if (this.#messageStartTime == undefined || chunk.messageStartTime < this.#messageStartTime) { @@ -93,6 +100,7 @@ export class McapIndexedReader { const size = await readable.size(); let header: TypedMcapRecords["Header"]; + let headerEndOffset: bigint; { const headerPrefix = await readable.read( 0n, @@ -104,15 +112,15 @@ export class McapIndexedReader { headerPrefix.byteLength, ); void parseMagic(headerPrefixView, 0); - const headerLength = headerPrefixView.getBigUint64( + const headerContentLength = headerPrefixView.getBigUint64( MCAP_MAGIC.length + /* Opcode.HEADER */ 1, true, ); + const headerReadLength = + /* Opcode.HEADER */ 1n + /* record content length */ 8n + headerContentLength; - const headerRecord = await readable.read( - BigInt(MCAP_MAGIC.length), - /* Opcode.HEADER */ 1n + /* record content length */ 8n + headerLength, - ); + const headerRecord = await readable.read(BigInt(MCAP_MAGIC.length), headerReadLength); + headerEndOffset = BigInt(MCAP_MAGIC.length) + headerReadLength; const headerResult = parseRecord({ view: new DataView(headerRecord.buffer, headerRecord.byteOffset, headerRecord.byteLength), startOffset: 0, @@ -218,14 +226,27 @@ export class McapIndexedReader { ), ); + const dataEndLength = + /* Opcode.DATA_END */ 1n + /* record content length */ 8n + /* data_section_crc */ 4n; + + const dataEndOffset = footer.summaryStart - dataEndLength; + if (dataEndOffset < headerEndOffset) { + throw errorWithLibrary( + `Expected DataEnd position (summary start ${footer.summaryStart} - ${dataEndLength} = ${dataEndOffset}) to be after Header end offset (${headerEndOffset})`, + ); + } + // Future optimization: avoid holding whole summary blob in memory at once - const allSummaryData = await readable.read( - footer.summaryStart, - footerOffset - footer.summaryStart, + const dataEndAndSummarySection = await readable.read( + dataEndOffset, + footerOffset - dataEndOffset, ); if (footer.summaryCrc !== 0) { let summaryCrc = crc32Init(); - summaryCrc = crc32Update(summaryCrc, allSummaryData); + summaryCrc = crc32Update( + summaryCrc, + dataEndAndSummarySection.subarray(Number(dataEndLength)), + ); summaryCrc = crc32Update(summaryCrc, footerPrefix); summaryCrc = crc32Final(summaryCrc); if (summaryCrc !== footer.summaryCrc) { @@ -236,9 +257,9 @@ export class McapIndexedReader { } const indexView = new DataView( - allSummaryData.buffer, - allSummaryData.byteOffset, - allSummaryData.byteLength, + dataEndAndSummarySection.buffer, + dataEndAndSummarySection.byteOffset, + dataEndAndSummarySection.byteLength, ); const channelsById = new Map(); @@ -248,6 +269,7 @@ export class McapIndexedReader { const metadataIndexes: TypedMcapRecords["MetadataIndex"][] = []; const summaryOffsetsByOpcode = new Map(); let statistics: TypedMcapRecords["Statistics"] | undefined; + let dataSectionCrc: number | undefined; let offset = 0; for ( @@ -256,6 +278,11 @@ export class McapIndexedReader { result.record; offset += result.usedBytes ) { + if (offset === 0 && result.record.type !== "DataEnd") { + throw errorWithLibrary( + `Expected DataEnd record to precede summary section, but found ${result.record.type}`, + ); + } switch (result.record.type) { case "Schema": schemasById.set(result.record.id, result.record); @@ -281,6 +308,10 @@ export class McapIndexedReader { case "SummaryOffset": summaryOffsetsByOpcode.set(result.record.groupOpcode, result.record); break; + case "DataEnd": + dataSectionCrc = + result.record.dataSectionCrc === 0 ? undefined : result.record.dataSectionCrc; + break; case "Header": case "Footer": case "Message": @@ -288,7 +319,6 @@ export class McapIndexedReader { case "MessageIndex": case "Attachment": case "Metadata": - case "DataEnd": throw errorWithLibrary(`${result.record.type} record not allowed in index section`); case "Unknown": break; @@ -310,6 +340,8 @@ export class McapIndexedReader { summaryOffsetsByOpcode, header, footer, + dataEndOffset, + dataSectionCrc, }); } diff --git a/typescript/core/src/McapWriter.test.ts b/typescript/core/src/McapWriter.test.ts index 881d211bc8..57bdcd82fc 100644 --- a/typescript/core/src/McapWriter.test.ts +++ b/typescript/core/src/McapWriter.test.ts @@ -4,11 +4,22 @@ import { McapIndexedReader } from "./McapIndexedReader"; import McapStreamReader from "./McapStreamReader"; import { McapWriter } from "./McapWriter"; import { TempBuffer } from "./TempBuffer"; -import { Opcode } from "./constants"; +import { MCAP_MAGIC, Opcode } from "./constants"; import { parseMagic, parseRecord } from "./parse"; import { collect, keyValues, record, string, uint16LE, uint32LE, uint64LE } from "./testUtils"; import { TypedMcapRecord } from "./types"; +function readAsMcapStream(data: Uint8Array) { + const reader = new McapStreamReader(); + reader.append(data); + const records: TypedMcapRecord[] = []; + for (let rec; (rec = reader.nextRecord()); ) { + records.push(rec); + } + expect(reader.done()).toBe(true); + return records; +} + describe("McapWriter", () => { it("supports messages with logTime 0", async () => { const tempBuffer = new TempBuffer(); @@ -344,4 +355,483 @@ describe("McapWriter", () => { }, ]); }); + + it("supports append mode", async () => { + const tempBuffer = new TempBuffer(); + + const writer = new McapWriter({ writable: tempBuffer }); + + await writer.start({ library: "", profile: "" }); + const schemaId = await writer.registerSchema({ + name: "schema1", + encoding: "json", + data: new Uint8Array(), + }); + const channelId1 = await writer.registerChannel({ + topic: "channel1", + schemaId, + messageEncoding: "json", + metadata: new Map(), + }); + await writer.addMessage({ + channelId: channelId1, + data: new Uint8Array(), + sequence: 0, + logTime: 0n, + publishTime: 0n, + }); + await writer.end(); + + // Records common to both the original and appended file + const commonRecords: TypedMcapRecord[] = [ + { + type: "Header", + library: "", + profile: "", + }, + { + type: "Schema", + id: 1, + encoding: "json", + data: new Uint8Array(), + name: "schema1", + }, + { + type: "Channel", + id: 0, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "channel1", + }, + { + type: "Message", + channelId: 0, + data: new Uint8Array(), + logTime: 0n, + publishTime: 0n, + sequence: 0, + }, + { + type: "MessageIndex", + channelId: 0, + records: [[0n, 71n]], + }, + ]; + + const originalRecords = readAsMcapStream(tempBuffer.get()); + expect(originalRecords).toEqual([ + ...commonRecords, + { + type: "DataEnd", + dataSectionCrc: 2968501716, + }, + { + type: "Schema", + id: 1, + encoding: "json", + data: new Uint8Array(), + name: "schema1", + }, + { + type: "Channel", + id: 0, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "channel1", + }, + { + type: "Statistics", + attachmentCount: 0, + channelCount: 1, + channelMessageCounts: new Map([[0, 1n]]), + chunkCount: 1, + messageCount: 1n, + messageEndTime: 0n, + messageStartTime: 0n, + metadataCount: 0, + schemaCount: 1, + }, + { + type: "ChunkIndex", + chunkLength: 151n, + chunkStartOffset: 25n, + compressedSize: 102n, + compression: "", + messageEndTime: 0n, + messageIndexLength: 31n, + messageIndexOffsets: new Map([[0, 176n]]), + messageStartTime: 0n, + uncompressedSize: 102n, + }, + { + type: "SummaryOffset", + groupLength: 34n, + groupOpcode: Opcode.SCHEMA, + groupStart: 220n, + }, + { + type: "SummaryOffset", + groupLength: 37n, + groupOpcode: Opcode.CHANNEL, + groupStart: 254n, + }, + { + type: "SummaryOffset", + groupLength: 65n, + groupOpcode: Opcode.STATISTICS, + groupStart: 291n, + }, + { + type: "SummaryOffset", + groupLength: 83n, + groupOpcode: Opcode.CHUNK_INDEX, + groupStart: 356n, + }, + { + type: "Footer", + summaryCrc: 2739614603, + summaryOffsetStart: 439n, + summaryStart: 220n, + }, + ]); + + const appendWriter = await McapWriter.InitializeForAppending(tempBuffer, {}); + + await appendWriter.addAttachment({ + name: "attachment1", + logTime: 0n, + createTime: 0n, + mediaType: "text/plain", + data: new TextEncoder().encode("foo"), + }); + await appendWriter.addMetadata({ + name: "metadata1", + metadata: new Map([["test", "testValue"]]), + }); + await appendWriter.addMessage({ + channelId: channelId1, + data: new Uint8Array(), + sequence: 1, + logTime: 1n, + publishTime: 1n, + }); + const channelId2 = await appendWriter.registerChannel({ + topic: "channel2", + schemaId, + messageEncoding: "json", + metadata: new Map(), + }); + await appendWriter.addMessage({ + channelId: channelId2, + data: new Uint8Array(), + sequence: 2, + logTime: 2n, + publishTime: 2n, + }); + await appendWriter.end(); + + const appendedRecords = readAsMcapStream(tempBuffer.get()); + + const newSummaryStart = 546n; + const dataEndLength = 1n + 8n + 4n; + const expectedDataCrc = crc32( + tempBuffer.get().slice(0, Number(newSummaryStart - dataEndLength)), + ); + + expect(appendedRecords).toEqual([ + ...commonRecords, + { + type: "Attachment", + name: "attachment1", + logTime: 0n, + createTime: 0n, + mediaType: "text/plain", + data: new TextEncoder().encode("foo"), + }, + { + type: "Metadata", + name: "metadata1", + metadata: new Map([["test", "testValue"]]), + }, + { + type: "Message", + channelId: 0, + data: new Uint8Array(), + logTime: 1n, + publishTime: 1n, + sequence: 1, + }, + { + type: "Channel", + id: 1, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "channel2", + }, + { + type: "Message", + channelId: 1, + data: new Uint8Array(), + logTime: 2n, + publishTime: 2n, + sequence: 2, + }, + { + type: "MessageIndex", + channelId: 0, + records: [[1n, 0n]], + }, + { + type: "MessageIndex", + channelId: 1, + records: [[2n, 68n]], + }, + { + type: "DataEnd", + dataSectionCrc: expectedDataCrc, + }, + { + type: "Schema", + id: 1, + encoding: "json", + data: new Uint8Array(), + name: "schema1", + }, + { + type: "Channel", + id: 0, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "channel1", + }, + { + type: "Channel", + id: 1, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "channel2", + }, + { + type: "Statistics", + attachmentCount: 1, + channelCount: 2, + channelMessageCounts: new Map([ + [0, 2n], + [1, 1n], + ]), + chunkCount: 2, + messageCount: 3n, + messageEndTime: 2n, + messageStartTime: 0n, + metadataCount: 1, + schemaCount: 1, + }, + { + type: "MetadataIndex", + offset: 276n, + length: 47n, + name: "metadata1", + }, + { + type: "AttachmentIndex", + offset: 207n, + length: 69n, + logTime: 0n, + createTime: 0n, + dataSize: 3n, + name: "attachment1", + mediaType: "text/plain", + }, + { + type: "ChunkIndex", + chunkLength: 151n, + chunkStartOffset: 25n, + compressedSize: 102n, + compression: "", + messageEndTime: 0n, + messageIndexLength: 31n, + messageIndexOffsets: new Map([[0, 176n]]), + messageStartTime: 0n, + uncompressedSize: 102n, + }, + { + type: "ChunkIndex", + chunkLength: 148n, + chunkStartOffset: 323n, + compressedSize: 99n, + compression: "", + messageEndTime: 2n, + messageIndexLength: 62n, + messageIndexOffsets: new Map([ + [0, 471n], + [1, 502n], + ]), + messageStartTime: 1n, + uncompressedSize: 99n, + }, + { + type: "SummaryOffset", + groupLength: 34n, + groupOpcode: Opcode.SCHEMA, + groupStart: 546n, + }, + { + type: "SummaryOffset", + groupLength: 74n, + groupOpcode: Opcode.CHANNEL, + groupStart: 580n, + }, + { + type: "SummaryOffset", + groupLength: 75n, + groupOpcode: Opcode.STATISTICS, + groupStart: 654n, + }, + { + type: "SummaryOffset", + groupLength: 38n, + groupOpcode: Opcode.METADATA_INDEX, + groupStart: 729n, + }, + { + type: "SummaryOffset", + groupLength: 78n, + groupOpcode: Opcode.ATTACHMENT_INDEX, + groupStart: 767n, + }, + { + type: "SummaryOffset", + groupLength: 176n, + groupOpcode: Opcode.CHUNK_INDEX, + groupStart: 845n, + }, + { + type: "Footer", + summaryCrc: 758669511, + summaryOffsetStart: 1021n, + summaryStart: newSummaryStart, + }, + ]); + }); + + it.each([true, false])( + "respects data_section_crc present=%s when appending", + async (useDataSectionCrc) => { + const originalDataSection = new Uint8Array([ + ...MCAP_MAGIC, + ...record(Opcode.HEADER, [ + ...string(""), // profile + ...string("lib"), // library + ]), + ]); + const dataEndLength = 1 + 8 + 4; + const tempBuffer = new TempBuffer( + new Uint8Array([ + ...originalDataSection, + ...record(Opcode.DATA_END, [ + ...uint32LE(useDataSectionCrc ? crc32(originalDataSection) : 0), // data crc + ]), + ...record(Opcode.STATISTICS, [ + ...uint64LE(0n), // message count + ...uint16LE(0), // schema count + ...uint32LE(0), // channel count + ...uint32LE(0), // attachment count + ...uint32LE(0), // metadata count + ...uint32LE(0), // chunk count + ...uint64LE(0n), // message start time + ...uint64LE(0n), // message end time + ...uint32LE(0), // channel message counts length + ]), + ...record(Opcode.FOOTER, [ + ...uint64LE(BigInt(originalDataSection.length + dataEndLength)), // summary start + ...uint64LE(0n), // summary offset start + ...uint32LE(0), // summary crc + ]), + ...MCAP_MAGIC, + ]), + ); + const appendWriter = await McapWriter.InitializeForAppending(tempBuffer, { + repeatChannels: false, + useSummaryOffsets: false, + useChunks: false, + }); + const chanId = await appendWriter.registerChannel({ + messageEncoding: "foo", + metadata: new Map(), + schemaId: 0, + topic: "foo", + }); + await appendWriter.addMessage({ + channelId: chanId, + logTime: 0n, + publishTime: 0n, + sequence: 0, + data: new Uint8Array([]), + }); + await appendWriter.end(); + + const summarySection = new Uint8Array([ + ...record(Opcode.STATISTICS, [ + ...uint64LE(1n), // message count + ...uint16LE(0), // schema count + ...uint32LE(1), // channel count + ...uint32LE(0), // attachment count + ...uint32LE(0), // metadata count + ...uint32LE(0), // chunk count + ...uint64LE(0n), // message start time + ...uint64LE(0n), // message end time + ...keyValues(uint16LE, uint64LE, [[0, 1n]]), // channel message counts length + ]), + ]); + + const newDataSection = new Uint8Array([ + ...originalDataSection, + ...record(Opcode.CHANNEL, [ + ...uint16LE(0), // channel id + ...uint16LE(0), // schema id + ...string("foo"), // topic + ...string("foo"), // message encoding + ...keyValues(string, string, []), // user data + ]), + ...record(Opcode.MESSAGE, [ + ...uint16LE(chanId), + ...uint32LE(0), // sequence + ...uint64LE(0n), // log time + ...uint64LE(0n), // publish time + ]), + ]); + + expect(tempBuffer.get()).toEqual( + new Uint8Array([ + ...newDataSection, + ...record(Opcode.DATA_END, [ + ...uint32LE(useDataSectionCrc ? crc32(newDataSection) : 0), // data crc + ]), + ...summarySection, + ...record(Opcode.FOOTER, [ + ...uint64LE(BigInt(newDataSection.length + dataEndLength)), // summary start + ...uint64LE(0n), // summary offset start + ...uint32LE( + // summary crc + crc32( + new Uint8Array([ + ...summarySection, + Opcode.FOOTER, + ...uint64LE(8n + 8n + 4n), // footer record length + ...uint64LE(BigInt(newDataSection.length + dataEndLength)), // summary start + ...uint64LE(0n), // summary offset start + ]), + ), + ), + ]), + ...MCAP_MAGIC, + ]), + ); + }, + ); }); diff --git a/typescript/core/src/McapWriter.ts b/typescript/core/src/McapWriter.ts index 5ff9521675..e9d266abc6 100644 --- a/typescript/core/src/McapWriter.ts +++ b/typescript/core/src/McapWriter.ts @@ -1,7 +1,9 @@ import { crc32Init, crc32Update, crc32Final, crc32 } from "@foxglove/crc"; import { ChunkBuilder } from "./ChunkBuilder"; +import { ISeekableWriter } from "./ISeekableWriter"; import { IWritable } from "./IWritable"; +import { McapIndexedReader } from "./McapIndexedReader"; import { McapRecordBuilder } from "./McapRecordBuilder"; import { Opcode } from "./constants"; import { @@ -18,6 +20,7 @@ import { SummaryOffset, Metadata, Statistics, + IReadable, } from "./types"; export type McapWriterOptions = { @@ -57,13 +60,19 @@ export class McapWriter { | ((chunkData: Uint8Array) => { compression: string; compressedData: Uint8Array }) | undefined; #chunkSize: number; - #dataSectionCrc = crc32Init(); + /** + * undefined means the CRC is not calculated, e.g. when using InitializeForAppending if the + * original file did not have a dataSectionCrc. + */ + #dataSectionCrc: number | undefined; public statistics: Statistics | undefined; #useSummaryOffsets: boolean; #repeatSchemas: boolean; #repeatChannels: boolean; + #appendMode = false; + // indices #chunkIndices: ChunkIndex[] | undefined; #attachmentIndices: AttachmentIndex[] | undefined; @@ -120,7 +129,64 @@ export class McapWriter { this.#compressChunk = compressChunk; } + /** + * Initializes a new McapWriter for appending to an existing MCAP file. The same `readWrite` will + * be used to load indexes out of the existing file, remove the DataEnd and subsequent records, + * and then rewrite them when the writer is closed. The existing file must be indexed, since + * existing indexes, channel and schema IDs, etc. are reused when appending to the file. + * + * A writer initialized with this method is already "opened" and does not require a `start()` + * call, however it does require an eventual call to `end()` to produce a properly indexed MCAP + * file. + */ + static async InitializeForAppending( + readWrite: IReadable & ISeekableWriter, + options: Omit, + ): Promise { + const reader = await McapIndexedReader.Initialize({ readable: readWrite }); + await readWrite.seek(reader.dataEndOffset); + await readWrite.truncate(); + + const writer = new McapWriter({ ...options, writable: readWrite }); + writer.#appendMode = true; + writer.#dataSectionCrc = + // Invert the CRC value so we can continue updating it with new data; it will be inverted + // again in end() + reader.dataSectionCrc != undefined ? crc32Final(reader.dataSectionCrc) : undefined; + writer.#chunkIndices = [...reader.chunkIndexes]; + writer.#attachmentIndices = [...reader.attachmentIndexes]; + writer.#metadataIndices = [...reader.metadataIndexes]; + + if (writer.statistics) { + if (reader.statistics) { + writer.statistics = reader.statistics; + } else { + // If statistics calculation was requested, but the input file does not have statistics, + // then we can't write them because we don't know the correct initial values + writer.statistics = undefined; + } + } + + writer.#schemas = new Map(reader.schemasById); + writer.#writtenSchemaIds = new Set(reader.schemasById.keys()); + for (const schema of reader.schemasById.values()) { + writer.#nextSchemaId = Math.max(writer.#nextSchemaId, schema.id + 1); + } + + writer.#channels = new Map(reader.channelsById); + writer.#writtenChannelIds = new Set(reader.channelsById.keys()); + for (const channel of reader.channelsById.values()) { + writer.#nextChannelId = Math.max(writer.#nextChannelId, channel.id + 1); + } + + return writer; + } + async start(header: Header): Promise { + if (this.#appendMode) { + throw new Error(`Cannot call start() when writer is in append mode`); + } + this.#dataSectionCrc = crc32Init(); this.#recordWriter.writeMagic(); this.#recordWriter.writeHeader(header); @@ -132,11 +198,15 @@ export class McapWriter { async end(): Promise { await this.#finalizeChunk(); - this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + if (this.#dataSectionCrc != undefined) { + this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + } await this.#writable.write(this.#recordWriter.buffer); this.#recordWriter.reset(); - this.#recordWriter.writeDataEnd({ dataSectionCrc: crc32Final(this.#dataSectionCrc) }); + this.#recordWriter.writeDataEnd({ + dataSectionCrc: this.#dataSectionCrc == undefined ? 0 : crc32Final(this.#dataSectionCrc), + }); await this.#writable.write(this.#recordWriter.buffer); this.#recordWriter.reset(); @@ -320,6 +390,7 @@ export class McapWriter { ); ++this.statistics.messageCount; } + // write out channel and schema if we have not yet done so if (!this.#writtenChannelIds.has(message.channelId)) { const channel = this.#channels.get(message.channelId); @@ -382,7 +453,9 @@ export class McapWriter { }); } - this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + if (this.#dataSectionCrc != undefined) { + this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + } await this.#writable.write(this.#recordWriter.buffer); this.#recordWriter.reset(); } @@ -402,7 +475,9 @@ export class McapWriter { }); } - this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + if (this.#dataSectionCrc != undefined) { + this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + } await this.#writable.write(this.#recordWriter.buffer); this.#recordWriter.reset(); } @@ -439,7 +514,9 @@ export class McapWriter { const messageIndexOffsets = this.#chunkIndices ? new Map() : undefined; - this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + if (this.#dataSectionCrc != undefined) { + this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + } await this.#writable.write(this.#recordWriter.buffer); this.#recordWriter.reset(); @@ -465,7 +542,9 @@ export class McapWriter { } this.#chunkBuilder.reset(); - this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + if (this.#dataSectionCrc != undefined) { + this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer); + } await this.#writable.write(this.#recordWriter.buffer); this.#recordWriter.reset(); } diff --git a/typescript/core/src/TempBuffer.ts b/typescript/core/src/TempBuffer.ts index e81ba1712e..f78161628f 100644 --- a/typescript/core/src/TempBuffer.ts +++ b/typescript/core/src/TempBuffer.ts @@ -1,29 +1,63 @@ +import { ISeekableWriter } from "./ISeekableWriter"; import { IWritable } from "./IWritable"; import { IReadable } from "./types"; /** * In-memory buffer used for reading and writing MCAP files in tests. Can be used as both an IReadable and an IWritable. */ -export class TempBuffer implements IReadable, IWritable { - #buffer = new ArrayBuffer(1024); - #size = 0; +export class TempBuffer implements IReadable, IWritable, ISeekableWriter { + #buffer = new ArrayBuffer(0); + #position = 0; + + constructor(source?: ArrayBufferView | ArrayBuffer) { + if (source instanceof ArrayBuffer) { + this.#buffer = source; + } else if (source) { + this.#buffer = new Uint8Array(source.buffer, source.byteOffset, source.byteLength).buffer; + } + } + + #setCapacity(capacity: number) { + if (this.#buffer.byteLength !== capacity) { + const newBuffer = new ArrayBuffer(capacity); + new Uint8Array(newBuffer).set( + new Uint8Array(this.#buffer, 0, Math.min(this.#buffer.byteLength, capacity)), + ); + this.#buffer = newBuffer; + } + } position(): bigint { - return BigInt(this.#size); + return BigInt(this.#position); } + + async seek(position: bigint): Promise { + if (position < 0n) { + throw new Error(`Attempted to seek to negative position ${position}`); + } else if (position > this.#buffer.byteLength) { + this.#setCapacity(Number(position)); + } + this.#position = Number(position); + } + + async truncate(): Promise { + const newBuffer = new ArrayBuffer(this.#position); + new Uint8Array(newBuffer).set(new Uint8Array(this.#buffer, 0, this.#position)); + this.#buffer = newBuffer; + } + async write(data: Uint8Array): Promise { - if (this.#size + data.byteLength > this.#buffer.byteLength) { - const newBuffer = new ArrayBuffer(this.#size + data.byteLength); - new Uint8Array(newBuffer).set(new Uint8Array(this.#buffer)); - this.#buffer = newBuffer; + if (this.#position + data.byteLength > this.#buffer.byteLength) { + this.#setCapacity(this.#position + data.byteLength); } - new Uint8Array(this.#buffer, this.#size).set(data); - this.#size += data.byteLength; + new Uint8Array(this.#buffer, this.#position).set(data); + this.#position += data.byteLength; } async size(): Promise { - return BigInt(this.#size); + return BigInt(this.#buffer.byteLength); } + async read(offset: bigint, size: bigint): Promise { if (offset < 0n || offset + size > BigInt(this.#buffer.byteLength)) { throw new Error("read out of range"); @@ -32,6 +66,6 @@ export class TempBuffer implements IReadable, IWritable { } get(): Uint8Array { - return new Uint8Array(this.#buffer, 0, this.#size); + return new Uint8Array(this.#buffer); } } diff --git a/typescript/core/src/index.ts b/typescript/core/src/index.ts index 051122da3b..e4ff8ddece 100644 --- a/typescript/core/src/index.ts +++ b/typescript/core/src/index.ts @@ -7,6 +7,7 @@ export { ChunkBuilder as McapChunkBuilder } from "./ChunkBuilder"; export * as McapTypes from "./types"; export * as McapConstants from "./constants"; export type { IWritable } from "./IWritable"; +export type { ISeekableWriter } from "./ISeekableWriter"; export * from "./hasMcapPrefix"; export * from "./parse";