Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[o11y] Add remaining R2 span tags #3256

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
KJ_UNREACHABLE;
}();

kj::Vector<Span::Tag> tags;
tags.add("db.system"_kjc, kj::str("cloudflare-kv"_kjc));
auto client = context.getHttpClientWithSpans(
subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}});
subrequestChannel, true, kj::none, operationName, kj::mv(tags));
headers.add(FLPROD_405_HEADER, urlStr);
for (const auto& header: additionalHeaders) {
headers.add(header.name.asPtr(), header.value.asPtr());
Expand Down
14 changes: 6 additions & 8 deletions src/workerd/api/r2-admin.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ jsg::Ref<R2Bucket> R2Admin::get(jsg::Lock& js, kj::String bucketName) {
jsg::Promise<jsg::Ref<R2Bucket>> R2Admin::create(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket here.
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}});
auto client = r2GetClient(context, subrequestChannel,
{"r2_create"_kjc, {"rpc.method"_kjc, "CreateBucket"_kjc}, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -59,8 +58,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType,
CompatibilityFlags::Reader flags) {
auto& context = IoContext::current();
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}});
auto client = r2GetClient(
context, subrequestChannel, {"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -116,9 +115,8 @@ jsg::Promise<R2Admin::ListResult> R2Admin::list(jsg::Lock& js,
jsg::Promise<void> R2Admin::delete_(
jsg::Lock& js, kj::String name, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
auto& context = IoContext::current();
// TODO(o11y): Add cloudflare.r2.bucket
auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc,
{{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}});
auto client = r2GetClient(context, subrequestChannel,
{"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteBucket"_kjc}, name.asPtr()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
49 changes: 42 additions & 7 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@
#include <regex>

namespace workerd::api::public_beta {
kj::Own<kj::HttpClient> r2GetClient(
IoContext& context, uint subrequestChannel, R2UserTracing user) {
kj::Vector<Span::Tag> tags;
tags.add("rpc.service"_kjc, kj::str("r2"_kjc));
tags.add(user.method.key, kj::str(user.method.value));
KJ_IF_SOME(b, user.bucket) {
tags.add("cloudflare.r2.bucket"_kjc, kj::str(b));
}
KJ_IF_SOME(tag, user.extraTag) {
tags.add(tag.key, kj::str(tag.value));
}

return context.getHttpClientWithSpans(subrequestChannel, true, kj::none, user.op, kj::mv(tags));
}

static bool isWholeNumber(double x) {
double intpart;
return modf(x, &intpart) == 0;
Expand Down Expand Up @@ -341,7 +356,8 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::head(jsg::Lock
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -377,7 +393,9 @@ R2Bucket::get(jsg::Lock& js,
return js.evalNow([&] {
auto& context = IoContext::current();

auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.bucket"_kjc, name.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -439,7 +457,9 @@ jsg::Promise<kj::Maybe<jsg::Ref<R2Bucket::HeadResult>>> R2Bucket::put(jsg::Lock&
});

auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_put"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_put"_kjc, {"rpc.method"_kjc, "PutObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.key"_kjc, name.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -630,8 +650,9 @@ jsg::Promise<jsg::Ref<R2MultipartUpload>> R2Bucket::createMultipartUpload(jsg::L
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client =
context.getHttpClient(clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_createMultipartUpload"_kjc, {"rpc.method"_kjc, "CreateMultipartUpload"_kjc},
this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -729,7 +750,20 @@ jsg::Promise<void> R2Bucket::delete_(jsg::Lock& js,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_delete"_kjc);
auto deleteKey = [&]() {
fhanau marked this conversation as resolved.
Show resolved Hide resolved
KJ_SWITCH_ONEOF(keys) {
KJ_CASE_ONEOF(ks, kj::Array<kj::String>) {
return kj::str(ks);
}
KJ_CASE_ONEOF(k, kj::String) {
return kj::str(k);
}
}
KJ_UNREACHABLE;
}();
auto client = r2GetClient(context, clientIndex,
{"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteObject"_kjc}, this->adminBucketName(),
{{"cloudflare.r2.delete"_kjc, deleteKey.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -774,7 +808,8 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js,
CompatibilityFlags::Reader flags) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_list"_kjc);
auto client = r2GetClient(context, clientIndex,
{"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}, this->adminBucketName()});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
18 changes: 18 additions & 0 deletions src/workerd/api/r2-bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@ class Headers;

namespace workerd::api::public_beta {

struct StringTagParams {
kj::LiteralStringConst key;
kj::StringPtr value;
};

struct R2UserTracing {
kj::LiteralStringConst op;
StringTagParams method;
// Passing Maybe<kj::StringPtr> instead of Maybe<StringTagParams> here – this avoids a branch on
// the caller side when bucket is already a Maybe, which is more convenient.
kj::Maybe<kj::StringPtr> bucket;
kj::Maybe<StringTagParams> extraTag;
};

// Helper for creating R2 HTTP Client with the right span tags across operations. This is much
// cleaner than setting span tags directly in each function.
kj::Own<kj::HttpClient> r2GetClient(IoContext& context, uint subrequestChannel, R2UserTracing user);

kj::Array<kj::byte> cloneByteArray(const kj::Array<kj::byte>& arr);
kj::ArrayPtr<kj::StringPtr> fillR2Path(
kj::StringPtr pathStorage[1], const kj::Maybe<kj::String>& bucket);
Expand Down
15 changes: 9 additions & 6 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ jsg::Promise<R2MultipartUpload::UploadedPart> R2MultipartUpload::uploadPart(jsg:
"Part number must be between 1 and 10000 (inclusive). Actual value was: ", partNumber);

auto& context = IoContext::current();
auto client =
context.getHttpClient(this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_uploadPart"_kjc, {"rpc.method"_kjc, "UploadPart"_kjc}, this->bucket->adminBucketName(),
{{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -95,8 +96,9 @@ jsg::Promise<jsg::Ref<R2Bucket::HeadResult>> R2MultipartUpload::complete(jsg::Lo
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_completeMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_completeMultipartUpload"_kjc, {"rpc.method"_kjc, "CompleteMultipartUpload"_kjc},
this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down Expand Up @@ -145,8 +147,9 @@ jsg::Promise<void> R2MultipartUpload::abort(
jsg::Lock& js, const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();
auto client = context.getHttpClient(
this->bucket->clientIndex, true, kj::none, "r2_abortMultipartUpload"_kjc);
auto client = r2GetClient(context, this->bucket->clientIndex,
{"r2_abortMultipartUpload"_kjc, {"rpc.method"_kjc, "AbortMultipartUpload"_kjc},
this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}});

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
Expand Down
8 changes: 4 additions & 4 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -839,11 +839,11 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannelWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags) {
kj::Vector<Span::Tag> tags) {
return getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& channelFactory) {
for (const SpanTagParams& tag: tags) {
tracing.userSpan.setTag(kj::mv(tag.key), kj::str(tag.value));
for (Span::Tag& tag: tags) {
tracing.userSpan.setTag(kj::mv(tag.key), kj::mv(tag.value));
}
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory);
Expand Down Expand Up @@ -897,7 +897,7 @@ kj::Own<kj::HttpClient> IoContext::getHttpClientWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags) {
kj::Vector<Span::Tag> tags) {
return asHttpClient(getSubrequestChannelWithSpans(
channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName), kj::mv(tags)));
}
Expand Down
12 changes: 2 additions & 10 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -710,19 +710,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName);

// As above, but with list of span tags to add.
// TODO(o11y): For now this only supports literal values based on initializer_list constraints.
// Add syntactic sugar to kj::vector so that we can pass in a vector more ergonomically and use
// that instead to support other value types.
struct SpanTagParams {
kj::LiteralStringConst key;
kj::LiteralStringConst value;
};
kj::Own<WorkerInterface> getSubrequestChannelWithSpans(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags);
kj::Vector<Span::Tag> tags);

// Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only.
kj::Own<WorkerInterface> getSubrequestChannelNoChecks(uint channel,
Expand All @@ -742,7 +734,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
kj::ConstString operationName,
std::initializer_list<SpanTagParams> tags);
kj::Vector<Span::Tag> tags);

// Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects
// to HttpClient.
Expand Down
Loading