diff --git a/src/workerd/api/hibernatable-web-socket.c++ b/src/workerd/api/hibernatable-web-socket.c++ index bb534cd0173..ecb83fe7fe5 100644 --- a/src/workerd/api/hibernatable-web-socket.c++ +++ b/src/workerd/api/hibernatable-web-socket.c++ @@ -74,7 +74,7 @@ kj::Promise HibernatableWebSocketCustomEve auto eventParameters = consumeParams(); - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { Trace::HibernatableWebSocketEventInfo::Type type = [&]() -> Trace::HibernatableWebSocketEventInfo::Type { KJ_SWITCH_ONEOF(eventParameters.eventType) { diff --git a/src/workerd/api/node/diagnostics-channel.c++ b/src/workerd/api/node/diagnostics-channel.c++ index f8e72b1c03d..7e7923de50e 100644 --- a/src/workerd/api/node/diagnostics-channel.c++ +++ b/src/workerd/api/node/diagnostics-channel.c++ @@ -26,7 +26,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) { } auto& context = IoContext::current(); - KJ_IF_SOME(tracer, context.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, context.getWorkerTracer()) { jsg::Serializer ser(js, jsg::Serializer::Options{ .omitHeader = false, diff --git a/src/workerd/api/node/util.c++ b/src/workerd/api/node/util.c++ index e8a882bf854..5fe83665f23 100644 --- a/src/workerd/api/node/util.c++ +++ b/src/workerd/api/node/util.c++ @@ -249,7 +249,7 @@ namespace { kj::str("The Node.js process.exit(", code, ") API was called. Canceling the request."); auto& ioContext = IoContext::current(); // If we have a tail worker, let's report the error. - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { // Why create the error like this in tracing? Because we're adding the exception // to the trace and ideally we'd have the JS stack attached to it. Just using // JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index efb0fd44af7..c0cf06a3e43 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -530,7 +530,7 @@ kj::Promise QueueCustomEventImpl::run( } } - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize)); } diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 6c5f07b4271..c71cfe67864 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -603,7 +603,7 @@ kj::Promise sendTracesToExportedHandler(kj::OwngetContext(); auto& metrics = incomingRequest->getMetrics(); - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { t.setEventInfo(context.now(), Trace::TraceEventInfo(traces)); } diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index e4a49729614..863e23fca54 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -1602,11 +1602,14 @@ void RpcSerializerExternalHander::serializeFunction( // call of an RPC session. class EntrypointJsRpcTarget final: public JsRpcTargetBase { public: - EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe entrypointName) + EntrypointJsRpcTarget(IoContext& ioCtx, + kj::Maybe entrypointName, + kj::Maybe> tracer) : JsRpcTargetBase(ioCtx), // Most of the time we don't really have to clone this but it's hard to fully prove, so // let's be safe. - entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })) {} + entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })), + tracer(kj::mv(tracer)) {} TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override { jsg::Lock& js = lock; @@ -1644,6 +1647,7 @@ public: private: kj::Maybe entrypointName; + kj::Maybe> tracer; bool isReservedName(kj::StringPtr name) override { if ( // "fetch" and "connect" are treated specially on entrypoints. @@ -1666,8 +1670,8 @@ private: } void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override { - KJ_IF_SOME(t, ioctx.getMetrics().getWorkerTracer()) { - t.setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); + KJ_IF_SOME(t, tracer) { + t->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); } } }; @@ -1721,7 +1725,8 @@ kj::Promise JsRpcSessionCustomEventImpl::r incomingRequest->delivered(); auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller(); - capFulfiller->fulfill(capnp::membrane(kj::heap(ioctx, entrypointName), + capFulfiller->fulfill(capnp::membrane(kj::heap(ioctx, entrypointName, + mapAddRef(incomingRequest->getWorkerTracer())), kj::refcounted(kj::mv(doneFulfiller)))); KJ_DEFER({ diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 55dbfff94a0..0d0d1b46631 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -200,9 +200,11 @@ IoContext::IoContext(ThreadContext& thread, IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own contextParam, kj::Own ioChannelFactoryParam, - kj::Own metricsParam) + kj::Own metricsParam, + kj::Maybe> workerTracer) : context(kj::mv(contextParam)), metrics(kj::mv(metricsParam)), + workerTracer(kj::mv(workerTracer)), ioChannelFactory(kj::mv(ioChannelFactoryParam)) {} // A call to delivered() implies a promise to call drain() later (or one of the other methods @@ -338,7 +340,7 @@ void IoContext::logUncaughtException( void IoContext::logUncaughtExceptionAsync( UncaughtExceptionSource source, kj::Exception&& exception) { - if (getMetrics().getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) { + if (getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) { // We don't need to take the isolate lock as neither inspecting nor tracing is enabled. We // do still want to syslog if relevant, but we can do that without a lock. if (!jsg::isTunneledException(exception.getDescription()) && diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index cfaa3e1557b..b44a6aa07d5 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -107,7 +107,8 @@ class IoContext_IncomingRequest final { public: IoContext_IncomingRequest(kj::Own context, kj::Own ioChannelFactory, - kj::Own metrics); + kj::Own metrics, + kj::Maybe> workerTracer); KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest); ~IoContext_IncomingRequest() noexcept(false); @@ -153,9 +154,14 @@ class IoContext_IncomingRequest final { return *metrics; } + kj::Maybe getWorkerTracer() { + return workerTracer; + } + private: kj::Own context; kj::Own metrics; + kj::Maybe> workerTracer; kj::Own ioChannelFactory; bool wasDelivered = false; @@ -229,6 +235,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler return *getCurrentIncomingRequest().metrics; } + const kj::Maybe getWorkerTracer() { + if (incomingRequests.empty()) return kj::none; + return getCurrentIncomingRequest().getWorkerTracer(); + } + LimitEnforcer& getLimitEnforcer() { return *limitEnforcer; } diff --git a/src/workerd/io/observer.h b/src/workerd/io/observer.h index 66df85e9cc2..e1eed46e03c 100644 --- a/src/workerd/io/observer.h +++ b/src/workerd/io/observer.h @@ -103,10 +103,6 @@ class RequestObserver: public kj::Refcounted { return nullptr; } - virtual kj::Maybe getWorkerTracer() { - return kj::none; - } - virtual kj::Own addedContextTask() { return kj::Own(); } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index deec4246a26..5d53b782aae 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -591,7 +591,7 @@ kj::Own PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint), executionModel); traces.add(kj::addRef(*trace)); - return kj::refcounted(kj::addRef(*this), kj::mv(trace), pipelineLogLevel); + return kj::refcounted(addRefToThis(), kj::mv(trace), pipelineLogLevel); } void PipelineTracer::addTrace(rpc::Trace::Reader reader) { diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 1bc8a212d42..9b3bbc93fc0 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -52,6 +52,7 @@ public: kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson); kj::Promise request(kj::HttpMethod method, @@ -94,7 +95,8 @@ private: kj::Own limitEnforcer, kj::Own ioContextDependency, kj::Own ioChannelFactory, - kj::Own metrics); + kj::Own metrics, + kj::Maybe> workerTracer); template kj::Promise maybeAddGcPassForTest(IoContext& context, kj::Promise promise); @@ -155,12 +157,13 @@ kj::Own WorkerEntrypoint::construct(ThreadContext& threadContex kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson) { TRACE_EVENT("workerd", "WorkerEntrypoint::construct()"); auto obj = kj::heap(kj::Badge(), threadContext, waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson)); obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), - kj::mv(ioChannelFactory), kj::addRef(*metrics)); + kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer)); auto& wrapper = metrics->wrapWorkerInterface(*obj); return kj::attachRef(wrapper, kj::mv(obj), kj::mv(metrics)); } @@ -182,7 +185,8 @@ void WorkerEntrypoint::init(kj::Own worker, kj::Own limitEnforcer, kj::Own ioContextDependency, kj::Own ioChannelFactory, - kj::Own metrics) { + kj::Own metrics, + kj::Maybe> workerTracer) { TRACE_EVENT("workerd", "WorkerEntrypoint::init()"); // We need to construct the IoContext -- unless this is an actor and it already has a // IoContext, in which case we reuse it. @@ -208,7 +212,7 @@ void WorkerEntrypoint::init(kj::Own worker, } incomingRequest = kj::heap( - kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics)) + kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics), kj::mv(workerTracer)) .attach(kj::mv(actor)); } @@ -229,7 +233,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, bool isActor = context.getActor() != kj::none; - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { auto timestamp = context.now(); kj::String cfJson; KJ_IF_SOME(c, cfBlobJson) { @@ -472,7 +476,7 @@ kj::Promise WorkerEntrypoint::runScheduled( // calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have // to think more about this. - KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, context.getWorkerTracer()) { double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS; t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron))); } @@ -531,7 +535,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( // There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events). incomingRequest->delivered(); - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime)); } @@ -711,10 +715,11 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson) { return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory), - kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(cfBlobJson)); + kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson)); } } // namespace workerd diff --git a/src/workerd/io/worker-entrypoint.h b/src/workerd/io/worker-entrypoint.h index ac3fc8e4e5a..e5213e8d6e5 100644 --- a/src/workerd/io/worker-entrypoint.h +++ b/src/workerd/io/worker-entrypoint.h @@ -33,6 +33,7 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson); } // namespace workerd diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 7d023468977..753690988f0 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -1047,7 +1047,7 @@ Worker::Isolate::Isolate(kj::Own apiParam, // Only add exception to trace when running within an I/O context with a tracer. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER, error, api->getErrorInterfaceTypeHandler(js)); } @@ -1840,7 +1840,7 @@ void Worker::handleLog(jsg::Lock& js, // Only check tracing if console.log() was not invoked at the top level. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { auto timestamp = ioContext.now(); tracer.log(timestamp, level, message()); } @@ -2039,7 +2039,7 @@ void Worker::Lock::logUncaughtException( // Only add exception to trace when running within an I/O context with a tracer. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { JSG_WITHIN_CONTEXT_SCOPE(*this, getContext(), [&](jsg::Lock& js) { addExceptionToTrace(impl->inner, ioContext, tracer, source, exception, worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this)); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index a9d0859c576..ab8083dd9fe 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1541,7 +1541,8 @@ public: kj::Own(this, kj::NullDisposer::instance), kj::refcounted(), // default observer makes no observations waitUntilTasks, - true, // tunnelExceptions + true, // tunnelExceptions + kj::none, // workerTracer kj::mv(metadata.cfBlobJson)); } diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index 995697cb2e4..3aec147aefd 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -410,7 +410,7 @@ kj::Own TestFixture::createIncomingRequest() { auto context = kj::refcounted( threadContext, kj::atomicAddRef(*worker), actor, kj::heap()); auto incomingRequest = kj::heap(kj::addRef(*context), - kj::heap(*timerChannel), kj::refcounted()); + kj::heap(*timerChannel), kj::refcounted(), nullptr); incomingRequest->delivered(); return incomingRequest; } diff --git a/src/workerd/util/own-util.h b/src/workerd/util/own-util.h index 9d2cbda0a61..e9a2fb64b4c 100644 --- a/src/workerd/util/own-util.h +++ b/src/workerd/util/own-util.h @@ -14,6 +14,11 @@ inline auto mapAddRef(kj::Maybe>& maybe) -> kj::Maybe> { return maybe.map([](kj::Own& t) { return kj::addRef(*t); }); } +template +inline auto mapAddRef(kj::Maybe>& maybe) -> kj::Maybe> { + return maybe.map([](kj::Rc& t) { return t.addRef(); }); +} + template inline auto mapAddRef(kj::Maybe maybe) -> kj::Maybe> { return maybe.map([](T& t) { return kj::addRef(t); });