Skip to content

Commit

Permalink
implement pubsub exporter with ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinnoel-be committed Jan 30, 2025
1 parent 3df6e99 commit 37eac56
Show file tree
Hide file tree
Showing 4 changed files with 459 additions and 84 deletions.
4 changes: 4 additions & 0 deletions .chloggen/add-pubsub-ordering.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
change_type: enhancement
component: googlecloudpubsubexporter
note: Add support for exporting ordered messages to GCP Pub/Sub
issues: [32850]
232 changes: 191 additions & 41 deletions exporter/googlecloudpubsubexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,14 @@ func (ex *pubsubExporter) shutdown(_ context.Context) error {
return client.Close()
}

func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error {
if len(data) == 0 {
return nil
}

func (ex *pubsubExporter) getMessageAttributes(encoding encoding, watermark time.Time) (map[string]string, error) {
id, err := ex.makeUUID()
if err != nil {
return err
return nil, err
}
ceTime, err := watermark.MarshalText()
if err != nil {
return err
return nil, err
}

attributes := map[string]string{
Expand All @@ -118,21 +114,199 @@ func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding,
}
if ex.ceCompression == gZip {
attributes["content-encoding"] = "gzip"
data, err = ex.compress(data)
if err != nil {
}
return attributes, err
}

func (ex *pubsubExporter) consumeTraces(ctx context.Context, traces ptrace.Traces) error {
if !ex.config.Ordering.Enabled {
return ex.publishTraces(ctx, traces, "")
}

tracesByOrderingKey := map[string]ptrace.Traces{
"": ptrace.NewTraces(),
}
traces.ResourceSpans().RemoveIf(func(resourceSpans ptrace.ResourceSpans) bool {
orderingKey, found := resourceSpans.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute)
if !found {
return false
}

orderingKeyValue := orderingKey.AsString()
if _, exists := tracesByOrderingKey[orderingKeyValue]; !exists {
tracesByOrderingKey[orderingKeyValue] = ptrace.NewTraces()
}

if ex.config.Ordering.RemoveResourceAttribute {
_ = resourceSpans.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute)
}

resourceSpans.MoveTo(tracesByOrderingKey[orderingKeyValue].ResourceSpans().AppendEmpty())
return true
})

// No ordering key
if traces.SpanCount() > 0 {
traces.ResourceSpans().MoveAndAppendTo(tracesByOrderingKey[""].ResourceSpans())
}

for key, tracesForKey := range tracesByOrderingKey {
if err := ex.publishTraces(ctx, tracesForKey, key); err != nil {
return err
}
}
return nil
}

func (ex *pubsubExporter) publishTraces(ctx context.Context, tracesForKey ptrace.Traces, orderingKey string) error {
watermark := ex.tracesWatermarkFunc(tracesForKey, time.Now(), ex.config.Watermark.AllowedDrift).UTC()
attributes, attributesErr := ex.getMessageAttributes(otlpProtoTrace, watermark)
if attributesErr != nil {
return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr)
}

data, err := ex.tracesMarshaler.MarshalTraces(tracesForKey)
if err != nil {
return fmt.Errorf("error while marshaling traces: %w", err)
}

return ex.publishMessage(ctx, data, attributes, orderingKey)
}

func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
if !ex.config.Ordering.Enabled {
return ex.publishMetrics(ctx, metrics, "")
}

metricsByOrderingKey := map[string]pmetric.Metrics{
"": pmetric.NewMetrics(),
}
metrics.ResourceMetrics().RemoveIf(func(resourceMetrics pmetric.ResourceMetrics) bool {
orderingKey, found := resourceMetrics.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute)
if !found {
return false
}

orderingKeyValue := orderingKey.AsString()
if _, exists := metricsByOrderingKey[orderingKeyValue]; !exists {
metricsByOrderingKey[orderingKeyValue] = pmetric.NewMetrics()
}

if ex.config.Ordering.RemoveResourceAttribute {
_ = resourceMetrics.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute)
}

resourceMetrics.MoveTo(metricsByOrderingKey[orderingKeyValue].ResourceMetrics().AppendEmpty())
return true
})

// No ordering key
if metrics.DataPointCount() > 0 {
metrics.ResourceMetrics().MoveAndAppendTo(metricsByOrderingKey[""].ResourceMetrics())
}

for key, metricsForKey := range metricsByOrderingKey {
if err := ex.publishMetrics(ctx, metricsForKey, key); err != nil {
return err
}
}
return nil
}

func (ex *pubsubExporter) publishMetrics(ctx context.Context, metricsForKey pmetric.Metrics, orderingKey string) error {
watermark := ex.metricsWatermarkFunc(metricsForKey, time.Now(), ex.config.Watermark.AllowedDrift).UTC()
attributes, attributesErr := ex.getMessageAttributes(otlpProtoMetric, watermark)
if attributesErr != nil {
return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr)
}

data, err := ex.metricsMarshaler.MarshalMetrics(metricsForKey)
if err != nil {
return fmt.Errorf("error while marshaling metrics: %w", err)
}

return ex.publishMessage(ctx, data, attributes, orderingKey)
}

func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error {
if !ex.config.Ordering.Enabled {
return ex.publishLogs(ctx, logs, "")
}

logsByOrderingKey := map[string]plog.Logs{
"": plog.NewLogs(),
}
if ex.config.Ordering.Enabled {
logs.ResourceLogs().RemoveIf(func(resourceLogs plog.ResourceLogs) bool {
orderingKey, found := resourceLogs.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute)
if !found {
return false
}

orderingKeyValue := orderingKey.AsString()
if _, exists := logsByOrderingKey[orderingKeyValue]; !exists {
logsByOrderingKey[orderingKeyValue] = plog.NewLogs()
}

if ex.config.Ordering.RemoveResourceAttribute {
_ = resourceLogs.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute)
}

resourceLogs.MoveTo(logsByOrderingKey[orderingKeyValue].ResourceLogs().AppendEmpty())
return true
})
}

// No ordering key
if logs.LogRecordCount() > 0 {
logs.ResourceLogs().MoveAndAppendTo(logsByOrderingKey[""].ResourceLogs())
}

for key, logsForKey := range logsByOrderingKey {
if err := ex.publishLogs(ctx, logsForKey, key); err != nil {
return err
}
}
_, err = ex.client.Publish(ctx, &pubsubpb.PublishRequest{
return nil
}

func (ex *pubsubExporter) publishLogs(ctx context.Context, logs plog.Logs, orderingKey string) error {
watermark := ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC()
attributes, attributesErr := ex.getMessageAttributes(otlpProtoLog, watermark)
if attributesErr != nil {
return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr)
}

data, err := ex.logsMarshaler.MarshalLogs(logs)
if err != nil {
return fmt.Errorf("error while marshaling logs: %w", err)
}

return ex.publishMessage(ctx, data, attributes, orderingKey)
}

func (ex *pubsubExporter) publishMessage(ctx context.Context, data []byte, attributes map[string]string, orderingKey string) error {
if len(data) == 0 {
return nil
}

data, compressErr := ex.compress(data)
if compressErr != nil {
return fmt.Errorf("error while compressing pubsub message payload: %w", compressErr)
}

_, publishErr := ex.client.Publish(ctx, &pubsubpb.PublishRequest{
Topic: ex.config.Topic,
Messages: []*pubsubpb.PubsubMessage{
{
Attributes: attributes,
Data: data,
},
},
Messages: []*pubsubpb.PubsubMessage{{
Attributes: attributes,
OrderingKey: orderingKey,
Data: data,
}},
})
return err
if publishErr != nil {
return fmt.Errorf("failed to publish pubsub message for ordering key %q: %w", orderingKey, publishErr)
}
return nil
}

func (ex *pubsubExporter) compress(payload []byte) ([]byte, error) {
Expand All @@ -151,27 +325,3 @@ func (ex *pubsubExporter) compress(payload []byte) ([]byte, error) {
}
return payload, nil
}

func (ex *pubsubExporter) consumeTraces(ctx context.Context, traces ptrace.Traces) error {
buffer, err := ex.tracesMarshaler.MarshalTraces(traces)
if err != nil {
return err
}
return ex.publishMessage(ctx, otlpProtoTrace, buffer, ex.tracesWatermarkFunc(traces, time.Now(), ex.config.Watermark.AllowedDrift).UTC())
}

func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
buffer, err := ex.metricsMarshaler.MarshalMetrics(metrics)
if err != nil {
return err
}
return ex.publishMessage(ctx, otlpProtoMetric, buffer, ex.metricsWatermarkFunc(metrics, time.Now(), ex.config.Watermark.AllowedDrift).UTC())
}

func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error {
buffer, err := ex.logsMarshaler.MarshalLogs(logs)
if err != nil {
return err
}
return ex.publishMessage(ctx, otlpProtoLog, buffer, ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC())
}
Loading

0 comments on commit 37eac56

Please sign in to comment.