Skip to content

Process

pyvider.rpcplugin.client.process

Process management and gRPC operations for RPC plugin clients.

This module handles subprocess launching, gRPC channel creation, stub initialization, and stdio/broker operations.

Classes

ClientProcessMixin

Mixin class containing process and gRPC methods for RPCPluginClient.

Functions
open_broker_subchannel async
open_broker_subchannel(sub_id: int, address: str) -> None

Open a broker subchannel for multi-service communication.

Parameters:

Name Type Description Default
sub_id int

Unique identifier for the subchannel

required
address str

Network address for the subchannel

required

Raises:

Type Description
ProtocolError

If broker operations fail

Source code in pyvider/rpcplugin/client/process.py
async def open_broker_subchannel(self: RPCPluginClient, sub_id: int, address: str) -> None:  # type: ignore[misc]
    """
    Open a broker subchannel for multi-service communication.

    Args:
        sub_id: Unique identifier for the subchannel
        address: Network address for the subchannel

    Raises:
        ProtocolError: If broker operations fail
    """
    if not self._broker_stub:
        self.logger.warning("Broker stub not available for subchannel operations")
        return

    try:
        self.logger.debug(f"Opening broker subchannel {sub_id} at address {address}")

        # Create connection info
        conn_info = ConnInfo()
        conn_info.service_id = sub_id
        conn_info.network = "tcp"  # Typically TCP for subchannels
        conn_info.address = address

        # Start broker stream
        stream = self._broker_stub.StartStream()

        # Send connection request
        await stream.write(conn_info)

        # Wait for acknowledgment
        response = await stream.read()
        if response and response.service_id == sub_id:
            # Check knock acknowledgment
            if hasattr(response, "knock") and hasattr(response.knock, "ack"):
                if response.knock.ack:
                    self.logger.debug(f"Broker subchannel {sub_id} opened successfully")
                else:
                    error_msg = (
                        response.knock.error if hasattr(response.knock, "error") else "Unknown error"
                    )
                    self.logger.error(f"Subchannel open failed: {error_msg}")
                    # Don't raise exception, just log error and continue
            else:
                self.logger.debug(f"Broker subchannel {sub_id} opened successfully")
        else:
            raise ProtocolError(f"Failed to get acknowledgment for broker subchannel {sub_id}")

        await stream.done_writing()

    except grpc.RpcError as e:
        raise ProtocolError(f"gRPC error opening broker subchannel {sub_id}: {e}") from e
    except Exception as e:
        raise ProtocolError(f"Error opening broker subchannel {sub_id}: {e}") from e

Functions