Skip to content

Commit

Permalink
Implement the tail stream RPC interface (#3228)
Browse files Browse the repository at this point in the history
* Implement tracing::TailStreamWriter
* Add the streaming-tail-workers autogate
* Implement the tail stream rpc interface
* Update type snapshot
  • Loading branch information
jasnell authored Dec 13, 2024
1 parent 02ff150 commit 83eb364
Show file tree
Hide file tree
Showing 37 changed files with 5,609 additions and 43 deletions.
5 changes: 4 additions & 1 deletion samples/tail-workers/config.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ const tailWorkerExample :Workerd.Config = (
(name = "main", worker = .helloWorld),
(name = "log", worker = .logWorker),
],
sockets = [ ( name = "http", address = "*:8080", http = (), service = "main" ) ]
sockets = [ ( name = "http", address = "*:8080", http = (), service = "main" ) ],
autogates = [
"workerd-autogate-streaming-tail-workers",
],
);

const helloWorld :Workerd.Worker = (
Expand Down
5 changes: 4 additions & 1 deletion samples/tail-workers/tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ export default {
// https://developers.cloudflare.com/workers/observability/logs/tail-workers/
tail(traces) {
console.log(traces[0].logs);
}
},
tailStream() {
return {};
},
};
7 changes: 7 additions & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ struct ExportedHandler {
jsg::LenientOptional<jsg::Function<TailHandler>> tail;
jsg::LenientOptional<jsg::Function<TailHandler>> trace;

typedef kj::Promise<void> TailStreamHandler(
jsg::JsObject obj, jsg::Value env, jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
jsg::LenientOptional<jsg::Function<TailStreamHandler>> tailStream;

typedef kj::Promise<void> ScheduledHandler(jsg::Ref<ScheduledController> controller,
jsg::Value env,
jsg::Optional<jsg::Ref<ExecutionContext>> ctx);
Expand Down Expand Up @@ -319,6 +323,7 @@ struct ExportedHandler {
JSG_STRUCT(fetch,
tail,
trace,
tailStream,
scheduled,
alarm,
test,
Expand All @@ -335,6 +340,7 @@ struct ExportedHandler {
type ExportedHandlerFetchHandler<Env = unknown, CfHostMetadata = unknown> = (request: Request<CfHostMetadata, IncomingRequestCfProperties<CfHostMetadata>>, env: Env, ctx: ExecutionContext) => Response | Promise<Response>;
type ExportedHandlerTailHandler<Env = unknown> = (events: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise<void>;
type ExportedHandlerTraceHandler<Env = unknown> = (traces: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise<void>;
type ExportedHandlerTailStreamHandler<Env = unknown> = (event : TailStream.TailEvent, env: Env, ctx: ExecutionContext) => TailStream.TailEventHandlerType | Promise<TailStream.TailEventHandlerType>;
type ExportedHandlerScheduledHandler<Env = unknown> = (controller: ScheduledController, env: Env, ctx: ExecutionContext) => void | Promise<void>;
type ExportedHandlerQueueHandler<Env = unknown, Message = unknown> = (batch: MessageBatch<Message>, env: Env, ctx: ExecutionContext) => void | Promise<void>;
type ExportedHandlerTestHandler<Env = unknown> = (controller: TestController, env: Env, ctx: ExecutionContext) => void | Promise<void>;
Expand All @@ -344,6 +350,7 @@ struct ExportedHandler {
fetch?: ExportedHandlerFetchHandler<Env, CfHostMetadata>;
tail?: ExportedHandlerTailHandler<Env>;
trace?: ExportedHandlerTraceHandler<Env>;
tailStream?: ExportedHandlerTailStreamHandler<Env>;
scheduled?: ExportedHandlerScheduledHandler<Env>;
alarm: never;
webSocketMessage: never;
Expand Down
12 changes: 12 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ wd_cc_library(
],
)

wd_cc_library(
name = "trace-stream",
srcs = ["trace-stream.c++"],
hdrs = ["trace-stream.h"],
visibility = ["//visibility:public"],
deps = [
":io",
"//src/workerd/util:completion-membrane",
],
)

wd_cc_library(
name = "io",
# HACK: Currently, the `io` and `api` packages are interdependent. We fold all the sources
Expand Down Expand Up @@ -106,6 +117,7 @@ wd_cc_library(
"//src/workerd/api:url",
"//src/workerd/jsg",
"//src/workerd/util:autogate",
"//src/workerd/util:completion-membrane",
"//src/workerd/util:sqlite",
"//src/workerd/util:thread-scopes",
"//src/workerd/util:uuid",
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,10 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
SpanParent getCurrentTraceSpan();
SpanParent getCurrentUserTraceSpan();

tracing::InvocationSpanContext& getInvocationSpanContext() {
return getCurrentIncomingRequest().invocationSpanContext;
}

// Returns a builder for recording tracing spans (or a no-op builder if tracing is inactive).
// If called while the JS lock is held, uses the trace information from the current async
// context, if available.
Expand Down
15 changes: 15 additions & 0 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

namespace workerd {

class IoContext;
class WorkerInterface;
class LimitEnforcer;
class TimerChannel;
Expand Down Expand Up @@ -127,6 +128,20 @@ class RequestObserver: public kj::Refcounted {
return nullptr;
}

// If the worker is configured to support streaming tail workers, reportTailEvent
// will forward the given event on to the collection of streaming tail workers
// that are configured with this observer. Otherwise, this is a non-op.
virtual void reportTailEvent(IoContext& ioContext, tracing::TailEvent::Event&& event) {
reportTailEvent(ioContext, [event = kj::mv(event)]() mutable { return kj::mv(event); });
}

// If the worker is configured to support streaming tail workers, reportTailEvent
// will forward the event returned by the callback on to the collection of streaming
// fail workers that are configured with this observer. The callback will only be
// invoked if there are tail workers.
virtual void reportTailEvent(
IoContext& ioContext, kj::FunctionParam<tracing::TailEvent::Event()> fn) {}

virtual kj::Own<void> addedContextTask() {
return kj::Own<void>();
}
Expand Down
Loading

0 comments on commit 83eb364

Please sign in to comment.