From 71d99cdda1920fb6d22bbd01839306ce09fe55f3 Mon Sep 17 00:00:00 2001 From: Olivier Tabone Date: Mon, 26 Jan 2026 16:21:28 +0100 Subject: [PATCH] fix: make stream_query_reasoning_engine async in AsyncClient - Add async keyword to stream_query_reasoning_engine method - Add await to rpc call in async client - Use execution_async_client instead of execution_api_client - Use async for loop to iterate over response stream --- .../reasoning_engine_execution_service/async_client.py | 4 ++-- vertexai/agent_engines/_agent_engines.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/aiplatform_v1/services/reasoning_engine_execution_service/async_client.py b/google/cloud/aiplatform_v1/services/reasoning_engine_execution_service/async_client.py index 4d36c2d70d..083291832b 100644 --- a/google/cloud/aiplatform_v1/services/reasoning_engine_execution_service/async_client.py +++ b/google/cloud/aiplatform_v1/services/reasoning_engine_execution_service/async_client.py @@ -408,7 +408,7 @@ async def sample_query_reasoning_engine(): # Done; return the response. return response - def stream_query_reasoning_engine( + async def stream_query_reasoning_engine( self, request: Optional[ Union[ @@ -541,7 +541,7 @@ async def sample_stream_query_reasoning_engine(): self._client._validate_universe_domain() # Send the request. - response = rpc( + response = await rpc( request, retry=retry, timeout=timeout, diff --git a/vertexai/agent_engines/_agent_engines.py b/vertexai/agent_engines/_agent_engines.py index dd4e35269d..9af30d429c 100644 --- a/vertexai/agent_engines/_agent_engines.py +++ b/vertexai/agent_engines/_agent_engines.py @@ -1643,14 +1643,14 @@ def _wrap_async_stream_query_operation( """ async def _method(self, **kwargs) -> AsyncIterable[Any]: - response = self.execution_api_client.stream_query_reasoning_engine( + response = await self.execution_async_client.stream_query_reasoning_engine( request=aip_types.StreamQueryReasoningEngineRequest( name=self.resource_name, input=kwargs, class_method=method_name, ), ) - for chunk in response: + async for chunk in response: for parsed_json in _utils.yield_parsed_json(chunk): if parsed_json is not None: yield parsed_json