From 6950477dc83bd44eb0b80a41bb698d237fbf4eca Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Mon, 13 Jan 2025 20:23:17 +0000 Subject: [PATCH] Revert "chore(internal): streaming refactors (#2012)" This reverts commit d76a748f606743407f94dfc26758095560e2082a. --- src/openai/_streaming.py | 104 +++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 32 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index 7aa7b62f6b..0fda992cff 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -59,22 +59,42 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - - yield process_data(data=data, cast_to=cast_to, response=response) + if sse.event is None: + data = sse.json() + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + yield process_data(data=data, cast_to=cast_to, response=response) + + else: + data = sse.json() + + if sse.event == "error" and is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) # Ensure the entire stream is consumed for _sse in iterator: @@ -141,22 +161,42 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break - data = sse.json() - if is_mapping(data) and data.get("error"): - message = None - error = data.get("error") - if is_mapping(error): - message = error.get("message") - if not message or not isinstance(message, str): - message = "An error occurred during streaming" - - raise APIError( - message=message, - request=self.response.request, - body=data["error"], - ) - - yield process_data(data=data, cast_to=cast_to, response=response) + if sse.event is None: + data = sse.json() + if is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + yield process_data(data=data, cast_to=cast_to, response=response) + + else: + data = sse.json() + + if sse.event == "error" and is_mapping(data) and data.get("error"): + message = None + error = data.get("error") + if is_mapping(error): + message = error.get("message") + if not message or not isinstance(message, str): + message = "An error occurred during streaming" + + raise APIError( + message=message, + request=self.response.request, + body=data["error"], + ) + + yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) # Ensure the entire stream is consumed async for _sse in iterator: