Skip to content

Commit

Permalink
Switch to the semantic conventions for 1.27
Browse files Browse the repository at this point in the history
  • Loading branch information
danschultzer committed Jan 9, 2025
1 parent a60f775 commit 81ac160
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 76 deletions.
67 changes: 54 additions & 13 deletions instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ defmodule OpentelemetryOban do

alias Ecto.Changeset
alias OpenTelemetry.Span
alias OpenTelemetry.SemanticConventions.Trace
alias OpenTelemetry.SemConv.Incubating.MessagingAttributes

require Trace
require OpenTelemetry.Tracer

@doc """
Expand Down Expand Up @@ -53,9 +52,9 @@ defmodule OpentelemetryOban do

def insert(name \\ Oban, %Changeset{} = changeset) do
attributes = attributes_before_insert(changeset)
worker = Changeset.get_field(changeset, :worker)
span_name = span_name(attributes)

OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do
changeset = add_tracing_information_to_meta(changeset)

case Oban.insert(name, changeset) do
Expand All @@ -75,9 +74,9 @@ defmodule OpentelemetryOban do

def insert!(name \\ Oban, %Changeset{} = changeset) do
attributes = attributes_before_insert(changeset)
worker = Changeset.get_field(changeset, :worker)
span_name = span_name(attributes)

OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
OpenTelemetry.Tracer.with_span span_name, attributes: attributes, kind: :producer do
changeset = add_tracing_information_to_meta(changeset)

try do
Expand All @@ -101,10 +100,10 @@ defmodule OpentelemetryOban do
end

def insert_all(name, changesets) when is_list(changesets) do
# changesets in insert_all can include different workers and different
# queues. This means we cannot provide much information here, but we can
# still record the insert and propagate the context information.
OpenTelemetry.Tracer.with_span :"Oban bulk insert", kind: :producer do
attributes = attributes_before_insert(changesets)
span_name = span_name(attributes)

OpenTelemetry.Tracer.with_span span_name, kind: :producer, attributes: attributes do
changesets = Enum.map(changesets, &add_tracing_information_to_meta/1)
Oban.insert_all(name, changesets)
end
Expand All @@ -125,23 +124,65 @@ defmodule OpentelemetryOban do
Changeset.change(changeset, %{meta: new_meta})
end

defp attributes_before_insert(changesets) when is_list(changesets) do
{queues, workers} =
Enum.reduce(changesets, {[], []}, fn changeset, {queues, workers} ->
queue = Changeset.get_field(changeset, :queue)
worker = Changeset.get_field(changeset, :worker)

{Enum.uniq([queue | queues]), Enum.uniq([worker | workers])}
end)

%{
unquote(MessagingAttributes.messaging_system()) => :oban,
unquote(MessagingAttributes.messaging_operation_name()) => :send,
unquote(MessagingAttributes.messaging_operation_type()) => unquote(MessagingAttributes.messaging_operation_type_values().publish),
}
# If the attribute value is the same for all messages in the batch, the instrumentation SHOULD set such attribute on the span representing the batch operation.
|> then(fn attributes ->
case queues do
[queue] -> Map.put(attributes, unquote(MessagingAttributes.messaging_consumer_group_name()), queue)
_ -> attributes
end
end)
|> then(fn attributes ->
case workers do
[worker] -> Map.put(attributes, unquote(MessagingAttributes.messaging_destination_name()), worker)
_ -> attributes
end
end)
end

defp attributes_before_insert(changeset) do
queue = Changeset.get_field(changeset, :queue)
worker = Changeset.get_field(changeset, :worker)

%{
Trace.messaging_system() => :oban,
Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue,
unquote(MessagingAttributes.messaging_system()) => :oban,
unquote(MessagingAttributes.messaging_consumer_group_name()) => queue,
unquote(MessagingAttributes.messaging_destination_name()) => worker,
unquote(MessagingAttributes.messaging_operation_name()) => :send,
unquote(MessagingAttributes.messaging_operation_type()) => unquote(MessagingAttributes.messaging_operation_type_values().publish),
:"oban.job.worker" => worker
}
end

defp attributes_after_insert(job) do
%{
unquote(MessagingAttributes.messaging_message_id()) => job.id,
"oban.job.job_id": job.id,
"oban.job.priority": job.priority,
"oban.job.max_attempts": job.max_attempts
}
end

# `messaging.destination.name` SHOULD be used when the destination is known to be neither temporary nor anonymous.
defp span_name(%{unquote(MessagingAttributes.messaging_operation_name()) => operation, unquote(MessagingAttributes.messaging_destination_name()) => destination}) do
"#{operation} #{destination}"
end

# If a corresponding `{destination}` value is not available for a specific operation, the instrumentation SHOULD omit the {destination}.
defp span_name(%{unquote(MessagingAttributes.messaging_operation_name()) => operation}) do
"#{operation}"
end
end
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule OpentelemetryOban.JobHandler do
alias OpenTelemetry.Span
alias OpenTelemetry.SemanticConventions.Trace

require Trace
alias OpenTelemetry.SemConv.Incubating.MessagingAttributes

@tracer_id __MODULE__

Expand Down Expand Up @@ -60,10 +58,12 @@ defmodule OpentelemetryOban.JobHandler do
OpenTelemetry.Tracer.set_current_span(:undefined)

attributes = %{
Trace.messaging_system() => :oban,
Trace.messaging_destination() => queue,
Trace.messaging_destination_kind() => :queue,
Trace.messaging_operation() => :process,
unquote(MessagingAttributes.messaging_system()) => :oban,
unquote(MessagingAttributes.messaging_destination_name()) => worker,
unquote(MessagingAttributes.messaging_consumer_group_name()) => queue,
unquote(MessagingAttributes.messaging_operation_name()) => :process,
unquote(MessagingAttributes.messaging_operation_type()) => :process,
unquote(MessagingAttributes.messaging_message_id()) => id,
:"oban.job.job_id" => id,
:"oban.job.worker" => worker,
:"oban.job.priority" => priority,
Expand All @@ -73,15 +73,17 @@ defmodule OpentelemetryOban.JobHandler do
:"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at)
}

span_name = "#{worker} process"

OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name(attributes), metadata, %{
kind: :consumer,
links: links,
attributes: attributes
})
end

defp span_name(%{unquote(MessagingAttributes.messaging_destination_name()) => destination_name, unquote(MessagingAttributes.messaging_operation_name()) => operation}) do
"#{operation} #{destination_name}"
end

def handle_job_stop(_event, _measurements, metadata, _config) do
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ defmodule OpentelemetryOban.PluginHandler do
end

def handle_plugin_start(_event, _measurements, %{plugin: plugin} = metadata, _config) do
attributes = %{"oban.plugin": plugin}
span_name = span_name(attributes)

OpentelemetryTelemetry.start_telemetry_span(
@tracer_id,
"#{plugin} process",
span_name,
metadata,
%{attributes: %{"oban.plugin": plugin}}
%{attributes: attributes}
)
end

defp span_name(%{"oban.plugin": plugin}), do: "oban.plugin #{inspect plugin}"

def handle_plugin_stop(_event, _measurements, metadata, _config) do
Tracer.set_attributes(end_span_plugin_attrs(metadata))
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do
%{plugin: Elixir.Oban.Plugins.Stager}
)

refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
refute_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")}
end

test "records span on plugin execution" do
Expand All @@ -61,7 +61,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do
%{plugin: Elixir.Oban.Plugins.Stager}
)

assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
assert_receive {:span, span(name: "oban.plugin Oban.Plugins.Stager")}
end

test "records span on plugin error" do
Expand Down Expand Up @@ -96,7 +96,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do

assert_receive {:span,
span(
name: "Elixir.Oban.Plugins.Stager process",
name: "oban.plugin Oban.Plugins.Stager",
events: events,
status: ^expected_status
)}
Expand Down Expand Up @@ -210,7 +210,7 @@ defmodule OpentelemetryOban.PluginHandlerTest do
end

defp receive_span_attrs(name) do
name = "#{name} process"
name = "oban.plugin #{inspect name}"

assert_receive(
{:span, span(name: ^name, attributes: attributes)},
Expand Down
Loading

0 comments on commit 81ac160

Please sign in to comment.