Skip to content

Commit

Permalink
[web-wasm] Improve queue tick logic (#935)
Browse files Browse the repository at this point in the history
  • Loading branch information
wkozyra95 authored Jan 29, 2025
1 parent 09b21ef commit 0809e0d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 30 deletions.
3 changes: 1 addition & 2 deletions ts/@live-compositor/web-wasm/src/compositor/compositor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export default class LiveCompositor {
public async init(): Promise<void> {
assert(wasmBundleUrl, 'Location of WASM bundle is not defined, call setWasmBundleUrl() first.');
this.instance = new WasmInstance({
framerate: this.options.framerate ?? { num: 60, den: 1 },
framerate: this.options.framerate ?? { num: 30, den: 1 },
wasmBundleUrl,
logger: this.logger.child({ element: 'wasmInstance' }),
});
Expand Down Expand Up @@ -108,6 +108,5 @@ export default class LiveCompositor {
*/
public async terminate(): Promise<void> {
await this.coreCompositor?.terminate();
await this.instance?.terminate();
}
}
3 changes: 2 additions & 1 deletion ts/@live-compositor/web-wasm/src/mainContext/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class WasmInstance implements CompositorManager {
}

public async terminate(): Promise<void> {
this.logger.debug('Terminate WASM instance.');
await Promise.all(Object.values(this.outputs).map(output => output.terminate()));
await Promise.all(Object.values(this.inputs).map(input => input.terminate()));
await this.worker.postMessage({ type: 'terminate' });
Expand Down Expand Up @@ -136,7 +137,7 @@ class WasmInstance implements CompositorManager {
this.outputs[route.id] = output;
return result;
} else if (route.operation === 'unregister') {
const output = this.inputs[route.id];
const output = this.outputs[route.id];
if (output) {
delete this.outputs[route.id];
await output.terminate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class WhipOutput implements Output {
}

public async terminate(): Promise<void> {
this.options.logger.debug('terminate WHIP connection');
try {
await fetch(this.location ?? this.options.endpointUrl, {
method: 'DELETE',
Expand Down
1 change: 0 additions & 1 deletion ts/@live-compositor/web-wasm/src/workerContext/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export class Pipeline {
}

public async terminate(): Promise<void> {
// TODO(noituri): Clean all remaining `InputFrame`s & stop input processing
this.queue.stop();
}

Expand Down
106 changes: 81 additions & 25 deletions ts/@live-compositor/web-wasm/src/workerContext/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import type { Frame, FrameSet, InputId, OutputId, Renderer } from '@live-composi
import type { Framerate } from '../compositor/compositor';
import type { Input } from './input/input';
import type { Output } from './output/output';
import type { Interval } from '../utils';
import { framerateToDurationMs } from '../utils';
import type { Timeout } from '../utils';
import type { Logger } from 'pino';

export type StopQueueFn = () => void;
Expand All @@ -12,40 +11,29 @@ export class Queue {
private inputs: Record<InputId, Input> = {};
private outputs: Record<OutputId, Output> = {};
private renderer: Renderer;
private framerate: Framerate;
private currentPts: number;
private startTimeMs?: number;
private queueInterval?: Interval;
private logger: Logger;
private frameTicker: FrameTicker;
private startTimeMs?: number;

public constructor(framerate: Framerate, renderer: Renderer, logger: Logger) {
this.renderer = renderer;
this.framerate = framerate;
this.currentPts = 0;
this.logger = logger;
this.frameTicker = new FrameTicker(framerate, logger);
}

public start() {
this.logger.debug('Start queue');
if (this.queueInterval) {
throw new Error('Queue was already started.');
}
const tickDuration = framerateToDurationMs(this.framerate);
// TODO: setInterval can drift, this implementation needs to be replaced
this.queueInterval = setInterval(async () => {
await this.onTick();
this.currentPts += tickDuration;
}, tickDuration);
this.startTimeMs = Date.now();
this.frameTicker.start(this.startTimeMs, async (pts: number) => {
await this.onTick(pts);
});
for (const input of Object.values(this.inputs)) {
input.updateQueueStartTime(this.startTimeMs);
}
}

public stop() {
if (this.queueInterval) {
clearTimeout(this.queueInterval);
}
this.frameTicker.stop();
for (const input of Object.values(this.inputs)) {
input.close();
}
Expand Down Expand Up @@ -84,22 +72,22 @@ export class Queue {
return this.outputs[outputId];
}

private async onTick(): Promise<void> {
const frames = await this.getInputFrames();
private async onTick(currentPtsMs: number): Promise<void> {
const frames = await this.getInputFrames(currentPtsMs);
this.logger.trace({ frames }, 'onQueueTick');

const outputs = this.renderer.render({
ptsMs: this.currentPts,
ptsMs: currentPtsMs,
frames,
});
this.sendOutputs(outputs);
}

private async getInputFrames(): Promise<Record<InputId, Frame>> {
private async getInputFrames(currentPtsMs: number): Promise<Record<InputId, Frame>> {
const frames: Array<[InputId, Frame | undefined]> = await Promise.all(
Object.entries(this.inputs).map(async ([inputId, input]) => [
inputId,
await input.getFrame(this.currentPts),
await input.getFrame(currentPtsMs),
])
);
const validFrames = frames.filter((entry): entry is [string, Frame] => !!entry[1]);
Expand All @@ -118,3 +106,71 @@ export class Queue {
}
}
}

class FrameTicker {
private framerate: Framerate;
private onTick?: (pts: number) => Promise<void>;
private logger: Logger;

private timeout?: Timeout;
private pendingTick?: Promise<void>;

private startTimeMs: number = 0; // init on start
private frameCounter: number = 0;

constructor(framerate: Framerate, logger: Logger) {
this.framerate = framerate;
this.logger = logger;
}

public start(startTimeMs: number, onTick: (pts: number) => Promise<void>) {
this.onTick = onTick;
this.startTimeMs = startTimeMs;
this.scheduleNext();
}

public stop() {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = undefined;
}
}

private scheduleNext() {
const timeoutDuration = this.startTimeMs + this.currentPtsMs() - Date.now();
this.timeout = setTimeout(
() => {
void this.doTick();
this.scheduleNext();
},
Math.max(timeoutDuration, 0)
);
}

private async doTick(): Promise<void> {
if (this.pendingTick) {
return;
}
this.maybeSkipFrames();
try {
this.pendingTick = this.onTick?.(this.currentPtsMs());
await this.pendingTick;
} catch (err: any) {
this.logger.warn(err, 'Queue tick failed.');
}
this.pendingTick = undefined;
this.frameCounter += 1;
}

private currentPtsMs(): number {
return this.frameCounter * 1000 * (this.framerate.den / this.framerate.num);
}

private maybeSkipFrames() {
const frameDurationMs = 1000 * (this.framerate.den / this.framerate.num);
while (Date.now() - this.startTimeMs > this.currentPtsMs() + frameDurationMs) {
this.logger.info(`Processing to slow, dropping frame (frameCounter: ${this.frameCounter})`);
this.frameCounter += 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function WhipExample() {
bearerToken: streamKey,
video: {
resolution: { width: 1920, height: 1080 },
maxBitrate: 1_000_000,
maxBitrate: 2_000_000,
},
});

Expand Down

0 comments on commit 0809e0d

Please sign in to comment.