Skip to content

Commit

Permalink
core[patch]: Keep event stream for streamEvents v2 open until end of …
Browse files Browse the repository at this point in the history
…run (#5561)

* Keep event stream for streamEvents v2 open until end of run

* Style nit

* Lint

* Small fix
  • Loading branch information
jacoblee93 authored May 28, 2024
1 parent d79e80d commit a6b9508
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 13 deletions.
22 changes: 9 additions & 13 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ export abstract class Runnable<
): AsyncGenerator<StreamEvent> {
const eventStreamer = new EventStreamCallbackHandler({
...streamOptions,
autoClose: false,
autoClose: true,
});
const config = ensureConfig(options);
const runId = config.runId ?? uuidv4();
Expand All @@ -845,18 +845,14 @@ export abstract class Runnable<
// add each chunk to the output stream
const outerThis = this;
async function consumeRunnableStream() {
try {
const runnableStream = await outerThis.stream(input, config);
const tappedStream = eventStreamer.tapOutputIterable(
runId,
runnableStream
);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of tappedStream) {
// Just iterate so that the callback handler picks up events
}
} finally {
await eventStreamer.writer.close();
const runnableStream = await outerThis.stream(input, config);
const tappedStream = eventStreamer.tapOutputIterable(
runId,
runnableStream
);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of tappedStream) {
// Just iterate so that the callback handler picks up events
}
}
const runnableStreamConsumePromise = consumeRunnableStream();
Expand Down
16 changes: 16 additions & 0 deletions langchain-core/src/tracers/event_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ export class EventStreamCallbackHandler extends BaseTracer {
} finally {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
tappedPromiseResolver!();
// Don't delete from the map to keep track of which runs have been tapped.
}
} else {
// otherwise just pass through
Expand Down Expand Up @@ -596,4 +597,19 @@ export class EventStreamCallbackHandler extends BaseTracer {
runInfo
);
}

async onRunCreate(run: Run): Promise<void> {
if (this.rootId === undefined) {
this.rootId = run.id;
}
}

async onRunUpdate(run: Run): Promise<void> {
if (run.id === this.rootId && this.autoClose) {
const pendingPromises = [...this.tappedPromises.values()];
void Promise.all(pendingPromises).finally(() => {
void this.writer.close();
});
}
}
}
39 changes: 39 additions & 0 deletions langchain/src/agents/tests/create_tool_calling_agent.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,42 @@ test("createToolCallingAgent works", async () => {
// an investigation into why such a short generation was returned.
expect(result.output.length).toBeGreaterThan(10);
});

test("createToolCallingAgent stream events works", async () => {
const prompt = ChatPromptTemplate.fromMessages([
["system", "You are a helpful assistant"],
["placeholder", "{chat_history}"],
["human", "{input}"],
["placeholder", "{agent_scratchpad}"],
]);
const llm = new ChatOpenAI({
modelName: "gpt-4o",
temperature: 0,
});
const agent = await createToolCallingAgent({
llm,
tools,
prompt,
});
const agentExecutor = new AgentExecutor({
agent,
tools,
});
const input = "what is the current weather in SF?";
const eventStream = agentExecutor.streamEvents(
{
input,
},
{
version: "v2",
}
);

for await (const event of eventStream) {
const eventType = event.event;
console.log("Event type: ", eventType);
if (eventType === "on_chat_model_stream") {
console.log("Content: ", event.data);
}
}
});

0 comments on commit a6b9508

Please sign in to comment.