From 13135a2aa4c1dbe8106d0ed3f8c49627b1cef7a3 Mon Sep 17 00:00:00 2001 From: Felix Hanau Date: Wed, 25 Sep 2024 14:58:10 +0000 Subject: [PATCH] [o11y] Add remaining R2 span tags --- src/workerd/api/kv.c++ | 4 ++- src/workerd/api/r2-admin.c++ | 14 ++++----- src/workerd/api/r2-bucket.c++ | 49 +++++++++++++++++++++++++++----- src/workerd/api/r2-bucket.h | 18 ++++++++++++ src/workerd/api/r2-multipart.c++ | 15 ++++++---- src/workerd/io/io-context.c++ | 8 +++--- src/workerd/io/io-context.h | 12 ++------ 7 files changed, 84 insertions(+), 36 deletions(-) diff --git a/src/workerd/api/kv.c++ b/src/workerd/api/kv.c++ index e8593275a14..12f0d7afd15 100644 --- a/src/workerd/api/kv.c++ +++ b/src/workerd/api/kv.c++ @@ -95,8 +95,10 @@ kj::Own KvNamespace::getHttpClient(IoContext& context, KJ_UNREACHABLE; }(); + kj::Vector tags; + tags.add("db.system"_kjc, kj::str("cloudflare-kv"_kjc)); auto client = context.getHttpClientWithSpans( - subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}}); + subrequestChannel, true, kj::none, operationName, kj::mv(tags)); headers.add(FLPROD_405_HEADER, urlStr); for (const auto& header: additionalHeaders) { headers.add(header.name.asPtr(), header.value.asPtr()); diff --git a/src/workerd/api/r2-admin.c++ b/src/workerd/api/r2-admin.c++ index f4eaa78cb28..788fefe9080 100644 --- a/src/workerd/api/r2-admin.c++ +++ b/src/workerd/api/r2-admin.c++ @@ -26,9 +26,8 @@ jsg::Ref R2Admin::get(jsg::Lock& js, kj::String bucketName) { jsg::Promise> R2Admin::create( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - // TODO(o11y): Add cloudflare.r2.bucket here. - auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc, - {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}}); + auto client = r2GetClient(context, subrequestChannel, + {"r2_create"_kjc, {"rpc.method"_kjc, "CreateBucket"_kjc}, name.asPtr()}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -59,8 +58,8 @@ jsg::Promise R2Admin::list(jsg::Lock& js, const jsg::TypeHandler>& errorType, CompatibilityFlags::Reader flags) { auto& context = IoContext::current(); - auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc, - {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}}); + auto client = r2GetClient( + context, subrequestChannel, {"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -116,9 +115,8 @@ jsg::Promise R2Admin::list(jsg::Lock& js, jsg::Promise R2Admin::delete_( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - // TODO(o11y): Add cloudflare.r2.bucket - auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc, - {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}}); + auto client = r2GetClient(context, subrequestChannel, + {"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteBucket"_kjc}, name.asPtr()}); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/api/r2-bucket.c++ b/src/workerd/api/r2-bucket.c++ index 2c6893f4a77..21eac01fc78 100644 --- a/src/workerd/api/r2-bucket.c++ +++ b/src/workerd/api/r2-bucket.c++ @@ -24,6 +24,21 @@ #include namespace workerd::api::public_beta { +kj::Own r2GetClient( + IoContext& context, uint subrequestChannel, R2UserTracing user) { + kj::Vector tags; + tags.add("rpc.service"_kjc, kj::str("r2"_kjc)); + tags.add(user.method.key, kj::str(user.method.value)); + KJ_IF_SOME(b, user.bucket) { + tags.add("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + KJ_IF_SOME(tag, user.extraTag) { + tags.add(tag.key, kj::str(tag.value)); + } + + return context.getHttpClientWithSpans(subrequestChannel, true, kj::none, user.op, kj::mv(tags)); +} + static bool isWholeNumber(double x) { double intpart; return modf(x, &intpart) == 0; @@ -341,7 +356,8 @@ jsg::Promise>> R2Bucket::head(jsg::Lock return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName()}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -377,7 +393,9 @@ R2Bucket::get(jsg::Lock& js, return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_get"_kjc, {"rpc.method"_kjc, "GetObject"_kjc}, this->adminBucketName(), + {{"cloudflare.r2.bucket"_kjc, name.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -439,7 +457,9 @@ jsg::Promise>> R2Bucket::put(jsg::Lock& }); auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_put"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_put"_kjc, {"rpc.method"_kjc, "PutObject"_kjc}, this->adminBucketName(), + {{"cloudflare.r2.key"_kjc, name.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -630,8 +650,9 @@ jsg::Promise> R2Bucket::createMultipartUpload(jsg::L const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = - context.getHttpClient(clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_createMultipartUpload"_kjc, {"rpc.method"_kjc, "CreateMultipartUpload"_kjc}, + this->adminBucketName()}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -729,7 +750,20 @@ jsg::Promise R2Bucket::delete_(jsg::Lock& js, const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_delete"_kjc); + auto deleteKey = [&]() { + KJ_SWITCH_ONEOF(keys) { + KJ_CASE_ONEOF(ks, kj::Array) { + return kj::str(ks); + } + KJ_CASE_ONEOF(k, kj::String) { + return kj::str(k); + } + } + KJ_UNREACHABLE; + }(); + auto client = r2GetClient(context, clientIndex, + {"r2_delete"_kjc, {"rpc.method"_kjc, "DeleteObject"_kjc}, this->adminBucketName(), + {{"cloudflare.r2.delete"_kjc, deleteKey.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -774,7 +808,8 @@ jsg::Promise R2Bucket::list(jsg::Lock& js, CompatibilityFlags::Reader flags) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_list"_kjc); + auto client = r2GetClient(context, clientIndex, + {"r2_list"_kjc, {"rpc.method"_kjc, "ListObjects"_kjc}, this->adminBucketName()}); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/api/r2-bucket.h b/src/workerd/api/r2-bucket.h index e6772e0f491..fa6a65bf0e6 100644 --- a/src/workerd/api/r2-bucket.h +++ b/src/workerd/api/r2-bucket.h @@ -15,6 +15,24 @@ class Headers; namespace workerd::api::public_beta { +struct StringTagParams { + kj::LiteralStringConst key; + kj::StringPtr value; +}; + +struct R2UserTracing { + kj::LiteralStringConst op; + StringTagParams method; + // Passing Maybe instead of Maybe here – this avoids a branch on + // the caller side when bucket is already a Maybe, which is more convenient. + kj::Maybe bucket; + kj::Maybe extraTag; +}; + +// Helper for creating R2 HTTP Client with the right span tags across operations. This is much +// cleaner than setting span tags directly in each function. +kj::Own r2GetClient(IoContext& context, uint subrequestChannel, R2UserTracing user); + kj::Array cloneByteArray(const kj::Array& arr); kj::ArrayPtr fillR2Path( kj::StringPtr pathStorage[1], const kj::Maybe& bucket); diff --git a/src/workerd/api/r2-multipart.c++ b/src/workerd/api/r2-multipart.c++ index c3f00ac791f..de539af68d2 100644 --- a/src/workerd/api/r2-multipart.c++ +++ b/src/workerd/api/r2-multipart.c++ @@ -30,8 +30,9 @@ jsg::Promise R2MultipartUpload::uploadPart(jsg: "Part number must be between 1 and 10000 (inclusive). Actual value was: ", partNumber); auto& context = IoContext::current(); - auto client = - context.getHttpClient(this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc); + auto client = r2GetClient(context, this->bucket->clientIndex, + {"r2_uploadPart"_kjc, {"rpc.method"_kjc, "UploadPart"_kjc}, this->bucket->adminBucketName(), + {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -95,8 +96,9 @@ jsg::Promise> R2MultipartUpload::complete(jsg::Lo const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient( - this->bucket->clientIndex, true, kj::none, "r2_completeMultipartUpload"_kjc); + auto client = r2GetClient(context, this->bucket->clientIndex, + {"r2_completeMultipartUpload"_kjc, {"rpc.method"_kjc, "CompleteMultipartUpload"_kjc}, + this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -145,8 +147,9 @@ jsg::Promise R2MultipartUpload::abort( jsg::Lock& js, const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient( - this->bucket->clientIndex, true, kj::none, "r2_abortMultipartUpload"_kjc); + auto client = r2GetClient(context, this->bucket->clientIndex, + {"r2_abortMultipartUpload"_kjc, {"rpc.method"_kjc, "AbortMultipartUpload"_kjc}, + this->bucket->adminBucketName(), {{"cloudflare.r2.upload_id"_kjc, uploadId.asPtr()}}}); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 09c3439926b..10a70545d72 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -839,11 +839,11 @@ kj::Own IoContext::getSubrequestChannelWithSpans(uint channel, bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags) { + kj::Vector tags) { return getSubrequest( [&](TraceContext& tracing, IoChannelFactory& channelFactory) { - for (const SpanTagParams& tag: tags) { - tracing.userSpan.setTag(kj::mv(tag.key), kj::str(tag.value)); + for (Span::Tag& tag: tags) { + tracing.userSpan.setTag(kj::mv(tag.key), kj::mv(tag.value)); } return getSubrequestChannelImpl( channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory); @@ -897,7 +897,7 @@ kj::Own IoContext::getHttpClientWithSpans(uint channel, bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags) { + kj::Vector tags) { return asHttpClient(getSubrequestChannelWithSpans( channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName), kj::mv(tags))); } diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 7aa3c2687cf..f15d7c6da7e 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -710,19 +710,11 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler kj::Maybe cfBlobJson, kj::ConstString operationName); - // As above, but with list of span tags to add. - // TODO(o11y): For now this only supports literal values based on initializer_list constraints. - // Add syntactic sugar to kj::vector so that we can pass in a vector more ergonomically and use - // that instead to support other value types. - struct SpanTagParams { - kj::LiteralStringConst key; - kj::LiteralStringConst value; - }; kj::Own getSubrequestChannelWithSpans(uint channel, bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags); + kj::Vector tags); // Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only. kj::Own getSubrequestChannelNoChecks(uint channel, @@ -742,7 +734,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName, - std::initializer_list tags); + kj::Vector tags); // Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects // to HttpClient.