Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/googlecloudpubsub] Add support for encoding extensions (#37109) #37137

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/pubsubreceiver-encodingextensions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
change_type: enhancement

component: googlecloudpubsubreceiver

note: Added support for encoding extensions.

issues: [37109]

subtext:

change_logs: [user]
54 changes: 39 additions & 15 deletions receiver/googlecloudpubsubreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,53 @@ receivers:

## Encoding

You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data
by looking at the `ce-type` and `content-type` attributes of the message. Only when those attributes are not set
must the `encoding` field in the configuration be set.
The `encoding` options allows you to specify Encoding Extensions for decoding messages on the subscription. An
extension need to be configured in the `extensions` section, and added to pipeline in the collectors configuration file.

| ce-type | ce-datacontenttype | encoding | description |
|-----------------------------------|----------------------|-------------------|------------------------------------------------|
| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message |
| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message |
| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message |
| - | - | otlp_proto_trace | Decode OTLP trace message |
| - | - | otlp_proto_metric | Decode OTLP trace message |
| - | - | otlp_proto_log | Decode OTLP trace message |
| - | - | cloud_logging | Decode [Cloud Logging] [LogEntry] message type |
| - | - | raw_text | Wrap in an OTLP log message |
The following example shows how to use the text encoding extension for ingesting arbitrary text message on a
subscription, wrapping them in OTLP Log messages. Note that not all extensions support all signals.

```yaml
extensions:
text_encoding:
encoding: utf8
unmarshaling_separator: "\r?\n"

service:
extensions: [text_encoding]
pipelines:
logs:
receivers: [googlecloudpubsub]
processors: []
exporters: [debug]
```

When the `encoding` configuration is set, the attributes on the message are ignored.
The receiver also supports build in encodings for the native OTLP encodings, without the need to specify an Encoding
Extensions. The non OTLP build in encodings will be deprecated as soon as extensions for the formats are available.

| encoding | description |
|-------------------|------------------------------------------------|
| otlp_proto_trace | Decode OTLP trace message |
| otlp_proto_metric | Decode OTLP trace message |
| otlp_proto_log | Decode OTLP trace message |
| cloud_logging | Decode [Cloud Logging] [LogEntry] message type |
| raw_text | Wrap in an OTLP log message |

With `cloud_logging`, the receiver can be used to bring Cloud Logging messages into an OpenTelemetry pipeline. You'll
first need to [set up a logging sink][sink-docs] with a Pub/Sub topic as its destination. Note that the `cloud_logging`
integration is considered **alpha** as the semantic convention on some of the conversion are not stabilized yet.

With `raw_text`, the receiver can be used for ingesting arbitrary text message on a Pubsub subscription, wrapping them
in OTLP Log messages, making it a convenient way to ingest raw log lines from Pubsub.
in OTLP Log messages.

When no encoding is specified, the receiver will try to discover the type of the data by looking at the `ce-type` and
`content-type` attributes of the message. These message attributes are set by the `googlepubsubexporter`.

| ce-type | ce-datacontenttype | encoding | description |
|-----------------------------------|----------------------|-------------------|------------------------------------------------|
| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message |
| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message |
| org.opentelemetry.otlp.logs.v1 | application/protobuf | | Decode OTLP log message |

[Cloud Logging]: https://cloud.google.com/logging
[LogEntry]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
Expand Down
45 changes: 0 additions & 45 deletions receiver/googlecloudpubsubreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,51 +35,6 @@ type Config struct {
ClientID string `mapstructure:"client_id"`
}

func (config *Config) validateForLog() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_log":
case "raw_text":
case "raw_json":
case "cloud_logging":
default:
return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json,cloud_logging]", config.Encoding)
}
return nil
}

func (config *Config) validateForTrace() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_trace":
default:
return fmt.Errorf("trace encoding %v is not supported. supported encoding formats include [otlp_proto_trace]", config.Encoding)
}
return nil
}

func (config *Config) validateForMetric() error {
err := config.validate()
if err != nil {
return err
}
switch config.Encoding {
case "":
case "otlp_proto_metric":
default:
return fmt.Errorf("metric encoding %v is not supported. supported encoding formats include [otlp_proto_metric]", config.Encoding)
}
return nil
}

func (config *Config) validate() error {
if !subscriptionMatcher.MatchString(config.Subscription) {
return fmt.Errorf("subscription '%s' is not a valid format, use 'projects/<project_id>/subscriptions/<name>'", config.Subscription)
Expand Down
60 changes: 0 additions & 60 deletions receiver/googlecloudpubsubreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,70 +63,10 @@ func TestLoadConfig(t *testing.T) {
func TestConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
assert.Error(t, c.validateForTrace())
assert.Error(t, c.validateForLog())
assert.Error(t, c.validateForMetric())
c.Subscription = "projects/000project/subscriptions/my-subscription"
assert.Error(t, c.validate())
c.Subscription = "projects/my-project/topics/my-topic"
assert.Error(t, c.validate())
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validate())
}

func TestTraceConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validateForTrace())

c.Encoding = "otlp_proto_metric"
assert.Error(t, c.validateForTrace())
c.Encoding = "otlp_proto_log"
assert.Error(t, c.validateForTrace())
c.Encoding = "raw_text"
assert.Error(t, c.validateForTrace())
c.Encoding = "raw_json"
assert.Error(t, c.validateForTrace())

c.Encoding = "otlp_proto_trace"
assert.NoError(t, c.validateForTrace())
}

func TestMetricConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validateForMetric())

c.Encoding = "otlp_proto_trace"
assert.Error(t, c.validateForMetric())
c.Encoding = "otlp_proto_log"
assert.Error(t, c.validateForMetric())
c.Encoding = "raw_text"
assert.Error(t, c.validateForMetric())
c.Encoding = "raw_json"
assert.Error(t, c.validateForMetric())

c.Encoding = "otlp_proto_metric"
assert.NoError(t, c.validateForMetric())
}

func TestLogConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Subscription = "projects/my-project/subscriptions/my-subscription"
assert.NoError(t, c.validateForLog())

c.Encoding = "otlp_proto_trace"
assert.Error(t, c.validateForLog())
c.Encoding = "otlp_proto_metric"
assert.Error(t, c.validateForLog())

c.Encoding = "raw_text"
assert.NoError(t, c.validateForLog())
c.Encoding = "raw_json"
assert.NoError(t, c.validateForLog())
c.Encoding = "otlp_proto_log"
assert.NoError(t, c.validateForLog())
}
6 changes: 3 additions & 3 deletions receiver/googlecloudpubsubreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (factory *pubsubReceiverFactory) CreateTraces(
cfg component.Config,
consumer consumer.Traces,
) (receiver.Traces, error) {
err := cfg.(*Config).validateForTrace()
err := cfg.(*Config).validate()
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (factory *pubsubReceiverFactory) CreateMetrics(
cfg component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
err := cfg.(*Config).validateForMetric()
err := cfg.(*Config).validate()
if err != nil {
return nil, err
}
Expand All @@ -107,7 +107,7 @@ func (factory *pubsubReceiverFactory) CreateLogs(
cfg component.Config,
consumer consumer.Logs,
) (receiver.Logs, error) {
err := cfg.(*Config).validateForLog()
err := cfg.(*Config).validate()
alexvanboxel marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions receiver/googlecloudpubsubreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/googleapis/gax-go/v2 v2.14.1
github.com/iancoleman/strcase v0.3.0
github.com/json-iterator/go v1.1.12
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.114.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250121185328-fbefb22cc2b3
Expand Down Expand Up @@ -37,7 +38,7 @@ require (
cloud.google.com/go/iam v1.2.2 // indirect
cloud.google.com/go/longrunning v0.6.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -55,7 +56,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.einride.tech/aip v0.68.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
Expand Down Expand Up @@ -92,3 +93,5 @@ retract (
v0.76.1
v0.65.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding
6 changes: 4 additions & 2 deletions receiver/googlecloudpubsubreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions receiver/googlecloudpubsubreceiver/internal/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bytes"
"context"
"encoding/hex"
stdjson "encoding/json"
"errors"
Expand All @@ -20,7 +19,6 @@ import (
jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
Expand Down Expand Up @@ -113,7 +111,7 @@ func getLogEntryDescriptor() protoreflect.MessageDescriptor {
// schema; this ensures that a numeric value in the input is correctly
// translated to either an integer or a double in the output. It falls back to
// plain JSON decoding if payload type is not available in the proto registry.
func TranslateLogEntry(_ context.Context, _ *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) {
func TranslateLogEntry(data []byte) (pcommon.Resource, plog.LogRecord, error) {
lr := plog.NewLogRecord()
res := pcommon.NewResource()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package internal

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -15,7 +14,6 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"
)

type Log struct {
Expand Down Expand Up @@ -70,15 +68,12 @@ func TestTranslateLogEntry(t *testing.T) {
}{
// TODO: Add publicly shareable log test data.
}

logger, _ := zap.NewDevelopment()

for _, tt := range tests {
var errs error
wantRes, wantLr, err := generateLog(t, tt.want)
errs = multierr.Append(errs, err)

gotRes, gotLr, err := TranslateLogEntry(context.TODO(), logger, []byte(tt.input))
gotRes, gotLr, err := TranslateLogEntry([]byte(tt.input))
errs = multierr.Append(errs, err)
errs = multierr.Combine(errs, compareResources(wantRes, gotRes), compareLogRecords(wantLr, gotLr))

Expand Down
Loading
Loading