Skip to content

Commit

Permalink
Revert "Get the WorkerTracer via RequestObserver"
Browse files Browse the repository at this point in the history
This reverts commit f06baa7.
  • Loading branch information
jasnell committed Oct 22, 2024
1 parent 52b29ed commit a4a49c7
Show file tree
Hide file tree
Showing 16 changed files with 57 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve

auto eventParameters = consumeParams();

KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
Trace::HibernatableWebSocketEventInfo::Type type =
[&]() -> Trace::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/node/diagnostics-channel.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) {
}

auto& context = IoContext::current();
KJ_IF_SOME(tracer, context.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, context.getWorkerTracer()) {
jsg::Serializer ser(js,
jsg::Serializer::Options{
.omitHeader = false,
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/node/util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ namespace {
kj::str("The Node.js process.exit(", code, ") API was called. Canceling the request.");
auto& ioContext = IoContext::current();
// If we have a tail worker, let's report the error.
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
// Why create the error like this in tracing? Because we're adding the exception
// to the trace and ideally we'd have the JS stack attached to it. Just using
// JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
}
}

KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
auto& context = incomingRequest->getContext();
auto& metrics = incomingRequest->getMetrics();

KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::TraceEventInfo(traces));
}

Expand Down
15 changes: 10 additions & 5 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1602,11 +1602,14 @@ void RpcSerializerExternalHander::serializeFunction(
// call of an RPC session.
class EntrypointJsRpcTarget final: public JsRpcTargetBase {
public:
EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe<kj::StringPtr> entrypointName)
EntrypointJsRpcTarget(IoContext& ioCtx,
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<kj::Own<WorkerTracer>> tracer)
: JsRpcTargetBase(ioCtx),
// Most of the time we don't really have to clone this but it's hard to fully prove, so
// let's be safe.
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })) {}
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })),
tracer(kj::mv(tracer)) {}

TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override {
jsg::Lock& js = lock;
Expand Down Expand Up @@ -1644,6 +1647,7 @@ public:

private:
kj::Maybe<kj::String> entrypointName;
kj::Maybe<kj::Own<WorkerTracer>> tracer;

bool isReservedName(kj::StringPtr name) override {
if ( // "fetch" and "connect" are treated specially on entrypoints.
Expand All @@ -1666,8 +1670,8 @@ private:
}

void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override {
KJ_IF_SOME(t, ioctx.getMetrics().getWorkerTracer()) {
t.setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
KJ_IF_SOME(t, tracer) {
t->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
}
}
};
Expand Down Expand Up @@ -1721,7 +1725,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::r
incomingRequest->delivered();

auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller<void>();
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName),
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName,
mapAddRef(incomingRequest->getWorkerTracer())),
kj::refcounted<ServerTopLevelMembrane>(kj::mv(doneFulfiller))));

KJ_DEFER({
Expand Down
6 changes: 4 additions & 2 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ IoContext::IoContext(ThreadContext& thread,

IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own<IoContext> contextParam,
kj::Own<IoChannelFactory> ioChannelFactoryParam,
kj::Own<RequestObserver> metricsParam)
kj::Own<RequestObserver> metricsParam,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer)
: context(kj::mv(contextParam)),
metrics(kj::mv(metricsParam)),
workerTracer(kj::mv(workerTracer)),
ioChannelFactory(kj::mv(ioChannelFactoryParam)) {}

// A call to delivered() implies a promise to call drain() later (or one of the other methods
Expand Down Expand Up @@ -338,7 +340,7 @@ void IoContext::logUncaughtException(

void IoContext::logUncaughtExceptionAsync(
UncaughtExceptionSource source, kj::Exception&& exception) {
if (getMetrics().getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
if (getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
// We don't need to take the isolate lock as neither inspecting nor tracing is enabled. We
// do still want to syslog if relevant, but we can do that without a lock.
if (!jsg::isTunneledException(exception.getDescription()) &&
Expand Down
13 changes: 12 additions & 1 deletion src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class IoContext_IncomingRequest final {
public:
IoContext_IncomingRequest(kj::Own<IoContext> context,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics);
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);
KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest);
~IoContext_IncomingRequest() noexcept(false);

Expand Down Expand Up @@ -153,9 +154,14 @@ class IoContext_IncomingRequest final {
return *metrics;
}

kj::Maybe<WorkerTracer&> getWorkerTracer() {
return workerTracer;
}

private:
kj::Own<IoContext> context;
kj::Own<RequestObserver> metrics;
kj::Maybe<kj::Own<WorkerTracer>> workerTracer;
kj::Own<IoChannelFactory> ioChannelFactory;

bool wasDelivered = false;
Expand Down Expand Up @@ -229,6 +235,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
return *getCurrentIncomingRequest().metrics;
}

const kj::Maybe<WorkerTracer&> getWorkerTracer() {
if (incomingRequests.empty()) return kj::none;
return getCurrentIncomingRequest().getWorkerTracer();
}

LimitEnforcer& getLimitEnforcer() {
return *limitEnforcer;
}
Expand Down
4 changes: 0 additions & 4 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ class RequestObserver: public kj::Refcounted {
return nullptr;
}

virtual kj::Maybe<WorkerTracer&> getWorkerTracer() {
return kj::none;
}

virtual kj::Own<void> addedContextTask() {
return kj::Own<void>();
}
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline
kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint),
executionModel);
traces.add(kj::addRef(*trace));
return kj::refcounted<WorkerTracer>(kj::addRef(*this), kj::mv(trace), pipelineLogLevel);
return kj::refcounted<WorkerTracer>(addRefToThis(), kj::mv(trace), pipelineLogLevel);
}

void PipelineTracer::addTrace(rpc::Trace::Reader reader) {
Expand Down
21 changes: 13 additions & 8 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public:
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

kj::Promise<void> request(kj::HttpMethod method,
Expand Down Expand Up @@ -94,7 +95,8 @@ private:
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics);
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);

template <typename T>
kj::Promise<T> maybeAddGcPassForTest(IoContext& context, kj::Promise<T> promise);
Expand Down Expand Up @@ -155,12 +157,13 @@ kj::Own<WorkerInterface> WorkerEntrypoint::construct(ThreadContext& threadContex
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
TRACE_EVENT("workerd", "WorkerEntrypoint::construct()");
auto obj = kj::heap<WorkerEntrypoint>(kj::Badge<WorkerEntrypoint>(), threadContext,
waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson));
obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency),
kj::mv(ioChannelFactory), kj::addRef(*metrics));
kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer));
auto& wrapper = metrics->wrapWorkerInterface(*obj);
return kj::attachRef(wrapper, kj::mv(obj), kj::mv(metrics));
}
Expand All @@ -182,7 +185,8 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics) {
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer) {
TRACE_EVENT("workerd", "WorkerEntrypoint::init()");
// We need to construct the IoContext -- unless this is an actor and it already has a
// IoContext, in which case we reuse it.
Expand All @@ -208,7 +212,7 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
}

incomingRequest = kj::heap<IoContext::IncomingRequest>(
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics))
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics), kj::mv(workerTracer))
.attach(kj::mv(actor));
}

Expand All @@ -229,7 +233,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,

bool isActor = context.getActor() != kj::none;

KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
auto timestamp = context.now();
kj::String cfJson;
KJ_IF_SOME(c, cfBlobJson) {
Expand Down Expand Up @@ -472,7 +476,7 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
// calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have
// to think more about this.

KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, context.getWorkerTracer()) {
double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;
t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron)));
}
Expand Down Expand Up @@ -531,7 +535,7 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
// There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events).
incomingRequest->delivered();

KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime));
}

Expand Down Expand Up @@ -711,10 +715,11 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName),
kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory),
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(cfBlobJson));
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson));
}

} // namespace workerd
1 change: 1 addition & 0 deletions src/workerd/io/worker-entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

} // namespace workerd
6 changes: 3 additions & 3 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ Worker::Isolate::Isolate(kj::Own<Api> apiParam,
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER,
error, api->getErrorInterfaceTypeHandler(js));
}
Expand Down Expand Up @@ -1840,7 +1840,7 @@ void Worker::handleLog(jsg::Lock& js,
// Only check tracing if console.log() was not invoked at the top level.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
auto timestamp = ioContext.now();
tracer.log(timestamp, level, message());
}
Expand Down Expand Up @@ -2039,7 +2039,7 @@ void Worker::Lock::logUncaughtException(
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
JSG_WITHIN_CONTEXT_SCOPE(*this, getContext(), [&](jsg::Lock& js) {
addExceptionToTrace(impl->inner, ioContext, tracer, source, exception,
worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this));
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1541,7 +1541,8 @@ public:
kj::Own<IoChannelFactory>(this, kj::NullDisposer::instance),
kj::refcounted<RequestObserver>(), // default observer makes no observations
waitUntilTasks,
true, // tunnelExceptions
true, // tunnelExceptions
kj::none, // workerTracer
kj::mv(metadata.cfBlobJson));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ kj::Own<IoContext::IncomingRequest> TestFixture::createIncomingRequest() {
auto context = kj::refcounted<IoContext>(
threadContext, kj::atomicAddRef(*worker), actor, kj::heap<MockLimitEnforcer>());
auto incomingRequest = kj::heap<IoContext::IncomingRequest>(kj::addRef(*context),
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>());
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>(), nullptr);
incomingRequest->delivered();
return incomingRequest;
}
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/util/own-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ inline auto mapAddRef(kj::Maybe<kj::Own<T>>& maybe) -> kj::Maybe<kj::Own<T>> {
return maybe.map([](kj::Own<T>& t) { return kj::addRef(*t); });
}

template <typename T>
inline auto mapAddRef(kj::Maybe<kj::Rc<T>>& maybe) -> kj::Maybe<kj::Rc<T>> {
return maybe.map([](kj::Rc<T>& t) { return t.addRef(); });
}

template <typename T>
inline auto mapAddRef(kj::Maybe<T&> maybe) -> kj::Maybe<kj::Own<T>> {
return maybe.map([](T& t) { return kj::addRef(t); });
Expand Down

0 comments on commit a4a49c7

Please sign in to comment.