diff --git a/itk/main.py b/itk/main.py index 7be7a5a20..d4f3eb65f 100644 --- a/itk/main.py +++ b/itk/main.py @@ -36,7 +36,7 @@ from a2a.utils import TransportProtocol -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) diff --git a/itk/run_itk.sh b/itk/run_itk.sh index 80e96f9c2..7914b00a9 100755 --- a/itk/run_itk.sh +++ b/itk/run_itk.sh @@ -8,6 +8,9 @@ RESULT=1 cleanup() { set +x echo "Cleaning up artifacts..." + echo "=== ITK Service Logs ===" + docker logs itk-service || true + echo "========================" docker stop itk-service > /dev/null 2>&1 || true docker rm itk-service > /dev/null 2>&1 || true docker rmi itk_service > /dev/null 2>&1 || true @@ -67,6 +70,7 @@ docker run -d --name itk-service \ -v "$A2A_PYTHON_ROOT:/app/agents/repo" \ -v "$ITK_DIR:/app/agents/repo/itk" \ -p 8000:8000 \ + -e A2A_LOG_LEVEL=DEBUG \ itk_service # 5.1. Fix dubious ownership for git (needed for uv-dynamic-versioning) @@ -156,10 +160,11 @@ except Exception as e: RESULT=$? set -e -if [ $RESULT -ne 0 ]; then - echo "Tests failed. Container logs:" - docker logs itk-service +if [ $RESULT -eq 0 ]; then + echo "=== ITK SUCCESS: All tests passed! ===" fi + +# Logs are always printed in cleanup() echo "--------------------------------------------------------" # Final exit result will be captured by trap cleanup diff --git a/src/a2a/server/agent_execution/active_task.py b/src/a2a/server/agent_execution/active_task.py index defdd5244..02440ab84 100644 --- a/src/a2a/server/agent_execution/active_task.py +++ b/src/a2a/server/agent_execution/active_task.py @@ -256,10 +256,14 @@ async def _run_producer(self) -> None: try: active = True while active: - ( - request_context, - request_id, - ) = await self._request_queue.get() + try: + ( + request_context, + request_id, + ) = await self._request_queue.get() + except QueueShutDown: + logger.debug('Producer[%s]: Request queue shut down during get', self._task_id) + break await self._request_lock.acquire() # TODO: Should we create task manager every time? self._task_manager._call_context = request_context.call_context @@ -283,7 +287,7 @@ async def _run_producer(self) -> None: 'Producer[%s]: Executing agent task %s', self._task_id, request_context.current_task, - ) + ) try: await self._agent_executor.execute( @@ -295,9 +299,9 @@ async def _run_producer(self) -> None: ) except QueueShutDown: logger.debug( - 'Producer[%s]: Request queue shut down', self._task_id + 'Producer[%s]: Request queue shut down during execution', self._task_id ) - raise + break except asyncio.CancelledError: logger.debug('Producer[%s]: Cancelled', self._task_id) raise @@ -410,7 +414,7 @@ async def _run_consumer(self) -> None: # noqa: PLR0915, PLR0912 # Terminate the ActiveTask globally. self._is_finished.set() - self._request_queue.shutdown(immediate=True) + self._request_queue.shutdown(immediate=False) if is_interrupted: logger.debug( @@ -447,7 +451,7 @@ async def _run_consumer(self) -> None: # noqa: PLR0915, PLR0912 finally: # The consumer is dead. The ActiveTask is permanently finished. self._is_finished.set() - self._request_queue.shutdown(immediate=True) + self._request_queue.shutdown(immediate=False) logger.debug('Consumer[%s]: Finishing', self._task_id) await self._maybe_cleanup() @@ -526,6 +530,37 @@ async def subscribe( # noqa: PLR0912, PLR0915 if self._is_finished.is_set(): if self._exception: raise self._exception from None + + logger.debug('Subscriber[%s]: Entering draining loop', self._task_id) + # Drain remaining events before breaking + while True: + try: + event = await asyncio.wait_for( + tapped_queue.dequeue_event(), timeout=0.1 + ) + logger.debug( + 'Subscriber[%s]: Race condition handled! Drained event after timeout: %s', + self._task_id, + event, + ) + except (asyncio.TimeoutError, TimeoutError): + break + + try: + if isinstance(event, _RequestCompleted): + if ( + request_id is not None + and event.request_id == request_id + ): + logger.debug( + 'Subscriber[%s]: Request completed (drained)', + self._task_id, + ) + return + continue + yield event + finally: + tapped_queue.task_done() break continue