diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py index 186029eadf..6d6bbce6ac 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py @@ -266,6 +266,12 @@ def _invoke_model_on_success( if original_body is not None: original_body.close() + def _on_stream_error_callback(self, span: Span, exception): + span.set_status(Status(StatusCode.ERROR, str(exception))) + if span.is_recording(): + span.set_attribute(ERROR_TYPE, type(exception).__qualname__) + span.end() + def on_success(self, span: Span, result: dict[str, Any]): if self._call_context.operation not in self._HANDLED_OPERATIONS: return @@ -282,8 +288,11 @@ def stream_done_callback(response): self._converse_on_success(span, response) span.end() + def stream_error_callback(exception): + self._on_stream_error_callback(span, exception) + result["stream"] = ConverseStreamWrapper( - result["stream"], stream_done_callback + result["stream"], stream_done_callback, stream_error_callback ) return @@ -307,8 +316,14 @@ def invoke_model_stream_done_callback(response): self._converse_on_success(span, response) span.end() + def invoke_model_stream_error_callback(exception): + self._on_stream_error_callback(span, exception) + result["body"] = InvokeModelWithResponseStreamWrapper( - result["body"], invoke_model_stream_done_callback, model_id + result["body"], + invoke_model_stream_done_callback, + invoke_model_stream_error_callback, + model_id, ) return diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py index 55f0fb0757..5911c91445 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py @@ -19,10 +19,14 @@ from __future__ import annotations import json +from typing import Callable, Dict, Union -from botocore.eventstream import EventStream +from botocore.eventstream import EventStream, EventStreamError from wrapt import ObjectProxy +_StreamDoneCallableT = Callable[[Dict[str, Union[int, str]]], None] +_StreamErrorCallableT = Callable[[Exception], None] + # pylint: disable=abstract-method class ConverseStreamWrapper(ObjectProxy): @@ -31,19 +35,25 @@ class ConverseStreamWrapper(ObjectProxy): def __init__( self, stream: EventStream, - stream_done_callback, + stream_done_callback: _StreamDoneCallableT, + stream_error_callback: _StreamErrorCallableT, ): super().__init__(stream) self._stream_done_callback = stream_done_callback + self._stream_error_callback = stream_error_callback # accumulating things in the same shape of non-streaming version # {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"} self._response = {} def __iter__(self): - for event in self.__wrapped__: - self._process_event(event) - yield event + try: + for event in self.__wrapped__: + self._process_event(event) + yield event + except EventStreamError as exc: + self._stream_error_callback(exc) + raise def _process_event(self, event): if "messageStart" in event: @@ -85,12 +95,14 @@ class InvokeModelWithResponseStreamWrapper(ObjectProxy): def __init__( self, stream: EventStream, - stream_done_callback, + stream_done_callback: _StreamDoneCallableT, + stream_error_callback: _StreamErrorCallableT, model_id: str, ): super().__init__(stream) self._stream_done_callback = stream_done_callback + self._stream_error_callback = stream_error_callback self._model_id = model_id # accumulating things in the same shape of the Converse API @@ -98,9 +110,13 @@ def __init__( self._response = {} def __iter__(self): - for event in self.__wrapped__: - self._process_event(event) - yield event + try: + for event in self.__wrapped__: + self._process_event(event) + yield event + except EventStreamError as exc: + self._stream_error_callback(exc) + raise def _process_event(self, event): if "chunk" not in event: diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_handles_event_stream_error.yaml b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_handles_event_stream_error.yaml new file mode 100644 index 0000000000..07e18ab0e7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_converse_stream_handles_event_stream_error.yaml @@ -0,0 +1,71 @@ +interactions: +- request: + body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}], + "inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences": + ["|"]}}' + headers: + Content-Length: + - '170' + Content-Type: + - !!binary | + YXBwbGljYXRpb24vanNvbg== + User-Agent: + - !!binary | + Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x + MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0 + aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2 + X-Amz-Date: + - !!binary | + MjAyNTAxMjdUMTE0NjAyWg== + X-Amz-Security-Token: + - test_aws_security_token + X-Amzn-Trace-Id: + - !!binary | + Um9vdD0xLWI5YzVlMjRlLWRmYzBjYTYyMmFiYjA2ZWEyMjAzZDZkYjtQYXJlbnQ9NDE0MWM4NWIx + ODkzMmI3OTtTYW1wbGVkPTE= + amz-sdk-invocation-id: + - !!binary | + YjA0ZTAzYWEtMDg2MS00NGIzLTk3NmMtMWZjOGE5MzY5YTFl + amz-sdk-request: + - !!binary | + YXR0ZW1wdD0x + authorization: + - Bearer test_aws_authorization + method: POST + uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream + response: + body: + string: !!binary | + AAAAswAAAFK3IJ11CzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh + cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u + b3BxcnN0dXZ3eHl6QUJDREVGR0hJSktMTU5PUFFSU1RVVldYWVowMSIsInJvbGUiOiJhc3Npc3Rh + bnQifRl7p7oAAAC3AAAAVzLKzzoLOmV2ZW50LXR5cGUHABFjb250ZW50QmxvY2tEZWx0YQ06Y29u + dGVudC10eXBlBwAQYXBwbGljYXRpb24vanNvbg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRl + bnRCbG9ja0luZGV4IjowLCJkZWx0YSI6eyJ0ZXh0IjoiSGkhIEknbSBhbiBBSSBsYW5ndWFnZSJ9 + LCJwIjoiYWJjZGVmZ2gifUn9+AsAAACUAAAAVsOsqngLOmV2ZW50LXR5cGUHABBjb250ZW50Qmxv + Y2tTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl + dmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsInAiOiJhYmNkZWZnaGlqa2xtbm9wIn3KsHRKAAAA + pgAAAFGGKdQ9CzpldmVudC10eXBlBwALbWVzc2FnZVN0b3ANOmNvbnRlbnQtdHlwZQcAEGFwcGxp + Y2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJwIjoiYWJjZGVmZ2hpamtsbW5vcHFy + c3R1dnd4eXpBQkNERUZHSEkiLCJzdG9wUmVhc29uIjoibWF4X3Rva2VucyJ9eRUDZQAAAPUAAABO + dJJs0ws6ZXZlbnQtdHlwZQcACG1ldGFkYXRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9q + c29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsibWV0cmljcyI6eyJsYXRlbmN5TXMiOjY2NH0sInAi + OiJhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0RFRkdISUpLTE1OT1BRUlNUVVZXWFlaMDEi + LCJ1c2FnZSI6eyJpbnB1dFRva2VucyI6OCwib3V0cHV0VG9rZW5zIjoxMCwidG90YWxUb2tlbnMi + OjE4fX3B+Dpy + headers: + Connection: + - keep-alive + Content-Type: + - application/vnd.amazon.eventstream + Date: + - Mon, 27 Jan 2025 11:46:02 GMT + Set-Cookie: test_set_cookie + Transfer-Encoding: + - chunked + x-amzn-RequestId: + - 657e0bef-5ebb-4387-be65-d3ceafd53dea + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_invoke_model_with_response_stream_handles_stream_error.yaml b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_invoke_model_with_response_stream_handles_stream_error.yaml new file mode 100644 index 0000000000..e29ddf9fc3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/cassettes/test_invoke_model_with_response_stream_handles_stream_error.yaml @@ -0,0 +1,62 @@ +interactions: +- request: + body: '{"inputText": "Say this is a test", "textGenerationConfig": {"maxTokenCount": + 10, "temperature": 0.8, "topP": 1, "stopSequences": ["|"]}}' + headers: + Content-Length: + - '137' + User-Agent: + - !!binary | + Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x + MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0 + aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2 + X-Amz-Date: + - !!binary | + MjAyNTAxMjdUMTIwMTU0Wg== + X-Amz-Security-Token: + - test_aws_security_token + X-Amzn-Trace-Id: + - !!binary | + Um9vdD0xLWJhYTFjOTdhLTI3M2UxYTlhYjIyMTM1NGQwN2JjNGNhYztQYXJlbnQ9OTVhNmQzZGEx + YTZkZjM4ZjtTYW1wbGVkPTE= + amz-sdk-invocation-id: + - !!binary | + ZWQxZGViZmQtZTE5NS00N2RiLWIyMzItMTY1MzJhYjQzZTM0 + amz-sdk-request: + - !!binary | + YXR0ZW1wdD0x + authorization: + - Bearer test_aws_authorization + method: POST + uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/invoke-with-response-stream + response: + body: + string: !!binary | + AAACBAAAAEs8ZEC6CzpldmVudC10eXBlBwAFY2h1bmsNOmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0 + aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJieXRlcyI6ImV5SnZkWFJ3ZFhSVVpYaDBJ + am9pSUdOdmJXMWxiblJjYmtobGJHeHZJU0JKSUdGdElHRWdZMjl0Y0hWMFpYSWdjSEp2WjNKaGJT + QmtaWE5wWjI1bFpDSXNJbWx1WkdWNElqb3dMQ0owYjNSaGJFOTFkSEIxZEZSbGVIUlViMnRsYmtO + dmRXNTBJam94TUN3aVkyOXRjR3hsZEdsdmJsSmxZWE52YmlJNklreEZUa2RVU0NJc0ltbHVjSFYw + VkdWNGRGUnZhMlZ1UTI5MWJuUWlPalVzSW1GdFlYcHZiaTFpWldSeWIyTnJMV2x1ZG05allYUnBi + MjVOWlhSeWFXTnpJanA3SW1sdWNIVjBWRzlyWlc1RGIzVnVkQ0k2TlN3aWIzVjBjSFYwVkc5clpX + NURiM1Z1ZENJNk1UQXNJbWx1ZG05allYUnBiMjVNWVhSbGJtTjVJam8yTnpRc0ltWnBjbk4wUW5s + MFpVeGhkR1Z1WTNraU9qWTNNMzE5IiwicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6In2J + Hw51 + headers: + Connection: + - keep-alive + Content-Type: + - application/vnd.amazon.eventstream + Date: + - Mon, 27 Jan 2025 12:01:55 GMT + Set-Cookie: test_set_cookie + Transfer-Encoding: + - chunked + X-Amzn-Bedrock-Content-Type: + - application/json + x-amzn-RequestId: + - 1eb1af77-fb2f-400f-9bf8-049e38b90f02 + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py index b9f5589988..f277ba895e 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py @@ -15,9 +15,11 @@ from __future__ import annotations import json +from unittest import mock import boto3 import pytest +from botocore.eventstream import EventStream, EventStreamError from opentelemetry.semconv._incubating.attributes.error_attributes import ( ERROR_TYPE, @@ -171,6 +173,65 @@ def test_converse_stream_with_content( assert len(logs) == 0 +@pytest.mark.skipif( + BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available" +) +@pytest.mark.vcr() +def test_converse_stream_handles_event_stream_error( + span_exporter, + log_exporter, + bedrock_runtime_client, + instrument_with_content, +): + # pylint:disable=too-many-locals + messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}] + + llm_model_value = "amazon.titan-text-lite-v1" + max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"] + response = bedrock_runtime_client.converse_stream( + messages=messages, + modelId=llm_model_value, + inferenceConfig={ + "maxTokens": max_tokens, + "temperature": temperature, + "topP": top_p, + "stopSequences": stop_sequences, + }, + ) + + with mock.patch.object( + EventStream, + "_parse_event", + side_effect=EventStreamError( + {"modelStreamErrorException": {}}, "ConverseStream" + ), + ): + with pytest.raises(EventStreamError): + for _event in response["stream"]: + pass + + (span,) = span_exporter.get_finished_spans() + input_tokens, output_tokens, finish_reason = None, None, None + assert_stream_completion_attributes( + span, + llm_model_value, + input_tokens, + output_tokens, + finish_reason, + "chat", + top_p, + temperature, + max_tokens, + stop_sequences, + ) + + assert span.status.status_code == StatusCode.ERROR + assert span.attributes[ERROR_TYPE] == "EventStreamError" + + logs = log_exporter.get_finished_logs() + assert len(logs) == 0 + + @pytest.mark.skipif( BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available" ) @@ -413,6 +474,56 @@ def test_invoke_model_with_response_stream_with_content( assert len(logs) == 0 +@pytest.mark.vcr() +def test_invoke_model_with_response_stream_handles_stream_error( + span_exporter, + log_exporter, + bedrock_runtime_client, + instrument_with_content, +): + # pylint:disable=too-many-locals + llm_model_value = "amazon.titan-text-lite-v1" + max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"] + body = get_invoke_model_body( + llm_model_value, max_tokens, temperature, top_p, stop_sequences + ) + response = bedrock_runtime_client.invoke_model_with_response_stream( + body=body, + modelId=llm_model_value, + ) + + # consume the stream in order to have it traced + finish_reason = None + input_tokens, output_tokens = None, None + with mock.patch.object( + EventStream, + "_parse_event", + side_effect=EventStreamError( + {"modelStreamErrorException": {}}, "InvokeModelWithRespnseStream" + ), + ): + with pytest.raises(EventStreamError): + for _event in response["body"]: + pass + + (span,) = span_exporter.get_finished_spans() + assert_stream_completion_attributes( + span, + llm_model_value, + input_tokens, + output_tokens, + finish_reason, + "text_completion", + top_p, + temperature, + max_tokens, + stop_sequences, + ) + + logs = log_exporter.get_finished_logs() + assert len(logs) == 0 + + @pytest.mark.vcr() def test_invoke_model_with_response_stream_invalid_model( span_exporter,