From 37eac56139c956cf39beecf2ed525f04e7a0b77f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kevin=20NO=C3=8BL?= Date: Mon, 27 Jan 2025 18:24:30 +0100 Subject: [PATCH] implement pubsub exporter with ordering --- .chloggen/add-pubsub-ordering.yaml | 4 + .../googlecloudpubsubexporter/exporter.go | 232 ++++++++++++++--- .../exporter_test.go | 233 ++++++++++++++++++ exporter/googlecloudpubsubexporter/factory.go | 74 +++--- 4 files changed, 459 insertions(+), 84 deletions(-) create mode 100644 .chloggen/add-pubsub-ordering.yaml diff --git a/.chloggen/add-pubsub-ordering.yaml b/.chloggen/add-pubsub-ordering.yaml new file mode 100644 index 000000000000..35e366472082 --- /dev/null +++ b/.chloggen/add-pubsub-ordering.yaml @@ -0,0 +1,4 @@ +change_type: enhancement +component: googlecloudpubsubexporter +note: Add support for exporting ordered messages to GCP Pub/Sub +issues: [32850] diff --git a/exporter/googlecloudpubsubexporter/exporter.go b/exporter/googlecloudpubsubexporter/exporter.go index f2b94d7cf4f3..f9cbe363cc16 100644 --- a/exporter/googlecloudpubsubexporter/exporter.go +++ b/exporter/googlecloudpubsubexporter/exporter.go @@ -85,18 +85,14 @@ func (ex *pubsubExporter) shutdown(_ context.Context) error { return client.Close() } -func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error { - if len(data) == 0 { - return nil - } - +func (ex *pubsubExporter) getMessageAttributes(encoding encoding, watermark time.Time) (map[string]string, error) { id, err := ex.makeUUID() if err != nil { - return err + return nil, err } ceTime, err := watermark.MarshalText() if err != nil { - return err + return nil, err } attributes := map[string]string{ @@ -118,21 +114,199 @@ func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, } if ex.ceCompression == gZip { attributes["content-encoding"] = "gzip" - data, err = ex.compress(data) - if err != nil { + } + return attributes, err +} + +func (ex *pubsubExporter) consumeTraces(ctx context.Context, traces ptrace.Traces) error { + if !ex.config.Ordering.Enabled { + return ex.publishTraces(ctx, traces, "") + } + + tracesByOrderingKey := map[string]ptrace.Traces{ + "": ptrace.NewTraces(), + } + traces.ResourceSpans().RemoveIf(func(resourceSpans ptrace.ResourceSpans) bool { + orderingKey, found := resourceSpans.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute) + if !found { + return false + } + + orderingKeyValue := orderingKey.AsString() + if _, exists := tracesByOrderingKey[orderingKeyValue]; !exists { + tracesByOrderingKey[orderingKeyValue] = ptrace.NewTraces() + } + + if ex.config.Ordering.RemoveResourceAttribute { + _ = resourceSpans.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute) + } + + resourceSpans.MoveTo(tracesByOrderingKey[orderingKeyValue].ResourceSpans().AppendEmpty()) + return true + }) + + // No ordering key + if traces.SpanCount() > 0 { + traces.ResourceSpans().MoveAndAppendTo(tracesByOrderingKey[""].ResourceSpans()) + } + + for key, tracesForKey := range tracesByOrderingKey { + if err := ex.publishTraces(ctx, tracesForKey, key); err != nil { + return err + } + } + return nil +} + +func (ex *pubsubExporter) publishTraces(ctx context.Context, tracesForKey ptrace.Traces, orderingKey string) error { + watermark := ex.tracesWatermarkFunc(tracesForKey, time.Now(), ex.config.Watermark.AllowedDrift).UTC() + attributes, attributesErr := ex.getMessageAttributes(otlpProtoTrace, watermark) + if attributesErr != nil { + return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr) + } + + data, err := ex.tracesMarshaler.MarshalTraces(tracesForKey) + if err != nil { + return fmt.Errorf("error while marshaling traces: %w", err) + } + + return ex.publishMessage(ctx, data, attributes, orderingKey) +} + +func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { + if !ex.config.Ordering.Enabled { + return ex.publishMetrics(ctx, metrics, "") + } + + metricsByOrderingKey := map[string]pmetric.Metrics{ + "": pmetric.NewMetrics(), + } + metrics.ResourceMetrics().RemoveIf(func(resourceMetrics pmetric.ResourceMetrics) bool { + orderingKey, found := resourceMetrics.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute) + if !found { + return false + } + + orderingKeyValue := orderingKey.AsString() + if _, exists := metricsByOrderingKey[orderingKeyValue]; !exists { + metricsByOrderingKey[orderingKeyValue] = pmetric.NewMetrics() + } + + if ex.config.Ordering.RemoveResourceAttribute { + _ = resourceMetrics.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute) + } + + resourceMetrics.MoveTo(metricsByOrderingKey[orderingKeyValue].ResourceMetrics().AppendEmpty()) + return true + }) + + // No ordering key + if metrics.DataPointCount() > 0 { + metrics.ResourceMetrics().MoveAndAppendTo(metricsByOrderingKey[""].ResourceMetrics()) + } + + for key, metricsForKey := range metricsByOrderingKey { + if err := ex.publishMetrics(ctx, metricsForKey, key); err != nil { + return err + } + } + return nil +} + +func (ex *pubsubExporter) publishMetrics(ctx context.Context, metricsForKey pmetric.Metrics, orderingKey string) error { + watermark := ex.metricsWatermarkFunc(metricsForKey, time.Now(), ex.config.Watermark.AllowedDrift).UTC() + attributes, attributesErr := ex.getMessageAttributes(otlpProtoMetric, watermark) + if attributesErr != nil { + return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr) + } + + data, err := ex.metricsMarshaler.MarshalMetrics(metricsForKey) + if err != nil { + return fmt.Errorf("error while marshaling metrics: %w", err) + } + + return ex.publishMessage(ctx, data, attributes, orderingKey) +} + +func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error { + if !ex.config.Ordering.Enabled { + return ex.publishLogs(ctx, logs, "") + } + + logsByOrderingKey := map[string]plog.Logs{ + "": plog.NewLogs(), + } + if ex.config.Ordering.Enabled { + logs.ResourceLogs().RemoveIf(func(resourceLogs plog.ResourceLogs) bool { + orderingKey, found := resourceLogs.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute) + if !found { + return false + } + + orderingKeyValue := orderingKey.AsString() + if _, exists := logsByOrderingKey[orderingKeyValue]; !exists { + logsByOrderingKey[orderingKeyValue] = plog.NewLogs() + } + + if ex.config.Ordering.RemoveResourceAttribute { + _ = resourceLogs.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute) + } + + resourceLogs.MoveTo(logsByOrderingKey[orderingKeyValue].ResourceLogs().AppendEmpty()) + return true + }) + } + + // No ordering key + if logs.LogRecordCount() > 0 { + logs.ResourceLogs().MoveAndAppendTo(logsByOrderingKey[""].ResourceLogs()) + } + + for key, logsForKey := range logsByOrderingKey { + if err := ex.publishLogs(ctx, logsForKey, key); err != nil { return err } } - _, err = ex.client.Publish(ctx, &pubsubpb.PublishRequest{ + return nil +} + +func (ex *pubsubExporter) publishLogs(ctx context.Context, logs plog.Logs, orderingKey string) error { + watermark := ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC() + attributes, attributesErr := ex.getMessageAttributes(otlpProtoLog, watermark) + if attributesErr != nil { + return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr) + } + + data, err := ex.logsMarshaler.MarshalLogs(logs) + if err != nil { + return fmt.Errorf("error while marshaling logs: %w", err) + } + + return ex.publishMessage(ctx, data, attributes, orderingKey) +} + +func (ex *pubsubExporter) publishMessage(ctx context.Context, data []byte, attributes map[string]string, orderingKey string) error { + if len(data) == 0 { + return nil + } + + data, compressErr := ex.compress(data) + if compressErr != nil { + return fmt.Errorf("error while compressing pubsub message payload: %w", compressErr) + } + + _, publishErr := ex.client.Publish(ctx, &pubsubpb.PublishRequest{ Topic: ex.config.Topic, - Messages: []*pubsubpb.PubsubMessage{ - { - Attributes: attributes, - Data: data, - }, - }, + Messages: []*pubsubpb.PubsubMessage{{ + Attributes: attributes, + OrderingKey: orderingKey, + Data: data, + }}, }) - return err + if publishErr != nil { + return fmt.Errorf("failed to publish pubsub message for ordering key %q: %w", orderingKey, publishErr) + } + return nil } func (ex *pubsubExporter) compress(payload []byte) ([]byte, error) { @@ -151,27 +325,3 @@ func (ex *pubsubExporter) compress(payload []byte) ([]byte, error) { } return payload, nil } - -func (ex *pubsubExporter) consumeTraces(ctx context.Context, traces ptrace.Traces) error { - buffer, err := ex.tracesMarshaler.MarshalTraces(traces) - if err != nil { - return err - } - return ex.publishMessage(ctx, otlpProtoTrace, buffer, ex.tracesWatermarkFunc(traces, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) -} - -func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { - buffer, err := ex.metricsMarshaler.MarshalMetrics(metrics) - if err != nil { - return err - } - return ex.publishMessage(ctx, otlpProtoMetric, buffer, ex.metricsWatermarkFunc(metrics, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) -} - -func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error { - buffer, err := ex.logsMarshaler.MarshalLogs(logs) - if err != nil { - return err - } - return ex.publishMessage(ctx, otlpProtoLog, buffer, ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) -} diff --git a/exporter/googlecloudpubsubexporter/exporter_test.go b/exporter/googlecloudpubsubexporter/exporter_test.go index 829628f391ce..4cfb656ea334 100644 --- a/exporter/googlecloudpubsubexporter/exporter_test.go +++ b/exporter/googlecloudpubsubexporter/exporter_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "testing" + "time" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/google/uuid" @@ -26,6 +27,81 @@ const ( defaultTopic = "projects/my-project/topics/otlp" ) +func TestGetMessageAttributes(t *testing.T) { + date := time.Date(2021, time.January, 1, 2, 3, 4, 5, time.UTC) + + t.Run("logs", func(t *testing.T) { + exporter, _ := newTestExporter(t) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoLog, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.logs.v1", + "content-type": "application/protobuf", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) + + t.Run("metrics", func(t *testing.T) { + exporter, _ := newTestExporter(t) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoMetric, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.metrics.v1", + "content-type": "application/protobuf", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) + + t.Run("traces", func(t *testing.T) { + exporter, _ := newTestExporter(t) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoTrace, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.traces.v1", + "content-type": "application/protobuf", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) + + t.Run("logs with compression", func(t *testing.T) { + exporter, _ := newTestExporter(t, func(cfg *Config) { + cfg.Compression = "gzip" + }) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoLog, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.logs.v1", + "content-type": "application/protobuf", + "content-encoding": "gzip", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) +} + func TestExporterNoData(t *testing.T) { exporter, publisher := newTestExporter(t, func(config *Config) { config.Watermark.Behavior = "earliest" @@ -199,6 +275,163 @@ func TestExporterSimpleDataWithCompression(t *testing.T) { }) } +func TestExporterWithOrdering(t *testing.T) { + const orderingKey = "ordering.key" + withOrdering := func(cfg *Config) { + cfg.Ordering.Enabled = true + cfg.Ordering.FromResourceAttribute = orderingKey + cfg.Ordering.RemoveResourceAttribute = true + } + + ctx := context.Background() + + t.Run("logs", func(t *testing.T) { + exporter, publisher := newTestExporter(t, withOrdering) + + logs := plog.NewLogs() + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 2") + } + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.Resource().Attributes().PutStr(orderingKey, "") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 2") + } + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.Resource().Attributes().PutStr(orderingKey, "value 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 1 with ordering key 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 2 with ordering key 1") + } + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.Resource().Attributes().PutStr(orderingKey, "value 2") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 1 with ordering key 2") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 2 with ordering key 2") + } + + require.NoError(t, exporter.consumeLogs(ctx, logs)) + require.Len(t, publisher.requests, 3, "one publish call per ordering key should've been made") + + var orderingKeyValues []string + for _, request := range publisher.requests { + assert.Equal(t, defaultTopic, request.Topic) + assert.Len(t, request.Messages, 1) + + for _, msg := range request.Messages { + orderingKeyValues = append(orderingKeyValues, msg.OrderingKey) + + assert.NotEmpty(t, msg.Data) + assert.NotEmpty(t, msg.Attributes) + } + } + assert.ElementsMatch(t, orderingKeyValues, []string{"", "value 1", "value 2"}) + }) + + t.Run("metrics", func(t *testing.T) { + exporter, publisher := newTestExporter(t, withOrdering) + + metrics := pmetric.NewMetrics() + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + resourceMetrics.Resource().Attributes().PutStr(orderingKey, "") + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + resourceMetrics.Resource().Attributes().PutStr(orderingKey, "value 1") + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + resourceMetrics.Resource().Attributes().PutStr(orderingKey, "value 2") + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + + require.NoError(t, exporter.consumeMetrics(ctx, metrics)) + require.Len(t, publisher.requests, 3, "one publish call per ordering key should've been made") + + var orderingKeyValues []string + for _, request := range publisher.requests { + assert.Equal(t, defaultTopic, request.Topic) + assert.Len(t, request.Messages, 1) + + for _, msg := range request.Messages { + orderingKeyValues = append(orderingKeyValues, msg.OrderingKey) + + assert.NotEmpty(t, msg.Data) + assert.NotEmpty(t, msg.Attributes) + } + } + assert.ElementsMatch(t, orderingKeyValues, []string{"", "value 1", "value 2"}) + }) + + t.Run("traces", func(t *testing.T) { + exporter, publisher := newTestExporter(t, withOrdering) + + traces := ptrace.NewTraces() + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 1") + } + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + resourceSpans.Resource().Attributes().PutStr(orderingKey, "") + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 1") + } + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + resourceSpans.Resource().Attributes().PutStr(orderingKey, "value 1") + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 2") + } + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + resourceSpans.Resource().Attributes().PutStr(orderingKey, "value 2") + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 3") + } + + require.NoError(t, exporter.consumeTraces(ctx, traces)) + require.Len(t, publisher.requests, 3, "one publish call per ordering key should've been made") + + var orderingKeyValues []string + for _, request := range publisher.requests { + assert.Equal(t, defaultTopic, request.Topic) + assert.Len(t, request.Messages, 1) + + for _, msg := range request.Messages { + orderingKeyValues = append(orderingKeyValues, msg.OrderingKey) + + assert.NotEmpty(t, msg.Data) + assert.NotEmpty(t, msg.Attributes) + } + } + assert.ElementsMatch(t, orderingKeyValues, []string{"", "value 1", "value 2"}) + }) +} + // Helpers func newTestExporter(t *testing.T, options ...func(*Config)) (*pubsubExporter, *mockPublisher) { diff --git a/exporter/googlecloudpubsubexporter/factory.go b/exporter/googlecloudpubsubexporter/factory.go index ec723119cdfa..16a2112e9680 100644 --- a/exporter/googlecloudpubsubexporter/factory.go +++ b/exporter/googlecloudpubsubexporter/factory.go @@ -40,11 +40,11 @@ func NewFactory() exporter.Factory { var exporters = map[*Config]*pubsubExporter{} func ensureExporter(params exporter.Settings, pCfg *Config) *pubsubExporter { - receiver := exporters[pCfg] - if receiver != nil { - return receiver + exp := exporters[pCfg] + if exp != nil { + return exp } - receiver = &pubsubExporter{ + exp = &pubsubExporter{ logger: params.Logger, userAgent: strings.ReplaceAll(pCfg.UserAgent, "{{version}}", params.BuildInfo.Version), ceSource: fmt.Sprintf("/opentelemetry/collector/%s/%s", metadata.Type.String(), params.BuildInfo.Version), @@ -56,20 +56,20 @@ func ensureExporter(params exporter.Settings, pCfg *Config) *pubsubExporter { makeClient: newPublisherClient, } // we ignore the error here as the config is already validated with the same method - receiver.ceCompression, _ = pCfg.parseCompression() + exp.ceCompression, _ = pCfg.parseCompression() watermarkBehavior, _ := pCfg.Watermark.parseWatermarkBehavior() switch watermarkBehavior { case earliest: - receiver.tracesWatermarkFunc = earliestTracesWatermark - receiver.metricsWatermarkFunc = earliestMetricsWatermark - receiver.logsWatermarkFunc = earliestLogsWatermark + exp.tracesWatermarkFunc = earliestTracesWatermark + exp.metricsWatermarkFunc = earliestMetricsWatermark + exp.logsWatermarkFunc = earliestLogsWatermark case current: - receiver.tracesWatermarkFunc = currentTracesWatermark - receiver.metricsWatermarkFunc = currentMetricsWatermark - receiver.logsWatermarkFunc = currentLogsWatermark + exp.tracesWatermarkFunc = currentTracesWatermark + exp.metricsWatermarkFunc = currentMetricsWatermark + exp.logsWatermarkFunc = currentLogsWatermark } - exporters[pCfg] = receiver - return receiver + exporters[pCfg] = exp + return exp } // createDefaultConfig creates the default configuration for exporter. @@ -89,67 +89,55 @@ func createDefaultConfig() component.Config { } } -func createTracesExporter( - ctx context.Context, - set exporter.Settings, - cfg component.Config, -) (exporter.Traces, error) { +func createTracesExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Traces, error) { pCfg := cfg.(*Config) - pubsubExporter := ensureExporter(set, pCfg) + exp := ensureExporter(set, pCfg) return exporterhelper.NewTraces( ctx, set, cfg, - pubsubExporter.consumeTraces, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exp.consumeTraces, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithTimeout(pCfg.TimeoutSettings), exporterhelper.WithRetry(pCfg.BackOffConfig), exporterhelper.WithQueue(pCfg.QueueSettings), - exporterhelper.WithStart(pubsubExporter.start), - exporterhelper.WithShutdown(pubsubExporter.shutdown), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown), ) } -func createMetricsExporter( - ctx context.Context, - set exporter.Settings, - cfg component.Config, -) (exporter.Metrics, error) { +func createMetricsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Metrics, error) { pCfg := cfg.(*Config) - pubsubExporter := ensureExporter(set, pCfg) + exp := ensureExporter(set, pCfg) return exporterhelper.NewMetrics( ctx, set, cfg, - pubsubExporter.consumeMetrics, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exp.consumeMetrics, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithTimeout(pCfg.TimeoutSettings), exporterhelper.WithRetry(pCfg.BackOffConfig), exporterhelper.WithQueue(pCfg.QueueSettings), - exporterhelper.WithStart(pubsubExporter.start), - exporterhelper.WithShutdown(pubsubExporter.shutdown), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown), ) } -func createLogsExporter( - ctx context.Context, - set exporter.Settings, - cfg component.Config, -) (exporter.Logs, error) { +func createLogsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Logs, error) { pCfg := cfg.(*Config) - pubsubExporter := ensureExporter(set, pCfg) + exp := ensureExporter(set, pCfg) return exporterhelper.NewLogs( ctx, set, cfg, - pubsubExporter.consumeLogs, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exp.consumeLogs, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithTimeout(pCfg.TimeoutSettings), exporterhelper.WithRetry(pCfg.BackOffConfig), exporterhelper.WithQueue(pCfg.QueueSettings), - exporterhelper.WithStart(pubsubExporter.start), - exporterhelper.WithShutdown(pubsubExporter.shutdown), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown), ) }