Skip to content

Commit

Permalink
[o11y] Add remaining R2 span tags
Browse files Browse the repository at this point in the history
  • Loading branch information
fhanau committed Dec 18, 2024
1 parent 7d0e7cd commit a8d937b
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 21 deletions.
14 changes: 6 additions & 8 deletions src/workerd/api/r2-admin.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ jsg::Ref<R2Bucket> R2Admin::get(jsg::Lock& js, kj::String bucketName) {
jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket here.
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}});
auto client =
r2GetClient(context, subrequestChannel, "r2_create"_kjc, "CreateBucket"_kjc, name.asPtr());

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -59,8 +58,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType,
CompatibilityFlags::Reader flags) {
auto& context = IoContext::current();
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}});
auto client =
r2GetClient(context, subrequestChannel, "r2_create"_kjc, "CreateBucket"_kjc, kj::none);

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -116,9 +115,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
jsg::Promise<void> R2Admin::delete_(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}});
auto client =
r2GetClient(context, subrequestChannel, "r2_delete"_kjc, "DeleteBucket"_kjc, name.asPtr());

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
52 changes: 45 additions & 7 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@
#include <regex>

namespace workerd::api::public_beta {
kj::Own<kj::HttpClient> r2GetClient(IoContext& context,
uint subrequestChannel,
kj::LiteralStringConst op,
kj::LiteralStringConst method,
kj::Maybe<kj::StringPtr> bucket,
kj::Maybe<StringTagParams> extraTag) {
return context.getHttpClientWithCallback(
subrequestChannel, true, kj::none, op, [&](TraceContext& tracing) {
tracing.userSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj));
tracing.userSpan.setTag("rpc.method"_kjc, kj::str(method));
KJ_IF_SOME(b, bucket) {
tracing.userSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b));
}
KJ_IF_SOME(tag, extraTag) {
tracing.userSpan.setTag(tag.key, kj::str(tag.value));
}
});
}

static bool isWholeNumber(double x) {
double intpart;
return modf(x, &intpart) == 0;
Expand Down Expand Up @@ -341,7 +360,8 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::head(jsg::Lock
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client =
r2GetClient(context, clientIndex, "r2_get"_kjc, "GetObject"_kjc, this->adminBucketName());

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -377,7 +397,9 @@ R2Bucket::get(jsg::Lock& js,
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client =
r2GetClient(context, clientIndex, "r2_get"_kjc, "GetObject"_kjc, this->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.key"_kjc, name.asPtr()}));

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -439,7 +461,9 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::put(jsg::Lock&
});

auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_put"_kjc);
auto client =
r2GetClient(context, clientIndex, "r2_put"_kjc, "PutObject"_kjc, this->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.key"_kjc, name.asPtr()}));

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -630,8 +654,8 @@ jsg::Promise<jsg::Ref<R2MultipartUpload>> R2Bucket::createMultipartUpload(jsg::L
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client =
context.getHttpClient(clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc);
auto client = r2GetClient(context, clientIndex, "r2_createMultipartUpload"_kjc,
"CreateMultipartUpload"_kjc, this->adminBucketName());

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -729,7 +753,20 @@ jsg::Promise<void> R2Bucket::delete_(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_delete"_kjc);
auto deleteKey = [&]() {
KJ_SWITCH_ONEOF(keys) {
KJ_CASE_ONEOF(ks, kj::Array<kj::String>) {
return kj::str(ks);
}
KJ_CASE_ONEOF(k, kj::String) {
return kj::str(k);
}
}
KJ_UNREACHABLE;
};
auto client = r2GetClient(context, clientIndex, "r2_delete"_kjc, "DeleteObject"_kjc,
this->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.delete"_kjc, deleteKey().asPtr()}));

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -774,7 +811,8 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js,
CompatibilityFlags::Reader flags) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_list"_kjc);
auto client = r2GetClient(
context, clientIndex, "r2_list"_kjc, "ListObjects"_kjc, this->adminBucketName());

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
13 changes: 13 additions & 0 deletions src/workerd/api/r2-bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ class Headers;

namespace workerd::api::public_beta {

struct StringTagParams {
kj::LiteralStringConst key;
kj::StringPtr value;
};
// Helper for creating R2 HTTP Client with the right span tags across operations. This is much
// cleaner than directly using a callback to set span tags.
kj::Own<kj::HttpClient> r2GetClient(IoContext& context,
uint subrequestChannel,
kj::LiteralStringConst op,
kj::LiteralStringConst method,
kj::Maybe<kj::StringPtr> bucket,
kj::Maybe<StringTagParams> extraTag = kj::none);

kj::Array<kj::byte> cloneByteArray(const kj::Array<kj::byte>& arr);
kj::ArrayPtr<kj::StringPtr> fillR2Path(
kj::StringPtr pathStorage[1], const kj::Maybe<kj::String>& bucket);
Expand Down
15 changes: 9 additions & 6 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ jsg::Promise<R2MultipartUpload::UploadedPart> R2MultipartUpload::uploadPart(jsg:
"Part number must be between 1 and 10000 (inclusive). Actual value was: ", partNumber);

auto& context = IoContext::current();
auto client =
context.getHttpClient(this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex, "r2_uploadPart"_kjc,
"UploadPart"_kjc, this->bucket->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}));

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -95,8 +96,9 @@ jsg::Promise<jsg::Ref<R2Bucket::HeadResult>> R2MultipartUpload::complete(jsg::Lo
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_completeMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex, "r2_completeMultipartUpload"_kjc,
"CompleteMultipartUpload"_kjc, this->bucket->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}));

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -145,8 +147,9 @@ jsg::Promise<void> R2MultipartUpload::abort(
jsg::Lock& js, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_abortMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex, "r2_abortMultipartUpload"_kjc,
"AbortMultipartUpload"_kjc, this->bucket->adminBucketName(),
kj::Maybe<StringTagParams>({"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}));

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
18 changes: 18 additions & 0 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,24 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannel(
});
}

kj::Own<kj::HttpClient> IoContext::getHttpClientWithCallback(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
kj::FunctionParam<void(TraceContext&)> func) {
return asHttpClient(getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& channelFactory) {
func(tracing);
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory);
},
SubrequestOptions{
.inHouse = isInHouse,
.wrapMetrics = !isInHouse,
.operationName = kj::mv(operationName),
}));
}

kj::Own<WorkerInterface> IoContext::getSubrequestChannelWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,13 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags);

// As above, but with callback to add span tags to the trace context for the operation.
kj::Own<kj::HttpClient> getHttpClientWithCallback(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
kj::FunctionParam<void(TraceContext&)> func);

// Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects
// to HttpClient.
kj::Own<kj::HttpClient> getHttpClientNoChecks(uint channel,
Expand Down

0 comments on commit a8d937b

Please sign in to comment.