Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fhanau committed Dec 30, 2024
1 parent a673c3b commit d368098
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 57 deletions.
6 changes: 4 additions & 2 deletions src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
KJ_UNREACHABLE;
}();

auto client = context.getHttpClientWithSpans<std::initializer_list>(
subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}});
kj::Vector<SpanTagParams> tags;
tags.add("db.system"_kjc, kj::str("cloudflare-kv"_kjc));
auto client = context.getHttpClientWithSpans(
subrequestChannel, true, kj::none, operationName, kj::mv(tags));
headers.add(FLPROD_405_HEADER, urlStr);
for (const auto& header: additionalHeaders) {
headers.add(header.name.asPtr(), header.value.asPtr());
Expand Down
23 changes: 10 additions & 13 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ kj::Own<kj::HttpClient> r2GetClient(IoContext& context,
kj::LiteralStringConst method,
kj::Maybe<kj::StringPtr> bucket,
kj::Maybe<StringTagParams> extraTag) {
kj::Vector<IoContext::SpanTagParams> tags;
tags.add(IoContext::SpanTagParams{"rpc.service"_kjc, "r2"_kjc});
tags.add(IoContext::SpanTagParams{"rpc.method"_kjc, kj::str(method)});
kj::Vector<SpanTagParams> tags;
tags.add("rpc.service"_kjc, kj::str("r2"_kjc));
tags.add("rpc.method"_kjc, kj::str(method));
KJ_IF_SOME(b, bucket) {
tags.add(IoContext::SpanTagParams{"cloudflare.r2.bucket"_kjc, kj::str(b)});
tags.add("cloudflare.r2.bucket"_kjc, kj::str(b));
}
KJ_IF_SOME(tag, extraTag) {
tags.add(IoContext::SpanTagParams{tag.key, kj::str(tag.value)});
tags.add(tag.key, kj::str(tag.value));
}

return context.getHttpClientWithSpans(subrequestChannel, true, kj::none, op, kj::mv(tags));
Expand Down Expand Up @@ -397,9 +397,8 @@ R2Bucket::get(jsg::Lock& js,
return js.evalNow([&] {
auto& context = IoContext::current();

auto client =
r2GetClient(context, clientIndex, "r2_get"_kjc, "GetObject"_kjc, this->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.key"_kjc, name.asPtr()}));
auto client = r2GetClient(context, clientIndex, "r2_get"_kjc, "GetObject"_kjc,
this->adminBucketName(), StringTagParams{"cloudflare.r2.key"_kjc, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -461,9 +460,8 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::put(jsg::Lock&
});

auto& context = IoContext::current();
auto client =
r2GetClient(context, clientIndex, "r2_put"_kjc, "PutObject"_kjc, this->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.key"_kjc, name.asPtr()}));
auto client = r2GetClient(context, clientIndex, "r2_put"_kjc, "PutObject"_kjc,
this->adminBucketName(), StringTagParams{"cloudflare.r2.key"_kjc, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -765,8 +763,7 @@ jsg::Promise<void> R2Bucket::delete_(jsg::Lock& js,
KJ_UNREACHABLE;
};
auto client = r2GetClient(context, clientIndex, "r2_delete"_kjc, "DeleteObject"_kjc,
this->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.delete"_kjc, deleteKey().asPtr()}));
this->adminBucketName(), StringTagParams{"cloudflare.r2.delete"_kjc, deleteKey().asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jsg::Promise<R2MultipartUpload::UploadedPart> R2MultipartUpload::uploadPart(jsg:
auto& context = IoContext::current();
auto client = r2GetClient(context, this->bucket->clientIndex, "r2_uploadPart"_kjc,
"UploadPart"_kjc, this->bucket->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}));
StringTagParams{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -98,7 +98,7 @@ jsg::Promise<jsg::Ref<R2Bucket::HeadResult>> R2MultipartUpload::complete(jsg::Lo
auto& context = IoContext::current();
auto client = r2GetClient(context, this->bucket->clientIndex, "r2_completeMultipartUpload"_kjc,
"CompleteMultipartUpload"_kjc, this->bucket->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}));
StringTagParams{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -149,7 +149,7 @@ jsg::Promise<void> R2MultipartUpload::abort(
auto& context = IoContext::current();
auto client = r2GetClient(context, this->bucket->clientIndex, "r2_abortMultipartUpload"_kjc,
"AbortMultipartUpload"_kjc, this->bucket->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}));
StringTagParams{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
30 changes: 4 additions & 26 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -835,23 +835,15 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannel(
});
}

template <template <class> class T>
kj::Own<WorkerInterface> IoContext::getSubrequestChannelWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
T<SpanTagParams> tags) {
kj::Vector<SpanTagParams> tags) {
return getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& channelFactory) {
for (const SpanTagParams& tag: tags) {
KJ_SWITCH_ONEOF(tag.value) {
KJ_CASE_ONEOF(c, kj::LiteralStringConst) {
tracing.userSpan.setTag(kj::mv(tag.key), kj::str(c));
}
KJ_CASE_ONEOF(str, kj::String) {
tracing.userSpan.setTag(kj::mv(tag.key), kj::str(str));
}
}
for (SpanTagParams& tag: tags) {
tracing.userSpan.setTag(kj::mv(tag.key), kj::mv(tag.value));
}
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory);
Expand Down Expand Up @@ -901,29 +893,15 @@ kj::Own<kj::HttpClient> IoContext::getHttpClient(
getSubrequestChannel(channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName)));
}

template <template <class> class T>
kj::Own<kj::HttpClient> IoContext::getHttpClientWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
T<SpanTagParams> tags) {
kj::Vector<SpanTagParams> tags) {
return asHttpClient(getSubrequestChannelWithSpans(
channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName), kj::mv(tags)));
}

// We don't want to declare the templated methods in the header – just generate code for the two
// supported container types explicitly.
template kj::Own<kj::HttpClient> IoContext::getHttpClientWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
kj::Vector<SpanTagParams> tags);
template kj::Own<kj::HttpClient> IoContext::getHttpClientWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags);

kj::Own<kj::HttpClient> IoContext::getHttpClientNoChecks(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
Expand Down
21 changes: 8 additions & 13 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ class WarningAggregator final: public kj::AtomicRefcounted {
kj::MutexGuarded<kj::Vector<kj::Own<WarningContext>>> warnings;
};

// Used to add span tags within user tracing.
struct SpanTagParams {
kj::LiteralStringConst key;
kj::String value;
};

// Represents one incoming request being handled by a IoContext. In non-actor scenarios,
// there is only ever one IncomingRequest per IoContext, but with actors there could be many.
//
Expand Down Expand Up @@ -710,21 +716,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName);

// As above, but with list of span tags to add.
// TODO(o11y): For now this only supports literal values based on initializer_list constraints.
// Add syntactic sugar to kj::vector so that we can pass in a vector more ergonomically and use
// that instead to support other value types.
struct SpanTagParams {
kj::LiteralStringConst key;
kj::OneOf<kj::LiteralStringConst, kj::String> value;
};

template <template <class> class T>
kj::Own<WorkerInterface> getSubrequestChannelWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
T<SpanTagParams> tags);
kj::Vector<SpanTagParams> tags);

// Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only.
kj::Own<WorkerInterface> getSubrequestChannelNoChecks(uint channel,
Expand All @@ -740,12 +736,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
kj::ConstString operationName);

// As above, but with list of span tags to add, analogous to getSubrequestChannelWithSpans().
template <template <class> class T>
kj::Own<kj::HttpClient> getHttpClientWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
T<SpanTagParams> tags);
kj::Vector<SpanTagParams> tags);

// Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects
// to HttpClient.
Expand Down

0 comments on commit d368098

Please sign in to comment.