Skip to content

Commit

Permalink
Handle exception when consuming EventStream
Browse files Browse the repository at this point in the history
  • Loading branch information
xrmx committed Jan 27, 2025
1 parent 1575884 commit 8da2e26
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ def _invoke_model_on_success(
if original_body is not None:
original_body.close()

def _on_stream_error_callback(self, span: Span, exception):
span.set_status(Status(StatusCode.ERROR, str(exception)))
if span.is_recording():
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
span.end()

def on_success(self, span: Span, result: dict[str, Any]):
if self._call_context.operation not in self._HANDLED_OPERATIONS:
return
Expand All @@ -282,8 +288,11 @@ def stream_done_callback(response):
self._converse_on_success(span, response)
span.end()

def stream_error_callback(exception):
self._on_stream_error_callback(span, exception)

result["stream"] = ConverseStreamWrapper(
result["stream"], stream_done_callback
result["stream"], stream_done_callback, stream_error_callback
)
return

Expand All @@ -307,8 +316,14 @@ def invoke_model_stream_done_callback(response):
self._converse_on_success(span, response)
span.end()

def invoke_model_stream_error_callback(exception):
self._on_stream_error_callback(span, exception)

result["body"] = InvokeModelWithResponseStreamWrapper(
result["body"], invoke_model_stream_done_callback, model_id
result["body"],
invoke_model_stream_done_callback,
invoke_model_stream_error_callback,
model_id,
)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
from __future__ import annotations

import json
from typing import Callable, Union

from botocore.eventstream import EventStream
from botocore.eventstream import EventStream, EventStreamError
from wrapt import ObjectProxy

_StreamDoneCallableT = Callable[[dict[str, Union[int, str]]], None]
_StreamErrorCallableT = Callable[[Exception], None]


# pylint: disable=abstract-method
class ConverseStreamWrapper(ObjectProxy):
Expand All @@ -31,19 +35,25 @@ class ConverseStreamWrapper(ObjectProxy):
def __init__(
self,
stream: EventStream,
stream_done_callback,
stream_done_callback: _StreamDoneCallableT,
stream_error_callback: _StreamErrorCallableT,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
self._stream_error_callback = stream_error_callback
# accumulating things in the same shape of non-streaming version
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event
try:
for event in self.__wrapped__:
self._process_event(event)
yield event
except EventStreamError as exc:
self._stream_error_callback(exc)
raise

def _process_event(self, event):
if "messageStart" in event:
Expand Down Expand Up @@ -85,22 +95,28 @@ class InvokeModelWithResponseStreamWrapper(ObjectProxy):
def __init__(
self,
stream: EventStream,
stream_done_callback,
stream_done_callback: _StreamDoneCallableT,
stream_error_callback: _StreamErrorCallableT,
model_id: str,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
self._stream_error_callback = stream_error_callback
self._model_id = model_id

# accumulating things in the same shape of the Converse API
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event
try:
for event in self.__wrapped__:
self._process_event(event)
yield event
except EventStreamError as exc:
self._stream_error_callback(exc)
raise

def _process_event(self, event):
if "chunk" not in event:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}],
"inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences":
["|"]}}'
headers:
Content-Length:
- '170'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjdUMTE0NjAyWg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLWI5YzVlMjRlLWRmYzBjYTYyMmFiYjA2ZWEyMjAzZDZkYjtQYXJlbnQ9NDE0MWM4NWIx
ODkzMmI3OTtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
YjA0ZTAzYWEtMDg2MS00NGIzLTk3NmMtMWZjOGE5MzY5YTFl
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
response:
body:
string: !!binary |
AAAAswAAAFK3IJ11CzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
b3BxcnN0dXZ3eHl6QUJDREVGR0hJSktMTU5PUFFSU1RVVldYWVowMSIsInJvbGUiOiJhc3Npc3Rh
bnQifRl7p7oAAAC3AAAAVzLKzzoLOmV2ZW50LXR5cGUHABFjb250ZW50QmxvY2tEZWx0YQ06Y29u
dGVudC10eXBlBwAQYXBwbGljYXRpb24vanNvbg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRl
bnRCbG9ja0luZGV4IjowLCJkZWx0YSI6eyJ0ZXh0IjoiSGkhIEknbSBhbiBBSSBsYW5ndWFnZSJ9
LCJwIjoiYWJjZGVmZ2gifUn9+AsAAACUAAAAVsOsqngLOmV2ZW50LXR5cGUHABBjb250ZW50Qmxv
Y2tTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
dmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsInAiOiJhYmNkZWZnaGlqa2xtbm9wIn3KsHRKAAAA
pgAAAFGGKdQ9CzpldmVudC10eXBlBwALbWVzc2FnZVN0b3ANOmNvbnRlbnQtdHlwZQcAEGFwcGxp
Y2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJwIjoiYWJjZGVmZ2hpamtsbW5vcHFy
c3R1dnd4eXpBQkNERUZHSEkiLCJzdG9wUmVhc29uIjoibWF4X3Rva2VucyJ9eRUDZQAAAPUAAABO
dJJs0ws6ZXZlbnQtdHlwZQcACG1ldGFkYXRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9q
c29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsibWV0cmljcyI6eyJsYXRlbmN5TXMiOjY2NH0sInAi
OiJhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0RFRkdISUpLTE1OT1BRUlNUVVZXWFlaMDEi
LCJ1c2FnZSI6eyJpbnB1dFRva2VucyI6OCwib3V0cHV0VG9rZW5zIjoxMCwidG90YWxUb2tlbnMi
OjE4fX3B+Dpy
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Mon, 27 Jan 2025 11:46:02 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
x-amzn-RequestId:
- 657e0bef-5ebb-4387-be65-d3ceafd53dea
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
interactions:
- request:
body: '{"inputText": "Say this is a test", "textGenerationConfig": {"maxTokenCount":
10, "temperature": 0.8, "topP": 1, "stopSequences": ["|"]}}'
headers:
Content-Length:
- '137'
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjdUMTIwMTU0Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLWJhYTFjOTdhLTI3M2UxYTlhYjIyMTM1NGQwN2JjNGNhYztQYXJlbnQ9OTVhNmQzZGEx
YTZkZjM4ZjtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
ZWQxZGViZmQtZTE5NS00N2RiLWIyMzItMTY1MzJhYjQzZTM0
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/invoke-with-response-stream
response:
body:
string: !!binary |
AAACBAAAAEs8ZEC6CzpldmVudC10eXBlBwAFY2h1bmsNOmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0
aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJieXRlcyI6ImV5SnZkWFJ3ZFhSVVpYaDBJ
am9pSUdOdmJXMWxiblJjYmtobGJHeHZJU0JKSUdGdElHRWdZMjl0Y0hWMFpYSWdjSEp2WjNKaGJT
QmtaWE5wWjI1bFpDSXNJbWx1WkdWNElqb3dMQ0owYjNSaGJFOTFkSEIxZEZSbGVIUlViMnRsYmtO
dmRXNTBJam94TUN3aVkyOXRjR3hsZEdsdmJsSmxZWE52YmlJNklreEZUa2RVU0NJc0ltbHVjSFYw
VkdWNGRGUnZhMlZ1UTI5MWJuUWlPalVzSW1GdFlYcHZiaTFpWldSeWIyTnJMV2x1ZG05allYUnBi
MjVOWlhSeWFXTnpJanA3SW1sdWNIVjBWRzlyWlc1RGIzVnVkQ0k2TlN3aWIzVjBjSFYwVkc5clpX
NURiM1Z1ZENJNk1UQXNJbWx1ZG05allYUnBiMjVNWVhSbGJtTjVJam8yTnpRc0ltWnBjbk4wUW5s
MFpVeGhkR1Z1WTNraU9qWTNNMzE5IiwicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6In2J
Hw51
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Mon, 27 Jan 2025 12:01:55 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
X-Amzn-Bedrock-Content-Type:
- application/json
x-amzn-RequestId:
- 1eb1af77-fb2f-400f-9bf8-049e38b90f02
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from __future__ import annotations

import json
from unittest import mock

import boto3
import pytest
from botocore.eventstream import EventStream, EventStreamError

from opentelemetry.semconv._incubating.attributes.error_attributes import (
ERROR_TYPE,
Expand Down Expand Up @@ -171,6 +173,65 @@ def test_converse_stream_with_content(
assert len(logs) == 0


@pytest.mark.skipif(
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
)
@pytest.mark.vcr()
def test_converse_stream_handles_event_stream_error(
span_exporter,
log_exporter,
bedrock_runtime_client,
instrument_with_content,
):
# pylint:disable=too-many-locals
messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}]

llm_model_value = "amazon.titan-text-lite-v1"
max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"]
response = bedrock_runtime_client.converse_stream(
messages=messages,
modelId=llm_model_value,
inferenceConfig={
"maxTokens": max_tokens,
"temperature": temperature,
"topP": top_p,
"stopSequences": stop_sequences,
},
)

with mock.patch.object(
EventStream,
"_parse_event",
side_effect=EventStreamError(
{"modelStreamErrorException": {}}, "ConverseStream"
),
):
with pytest.raises(EventStreamError):
for _event in response["stream"]:
pass

(span,) = span_exporter.get_finished_spans()
input_tokens, output_tokens, finish_reason = None, None, None
assert_stream_completion_attributes(
span,
llm_model_value,
input_tokens,
output_tokens,
finish_reason,
"chat",
top_p,
temperature,
max_tokens,
stop_sequences,
)

assert span.status.status_code == StatusCode.ERROR
assert span.attributes[ERROR_TYPE] == "EventStreamError"

logs = log_exporter.get_finished_logs()
assert len(logs) == 0


@pytest.mark.skipif(
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
)
Expand Down Expand Up @@ -413,6 +474,56 @@ def test_invoke_model_with_response_stream_with_content(
assert len(logs) == 0


@pytest.mark.vcr()
def test_invoke_model_with_response_stream_handles_stream_error(
span_exporter,
log_exporter,
bedrock_runtime_client,
instrument_with_content,
):
# pylint:disable=too-many-locals
llm_model_value = "amazon.titan-text-lite-v1"
max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"]
body = get_invoke_model_body(
llm_model_value, max_tokens, temperature, top_p, stop_sequences
)
response = bedrock_runtime_client.invoke_model_with_response_stream(
body=body,
modelId=llm_model_value,
)

# consume the stream in order to have it traced
finish_reason = None
input_tokens, output_tokens = None, None
with mock.patch.object(
EventStream,
"_parse_event",
side_effect=EventStreamError(
{"modelStreamErrorException": {}}, "InvokeModelWithRespnseStream"
),
):
with pytest.raises(EventStreamError):
for _event in response["body"]:
pass

(span,) = span_exporter.get_finished_spans()
assert_stream_completion_attributes(
span,
llm_model_value,
input_tokens,
output_tokens,
finish_reason,
"text_completion",
top_p,
temperature,
max_tokens,
stop_sequences,
)

logs = log_exporter.get_finished_logs()
assert len(logs) == 0


@pytest.mark.vcr()
def test_invoke_model_with_response_stream_invalid_model(
span_exporter,
Expand Down

0 comments on commit 8da2e26

Please sign in to comment.