From 52b29ed150a813bb73bdac2afdc96f996778999d Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 22 Oct 2024 09:31:16 -0700 Subject: [PATCH 1/2] Revert "Move kj::Own to kj::Rc" This reverts commit f2e1730e18f223bac4f2208e7e296c42a5ff5312. --- src/workerd/api/hibernatable-web-socket.c++ | 4 ++-- src/workerd/api/node/diagnostics-channel.c++ | 2 +- src/workerd/api/node/util.c++ | 2 +- src/workerd/api/queue.c++ | 4 ++-- src/workerd/api/trace.c++ | 4 ++-- src/workerd/api/worker-rpc.c++ | 4 ++-- src/workerd/io/observer.h | 2 +- src/workerd/io/trace.c++ | 4 ++-- src/workerd/io/trace.h | 2 +- src/workerd/io/worker-entrypoint.c++ | 8 ++++---- src/workerd/io/worker.c++ | 6 +++--- src/workerd/util/own-util.h | 5 ----- 12 files changed, 21 insertions(+), 26 deletions(-) diff --git a/src/workerd/api/hibernatable-web-socket.c++ b/src/workerd/api/hibernatable-web-socket.c++ index bdbcfec74a2..bb534cd0173 100644 --- a/src/workerd/api/hibernatable-web-socket.c++ +++ b/src/workerd/api/hibernatable-web-socket.c++ @@ -74,7 +74,7 @@ kj::Promise HibernatableWebSocketCustomEve auto eventParameters = consumeParams(); - KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { Trace::HibernatableWebSocketEventInfo::Type type = [&]() -> Trace::HibernatableWebSocketEventInfo::Type { KJ_SWITCH_ONEOF(eventParameters.eventType) { @@ -95,7 +95,7 @@ kj::Promise HibernatableWebSocketCustomEve KJ_UNREACHABLE; }(); - tracer->setEventInfo(context.now(), Trace::HibernatableWebSocketEventInfo(kj::mv(type))); + t.setEventInfo(context.now(), Trace::HibernatableWebSocketEventInfo(kj::mv(type))); } try { diff --git a/src/workerd/api/node/diagnostics-channel.c++ b/src/workerd/api/node/diagnostics-channel.c++ index cba220c53be..f8e72b1c03d 100644 --- a/src/workerd/api/node/diagnostics-channel.c++ +++ b/src/workerd/api/node/diagnostics-channel.c++ @@ -37,7 +37,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) { Error, "Diagnostic events cannot be published with SharedArrayBuffer or " "transferred ArrayBuffer instances"); - tracer->addDiagnosticChannelEvent(context.now(), name.toString(js), kj::mv(tmp.data)); + tracer.addDiagnosticChannelEvent(context.now(), name.toString(js), kj::mv(tmp.data)); } } diff --git a/src/workerd/api/node/util.c++ b/src/workerd/api/node/util.c++ index 6b68bc11b06..e8a882bf854 100644 --- a/src/workerd/api/node/util.c++ +++ b/src/workerd/api/node/util.c++ @@ -255,7 +255,7 @@ namespace { // JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost // of creating and capturing the stack when we actually need it. auto ex = KJ_ASSERT_NONNULL(js.error(message).tryCast()); - tracer->addException(ioContext.now(), ex.get(js, "name"_kj).toString(js), + tracer.addException(ioContext.now(), ex.get(js, "name"_kj).toString(js), ex.get(js, "message"_kj).toString(js), ex.get(js, "stack"_kj).toString(js)); ioContext.abort(js.exceptionToKj(ex)); } else { diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index dd58ddf1d5c..efb0fd44af7 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -530,8 +530,8 @@ kj::Promise QueueCustomEventImpl::run( } } - KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) { - tracer->setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize)); + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize)); } // Create a custom refcounted type for holding the queueEvent so that we can pass it to the diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 31d9084acb8..6c5f07b4271 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -603,8 +603,8 @@ kj::Promise sendTracesToExportedHandler(kj::OwngetContext(); auto& metrics = incomingRequest->getMetrics(); - KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) { - tracer->setEventInfo(context.now(), Trace::TraceEventInfo(traces)); + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + t.setEventInfo(context.now(), Trace::TraceEventInfo(traces)); } auto nonEmptyTraces = kj::Vector>(kj::size(traces)); diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index 8acf4dd37c2..e4a49729614 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -1666,8 +1666,8 @@ private: } void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override { - KJ_IF_SOME(tracer, ioctx.getMetrics().getWorkerTracer()) { - tracer->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); + KJ_IF_SOME(t, ioctx.getMetrics().getWorkerTracer()) { + t.setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); } } }; diff --git a/src/workerd/io/observer.h b/src/workerd/io/observer.h index 85f7086ce66..66df85e9cc2 100644 --- a/src/workerd/io/observer.h +++ b/src/workerd/io/observer.h @@ -103,7 +103,7 @@ class RequestObserver: public kj::Refcounted { return nullptr; } - virtual kj::Maybe> getWorkerTracer() { + virtual kj::Maybe getWorkerTracer() { return kj::none; } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index bd0207d9c41..deec4246a26 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -578,7 +578,7 @@ kj::Promise>> PipelineTracer::onComplete() { return kj::mv(paf.promise); } -kj::Rc PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineLogLevel, +kj::Own PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel, kj::Maybe scriptId, kj::Maybe stableId, @@ -591,7 +591,7 @@ kj::Rc PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineL kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint), executionModel); traces.add(kj::addRef(*trace)); - return kj::rc(addRefToThis(), kj::mv(trace), pipelineLogLevel); + return kj::refcounted(kj::addRef(*this), kj::mv(trace), pipelineLogLevel); } void PipelineTracer::addTrace(rpc::Trace::Reader reader) { diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index e86d1892025..e41da6899cd 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -347,7 +347,7 @@ class PipelineTracer final: public kj::Refcounted, public kj::EnableAddRefToThis kj::Promise>> onComplete(); // Makes a tracer for a worker stage. - kj::Rc makeWorkerTracer(PipelineLogLevel pipelineLogLevel, + kj::Own makeWorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel, kj::Maybe scriptId, kj::Maybe stableId, diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index f9ac4e465a5..1bc8a212d42 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -229,7 +229,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, bool isActor = context.getActor() != kj::none; - KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { auto timestamp = context.now(); kj::String cfJson; KJ_IF_SOME(c, cfBlobJson) { @@ -253,7 +253,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, return Trace::FetchEventInfo::Header(kj::mv(entry.key), kj::strArray(entry.value, ", ")); }; - tracer->setEventInfo(timestamp, + t.setEventInfo(timestamp, Trace::FetchEventInfo(method, kj::str(url), kj::mv(cfJson), kj::mv(traceHeadersArray))); } @@ -474,7 +474,7 @@ kj::Promise WorkerEntrypoint::runScheduled( KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) { double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS; - t->setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron))); + t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron))); } // Scheduled handlers run entirely in waitUntil() tasks. @@ -532,7 +532,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( incomingRequest->delivered(); KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { - t->setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime)); + t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime)); } auto scheduleAlarmResult = co_await actor.scheduleAlarm(scheduledTime); diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index e423507e09c..7d023468977 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -139,7 +139,7 @@ void sendExceptionToInspector(jsg::Lock& js, void addExceptionToTrace(jsg::Lock& js, IoContext& ioContext, - kj::Rc& tracer, + WorkerTracer& tracer, UncaughtExceptionSource source, const jsg::JsValue& exception, const jsg::TypeHandler& errorTypeHandler) { @@ -214,7 +214,7 @@ void addExceptionToTrace(jsg::Lock& js, } // TODO(someday): Limit size of exception content? - tracer->addException(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack)); + tracer.addException(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack)); } void reportStartupError(kj::StringPtr id, @@ -1842,7 +1842,7 @@ void Worker::handleLog(jsg::Lock& js, auto& ioContext = IoContext::current(); KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { auto timestamp = ioContext.now(); - tracer->log(timestamp, level, message()); + tracer.log(timestamp, level, message()); } } diff --git a/src/workerd/util/own-util.h b/src/workerd/util/own-util.h index e9a2fb64b4c..9d2cbda0a61 100644 --- a/src/workerd/util/own-util.h +++ b/src/workerd/util/own-util.h @@ -14,11 +14,6 @@ inline auto mapAddRef(kj::Maybe>& maybe) -> kj::Maybe> { return maybe.map([](kj::Own& t) { return kj::addRef(*t); }); } -template -inline auto mapAddRef(kj::Maybe>& maybe) -> kj::Maybe> { - return maybe.map([](kj::Rc& t) { return t.addRef(); }); -} - template inline auto mapAddRef(kj::Maybe maybe) -> kj::Maybe> { return maybe.map([](T& t) { return kj::addRef(t); }); From a4a49c79d6fc6e7259c4972766380de9d22704bf Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 22 Oct 2024 09:31:57 -0700 Subject: [PATCH 2/2] Revert "Get the WorkerTracer via RequestObserver" This reverts commit f06baa7897313c455616dd4d7d46be8624eb4271. --- src/workerd/api/hibernatable-web-socket.c++ | 2 +- src/workerd/api/node/diagnostics-channel.c++ | 2 +- src/workerd/api/node/util.c++ | 2 +- src/workerd/api/queue.c++ | 2 +- src/workerd/api/trace.c++ | 2 +- src/workerd/api/worker-rpc.c++ | 15 +++++++++----- src/workerd/io/io-context.c++ | 6 ++++-- src/workerd/io/io-context.h | 13 +++++++++++- src/workerd/io/observer.h | 4 ---- src/workerd/io/trace.c++ | 2 +- src/workerd/io/worker-entrypoint.c++ | 21 ++++++++++++-------- src/workerd/io/worker-entrypoint.h | 1 + src/workerd/io/worker.c++ | 6 +++--- src/workerd/server/server.c++ | 3 ++- src/workerd/tests/test-fixture.c++ | 2 +- src/workerd/util/own-util.h | 5 +++++ 16 files changed, 57 insertions(+), 31 deletions(-) diff --git a/src/workerd/api/hibernatable-web-socket.c++ b/src/workerd/api/hibernatable-web-socket.c++ index bb534cd0173..ecb83fe7fe5 100644 --- a/src/workerd/api/hibernatable-web-socket.c++ +++ b/src/workerd/api/hibernatable-web-socket.c++ @@ -74,7 +74,7 @@ kj::Promise HibernatableWebSocketCustomEve auto eventParameters = consumeParams(); - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { Trace::HibernatableWebSocketEventInfo::Type type = [&]() -> Trace::HibernatableWebSocketEventInfo::Type { KJ_SWITCH_ONEOF(eventParameters.eventType) { diff --git a/src/workerd/api/node/diagnostics-channel.c++ b/src/workerd/api/node/diagnostics-channel.c++ index f8e72b1c03d..7e7923de50e 100644 --- a/src/workerd/api/node/diagnostics-channel.c++ +++ b/src/workerd/api/node/diagnostics-channel.c++ @@ -26,7 +26,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) { } auto& context = IoContext::current(); - KJ_IF_SOME(tracer, context.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, context.getWorkerTracer()) { jsg::Serializer ser(js, jsg::Serializer::Options{ .omitHeader = false, diff --git a/src/workerd/api/node/util.c++ b/src/workerd/api/node/util.c++ index e8a882bf854..5fe83665f23 100644 --- a/src/workerd/api/node/util.c++ +++ b/src/workerd/api/node/util.c++ @@ -249,7 +249,7 @@ namespace { kj::str("The Node.js process.exit(", code, ") API was called. Canceling the request."); auto& ioContext = IoContext::current(); // If we have a tail worker, let's report the error. - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { // Why create the error like this in tracing? Because we're adding the exception // to the trace and ideally we'd have the JS stack attached to it. Just using // JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index efb0fd44af7..c0cf06a3e43 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -530,7 +530,7 @@ kj::Promise QueueCustomEventImpl::run( } } - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize)); } diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 6c5f07b4271..c71cfe67864 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -603,7 +603,7 @@ kj::Promise sendTracesToExportedHandler(kj::OwngetContext(); auto& metrics = incomingRequest->getMetrics(); - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { t.setEventInfo(context.now(), Trace::TraceEventInfo(traces)); } diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index e4a49729614..863e23fca54 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -1602,11 +1602,14 @@ void RpcSerializerExternalHander::serializeFunction( // call of an RPC session. class EntrypointJsRpcTarget final: public JsRpcTargetBase { public: - EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe entrypointName) + EntrypointJsRpcTarget(IoContext& ioCtx, + kj::Maybe entrypointName, + kj::Maybe> tracer) : JsRpcTargetBase(ioCtx), // Most of the time we don't really have to clone this but it's hard to fully prove, so // let's be safe. - entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })) {} + entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })), + tracer(kj::mv(tracer)) {} TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override { jsg::Lock& js = lock; @@ -1644,6 +1647,7 @@ public: private: kj::Maybe entrypointName; + kj::Maybe> tracer; bool isReservedName(kj::StringPtr name) override { if ( // "fetch" and "connect" are treated specially on entrypoints. @@ -1666,8 +1670,8 @@ private: } void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override { - KJ_IF_SOME(t, ioctx.getMetrics().getWorkerTracer()) { - t.setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); + KJ_IF_SOME(t, tracer) { + t->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); } } }; @@ -1721,7 +1725,8 @@ kj::Promise JsRpcSessionCustomEventImpl::r incomingRequest->delivered(); auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller(); - capFulfiller->fulfill(capnp::membrane(kj::heap(ioctx, entrypointName), + capFulfiller->fulfill(capnp::membrane(kj::heap(ioctx, entrypointName, + mapAddRef(incomingRequest->getWorkerTracer())), kj::refcounted(kj::mv(doneFulfiller)))); KJ_DEFER({ diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 55dbfff94a0..0d0d1b46631 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -200,9 +200,11 @@ IoContext::IoContext(ThreadContext& thread, IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own contextParam, kj::Own ioChannelFactoryParam, - kj::Own metricsParam) + kj::Own metricsParam, + kj::Maybe> workerTracer) : context(kj::mv(contextParam)), metrics(kj::mv(metricsParam)), + workerTracer(kj::mv(workerTracer)), ioChannelFactory(kj::mv(ioChannelFactoryParam)) {} // A call to delivered() implies a promise to call drain() later (or one of the other methods @@ -338,7 +340,7 @@ void IoContext::logUncaughtException( void IoContext::logUncaughtExceptionAsync( UncaughtExceptionSource source, kj::Exception&& exception) { - if (getMetrics().getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) { + if (getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) { // We don't need to take the isolate lock as neither inspecting nor tracing is enabled. We // do still want to syslog if relevant, but we can do that without a lock. if (!jsg::isTunneledException(exception.getDescription()) && diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index cfaa3e1557b..b44a6aa07d5 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -107,7 +107,8 @@ class IoContext_IncomingRequest final { public: IoContext_IncomingRequest(kj::Own context, kj::Own ioChannelFactory, - kj::Own metrics); + kj::Own metrics, + kj::Maybe> workerTracer); KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest); ~IoContext_IncomingRequest() noexcept(false); @@ -153,9 +154,14 @@ class IoContext_IncomingRequest final { return *metrics; } + kj::Maybe getWorkerTracer() { + return workerTracer; + } + private: kj::Own context; kj::Own metrics; + kj::Maybe> workerTracer; kj::Own ioChannelFactory; bool wasDelivered = false; @@ -229,6 +235,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler return *getCurrentIncomingRequest().metrics; } + const kj::Maybe getWorkerTracer() { + if (incomingRequests.empty()) return kj::none; + return getCurrentIncomingRequest().getWorkerTracer(); + } + LimitEnforcer& getLimitEnforcer() { return *limitEnforcer; } diff --git a/src/workerd/io/observer.h b/src/workerd/io/observer.h index 66df85e9cc2..e1eed46e03c 100644 --- a/src/workerd/io/observer.h +++ b/src/workerd/io/observer.h @@ -103,10 +103,6 @@ class RequestObserver: public kj::Refcounted { return nullptr; } - virtual kj::Maybe getWorkerTracer() { - return kj::none; - } - virtual kj::Own addedContextTask() { return kj::Own(); } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index deec4246a26..5d53b782aae 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -591,7 +591,7 @@ kj::Own PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint), executionModel); traces.add(kj::addRef(*trace)); - return kj::refcounted(kj::addRef(*this), kj::mv(trace), pipelineLogLevel); + return kj::refcounted(addRefToThis(), kj::mv(trace), pipelineLogLevel); } void PipelineTracer::addTrace(rpc::Trace::Reader reader) { diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 1bc8a212d42..9b3bbc93fc0 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -52,6 +52,7 @@ public: kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson); kj::Promise request(kj::HttpMethod method, @@ -94,7 +95,8 @@ private: kj::Own limitEnforcer, kj::Own ioContextDependency, kj::Own ioChannelFactory, - kj::Own metrics); + kj::Own metrics, + kj::Maybe> workerTracer); template kj::Promise maybeAddGcPassForTest(IoContext& context, kj::Promise promise); @@ -155,12 +157,13 @@ kj::Own WorkerEntrypoint::construct(ThreadContext& threadContex kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson) { TRACE_EVENT("workerd", "WorkerEntrypoint::construct()"); auto obj = kj::heap(kj::Badge(), threadContext, waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson)); obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), - kj::mv(ioChannelFactory), kj::addRef(*metrics)); + kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer)); auto& wrapper = metrics->wrapWorkerInterface(*obj); return kj::attachRef(wrapper, kj::mv(obj), kj::mv(metrics)); } @@ -182,7 +185,8 @@ void WorkerEntrypoint::init(kj::Own worker, kj::Own limitEnforcer, kj::Own ioContextDependency, kj::Own ioChannelFactory, - kj::Own metrics) { + kj::Own metrics, + kj::Maybe> workerTracer) { TRACE_EVENT("workerd", "WorkerEntrypoint::init()"); // We need to construct the IoContext -- unless this is an actor and it already has a // IoContext, in which case we reuse it. @@ -208,7 +212,7 @@ void WorkerEntrypoint::init(kj::Own worker, } incomingRequest = kj::heap( - kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics)) + kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics), kj::mv(workerTracer)) .attach(kj::mv(actor)); } @@ -229,7 +233,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, bool isActor = context.getActor() != kj::none; - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { auto timestamp = context.now(); kj::String cfJson; KJ_IF_SOME(c, cfBlobJson) { @@ -472,7 +476,7 @@ kj::Promise WorkerEntrypoint::runScheduled( // calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have // to think more about this. - KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, context.getWorkerTracer()) { double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS; t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron))); } @@ -531,7 +535,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( // There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events). incomingRequest->delivered(); - KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime)); } @@ -711,10 +715,11 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson) { return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory), - kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(cfBlobJson)); + kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson)); } } // namespace workerd diff --git a/src/workerd/io/worker-entrypoint.h b/src/workerd/io/worker-entrypoint.h index ac3fc8e4e5a..e5213e8d6e5 100644 --- a/src/workerd/io/worker-entrypoint.h +++ b/src/workerd/io/worker-entrypoint.h @@ -33,6 +33,7 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, + kj::Maybe> workerTracer, kj::Maybe cfBlobJson); } // namespace workerd diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 7d023468977..753690988f0 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -1047,7 +1047,7 @@ Worker::Isolate::Isolate(kj::Own apiParam, // Only add exception to trace when running within an I/O context with a tracer. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER, error, api->getErrorInterfaceTypeHandler(js)); } @@ -1840,7 +1840,7 @@ void Worker::handleLog(jsg::Lock& js, // Only check tracing if console.log() was not invoked at the top level. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { auto timestamp = ioContext.now(); tracer.log(timestamp, level, message()); } @@ -2039,7 +2039,7 @@ void Worker::Lock::logUncaughtException( // Only add exception to trace when running within an I/O context with a tracer. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { JSG_WITHIN_CONTEXT_SCOPE(*this, getContext(), [&](jsg::Lock& js) { addExceptionToTrace(impl->inner, ioContext, tracer, source, exception, worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this)); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index a9d0859c576..ab8083dd9fe 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1541,7 +1541,8 @@ public: kj::Own(this, kj::NullDisposer::instance), kj::refcounted(), // default observer makes no observations waitUntilTasks, - true, // tunnelExceptions + true, // tunnelExceptions + kj::none, // workerTracer kj::mv(metadata.cfBlobJson)); } diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index 995697cb2e4..3aec147aefd 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -410,7 +410,7 @@ kj::Own TestFixture::createIncomingRequest() { auto context = kj::refcounted( threadContext, kj::atomicAddRef(*worker), actor, kj::heap()); auto incomingRequest = kj::heap(kj::addRef(*context), - kj::heap(*timerChannel), kj::refcounted()); + kj::heap(*timerChannel), kj::refcounted(), nullptr); incomingRequest->delivered(); return incomingRequest; } diff --git a/src/workerd/util/own-util.h b/src/workerd/util/own-util.h index 9d2cbda0a61..e9a2fb64b4c 100644 --- a/src/workerd/util/own-util.h +++ b/src/workerd/util/own-util.h @@ -14,6 +14,11 @@ inline auto mapAddRef(kj::Maybe>& maybe) -> kj::Maybe> { return maybe.map([](kj::Own& t) { return kj::addRef(*t); }); } +template +inline auto mapAddRef(kj::Maybe>& maybe) -> kj::Maybe> { + return maybe.map([](kj::Rc& t) { return t.addRef(); }); +} + template inline auto mapAddRef(kj::Maybe maybe) -> kj::Maybe> { return maybe.map([](T& t) { return kj::addRef(t); });