Skip to content

Commit

Permalink
add pubsub exporter config for ordering
Browse files Browse the repository at this point in the history
Some readme fixes
  • Loading branch information
kevinnoel-be committed Jan 30, 2025
1 parent ed16f9c commit 3df6e99
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 20 deletions.
52 changes: 46 additions & 6 deletions exporter/googlecloudpubsubexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ This exporter sends OTLP messages to a Google Cloud [Pubsub](https://cloud.googl
The following configuration options are supported:

* `project` (Optional): The Google Cloud Project of the topics.
* `topic` (Required): The topic name to receive OTLP data over. The topic name should be a fully qualified resource
* `topic` (Required): The topic name to send OTLP data over. The topic name should be a fully qualified resource
name (eg: `projects/otel-project/topics/otlp`).
* `compression` (Optional): Set the payload compression, only `gzip` is supported. Default is no compression.
* `watermark` Behaviour of how the `ce-time` attribute is set (see watermark section for more info)
Expand All @@ -31,17 +31,25 @@ The following configuration options are supported:
or switching between [global and regional service endpoints](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints).
* `insecure` (Optional): allows performing “insecure” SSL connections and transfers, useful when connecting to a local
emulator instance. Only has effect if Endpoint is not ""
* `ordering`: Configures the [PubSub ordering](https://cloud.google.com/pubsub/docs/ordering) feature, see
[ordering](#ordering) section for more info.
* `enabled` (default = `false`): Enables the ordering. Default is disabled.
* `from_resource_attribute` (no default): resource attribute that will be used as the ordering key. Required when
`ordering.enabled` is `true`. If the resource attribute is missing or has an empty value, the messages will not be
ordered for this resource.
* `remove_resource_attribute` (default = `false`): if the ordering key resource attribute specified
`from_resource_attribute` should be removed from the resource attributes.

```yaml
exporters:
googlecloudpubsub:
project: my-project
topic: otlp-traces
topic: projects/my-project/topics/otlp-traces
```
## Pubsub topic
The Google Cloud [Pubsub](https://cloud.google.com/pubsub) export doesn't automatic create topics, it expects the topic
The Google Cloud [Pubsub](https://cloud.google.com/pubsub) exporter doesn't automatically create topics, it expects the topic
to be created upfront. Security wise it's best to give the collector its own service account and give the
topic `Pub/Sub Publisher` permission.

Expand Down Expand Up @@ -74,11 +82,11 @@ up to 20% of the cost. This can be done by setting the `compression` to `gzip`.
exporters:
googlecloudpubsub:
project: my-project
topic: otlp-traces
topic: projects/my-project/topics/otlp-traces
compression: gzip
```

The exporter with add the `content-encoding` attribute to the message. The receiver will look at this attribute
The exporter will add the `content-encoding` attribute to the message. The receiver will look at this attribute
to detect the compression that is used on the payload.

Only `gzip` is supported.
Expand All @@ -100,7 +108,7 @@ timestamp , if you want to behaviour to have effect as the default is `0s`.
exporters:
googlecloudpubsub:
project: my-project
topic: otlp-traces
topic: projects/my-project/topics/otlp-traces
watermark:
behavior: earliest
allow_drift: 1h
Expand All @@ -119,3 +127,35 @@ scenario is `behavior: earliest` with a reasonable `allow_drift` of `1h`.

Allowed behavior values are `current` or `earliest`. For `allow_drift` the default is `0s`, so make sure to set the
value.

## Ordering

When ordering is enabled (`ordering.enabled`), you are required to specify a resource attribute key that will be used as
the ordering key (`ordering.from_resource_attribute`). If this resource attribute is only meant to be used as an
ordering key, you may want to choose to get this resource attribute key (`ordering.from_resource_attribute`) removed
before publishing to PubSub by enabling the `ordering.remove_resource_attribute` configuration.

```yaml
exporters:
googlecloudpubsub:
project: my-project
topic: projects/my-project/topics/otlp-traces
ordering:
enabled: true
from_resource_attribute: some.resource.attribute.key
remove_resource_attribute: true
```

### Notes

While the PubSub topic doesn't require any configuration for ordering, you will need to enable ordering on your
subscription(s) if you need it. Enabling ordering on a subscription is only possible at creation.
For composite ordering keys you'd need to compose the resource attribute value before exporting e.g., by using a
[transform processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor)
.

Empty values in the ordering key are accepted but won't be ordered, see [PubSub ordering documentation](https://cloud.google.com/pubsub/docs/ordering)
for more details.

PubSub requires one publish request per ordering key value, so this exporter groups the signals per ordering key before
publishing.
32 changes: 27 additions & 5 deletions exporter/googlecloudpubsubexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/multierr"
)

var topicMatcher = regexp.MustCompile(`^projects/[a-z][a-z0-9\-]*/topics/`)
Expand All @@ -34,6 +35,8 @@ type Config struct {
Compression string `mapstructure:"compression"`
// Watermark defines the watermark (the ce-time attribute on the message) behavior
Watermark WatermarkConfig `mapstructure:"watermark"`
// Ordering configures the ordering keys
Ordering OrderingConfig `mapstructure:"ordering"`
}

// WatermarkConfig customizes the behavior of the watermark
Expand All @@ -46,15 +49,27 @@ type WatermarkConfig struct {
AllowedDrift time.Duration `mapstructure:"allowed_drift"`
}

// OrderingConfig customizes the behavior of the ordering
type OrderingConfig struct {
// Enabled indicates if ordering is enabled
Enabled bool `mapstructure:"enabled"`
// FromResourceAttribute is a resource attribute that will be used as the ordering key.
FromResourceAttribute string `mapstructure:"from_resource_attribute"`
// RemoveResourceAttribute indicates if the ordering key should be removed from the resource attributes.
RemoveResourceAttribute bool `mapstructure:"remove_resource_attribute"`
}

func (config *Config) Validate() error {
var errors error
if !topicMatcher.MatchString(config.Topic) {
return fmt.Errorf("topic '%s' is not a valid format, use 'projects/<project_id>/topics/<name>'", config.Topic)
errors = multierr.Append(errors, fmt.Errorf("topic '%s' is not a valid format, use 'projects/<project_id>/topics/<name>'", config.Topic))
}
_, err := config.parseCompression()
if err != nil {
return err
if _, err := config.parseCompression(); err != nil {
errors = multierr.Append(errors, err)
}
return config.Watermark.validate()
errors = multierr.Append(errors, config.Watermark.validate())
errors = multierr.Append(errors, config.Ordering.validate())
return errors
}

func (config *WatermarkConfig) validate() error {
Expand All @@ -65,6 +80,13 @@ func (config *WatermarkConfig) validate() error {
return err
}

func (cfg *OrderingConfig) validate() error {
if cfg.Enabled && cfg.FromResourceAttribute == "" {
return fmt.Errorf("'from_resource_attribute' is required if ordering is enabled")
}
return nil
}

func (config *Config) parseCompression() (compression, error) {
switch config.Compression {
case "gzip":
Expand Down
32 changes: 23 additions & 9 deletions exporter/googlecloudpubsubexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,21 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

customConfig := factory.CreateDefaultConfig().(*Config)
expectedConfig := factory.CreateDefaultConfig().(*Config)

customConfig.ProjectID = "my-project"
customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}"
customConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
expectedConfig.ProjectID = "my-project"
expectedConfig.UserAgent = "opentelemetry-collector-contrib {{version}}"
expectedConfig.TimeoutSettings = exporterhelper.TimeoutConfig{
Timeout: 20 * time.Second,
}
customConfig.Topic = "projects/my-project/topics/otlp-topic"
customConfig.Compression = "gzip"
customConfig.Watermark.Behavior = "earliest"
customConfig.Watermark.AllowedDrift = time.Hour
assert.Equal(t, cfg, customConfig)
expectedConfig.Topic = "projects/my-project/topics/otlp-topic"
expectedConfig.Compression = "gzip"
expectedConfig.Watermark.Behavior = "earliest"
expectedConfig.Watermark.AllowedDrift = time.Hour
expectedConfig.Ordering.Enabled = true
expectedConfig.Ordering.FromResourceAttribute = "ordering_key"
expectedConfig.Ordering.RemoveResourceAttribute = true
assert.Equal(t, expectedConfig, cfg)
}

func TestTopicConfigValidation(t *testing.T) {
Expand Down Expand Up @@ -100,3 +103,14 @@ func TestWatermarkDefaultMaxDriftValidation(t *testing.T) {
assert.NoError(t, c.Validate())
assert.Equal(t, time.Duration(9223372036854775807), c.Watermark.AllowedDrift)
}

func TestOrderConfigValidation(t *testing.T) {
factory := NewFactory()
c := factory.CreateDefaultConfig().(*Config)
c.Topic = "projects/project/topics/my-topic"
assert.NoError(t, c.Validate())
c.Ordering.Enabled = true
assert.Error(t, c.Validate())
c.Ordering.FromResourceAttribute = "key"
assert.NoError(t, c.Validate())
}
5 changes: 5 additions & 0 deletions exporter/googlecloudpubsubexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func createDefaultConfig() component.Config {
Behavior: "current",
AllowedDrift: 0,
},
Ordering: OrderingConfig{
Enabled: false,
FromResourceAttribute: "",
RemoveResourceAttribute: false,
},
}
}

Expand Down
4 changes: 4 additions & 0 deletions exporter/googlecloudpubsubexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ googlecloudpubsub/customname:
watermark:
behavior: earliest
allowed_drift: 1h
ordering:
enabled: true
from_resource_attribute: ordering_key
remove_resource_attribute: true

0 comments on commit 3df6e99

Please sign in to comment.