-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[receiver/googlecloudpubsub] Add support for encoding extensions (#37109) #37137
[receiver/googlecloudpubsub] Add support for encoding extensions (#37109) #37137
Conversation
8952521
to
fa9e362
Compare
…n-telemetry#37109) Added support for encoding extensions. Setting the encoding field in the config now references the extension. If it didn't find the extension it will fall back to searching the internal encoders. To make the build in encoders consistent with the extensions they now have the same interface. The README is adapted accordingly.
fa9e362
to
29ffa42
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, need a codeowner review.
I am the codeowner (sorry, should have stated that in the PR) |
spiderman meme here |
if receiver.config.Encoding != "" { | ||
if receiver.consumerCount() > 1 { | ||
return errors.New("cannot start receiver: multiple consumers were attached, but encoding was specified") | ||
} | ||
encodingID := convertEncoding(receiver.config.Encoding) | ||
if encodingID == unknown { | ||
extensionID := component.ID{} | ||
err := extensionID.UnmarshalText([]byte(receiver.config.Encoding)) | ||
if err != nil { | ||
return errors.New("cannot start receiver: neither a build in encoder, or an extension") | ||
} | ||
extensions := host.GetExtensions() | ||
if extension, ok := extensions[extensionID]; ok { | ||
if receiver.tracesConsumer != nil { | ||
receiver.tracesUnmarshaler, ok = extension.(encoding.TracesUnmarshalerExtension) | ||
if !ok { | ||
return fmt.Errorf("cannot start receiver: extension %q is not a trace unmarshaler", extensionID) | ||
} | ||
} | ||
if receiver.logsConsumer != nil { | ||
receiver.logsUnmarshaler, ok = extension.(encoding.LogsUnmarshalerExtension) | ||
if !ok { | ||
return fmt.Errorf("cannot start receiver: extension %q is not a logs unmarshaler", extensionID) | ||
} | ||
} | ||
if receiver.metricsConsumer != nil { | ||
receiver.metricsUnmarshaler, ok = extension.(encoding.MetricsUnmarshalerExtension) | ||
if !ok { | ||
return fmt.Errorf("cannot start receiver: extension %q is not a metrics unmarshaler", extensionID) | ||
} | ||
} | ||
} else { | ||
return fmt.Errorf("cannot start receiver: extension %q not found", extensionID) | ||
} | ||
} else { | ||
if receiver.tracesConsumer != nil { | ||
switch encodingID { | ||
case otlpProtoTrace: | ||
receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} | ||
default: | ||
return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for traces", receiver.config.Encoding) | ||
} | ||
} | ||
if receiver.logsConsumer != nil { | ||
switch encodingID { | ||
case otlpProtoLog: | ||
receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} | ||
case rawTextLog: | ||
receiver.logsUnmarshaler = unmarshalLogStrings{} | ||
case cloudLogging: | ||
receiver.logsUnmarshaler = unmarshalCloudLoggingLogEntry{} | ||
default: | ||
return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for logs", receiver.config.Encoding) | ||
} | ||
} | ||
if receiver.metricsConsumer != nil { | ||
switch encodingID { | ||
case otlpProtoMetric: | ||
receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} | ||
default: | ||
return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for metrics", receiver.config.Encoding) | ||
} | ||
} | ||
} | ||
createHandlerFn = receiver.createReceiverHandler | ||
} else { | ||
// we will rely on the attributes of the message to determine the signal, so we need all proto unmarshalers | ||
receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} | ||
receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} | ||
receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} | ||
createHandlerFn = receiver.createMultiplexingReceiverHandler | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of branching conditions here, can you break it up into some helper functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed, update... but I have the impression there is something wrong with the CI/CD. Waiting until that is resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TylerHelmuth I've split it into two functions (mainly grouping built-in and external encoders).
98cca94
to
7512333
Compare
06d3e3c
to
c9f46c8
Compare
Re-merged to fix the merge conflict. |
Description
Added support for encoding extensions. Setting the encoding field in the config now references the extension. If it didn't find the extension, it will fall back to searching the internal encoders.
To make the build in encoders consistent with the extensions, they now have the same interface.
Link to tracking issue
Fixes #37109
Testing
Documentation
Reworked the encoding section of the README, with an example of a text_encoding extension