Process management and gRPC operations for RPC plugin clients.
This module handles subprocess launching, gRPC channel creation,
stub initialization, and stdio/broker operations.
Classes
ClientProcessMixin
Mixin class containing process and gRPC methods for RPCPluginClient.
Functions
open_broker_subchannel
async
open_broker_subchannel(sub_id: int, address: str) -> None
Open a broker subchannel for multi-service communication.
Parameters:
| Name |
Type |
Description |
Default |
sub_id
|
int
|
Unique identifier for the subchannel
|
required
|
address
|
str
|
Network address for the subchannel
|
required
|
Raises:
Source code in pyvider/rpcplugin/client/process.py
| async def open_broker_subchannel(self: RPCPluginClient, sub_id: int, address: str) -> None: # type: ignore[misc]
"""
Open a broker subchannel for multi-service communication.
Args:
sub_id: Unique identifier for the subchannel
address: Network address for the subchannel
Raises:
ProtocolError: If broker operations fail
"""
if not self._broker_stub:
self.logger.warning("Broker stub not available for subchannel operations")
return
try:
self.logger.debug(f"Opening broker subchannel {sub_id} at address {address}")
# Create connection info
conn_info = ConnInfo()
conn_info.service_id = sub_id
conn_info.network = "tcp" # Typically TCP for subchannels
conn_info.address = address
# Start broker stream
stream = self._broker_stub.StartStream()
# Send connection request
await stream.write(conn_info)
# Wait for acknowledgment
response = await stream.read()
if response and response.service_id == sub_id:
# Check knock acknowledgment
if hasattr(response, "knock") and hasattr(response.knock, "ack"):
if response.knock.ack:
self.logger.debug(f"Broker subchannel {sub_id} opened successfully")
else:
error_msg = (
response.knock.error if hasattr(response.knock, "error") else "Unknown error"
)
self.logger.error(f"Subchannel open failed: {error_msg}")
# Don't raise exception, just log error and continue
else:
self.logger.debug(f"Broker subchannel {sub_id} opened successfully")
else:
raise ProtocolError(f"Failed to get acknowledgment for broker subchannel {sub_id}")
await stream.done_writing()
except grpc.RpcError as e:
raise ProtocolError(f"gRPC error opening broker subchannel {sub_id}: {e}") from e
except Exception as e:
raise ProtocolError(f"Error opening broker subchannel {sub_id}: {e}") from e
|
Functions