Skip to content

Commit

Permalink
Add tests for @mcap/core append mode
Browse files Browse the repository at this point in the history
  • Loading branch information
alexern14 committed Dec 1, 2023
1 parent 8af7372 commit 4e58b74
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 1 deletion.
306 changes: 306 additions & 0 deletions typescript/core/src/McapWriter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,310 @@ describe("McapWriter", () => {
},
]);
});

it("supports append mode", async () => {
const tempBuffer = new TempBuffer();

const writer = new McapWriter({ writable: tempBuffer, chunkSize: 0 });

await writer.start({ library: "", profile: "" });
const schemaId = await writer.registerSchema({
name: "test1",
encoding: "json",
data: new Uint8Array(),
});
const channelId = await writer.registerChannel({
topic: "test",
schemaId,
messageEncoding: "json",
metadata: new Map(),
});
await writer.addMessage({
channelId,
data: new Uint8Array(),
sequence: 0,
logTime: 0n,
publishTime: 0n,
});
await writer.end();

const writerAppendMode = await McapWriter.InitializeInAppendMode(tempBuffer);

await writerAppendMode.addAttachment({
name: "attachment test",
logTime: 0n,
createTime: 0n,
mediaType: "application/json",
data: new TextEncoder().encode(`{"test": "testValue"}`),
});
await writerAppendMode.addMetadata({
name: "metadata test",
metadata: new Map<string, string>([["test", "testValue"]]),
});
await writerAppendMode.addMessage({
channelId,
data: new Uint8Array(),
sequence: 1,
logTime: 1n,
publishTime: 1n,
});
const newChannelId = await writerAppendMode.registerChannel({
topic: "test/append",
schemaId,
messageEncoding: "json",
metadata: new Map(),
});
await writerAppendMode.addMessage({
channelId: newChannelId,
data: new Uint8Array(),
sequence: 2,
logTime: 2n,
publishTime: 2n,
});
await writerAppendMode.end();

const reader = new McapStreamReader();
reader.append(tempBuffer.get());
const records: TypedMcapRecord[] = [];
for (let rec; (rec = reader.nextRecord()); ) {
records.push(rec);
}

expect(reader.done()).toEqual(true);

expect(records).toEqual<TypedMcapRecord[]>([
{
type: "Header",
library: "",
profile: "",
},
{
type: "Schema",
id: 1,
encoding: "json",
data: new Uint8Array(),
name: "test1",
},
{
type: "Channel",
id: 0,
messageEncoding: "json",
metadata: new Map(),
schemaId: 1,
topic: "test",
},
{
type: "Message",
channelId: 0,
data: new Uint8Array(),
logTime: 0n,
publishTime: 0n,
sequence: 0,
},
{
type: "MessageIndex",
channelId: 0,
records: [[0n, 65n]],
},
{
type: "Attachment",
name: "attachment test",
logTime: 0n,
createTime: 0n,
mediaType: "application/json",
data: new TextEncoder().encode(`{"test": "testValue"}`),
},
{
type: "Metadata",
name: "metadata test",
metadata: new Map([["test", "testValue"]]),
},
{
type: "Schema",
id: 1,
encoding: "json",
data: new Uint8Array(),
name: "test1",
},
{
type: "Channel",
id: 0,
messageEncoding: "json",
metadata: new Map(),
schemaId: 1,
topic: "test",
},
{
type: "Message",
channelId: 0,
data: new Uint8Array(),
logTime: 1n,
publishTime: 1n,
sequence: 1,
},
{
type: "Channel",
id: 1,
messageEncoding: "json",
metadata: new Map(),
schemaId: 1,
topic: "test/append",
},
{
type: "Message",
channelId: 1,
data: new Uint8Array(),
logTime: 2n,
publishTime: 2n,
sequence: 2,
},
{
type: "MessageIndex",
channelId: 0,
records: [[1n, 65n]],
},
{
type: "MessageIndex",
channelId: 1,
records: [[2n, 136n]],
},
{
type: "DataEnd",
dataSectionCrc: 1361490560,
},
{
type: "Schema",
id: 1,
encoding: "json",
data: new Uint8Array(),
name: "test1",
},
{
type: "Channel",
id: 0,
messageEncoding: "json",
metadata: new Map(),
schemaId: 1,
topic: "test",
},
{
type: "Channel",
id: 1,
messageEncoding: "json",
metadata: new Map(),
schemaId: 1,
topic: "test/append",
},
{
type: "Statistics",
attachmentCount: 1,
channelCount: 2,
channelMessageCounts: new Map([
[0, 2n],
[1, 1n],
]),
chunkCount: 2,
messageCount: 3n,
messageEndTime: 2n,
messageStartTime: 0n,
metadataCount: 1,
schemaCount: 1,
},
{
type: "MetadataIndex",
offset: 298n,
length: 51n,
name: "metadata test",
},
{
type: "AttachmentIndex",
offset: 201n,
length: 97n,
logTime: 0n,
createTime: 0n,
dataSize: 21n,
name: "attachment test",
mediaType: "application/json",
},
{
type: "ChunkIndex",
chunkLength: 145n,
chunkStartOffset: 25n,
compressedSize: 96n,
compression: "",
messageEndTime: 0n,
messageIndexLength: 31n,
messageIndexOffsets: new Map([[0, 170n]]),
messageStartTime: 0n,
uncompressedSize: 96n,
},
{
type: "ChunkIndex",
chunkLength: 216n,
chunkStartOffset: 349n,
compressedSize: 167n,
compression: "",
messageEndTime: 2n,
messageIndexLength: 62n,
messageIndexOffsets: new Map([
[0, 565n],
[1, 596n],
]),
messageStartTime: 1n,
uncompressedSize: 167n,
},
{
type: "SummaryOffset",
groupLength: 32n,
groupOpcode: Opcode.SCHEMA,
groupStart: 640n,
},
{
type: "SummaryOffset",
groupLength: 73n,
groupOpcode: Opcode.CHANNEL,
groupStart: 672n,
},
{
type: "SummaryOffset",
groupLength: 75n,
groupOpcode: Opcode.STATISTICS,
groupStart: 745n,
},
{
type: "SummaryOffset",
groupLength: 42n,
groupOpcode: Opcode.METADATA_INDEX,
groupStart: 820n,
},
{
type: "SummaryOffset",
groupLength: 88n,
groupOpcode: Opcode.ATTACHMENT_INDEX,
groupStart: 862n,
},
{
type: "SummaryOffset",
groupLength: 176n,
groupOpcode: Opcode.CHUNK_INDEX,
groupStart: 950n,
},
{
type: "Footer",
summaryCrc: 1244986820,
summaryOffsetStart: 1126n,
summaryStart: 640n,
},
]);

// Confirm that the appended file is still readable
const readAppendedFile = new McapStreamReader();
readAppendedFile.append(tempBuffer.get());
const appendedRecords: TypedMcapRecord[] = [];
for (let rec; (rec = readAppendedFile.nextRecord()); ) {
appendedRecords.push(rec);
}

expect(readAppendedFile.done()).toEqual(true);
});
});
6 changes: 5 additions & 1 deletion typescript/core/src/TempBuffer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { ISeekableWriter } from "./ISeekableWriter";
import { IWritable } from "./IWritable";
import { IReadable } from "./types";

/**
* In-memory buffer used for reading and writing MCAP files in tests. Can be used as both an IReadable and an IWritable.
*/
export class TempBuffer implements IReadable, IWritable {
export class TempBuffer implements IReadable, IWritable, ISeekableWriter {
#buffer = new ArrayBuffer(1024);
#size = 0;

position(): bigint {
return BigInt(this.#size);
}
async seek(position: bigint): Promise<void> {
this.#size = Number(position);
}
async write(data: Uint8Array): Promise<void> {
if (this.#size + data.byteLength > this.#buffer.byteLength) {
const newBuffer = new ArrayBuffer(this.#size + data.byteLength);
Expand Down

0 comments on commit 4e58b74

Please sign in to comment.