diff --git a/CHANGELOG.md b/CHANGELOG.md index 72c372ecb9..80da336aee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3161](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3161)) - `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock InvokeModel API ([#3200](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3200)) +- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock ConverseStream API + ([#3204](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3204)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-botocore/examples/bedrock-runtime/zero-code/README.rst b/instrumentation/opentelemetry-instrumentation-botocore/examples/bedrock-runtime/zero-code/README.rst index 37e1db9b30..cdd678c765 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/examples/bedrock-runtime/zero-code/README.rst +++ b/instrumentation/opentelemetry-instrumentation-botocore/examples/bedrock-runtime/zero-code/README.rst @@ -18,6 +18,8 @@ Available examples ------------------ - `converse.py` uses `bedrock-runtime` `Converse API _`. +- `converse_stream.py` uses `bedrock-runtime` `ConverseStream API _`. +- `invoke_model.py` uses `bedrock-runtime` `InvokeModel API _`. Setup ----- diff --git a/instrumentation/opentelemetry-instrumentation-botocore/examples/bedrock-runtime/zero-code/converse_stream.py b/instrumentation/opentelemetry-instrumentation-botocore/examples/bedrock-runtime/zero-code/converse_stream.py new file mode 100644 index 0000000000..6bc0b33fdf --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/examples/bedrock-runtime/zero-code/converse_stream.py @@ -0,0 +1,26 @@ +import os + +import boto3 + + +def main(): + client = boto3.client("bedrock-runtime") + stream = client.converse_stream( + modelId=os.getenv("CHAT_MODEL", "amazon.titan-text-lite-v1"), + messages=[ + { + "role": "user", + "content": [{"text": "Write a short poem on OpenTelemetry."}], + }, + ], + ) + + response = "" + for event in stream["stream"]: + if "contentBlockDelta" in event: + response += event["contentBlockDelta"]["delta"]["text"] + print(response) + + +if __name__ == "__main__": + main() diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index 0481b248aa..b5598a3cf7 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -188,11 +188,15 @@ def _patched_api_call(self, original_func, instance, args, kwargs): } _safe_invoke(extension.extract_attributes, attributes) + end_span_on_exit = extension.should_end_span_on_exit() with self._tracer.start_as_current_span( call_context.span_name, kind=call_context.span_kind, attributes=attributes, + # tracing streaming services require to close the span manually + # at a later time after the stream has been consumed + end_on_exit=end_span_on_exit, ) as span: _safe_invoke(extension.before_service_call, span) self._call_request_hook(span, call_context) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py index 66021d34ff..fb664bb1e4 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py @@ -23,8 +23,12 @@ import logging from typing import Any +from botocore.eventstream import EventStream from botocore.response import StreamingBody +from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import ( + ConverseStreamWrapper, +) from opentelemetry.instrumentation.botocore.extensions.types import ( _AttributeMapT, _AwsSdkExtension, @@ -62,7 +66,14 @@ class _BedrockRuntimeExtension(_AwsSdkExtension): Amazon Bedrock Runtime. """ - _HANDLED_OPERATIONS = {"Converse", "InvokeModel"} + _HANDLED_OPERATIONS = {"Converse", "ConverseStream", "InvokeModel"} + _DONT_CLOSE_SPAN_ON_END_OPERATIONS = {"ConverseStream"} + + def should_end_span_on_exit(self): + return ( + self._call_context.operation + not in self._DONT_CLOSE_SPAN_ON_END_OPERATIONS + ) def extract_attributes(self, attributes: _AttributeMapT): if self._call_context.operation not in self._HANDLED_OPERATIONS: @@ -77,7 +88,7 @@ def extract_attributes(self, attributes: _AttributeMapT): GenAiOperationNameValues.CHAT.value ) - # Converse + # Converse / ConverseStream if inference_config := self._call_context.params.get( "inferenceConfig" ): @@ -251,6 +262,20 @@ def on_success(self, span: Span, result: dict[str, Any]): return if not span.is_recording(): + if not self.should_end_span_on_exit(): + span.end() + return + + # ConverseStream + if "stream" in result and isinstance(result["stream"], EventStream): + + def stream_done_callback(response): + self._converse_on_success(span, response) + span.end() + + result["stream"] = ConverseStreamWrapper( + result["stream"], stream_done_callback + ) return # Converse @@ -328,3 +353,6 @@ def on_error(self, span: Span, exception: _BotoClientErrorT): span.set_status(Status(StatusCode.ERROR, str(exception))) if span.is_recording(): span.set_attribute(ERROR_TYPE, type(exception).__qualname__) + + if not self.should_end_span_on_exit(): + span.end() diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py new file mode 100644 index 0000000000..55d90a2b9f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py @@ -0,0 +1,74 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Includes work from: +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from botocore.eventstream import EventStream +from wrapt import ObjectProxy + + +# pylint: disable=abstract-method +class ConverseStreamWrapper(ObjectProxy): + """Wrapper for botocore.eventstream.EventStream""" + + def __init__( + self, + stream: EventStream, + stream_done_callback, + ): + super().__init__(stream) + + self._stream_done_callback = stream_done_callback + # accumulating things in the same shape of non-streaming version + # {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"} + self._response = {} + + def __iter__(self): + for event in self.__wrapped__: + self._process_event(event) + yield event + + def _process_event(self, event): + if "messageStart" in event: + # {'messageStart': {'role': 'assistant'}} + pass + + if "contentBlockDelta" in event: + # {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}} + pass + + if "contentBlockStop" in event: + # {'contentBlockStop': {'contentBlockIndex': 0}} + pass + + if "messageStop" in event: + # {'messageStop': {'stopReason': 'end_turn'}} + if stop_reason := event["messageStop"].get("stopReason"): + self._response["stopReason"] = stop_reason + + if "metadata" in event: + # {'metadata': {'usage': {'inputTokens': 12, 'outputTokens': 15, 'totalTokens': 27}, 'metrics': {'latencyMs': 2980}}} + if usage := event["metadata"].get("usage"): + self._response["usage"] = {} + if input_tokens := usage.get("inputTokens"): + self._response["usage"]["inputTokens"] = input_tokens + + if output_tokens := usage.get("outputTokens"): + self._response["usage"]["outputTokens"] = output_tokens + + self._stream_done_callback(self._response) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py index a3c73af65c..2927c67e93 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/types.py @@ -101,6 +101,14 @@ def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use """ return True + def should_end_span_on_exit(self) -> bool: # pylint:disable=no-self-use + """Returns if the span should be closed automatically on exit + + Extensions might override this function to disable automatic closing + of the span if they need to close it at a later time themselves. + """ + return True + def extract_attributes(self, attributes: _AttributeMapT): """Callback which gets invoked before the span is created. diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py index 460d3a4fb5..1467817e2e 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py @@ -91,7 +91,7 @@ def assert_completion_attributes_from_streaming_body( ) -def assert_completion_attributes( +def assert_converse_completion_attributes( span: ReadableSpan, request_model: str, response: dict[str, Any] | None, @@ -128,6 +128,34 @@ def assert_completion_attributes( ) +def assert_converse_stream_completion_attributes( + span: ReadableSpan, + request_model: str, + input_tokens: int | None = None, + output_tokens: int | None = None, + finish_reason: tuple[str] | None = None, + operation_name: str = "chat", + request_top_p: int | None = None, + request_temperature: int | None = None, + request_max_tokens: int | None = None, + request_stop_sequences: list[str] | None = None, +): + return assert_all_attributes( + span, + request_model, + input_tokens, + output_tokens, + finish_reason, + operation_name, + request_top_p, + request_temperature, + request_max_tokens, + tuple(request_stop_sequences) + if request_stop_sequences is not None + else request_stop_sequences, + ) + + def assert_equal_or_not_present(value, attribute_name, span): if value is not None: assert value == span.attributes[attribute_name] diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_with_content.yaml b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_with_content.yaml new file mode 100644 index 0000000000..96976f1e7c --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_with_content.yaml @@ -0,0 +1,69 @@ +interactions: +- request: + body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}], + "inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences": + ["|"]}}' + headers: + Content-Length: + - '170' + Content-Type: + - !!binary | + YXBwbGljYXRpb24vanNvbg== + User-Agent: + - !!binary | + Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x + MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0 + aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2 + X-Amz-Date: + - !!binary | + MjAyNTAxMjNUMDk1MTU2Wg== + X-Amz-Security-Token: + - test_aws_security_token + X-Amzn-Trace-Id: + - !!binary | + Um9vdD0xLTA0YmY4MjVjLTAxMTY5NjdhYWM1NmIxM2RlMDI1N2QwMjtQYXJlbnQ9MDdkM2U3N2Rl + OGFjMzJhNDtTYW1wbGVkPTE= + amz-sdk-invocation-id: + - !!binary | + ZGQ1MTZiNTEtOGU1Yi00NGYyLTk5MzMtZjAwYzBiOGFkYWYw + amz-sdk-request: + - !!binary | + YXR0ZW1wdD0x + authorization: + - Bearer test_aws_authorization + method: POST + uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream + response: + body: + string: !!binary | + AAAAlAAAAFLEwW5hCzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh + cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u + b3BxcnN0dXZ3Iiwicm9sZSI6ImFzc2lzdGFudCJ9P+wfRAAAAMQAAABXjLhVJQs6ZXZlbnQtdHlw + ZQcAEWNvbnRlbnRCbG9ja0RlbHRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTpt + ZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsImRlbHRhIjp7InRleHQi + OiJIaSEgSG93IGNhbiBJIGhlbHAgeW91In0sInAiOiJhYmNkZWZnaGlqa2xtbm9wcXJzdHUifeBJ + 9mIAAACJAAAAVlvc+UsLOmV2ZW50LXR5cGUHABBjb250ZW50QmxvY2tTdG9wDTpjb250ZW50LXR5 + cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2Nr + SW5kZXgiOjAsInAiOiJhYmNkZSJ95xzwrwAAAKcAAABRu0n9jQs6ZXZlbnQtdHlwZQcAC21lc3Nh + Z2VTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl + dmVudHsicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6QUJDREVGR0hJSiIsInN0b3BSZWFz + b24iOiJtYXhfdG9rZW5zIn1LR3pNAAAAygAAAE5X40OECzpldmVudC10eXBlBwAIbWV0YWRhdGEN + OmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJt + ZXRyaWNzIjp7ImxhdGVuY3lNcyI6NjA4fSwicCI6ImFiY2RlZmdoaWprIiwidXNhZ2UiOnsiaW5w + dXRUb2tlbnMiOjgsIm91dHB1dFRva2VucyI6MTAsInRvdGFsVG9rZW5zIjoxOH19iiQr+w== + headers: + Connection: + - keep-alive + Content-Type: + - application/vnd.amazon.eventstream + Date: + - Thu, 23 Jan 2025 09:51:56 GMT + Set-Cookie: test_set_cookie + Transfer-Encoding: + - chunked + x-amzn-RequestId: + - 2b74a5d3-615a-4f81-b00f-f0b10a618e23 + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_with_invalid_model.yaml b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_with_invalid_model.yaml new file mode 100644 index 0000000000..59929a1bc7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_with_invalid_model.yaml @@ -0,0 +1,54 @@ +interactions: +- request: + body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}]}' + headers: + Content-Length: + - '77' + Content-Type: + - !!binary | + YXBwbGljYXRpb24vanNvbg== + User-Agent: + - !!binary | + Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x + MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0 + aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2 + X-Amz-Date: + - !!binary | + MjAyNTAxMjNUMDk1MTU3Wg== + X-Amz-Security-Token: + - test_aws_security_token + X-Amzn-Trace-Id: + - !!binary | + Um9vdD0xLTI5NzA1OTZhLTEyZWI5NDk2ODA1ZjZhYzE5YmU3ODM2NztQYXJlbnQ9Y2M0OTA0YWE2 + ZjQ2NmYxYTtTYW1wbGVkPTE= + amz-sdk-invocation-id: + - !!binary | + MjQzZWY2ZDgtNGJhNy00YTVlLWI0MGEtYThiNDE2ZDIzYjhk + amz-sdk-request: + - !!binary | + YXR0ZW1wdD0x + authorization: + - Bearer test_aws_authorization + method: POST + uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/does-not-exist/converse-stream + response: + body: + string: '{"message":"The provided model identifier is invalid."}' + headers: + Connection: + - keep-alive + Content-Length: + - '55' + Content-Type: + - application/json + Date: + - Thu, 23 Jan 2025 09:51:57 GMT + Set-Cookie: test_set_cookie + x-amzn-ErrorType: + - ValidationException:http://internal.amazon.com/coral/com.amazon.bedrock/ + x-amzn-RequestId: + - 358b122c-d045-4d8f-a5bb-b0bd8cf6ee59 + status: + code: 400 + message: Bad Request +version: 1 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py index 9ee625eb3e..ce3b4375e9 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py @@ -25,8 +25,9 @@ from opentelemetry.trace.status import StatusCode from .bedrock_utils import ( - assert_completion_attributes, assert_completion_attributes_from_streaming_body, + assert_converse_completion_attributes, + assert_converse_stream_completion_attributes, ) BOTO3_VERSION = tuple(int(x) for x in boto3.__version__.split(".")) @@ -58,7 +59,7 @@ def test_converse_with_content( ) (span,) = span_exporter.get_finished_spans() - assert_completion_attributes( + assert_converse_completion_attributes( span, llm_model_value, response, @@ -93,7 +94,7 @@ def test_converse_with_invalid_model( ) (span,) = span_exporter.get_finished_spans() - assert_completion_attributes( + assert_converse_completion_attributes( span, llm_model_value, None, @@ -107,6 +108,99 @@ def test_converse_with_invalid_model( assert len(logs) == 0 +@pytest.mark.skipif( + BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available" +) +@pytest.mark.vcr() +def test_converse_stream_with_content( + span_exporter, + log_exporter, + bedrock_runtime_client, + instrument_with_content, +): + # pylint:disable=too-many-locals + messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}] + + llm_model_value = "amazon.titan-text-lite-v1" + max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"] + response = bedrock_runtime_client.converse_stream( + messages=messages, + modelId=llm_model_value, + inferenceConfig={ + "maxTokens": max_tokens, + "temperature": temperature, + "topP": top_p, + "stopSequences": stop_sequences, + }, + ) + + # consume the stream in order to have it traced + finish_reason = None + input_tokens, output_tokens = None, None + text = "" + for event in response["stream"]: + if "contentBlockDelta" in event: + text += event["contentBlockDelta"]["delta"]["text"] + if "messageStop" in event: + finish_reason = (event["messageStop"]["stopReason"],) + if "metadata" in event: + usage = event["metadata"]["usage"] + input_tokens = usage["inputTokens"] + output_tokens = usage["outputTokens"] + + assert text + + (span,) = span_exporter.get_finished_spans() + assert_converse_stream_completion_attributes( + span, + llm_model_value, + input_tokens, + output_tokens, + finish_reason, + "chat", + top_p, + temperature, + max_tokens, + stop_sequences, + ) + + logs = log_exporter.get_finished_logs() + assert len(logs) == 0 + + +@pytest.mark.skipif( + BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available" +) +@pytest.mark.vcr() +def test_converse_stream_with_invalid_model( + span_exporter, + log_exporter, + bedrock_runtime_client, + instrument_with_content, +): + messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}] + + llm_model_value = "does-not-exist" + with pytest.raises(bedrock_runtime_client.exceptions.ValidationException): + bedrock_runtime_client.converse_stream( + messages=messages, + modelId=llm_model_value, + ) + + (span,) = span_exporter.get_finished_spans() + assert_converse_stream_completion_attributes( + span, + llm_model_value, + operation_name="chat", + ) + + assert span.status.status_code == StatusCode.ERROR + assert span.attributes[ERROR_TYPE] == "ValidationException" + + logs = log_exporter.get_finished_logs() + assert len(logs) == 0 + + def get_invoke_model_body( llm_model, max_tokens=None,