From 05384b88c30590604afc79eb986b86e28ac1d568 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 21 Nov 2023 10:44:13 -0700 Subject: [PATCH] Add tests for @mcap/core append mode --- typescript/core/src/McapWriter.test.ts | 309 +++++++++++++++++++++++++ typescript/core/src/TempBuffer.ts | 6 +- 2 files changed, 314 insertions(+), 1 deletion(-) diff --git a/typescript/core/src/McapWriter.test.ts b/typescript/core/src/McapWriter.test.ts index 881d211bc8..4f81759449 100644 --- a/typescript/core/src/McapWriter.test.ts +++ b/typescript/core/src/McapWriter.test.ts @@ -344,4 +344,313 @@ describe("McapWriter", () => { }, ]); }); + + it("supports append mode", async () => { + const tempBuffer = new TempBuffer(); + + const writer = new McapWriter({ writable: tempBuffer, chunkSize: 0 }); + + await writer.start({ library: "", profile: "" }); + const schemaId = await writer.registerSchema({ + name: "test1", + encoding: "json", + data: new Uint8Array(), + }); + const channelId = await writer.registerChannel({ + topic: "test", + schemaId, + messageEncoding: "json", + metadata: new Map(), + }); + await writer.addMessage({ + channelId, + data: new Uint8Array(), + sequence: 0, + logTime: 0n, + publishTime: 0n, + }); + await writer.end(); + + const writerAppendMode = await McapWriter.InitializeInAppendMode({ + writable: tempBuffer, + readable: tempBuffer, + }); + + await writerAppendMode.addAttachment({ + name: "attachment test", + logTime: 0n, + createTime: 0n, + mediaType: "application/json", + data: new TextEncoder().encode(`{"test": "testValue"}`), + }); + await writerAppendMode.addMetadata({ + name: "metadata test", + metadata: new Map([["test", "testValue"]]), + }); + await writerAppendMode.addMessage({ + channelId, + data: new Uint8Array(), + sequence: 1, + logTime: 1n, + publishTime: 1n, + }); + const newChannelId = await writerAppendMode.registerChannel({ + topic: "test/append", + schemaId, + messageEncoding: "json", + metadata: new Map(), + }); + await writerAppendMode.addMessage({ + channelId: newChannelId, + data: new Uint8Array(), + sequence: 2, + logTime: 2n, + publishTime: 2n, + }); + await writerAppendMode.end(); + + const reader = new McapStreamReader(); + reader.append(tempBuffer.get()); + const records: TypedMcapRecord[] = []; + for (let rec; (rec = reader.nextRecord()); ) { + records.push(rec); + } + + expect(reader.done()).toEqual(true); + + expect(records).toEqual([ + { + type: "Header", + library: "", + profile: "", + }, + { + type: "Schema", + id: 1, + encoding: "json", + data: new Uint8Array(), + name: "test1", + }, + { + type: "Channel", + id: 0, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "test", + }, + { + type: "Message", + channelId: 0, + data: new Uint8Array(), + logTime: 0n, + publishTime: 0n, + sequence: 0, + }, + { + type: "MessageIndex", + channelId: 0, + records: [[0n, 65n]], + }, + { + type: "Attachment", + name: "attachment test", + logTime: 0n, + createTime: 0n, + mediaType: "application/json", + data: new TextEncoder().encode(`{"test": "testValue"}`), + }, + { + type: "Metadata", + name: "metadata test", + metadata: new Map([["test", "testValue"]]), + }, + { + type: "Schema", + id: 1, + encoding: "json", + data: new Uint8Array(), + name: "test1", + }, + { + type: "Channel", + id: 0, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "test", + }, + { + type: "Message", + channelId: 0, + data: new Uint8Array(), + logTime: 1n, + publishTime: 1n, + sequence: 1, + }, + { + type: "Channel", + id: 1, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "test/append", + }, + { + type: "Message", + channelId: 1, + data: new Uint8Array(), + logTime: 2n, + publishTime: 2n, + sequence: 2, + }, + { + type: "MessageIndex", + channelId: 0, + records: [[1n, 65n]], + }, + { + type: "MessageIndex", + channelId: 1, + records: [[2n, 136n]], + }, + { + type: "DataEnd", + dataSectionCrc: 1361490560, + }, + { + type: "Schema", + id: 1, + encoding: "json", + data: new Uint8Array(), + name: "test1", + }, + { + type: "Channel", + id: 0, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "test", + }, + { + type: "Channel", + id: 1, + messageEncoding: "json", + metadata: new Map(), + schemaId: 1, + topic: "test/append", + }, + { + type: "Statistics", + attachmentCount: 1, + channelCount: 2, + channelMessageCounts: new Map([ + [0, 2n], + [1, 1n], + ]), + chunkCount: 2, + messageCount: 3n, + messageEndTime: 2n, + messageStartTime: 0n, + metadataCount: 1, + schemaCount: 1, + }, + { + type: "MetadataIndex", + offset: 298n, + length: 51n, + name: "metadata test", + }, + { + type: "AttachmentIndex", + offset: 201n, + length: 97n, + logTime: 0n, + createTime: 0n, + dataSize: 21n, + name: "attachment test", + mediaType: "application/json", + }, + { + type: "ChunkIndex", + chunkLength: 145n, + chunkStartOffset: 25n, + compressedSize: 96n, + compression: "", + messageEndTime: 0n, + messageIndexLength: 31n, + messageIndexOffsets: new Map([[0, 170n]]), + messageStartTime: 0n, + uncompressedSize: 96n, + }, + { + type: "ChunkIndex", + chunkLength: 216n, + chunkStartOffset: 349n, + compressedSize: 167n, + compression: "", + messageEndTime: 2n, + messageIndexLength: 62n, + messageIndexOffsets: new Map([ + [0, 565n], + [1, 596n], + ]), + messageStartTime: 1n, + uncompressedSize: 167n, + }, + { + type: "SummaryOffset", + groupLength: 32n, + groupOpcode: Opcode.SCHEMA, + groupStart: 640n, + }, + { + type: "SummaryOffset", + groupLength: 73n, + groupOpcode: Opcode.CHANNEL, + groupStart: 672n, + }, + { + type: "SummaryOffset", + groupLength: 75n, + groupOpcode: Opcode.STATISTICS, + groupStart: 745n, + }, + { + type: "SummaryOffset", + groupLength: 42n, + groupOpcode: Opcode.METADATA_INDEX, + groupStart: 820n, + }, + { + type: "SummaryOffset", + groupLength: 88n, + groupOpcode: Opcode.ATTACHMENT_INDEX, + groupStart: 862n, + }, + { + type: "SummaryOffset", + groupLength: 176n, + groupOpcode: Opcode.CHUNK_INDEX, + groupStart: 950n, + }, + { + type: "Footer", + summaryCrc: 1244986820, + summaryOffsetStart: 1126n, + summaryStart: 640n, + }, + ]); + + // Confirm that the appended file is still readable + const readAppendedFile = new McapStreamReader(); + readAppendedFile.append(tempBuffer.get()); + const appendedRecords: TypedMcapRecord[] = []; + for (let rec; (rec = readAppendedFile.nextRecord()); ) { + appendedRecords.push(rec); + } + + expect(readAppendedFile.done()).toEqual(true); + }); }); diff --git a/typescript/core/src/TempBuffer.ts b/typescript/core/src/TempBuffer.ts index e81ba1712e..09d618c022 100644 --- a/typescript/core/src/TempBuffer.ts +++ b/typescript/core/src/TempBuffer.ts @@ -1,16 +1,20 @@ +import { ISeekableWriter } from "./ISeekableWriter"; import { IWritable } from "./IWritable"; import { IReadable } from "./types"; /** * In-memory buffer used for reading and writing MCAP files in tests. Can be used as both an IReadable and an IWritable. */ -export class TempBuffer implements IReadable, IWritable { +export class TempBuffer implements IReadable, IWritable, ISeekableWriter { #buffer = new ArrayBuffer(1024); #size = 0; position(): bigint { return BigInt(this.#size); } + async seek(position: bigint): Promise { + this.#size = Number(position); + } async write(data: Uint8Array): Promise { if (this.#size + data.byteLength > this.#buffer.byteLength) { const newBuffer = new ArrayBuffer(this.#size + data.byteLength);