Skip to content

Commit

Permalink
Merge pull request #2974 from cloudflare/jsnell/revert-workerd-2957
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored Oct 22, 2024
2 parents 26a707b + a4a49c7 commit d570c21
Show file tree
Hide file tree
Showing 16 changed files with 65 additions and 44 deletions.
4 changes: 2 additions & 2 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve

auto eventParameters = consumeParams();

KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
Trace::HibernatableWebSocketEventInfo::Type type =
[&]() -> Trace::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
Expand All @@ -95,7 +95,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
KJ_UNREACHABLE;
}();

tracer->setEventInfo(context.now(), Trace::HibernatableWebSocketEventInfo(kj::mv(type)));
t.setEventInfo(context.now(), Trace::HibernatableWebSocketEventInfo(kj::mv(type)));
}

try {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/node/diagnostics-channel.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) {
}

auto& context = IoContext::current();
KJ_IF_SOME(tracer, context.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, context.getWorkerTracer()) {
jsg::Serializer ser(js,
jsg::Serializer::Options{
.omitHeader = false,
Expand All @@ -37,7 +37,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) {
Error,
"Diagnostic events cannot be published with SharedArrayBuffer or "
"transferred ArrayBuffer instances");
tracer->addDiagnosticChannelEvent(context.now(), name.toString(js), kj::mv(tmp.data));
tracer.addDiagnosticChannelEvent(context.now(), name.toString(js), kj::mv(tmp.data));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/node/util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ namespace {
kj::str("The Node.js process.exit(", code, ") API was called. Canceling the request.");
auto& ioContext = IoContext::current();
// If we have a tail worker, let's report the error.
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
// Why create the error like this in tracing? Because we're adding the exception
// to the trace and ideally we'd have the JS stack attached to it. Just using
// JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost
// of creating and capturing the stack when we actually need it.
auto ex = KJ_ASSERT_NONNULL(js.error(message).tryCast<jsg::JsObject>());
tracer->addException(ioContext.now(), ex.get(js, "name"_kj).toString(js),
tracer.addException(ioContext.now(), ex.get(js, "name"_kj).toString(js),
ex.get(js, "message"_kj).toString(js), ex.get(js, "stack"_kj).toString(js));
ioContext.abort(js.exceptionToKj(ex));
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
}
}

KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
tracer->setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
}

// Create a custom refcounted type for holding the queueEvent so that we can pass it to the
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
auto& context = incomingRequest->getContext();
auto& metrics = incomingRequest->getMetrics();

KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
tracer->setEventInfo(context.now(), Trace::TraceEventInfo(traces));
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::TraceEventInfo(traces));
}

auto nonEmptyTraces = kj::Vector<kj::Own<Trace>>(kj::size(traces));
Expand Down
15 changes: 10 additions & 5 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1602,11 +1602,14 @@ void RpcSerializerExternalHander::serializeFunction(
// call of an RPC session.
class EntrypointJsRpcTarget final: public JsRpcTargetBase {
public:
EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe<kj::StringPtr> entrypointName)
EntrypointJsRpcTarget(IoContext& ioCtx,
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<kj::Own<WorkerTracer>> tracer)
: JsRpcTargetBase(ioCtx),
// Most of the time we don't really have to clone this but it's hard to fully prove, so
// let's be safe.
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })) {}
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })),
tracer(kj::mv(tracer)) {}

TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override {
jsg::Lock& js = lock;
Expand Down Expand Up @@ -1644,6 +1647,7 @@ public:

private:
kj::Maybe<kj::String> entrypointName;
kj::Maybe<kj::Own<WorkerTracer>> tracer;

bool isReservedName(kj::StringPtr name) override {
if ( // "fetch" and "connect" are treated specially on entrypoints.
Expand All @@ -1666,8 +1670,8 @@ private:
}

void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override {
KJ_IF_SOME(tracer, ioctx.getMetrics().getWorkerTracer()) {
tracer->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
KJ_IF_SOME(t, tracer) {
t->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
}
}
};
Expand Down Expand Up @@ -1721,7 +1725,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::r
incomingRequest->delivered();

auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller<void>();
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName),
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName,
mapAddRef(incomingRequest->getWorkerTracer())),
kj::refcounted<ServerTopLevelMembrane>(kj::mv(doneFulfiller))));

KJ_DEFER({
Expand Down
6 changes: 4 additions & 2 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ IoContext::IoContext(ThreadContext& thread,

IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own<IoContext> contextParam,
kj::Own<IoChannelFactory> ioChannelFactoryParam,
kj::Own<RequestObserver> metricsParam)
kj::Own<RequestObserver> metricsParam,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer)
: context(kj::mv(contextParam)),
metrics(kj::mv(metricsParam)),
workerTracer(kj::mv(workerTracer)),
ioChannelFactory(kj::mv(ioChannelFactoryParam)) {}

// A call to delivered() implies a promise to call drain() later (or one of the other methods
Expand Down Expand Up @@ -338,7 +340,7 @@ void IoContext::logUncaughtException(

void IoContext::logUncaughtExceptionAsync(
UncaughtExceptionSource source, kj::Exception&& exception) {
if (getMetrics().getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
if (getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
// We don't need to take the isolate lock as neither inspecting nor tracing is enabled. We
// do still want to syslog if relevant, but we can do that without a lock.
if (!jsg::isTunneledException(exception.getDescription()) &&
Expand Down
13 changes: 12 additions & 1 deletion src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class IoContext_IncomingRequest final {
public:
IoContext_IncomingRequest(kj::Own<IoContext> context,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics);
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);
KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest);
~IoContext_IncomingRequest() noexcept(false);

Expand Down Expand Up @@ -153,9 +154,14 @@ class IoContext_IncomingRequest final {
return *metrics;
}

kj::Maybe<WorkerTracer&> getWorkerTracer() {
return workerTracer;
}

private:
kj::Own<IoContext> context;
kj::Own<RequestObserver> metrics;
kj::Maybe<kj::Own<WorkerTracer>> workerTracer;
kj::Own<IoChannelFactory> ioChannelFactory;

bool wasDelivered = false;
Expand Down Expand Up @@ -229,6 +235,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
return *getCurrentIncomingRequest().metrics;
}

const kj::Maybe<WorkerTracer&> getWorkerTracer() {
if (incomingRequests.empty()) return kj::none;
return getCurrentIncomingRequest().getWorkerTracer();
}

LimitEnforcer& getLimitEnforcer() {
return *limitEnforcer;
}
Expand Down
4 changes: 0 additions & 4 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ class RequestObserver: public kj::Refcounted {
return nullptr;
}

virtual kj::Maybe<kj::Rc<WorkerTracer>> getWorkerTracer() {
return kj::none;
}

virtual kj::Own<void> addedContextTask() {
return kj::Own<void>();
}
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ kj::Promise<kj::Array<kj::Own<Trace>>> PipelineTracer::onComplete() {
return kj::mv(paf.promise);
}

kj::Rc<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptId,
kj::Maybe<kj::String> stableId,
Expand All @@ -591,7 +591,7 @@ kj::Rc<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineL
kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint),
executionModel);
traces.add(kj::addRef(*trace));
return kj::rc<WorkerTracer>(addRefToThis(), kj::mv(trace), pipelineLogLevel);
return kj::refcounted<WorkerTracer>(addRefToThis(), kj::mv(trace), pipelineLogLevel);
}

void PipelineTracer::addTrace(rpc::Trace::Reader reader) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class PipelineTracer final: public kj::Refcounted, public kj::EnableAddRefToThis
kj::Promise<kj::Array<kj::Own<Trace>>> onComplete();

// Makes a tracer for a worker stage.
kj::Rc<WorkerTracer> makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
kj::Own<WorkerTracer> makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptId,
kj::Maybe<kj::String> stableId,
Expand Down
27 changes: 16 additions & 11 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public:
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

kj::Promise<void> request(kj::HttpMethod method,
Expand Down Expand Up @@ -94,7 +95,8 @@ private:
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics);
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);

template <typename T>
kj::Promise<T> maybeAddGcPassForTest(IoContext& context, kj::Promise<T> promise);
Expand Down Expand Up @@ -155,12 +157,13 @@ kj::Own<WorkerInterface> WorkerEntrypoint::construct(ThreadContext& threadContex
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
TRACE_EVENT("workerd", "WorkerEntrypoint::construct()");
auto obj = kj::heap<WorkerEntrypoint>(kj::Badge<WorkerEntrypoint>(), threadContext,
waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson));
obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency),
kj::mv(ioChannelFactory), kj::addRef(*metrics));
kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer));
auto& wrapper = metrics->wrapWorkerInterface(*obj);
return kj::attachRef(wrapper, kj::mv(obj), kj::mv(metrics));
}
Expand All @@ -182,7 +185,8 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics) {
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer) {
TRACE_EVENT("workerd", "WorkerEntrypoint::init()");
// We need to construct the IoContext -- unless this is an actor and it already has a
// IoContext, in which case we reuse it.
Expand All @@ -208,7 +212,7 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
}

incomingRequest = kj::heap<IoContext::IncomingRequest>(
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics))
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics), kj::mv(workerTracer))
.attach(kj::mv(actor));
}

Expand All @@ -229,7 +233,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,

bool isActor = context.getActor() != kj::none;

KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
auto timestamp = context.now();
kj::String cfJson;
KJ_IF_SOME(c, cfBlobJson) {
Expand All @@ -253,7 +257,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
return Trace::FetchEventInfo::Header(kj::mv(entry.key), kj::strArray(entry.value, ", "));
};

tracer->setEventInfo(timestamp,
t.setEventInfo(timestamp,
Trace::FetchEventInfo(method, kj::str(url), kj::mv(cfJson), kj::mv(traceHeadersArray)));
}

Expand Down Expand Up @@ -472,9 +476,9 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
// calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have
// to think more about this.

KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(t, context.getWorkerTracer()) {
double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;
t->setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron)));
t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron)));
}

// Scheduled handlers run entirely in waitUntil() tasks.
Expand Down Expand Up @@ -531,8 +535,8 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
// There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events).
incomingRequest->delivered();

KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
t->setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime));
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime));
}

auto scheduleAlarmResult = co_await actor.scheduleAlarm(scheduledTime);
Expand Down Expand Up @@ -711,10 +715,11 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName),
kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory),
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(cfBlobJson));
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson));
}

} // namespace workerd
1 change: 1 addition & 0 deletions src/workerd/io/worker-entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

} // namespace workerd
12 changes: 6 additions & 6 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void sendExceptionToInspector(jsg::Lock& js,

void addExceptionToTrace(jsg::Lock& js,
IoContext& ioContext,
kj::Rc<WorkerTracer>& tracer,
WorkerTracer& tracer,
UncaughtExceptionSource source,
const jsg::JsValue& exception,
const jsg::TypeHandler<Worker::Api::ErrorInterface>& errorTypeHandler) {
Expand Down Expand Up @@ -214,7 +214,7 @@ void addExceptionToTrace(jsg::Lock& js,
}

// TODO(someday): Limit size of exception content?
tracer->addException(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack));
tracer.addException(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack));
}

void reportStartupError(kj::StringPtr id,
Expand Down Expand Up @@ -1047,7 +1047,7 @@ Worker::Isolate::Isolate(kj::Own<Api> apiParam,
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER,
error, api->getErrorInterfaceTypeHandler(js));
}
Expand Down Expand Up @@ -1840,9 +1840,9 @@ void Worker::handleLog(jsg::Lock& js,
// Only check tracing if console.log() was not invoked at the top level.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
auto timestamp = ioContext.now();
tracer->log(timestamp, level, message());
tracer.log(timestamp, level, message());
}
}

Expand Down Expand Up @@ -2039,7 +2039,7 @@ void Worker::Lock::logUncaughtException(
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
JSG_WITHIN_CONTEXT_SCOPE(*this, getContext(), [&](jsg::Lock& js) {
addExceptionToTrace(impl->inner, ioContext, tracer, source, exception,
worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this));
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1541,7 +1541,8 @@ public:
kj::Own<IoChannelFactory>(this, kj::NullDisposer::instance),
kj::refcounted<RequestObserver>(), // default observer makes no observations
waitUntilTasks,
true, // tunnelExceptions
true, // tunnelExceptions
kj::none, // workerTracer
kj::mv(metadata.cfBlobJson));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ kj::Own<IoContext::IncomingRequest> TestFixture::createIncomingRequest() {
auto context = kj::refcounted<IoContext>(
threadContext, kj::atomicAddRef(*worker), actor, kj::heap<MockLimitEnforcer>());
auto incomingRequest = kj::heap<IoContext::IncomingRequest>(kj::addRef(*context),
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>());
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>(), nullptr);
incomingRequest->delivered();
return incomingRequest;
}
Expand Down

0 comments on commit d570c21

Please sign in to comment.