This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
asyncdefStartStream(self,request:Any,context:Any)->Empty:"""Handle broker stream start."""logger.debug("StartStream called")try:# Wait for the stream to be ready with a timeoutawaitasyncio.wait_for(self._setup_complete.wait(),timeout=2.0)returnEmpty()exceptTimeoutError:logger.error("Timeout waiting for StreamStdio")context.set_code("UNIMPLEMENTED")context.set_details("Timeout waiting for StreamStdio setup")raiseexceptExceptionase:logger.error(f"StartStream error: {e}")context.set_code("UNIMPLEMENTED")context.set_details(f"Internal error: {e!s}")raise
asyncdefhandle_shutdown(self,force:bool=False)->None:"""Handle graceful shutdown."""self._stream_active=Falseifforce:whilenotself._message_queue.empty():try:self._message_queue.get_nowait()exceptasyncio.QueueEmpty:breakawaitasyncio.sleep(0.1)# Allow pending messages to process