Skip to content

Commit

Permalink
Add Mcap Append mode to Mcap Writer in order to add attachments/metadata
Browse files Browse the repository at this point in the history
to an existing Mcap.
  • Loading branch information
alexern14 committed Nov 16, 2023
1 parent c6526f3 commit 6553151
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 1 deletion.
9 changes: 9 additions & 0 deletions typescript/core/src/IAppendWritable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { IWritable } from "./IWritable";

/**
* IAppendWritable describes a writer interface with append abilities.
*/
export interface IAppendWritable extends IWritable {
// Seek the cursor to the given position
seek(position: bigint): Promise<void>;
}
159 changes: 158 additions & 1 deletion typescript/core/src/McapWriter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { crc32Init, crc32Update, crc32Final, crc32 } from "@foxglove/crc";

import { ChunkBuilder } from "./ChunkBuilder";
import { IAppendWritable } from "./IAppendWritable";
import { IWritable } from "./IWritable";
import { McapIndexedReader } from "./McapIndexedReader";
import { McapRecordBuilder } from "./McapRecordBuilder";
import { Opcode } from "./constants";
import {
Expand All @@ -18,6 +20,7 @@ import {
SummaryOffset,
Metadata,
Statistics,
IReadable,
} from "./types";

export type McapWriterOptions = {
Expand All @@ -34,6 +37,7 @@ export type McapWriterOptions = {
startChannelId?: number;
chunkSize?: number;
compressChunk?: (chunkData: Uint8Array) => { compression: string; compressedData: Uint8Array };
appendMode?: boolean;
};

/**
Expand Down Expand Up @@ -64,6 +68,9 @@ export class McapWriter {
#repeatSchemas: boolean;
#repeatChannels: boolean;

// If using append mode, InitializeInAppendMode() should be used to create the McapWriter
#appendMode: boolean;

// indices
#chunkIndices: ChunkIndex[] | undefined;
#attachmentIndices: AttachmentIndex[] | undefined;
Expand All @@ -84,6 +91,7 @@ export class McapWriter {
startChannelId = 0,
chunkSize = 1024 * 1024,
compressChunk,
appendMode = false,
} = options;

this.#writable = writable;
Expand All @@ -106,6 +114,7 @@ export class McapWriter {
}
this.#repeatSchemas = repeatSchemas;
this.#repeatChannels = repeatChannels;
this.#appendMode = appendMode;
if (useAttachmentIndex) {
this.#attachmentIndices = [];
}
Expand All @@ -120,7 +129,60 @@ export class McapWriter {
this.#compressChunk = compressChunk;
}

/**
* Initializes a new McapWriter in append mode.
*/
static async InitializeInAppendMode({
writable,
readable,
}: {
writable: IAppendWritable;
readable: IReadable;
}): Promise<McapWriter> {
const reader = await McapIndexedReader.Initialize({ readable });

await writable.seek(reader.dataEndOffset!);

const writer = new McapWriter({
writable,
appendMode: true,
});

for (const attachmentIndex of reader.attachmentIndexes) {
await writer.addAttachmentIndex(attachmentIndex);
}

for (const metadataIndex of reader.metadataIndexes) {
await writer.addMetadataIndex(metadataIndex);
}

for (const schema of reader.schemasById) {
await writer.registerExistingSchema(schema[1]);
}

for (const channel of reader.channelsById) {
await writer.registerExistingChannel(channel[1]);
}

for (const chunkIndex of reader.chunkIndexes) {
await writer.addChunkIndex(chunkIndex);
}

await writer.setStatistics(
reader.statistics!.messageCount,
reader.statistics!.messageStartTime,
reader.statistics!.messageEndTime,
reader.statistics!.chunkCount,
reader.statistics!.channelMessageCounts,
);

return writer;
}

async start(header: Header): Promise<void> {
if (this.#appendMode) {
return;
}
this.#recordWriter.writeMagic();
this.#recordWriter.writeHeader(header);

Expand All @@ -130,7 +192,9 @@ export class McapWriter {
}

async end(): Promise<void> {
await this.#finalizeChunk();
if (!this.#appendMode) {
await this.#finalizeChunk();
}

this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer);
await this.#writable.write(this.#recordWriter.buffer);
Expand Down Expand Up @@ -469,4 +533,97 @@ export class McapWriter {
await this.#writable.write(this.#recordWriter.buffer);
this.#recordWriter.reset();
}

/**
* Add attachment index from existing MCAP file.
* The purpose of this is to update the information for the new summary with the existing data from the existing MCAP file.
*/
private async addAttachmentIndex(attachmentIndex: AttachmentIndex): Promise<void> {
if (this.statistics) {
++this.statistics.attachmentCount;
}

if (this.#attachmentIndices) {
this.#attachmentIndices.push(attachmentIndex);
}

this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer);
this.#recordWriter.reset();
}

/**
* Add metadata index from existing MCAP file.
* The purpose of this is to update the information for the new summary with the existing data from the existing MCAP file.
*/
private async addMetadataIndex(metadataIndex: MetadataIndex): Promise<void> {
if (this.statistics) {
++this.statistics.metadataCount;
}

if (this.#metadataIndices) {
this.#metadataIndices.push(metadataIndex);
}

this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer);
this.#recordWriter.reset();
}

/**
* Add chunk index from existing MCAP file.
* The purpose of this is to update the information for the new summary with the existing data from the existing MCAP file.
*/
private async addChunkIndex(chunkIndex: ChunkIndex): Promise<void> {
if (this.statistics) {
++this.statistics.chunkCount;
}

if (this.#chunkIndices) {
this.#chunkIndices.push(chunkIndex);
}

this.#dataSectionCrc = crc32Update(this.#dataSectionCrc, this.#recordWriter.buffer);
this.#recordWriter.reset();
}

/**
* Set statistics from existing MCAP file.
* The purpose of this is to update the information for the new summary with the existing data from the existing MCAP file.
*/
private async setStatistics(
messageCount: bigint,
messageStartTime: bigint,
messageEndTime: bigint,
chunkCount: number,
channelMessageCounts: Map<number, bigint>,
): Promise<void> {
if (this.statistics) {
this.statistics.messageCount = messageCount;
this.statistics.messageStartTime = messageStartTime;
this.statistics.messageEndTime = messageEndTime;
this.statistics.chunkCount = chunkCount;
this.statistics.channelMessageCounts = channelMessageCounts;
}
}

/**
* Register schema from existing MCAP file.
* The purpose of this is to update the information for the new summary with the existing data from the existing MCAP file.
*/
private async registerExistingSchema(info: Schema): Promise<void> {
this.#schemas.set(info.id, info);
if (this.statistics) {
++this.statistics.schemaCount;
}
}

/**
* Register channel from existing MCAP file.
* The purpose of this is to update the information for the new summary with the existing data from the existing MCAP file.
*/
private async registerExistingChannel(info: Channel): Promise<void> {
this.#channels.set(info.id, info);
if (this.statistics) {
++this.statistics.channelCount;
}
}
}

0 comments on commit 6553151

Please sign in to comment.