Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

botocore: handle exceptions when consuming streaming versions of bedrock APIs #3211

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ def _invoke_model_on_success(
if original_body is not None:
original_body.close()

def _on_stream_error_callback(self, span: Span, exception):
span.set_status(Status(StatusCode.ERROR, str(exception)))
if span.is_recording():
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
span.end()

def on_success(self, span: Span, result: dict[str, Any]):
if self._call_context.operation not in self._HANDLED_OPERATIONS:
return
Expand All @@ -282,8 +288,11 @@ def stream_done_callback(response):
self._converse_on_success(span, response)
span.end()

def stream_error_callback(exception):
self._on_stream_error_callback(span, exception)

result["stream"] = ConverseStreamWrapper(
result["stream"], stream_done_callback
result["stream"], stream_done_callback, stream_error_callback
)
return

Expand All @@ -307,8 +316,14 @@ def invoke_model_stream_done_callback(response):
self._converse_on_success(span, response)
span.end()

def invoke_model_stream_error_callback(exception):
self._on_stream_error_callback(span, exception)

result["body"] = InvokeModelWithResponseStreamWrapper(
result["body"], invoke_model_stream_done_callback, model_id
result["body"],
invoke_model_stream_done_callback,
invoke_model_stream_error_callback,
model_id,
)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
from __future__ import annotations

import json
from typing import Callable, Dict, Union

from botocore.eventstream import EventStream
from botocore.eventstream import EventStream, EventStreamError
from wrapt import ObjectProxy

_StreamDoneCallableT = Callable[[Dict[str, Union[int, str]]], None]
_StreamErrorCallableT = Callable[[Exception], None]


# pylint: disable=abstract-method
class ConverseStreamWrapper(ObjectProxy):
Expand All @@ -31,19 +35,25 @@ class ConverseStreamWrapper(ObjectProxy):
def __init__(
self,
stream: EventStream,
xrmx marked this conversation as resolved.
Show resolved Hide resolved
stream_done_callback,
stream_done_callback: _StreamDoneCallableT,
stream_error_callback: _StreamErrorCallableT,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
self._stream_error_callback = stream_error_callback
# accumulating things in the same shape of non-streaming version
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event
try:
for event in self.__wrapped__:
self._process_event(event)
yield event
except EventStreamError as exc:
self._stream_error_callback(exc)
raise

def _process_event(self, event):
if "messageStart" in event:
Expand Down Expand Up @@ -85,22 +95,28 @@ class InvokeModelWithResponseStreamWrapper(ObjectProxy):
def __init__(
self,
stream: EventStream,
stream_done_callback,
stream_done_callback: _StreamDoneCallableT,
stream_error_callback: _StreamErrorCallableT,
model_id: str,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
self._stream_error_callback = stream_error_callback
self._model_id = model_id

# accumulating things in the same shape of the Converse API
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event
try:
for event in self.__wrapped__:
self._process_event(event)
yield event
except EventStreamError as exc:
self._stream_error_callback(exc)
raise

def _process_event(self, event):
if "chunk" not in event:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}],
"inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences":
["|"]}}'
headers:
Content-Length:
- '170'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjdUMTE0NjAyWg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLWI5YzVlMjRlLWRmYzBjYTYyMmFiYjA2ZWEyMjAzZDZkYjtQYXJlbnQ9NDE0MWM4NWIx
ODkzMmI3OTtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
YjA0ZTAzYWEtMDg2MS00NGIzLTk3NmMtMWZjOGE5MzY5YTFl
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
response:
body:
string: !!binary |
AAAAswAAAFK3IJ11CzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
b3BxcnN0dXZ3eHl6QUJDREVGR0hJSktMTU5PUFFSU1RVVldYWVowMSIsInJvbGUiOiJhc3Npc3Rh
bnQifRl7p7oAAAC3AAAAVzLKzzoLOmV2ZW50LXR5cGUHABFjb250ZW50QmxvY2tEZWx0YQ06Y29u
dGVudC10eXBlBwAQYXBwbGljYXRpb24vanNvbg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRl
bnRCbG9ja0luZGV4IjowLCJkZWx0YSI6eyJ0ZXh0IjoiSGkhIEknbSBhbiBBSSBsYW5ndWFnZSJ9
LCJwIjoiYWJjZGVmZ2gifUn9+AsAAACUAAAAVsOsqngLOmV2ZW50LXR5cGUHABBjb250ZW50Qmxv
Y2tTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
dmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsInAiOiJhYmNkZWZnaGlqa2xtbm9wIn3KsHRKAAAA
pgAAAFGGKdQ9CzpldmVudC10eXBlBwALbWVzc2FnZVN0b3ANOmNvbnRlbnQtdHlwZQcAEGFwcGxp
Y2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJwIjoiYWJjZGVmZ2hpamtsbW5vcHFy
c3R1dnd4eXpBQkNERUZHSEkiLCJzdG9wUmVhc29uIjoibWF4X3Rva2VucyJ9eRUDZQAAAPUAAABO
dJJs0ws6ZXZlbnQtdHlwZQcACG1ldGFkYXRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9q
c29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsibWV0cmljcyI6eyJsYXRlbmN5TXMiOjY2NH0sInAi
OiJhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0RFRkdISUpLTE1OT1BRUlNUVVZXWFlaMDEi
LCJ1c2FnZSI6eyJpbnB1dFRva2VucyI6OCwib3V0cHV0VG9rZW5zIjoxMCwidG90YWxUb2tlbnMi
OjE4fX3B+Dpy
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Mon, 27 Jan 2025 11:46:02 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
x-amzn-RequestId:
- 657e0bef-5ebb-4387-be65-d3ceafd53dea
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
interactions:
- request:
body: '{"inputText": "Say this is a test", "textGenerationConfig": {"maxTokenCount":
10, "temperature": 0.8, "topP": 1, "stopSequences": ["|"]}}'
headers:
Content-Length:
- '137'
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjdUMTIwMTU0Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLWJhYTFjOTdhLTI3M2UxYTlhYjIyMTM1NGQwN2JjNGNhYztQYXJlbnQ9OTVhNmQzZGEx
YTZkZjM4ZjtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
ZWQxZGViZmQtZTE5NS00N2RiLWIyMzItMTY1MzJhYjQzZTM0
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/invoke-with-response-stream
response:
body:
string: !!binary |
AAACBAAAAEs8ZEC6CzpldmVudC10eXBlBwAFY2h1bmsNOmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0
aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJieXRlcyI6ImV5SnZkWFJ3ZFhSVVpYaDBJ
am9pSUdOdmJXMWxiblJjYmtobGJHeHZJU0JKSUdGdElHRWdZMjl0Y0hWMFpYSWdjSEp2WjNKaGJT
QmtaWE5wWjI1bFpDSXNJbWx1WkdWNElqb3dMQ0owYjNSaGJFOTFkSEIxZEZSbGVIUlViMnRsYmtO
dmRXNTBJam94TUN3aVkyOXRjR3hsZEdsdmJsSmxZWE52YmlJNklreEZUa2RVU0NJc0ltbHVjSFYw
VkdWNGRGUnZhMlZ1UTI5MWJuUWlPalVzSW1GdFlYcHZiaTFpWldSeWIyTnJMV2x1ZG05allYUnBi
MjVOWlhSeWFXTnpJanA3SW1sdWNIVjBWRzlyWlc1RGIzVnVkQ0k2TlN3aWIzVjBjSFYwVkc5clpX
NURiM1Z1ZENJNk1UQXNJbWx1ZG05allYUnBiMjVNWVhSbGJtTjVJam8yTnpRc0ltWnBjbk4wUW5s
MFpVeGhkR1Z1WTNraU9qWTNNMzE5IiwicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6In2J
Hw51
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Mon, 27 Jan 2025 12:01:55 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
X-Amzn-Bedrock-Content-Type:
- application/json
x-amzn-RequestId:
- 1eb1af77-fb2f-400f-9bf8-049e38b90f02
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from __future__ import annotations

import json
from unittest import mock

import boto3
import pytest
from botocore.eventstream import EventStream, EventStreamError

from opentelemetry.semconv._incubating.attributes.error_attributes import (
ERROR_TYPE,
Expand Down Expand Up @@ -171,6 +173,65 @@ def test_converse_stream_with_content(
assert len(logs) == 0


@pytest.mark.skipif(
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
)
@pytest.mark.vcr()
def test_converse_stream_handles_event_stream_error(
span_exporter,
log_exporter,
bedrock_runtime_client,
instrument_with_content,
):
# pylint:disable=too-many-locals
messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}]

llm_model_value = "amazon.titan-text-lite-v1"
max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"]
response = bedrock_runtime_client.converse_stream(
messages=messages,
modelId=llm_model_value,
inferenceConfig={
"maxTokens": max_tokens,
"temperature": temperature,
"topP": top_p,
"stopSequences": stop_sequences,
},
)

with mock.patch.object(
EventStream,
"_parse_event",
side_effect=EventStreamError(
{"modelStreamErrorException": {}}, "ConverseStream"
),
):
with pytest.raises(EventStreamError):
for _event in response["stream"]:
pass

(span,) = span_exporter.get_finished_spans()
input_tokens, output_tokens, finish_reason = None, None, None
assert_stream_completion_attributes(
span,
llm_model_value,
input_tokens,
output_tokens,
finish_reason,
"chat",
top_p,
temperature,
max_tokens,
stop_sequences,
)

assert span.status.status_code == StatusCode.ERROR
assert span.attributes[ERROR_TYPE] == "EventStreamError"

logs = log_exporter.get_finished_logs()
assert len(logs) == 0


@pytest.mark.skipif(
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
)
Expand Down Expand Up @@ -413,6 +474,56 @@ def test_invoke_model_with_response_stream_with_content(
assert len(logs) == 0


@pytest.mark.vcr()
def test_invoke_model_with_response_stream_handles_stream_error(
span_exporter,
log_exporter,
bedrock_runtime_client,
instrument_with_content,
):
# pylint:disable=too-many-locals
llm_model_value = "amazon.titan-text-lite-v1"
max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"]
body = get_invoke_model_body(
llm_model_value, max_tokens, temperature, top_p, stop_sequences
)
response = bedrock_runtime_client.invoke_model_with_response_stream(
body=body,
modelId=llm_model_value,
)

# consume the stream in order to have it traced
finish_reason = None
input_tokens, output_tokens = None, None
with mock.patch.object(
EventStream,
"_parse_event",
side_effect=EventStreamError(
{"modelStreamErrorException": {}}, "InvokeModelWithRespnseStream"
),
):
with pytest.raises(EventStreamError):
for _event in response["body"]:
pass

(span,) = span_exporter.get_finished_spans()
assert_stream_completion_attributes(
span,
llm_model_value,
input_tokens,
output_tokens,
finish_reason,
"text_completion",
top_p,
temperature,
max_tokens,
stop_sequences,
)

logs = log_exporter.get_finished_logs()
assert len(logs) == 0


@pytest.mark.vcr()
def test_invoke_model_with_response_stream_invalid_model(
span_exporter,
Expand Down
Loading