Skip to content

Service

pyvider.rpcplugin.protocol.service

gRPC Service Implementations for Pyvider RPC Plugin.

This module provides the Python implementations for the standard gRPC services defined in the common go-plugin protocol: - GRPCBrokerService: For managing brokered subchannels. - GRPCStdioService: For streaming stdin/stdout/stderr. - GRPCControllerService: For controlling the plugin lifecycle (e.g., shutdown).

It also includes helper classes like SubchannelConnection and a registration function to add these services to a gRPC server.

Classes

GRPCBrokerService

GRPCBrokerService()

Bases: GRPCBrokerServicer

Implementation of the gRPC Broker logic.

This matches the StartStream(...) signature in grpc_broker.proto, which transmits a stream of ConnInfo messages in both directions. In go-plugin, the plugin side uses: 'StartStream(stream ConnInfo) returns (stream ConnInfo)' to set up a subchannel for callbacks or bridging. We'll do a simplified version here.

Source code in pyvider/rpcplugin/protocol/service.py
def __init__(self) -> None:
    # We hold subchannel references here.
    self._subchannels: dict[int, SubchannelConnection] = {}
Functions
StartStream async
StartStream(
    request_iterator: AsyncIterator[ConnInfo],
    context: ServicerContext[ConnInfo, ConnInfo],
) -> AsyncIterator[ConnInfo]

Handles the bidirectional stream for broker connections.

This gRPC method allows the client and server to exchange ConnInfo messages to manage subchannels for additional services or callbacks.

Parameters:

Name Type Description Default
request_iterator AsyncIterator[ConnInfo]

An async iterator yielding incoming ConnInfo messages from the client.

required
context ServicerContext[ConnInfo, ConnInfo]

The gRPC request context.

required

Yields:

Type Description
AsyncIterator[ConnInfo]

Outgoing ConnInfo messages to the client.

Source code in pyvider/rpcplugin/protocol/service.py
async def StartStream(
    self,
    request_iterator: AsyncIterator[ConnInfo],
    context: grpc.aio.ServicerContext[ConnInfo, ConnInfo],
) -> AsyncIterator[ConnInfo]:
    """
    Handles the bidirectional stream for broker connections.

    This gRPC method allows the client and server to exchange `ConnInfo`
    messages to manage subchannels for additional services or callbacks.

    Args:
        request_iterator: An async iterator yielding incoming `ConnInfo`
                          messages from the client.
        context: The gRPC request context.

    Yields:
        Outgoing `ConnInfo` messages to the client.
    """
    incoming: ConnInfo | None = None  # Initialize to avoid unbound variable in exception handler
    try:  # Outer try for iterator errors
        async for incoming in request_iterator:
            sub_id = incoming.service_id
            try:  # Inner try for processing each item
                logger.debug(
                    "Broker received subchannel request",
                    sub_id=sub_id,
                    network=incoming.network,
                    address=incoming.address,
                )

                if incoming.knock.knock:  # Request to open/ensure channel
                    if sub_id in self._subchannels and self._subchannels[sub_id].is_open:
                        yield ConnInfo(
                            service_id=sub_id,
                            network=incoming.network,
                            address=incoming.address,
                            knock=ConnInfo.Knock(knock=False, ack=True, error=""),
                        )
                    else:  # New subchannel request or existing but not open
                        subchan = SubchannelConnection(sub_id, incoming.address)
                        await subchan.open()
                        self._subchannels[sub_id] = subchan
                        yield ConnInfo(
                            service_id=sub_id,
                            network=incoming.network,
                            address=incoming.address,
                            knock=ConnInfo.Knock(knock=False, ack=True, error=""),
                        )
                else:  # Request to close channel (knock=False)
                    if sub_id in self._subchannels:
                        await self._subchannels[sub_id].close()
                        del self._subchannels[sub_id]
                        yield ConnInfo(  # Ack the close
                            service_id=sub_id,
                            knock=ConnInfo.Knock(knock=False, ack=True, error=""),
                        )
                    else:
                        yield ConnInfo(
                            service_id=sub_id,
                            knock=ConnInfo.Knock(knock=False, ack=True, error="Channel not found"),
                        )
            except Exception as ex_inner:
                err_str_inner = f"Broker error processing item for sub_id {sub_id}: {ex_inner}"
                logger.error(
                    "Broker error processing subchannel item",
                    sub_id=sub_id,
                    error=str(ex_inner),
                    trace=traceback.format_exc(),
                )
                yield ConnInfo(
                    service_id=sub_id,
                    knock=ConnInfo.Knock(knock=False, ack=False, error=err_str_inner),
                )
                # Crucial: process next item, don't fall into ex_outer
                continue
    except Exception as ex_outer:
        outer_error_sub_id = getattr(incoming, "service_id", 0) if incoming is not None else 0
        err_str_outer = (
            "Broker stream error from client iterator for sub_id "
            f"{outer_error_sub_id} (outer loop): {ex_outer}"
        )
        try:
            yield ConnInfo(
                service_id=0,
                knock=ConnInfo.Knock(knock=False, ack=False, error=err_str_outer),
            )
        except Exception as ex_yield:
            logger.error("Failed to yield broker error response", error=str(ex_yield))

GRPCControllerService

GRPCControllerService(
    shutdown_event: Event, stdio_service: GRPCStdioService
)

Bases: GRPCControllerServicer

Implements the GRPCController service for plugin lifecycle management. Specifically, it handles the Shutdown RPC to gracefully terminate the plugin.

Initializes the GRPCControllerService.

Parameters:

Name Type Description Default
shutdown_event Event

An asyncio.Event to signal plugin shutdown.

required
stdio_service GRPCStdioService

The GRPCStdioService instance to also shutdown.

required
Source code in pyvider/rpcplugin/protocol/service.py
def __init__(self, shutdown_event: asyncio.Event, stdio_service: GRPCStdioService) -> None:
    """
    Initializes the GRPCControllerService.

    Args:
        shutdown_event: An asyncio.Event to signal plugin shutdown.
        stdio_service: The GRPCStdioService instance to also shutdown.
    """
    self._shutdown_event = shutdown_event or asyncio.Event()
    self._stdio_service = stdio_service
Functions
Shutdown async
Shutdown(
    request: Empty, context: ServicerContext[Empty, Empty]
) -> CEmpty

Handles the Shutdown RPC request from the client.

This method signals other plugin components to shut down gracefully and then initiates the process termination.

Parameters:

Name Type Description Default
request Empty

The Empty request message (from grpc_controller.proto).

required
context ServicerContext[Empty, Empty]

The gRPC request context.

required

Returns:

Type Description
Empty

An Empty response message.

Source code in pyvider/rpcplugin/protocol/service.py
@resilient(
    context={"operation": "controller_shutdown", "component": "protocol"},
    log_errors=True,
)
async def Shutdown(self, request: CEmpty, context: grpc.aio.ServicerContext[CEmpty, CEmpty]) -> CEmpty:
    """
    Handles the Shutdown RPC request from the client.

    This method signals other plugin components to shut down gracefully
    and then initiates the process termination.

    Args:
        request: The Empty request message (from grpc_controller.proto).
        context: The gRPC request context.

    Returns:
        An Empty response message.
    """
    self._stdio_service.shutdown()
    self._shutdown_event.set()

    self._shutdown_task = asyncio.create_task(self._delayed_shutdown())
    return CEmpty()

GRPCStdioService

GRPCStdioService()

Bases: GRPCStdioServicer

Implementation of plugin stdio streaming.

Source code in pyvider/rpcplugin/protocol/service.py
def __init__(self) -> None:
    self._message_queue: asyncio.Queue[Any] = asyncio.Queue()  # Allow Any for sentinel
    self._shutdown = False
Functions
StreamStdio async
StreamStdio(
    request: Empty,
    context: ServicerContext[Empty, StdioData],
) -> AsyncIterator[StdioData]

Streams STDOUT/STDERR lines to the caller.

Source code in pyvider/rpcplugin/protocol/service.py
async def StreamStdio(
    self, request: empty_pb2.Empty, context: grpc.aio.ServicerContext[empty_pb2.Empty, StdioData]
) -> AsyncIterator[StdioData]:
    """Streams STDOUT/STDERR lines to the caller."""

    done = asyncio.Event()

    def on_rpc_done(_: Any) -> None:
        done.set()

    context.add_done_callback(on_rpc_done)  # type: ignore[arg-type]

    async for item in self._stream_items(done):
        yield item
put_line async
put_line(line: bytes, is_stderr: bool = False) -> None

Adds a line of data (stdout or stderr) to the message queue for streaming.

Parameters:

Name Type Description Default
line bytes

The bytes data of the line.

required
is_stderr bool

True if the line is from stderr, False for stdout.

False
Source code in pyvider/rpcplugin/protocol/service.py
async def put_line(self, line: bytes, is_stderr: bool = False) -> None:
    """
    Adds a line of data (stdout or stderr) to the message queue for streaming.

    Args:
        line: The bytes data of the line.
        is_stderr: True if the line is from stderr, False for stdout.
    """
    try:
        data = StdioData(channel=StdioData.STDERR if is_stderr else StdioData.STDOUT, data=line)
        await self._message_queue.put(data)
    except Exception:
        pass  # Empty block

SubchannelConnection

Represents a brokered subchannel for plugin-to-plugin communication.

In the go-plugin architecture, subchannels allow plugins to establish secondary communication channels for callbacks, additional services, or plugin-to-plugin communication. Each subchannel has a unique ID and network address.

Attributes:

Name Type Description
conn_id int

Unique identifier for this subchannel connection.

address str

Network address for the subchannel (format depends on transport).

is_open bool

Whether the subchannel is currently open and available.

Example
subchannel = SubchannelConnection(
    conn_id=1,
    address="127.0.0.1:9000"
)
await subchannel.open()
# Subchannel now ready for communication
await subchannel.close()
Functions
close async
close() -> None

Close the subchannel and release resources.

This method closes the subchannel connection and marks it as unavailable. Resources associated with the subchannel are released.

Side Effects

Sets is_open to False after closing.

Source code in pyvider/rpcplugin/protocol/service.py
async def close(self) -> None:
    """
    Close the subchannel and release resources.

    This method closes the subchannel connection and marks it as
    unavailable. Resources associated with the subchannel are released.

    Side Effects:
        Sets is_open to False after closing.
    """
    await asyncio.sleep(0.05)
    self.is_open = False
open async
open() -> None

Open the subchannel for communication.

This method establishes the subchannel connection and marks it as available for use. In a real implementation, this would involve network setup or IPC channel creation.

Side Effects

Sets is_open to True after successful opening.

Source code in pyvider/rpcplugin/protocol/service.py
async def open(self) -> None:
    """
    Open the subchannel for communication.

    This method establishes the subchannel connection and marks it as
    available for use. In a real implementation, this would involve
    network setup or IPC channel creation.

    Side Effects:
        Sets is_open to True after successful opening.
    """
    logger.debug("Opening subchannel", conn_id=self.conn_id, address=self.address)
    await asyncio.sleep(0.05)  # simulate
    self.is_open = True

Functions

register_protocol_service

register_protocol_service(
    server: Server, shutdown_event: Event
) -> None

Registers all standard gRPC services for the plugin.

Source code in pyvider/rpcplugin/protocol/service.py
def register_protocol_service(server: grpc.aio.Server, shutdown_event: asyncio.Event) -> None:
    """Registers all standard gRPC services for the plugin."""
    stdio_service = GRPCStdioService()
    broker_service = GRPCBrokerService()
    controller_service = GRPCControllerService(shutdown_event, stdio_service)

    add_GRPCStdioServicer_to_server(stdio_service, server)  # type: ignore[no-untyped-call]
    add_GRPCBrokerServicer_to_server(broker_service, server)  # type: ignore[no-untyped-call]
    add_GRPCControllerServicer_to_server(controller_service, server)  # type: ignore[no-untyped-call]