Skip to content

Service

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

pyvider.rpcplugin.protocol.service

gRPC Service Implementations for Pyvider RPC Plugin.

This module provides the Python implementations for the standard gRPC services defined in the common go-plugin protocol: - GRPCBrokerService: For managing brokered subchannels. - GRPCStdioService: For streaming stdin/stdout/stderr. - GRPCControllerService: For controlling the plugin lifecycle (e.g., shutdown).

It also includes helper classes like SubchannelConnection and a registration function to add these services to a gRPC server.

Classes

GRPCBrokerService

GRPCBrokerService()

Bases: GRPCBrokerServicer

Implementation of the gRPC Broker logic.

This matches the StartStream(...) signature in grpc_broker.proto, which transmits a stream of ConnInfo messages in both directions. In go-plugin, the plugin side uses: 'StartStream(stream ConnInfo) returns (stream ConnInfo)' to set up a subchannel for callbacks or bridging. We'll do a simplified version here.

Source code in pyvider/rpcplugin/protocol/service.py
def __init__(self) -> None:
    # We hold subchannel references here.
    self._subchannels: dict[int, SubchannelConnection] = {}
Functions
StartStream async
StartStream(
    request_iterator: AsyncIterator[ConnInfo],
    context: ServicerContext[ConnInfo, ConnInfo],
) -> AsyncIterator[ConnInfo]

Handles the bidirectional stream for broker connections.

This gRPC method allows the client and server to exchange ConnInfo messages to manage subchannels for additional services or callbacks.

Parameters:

Name Type Description Default
request_iterator AsyncIterator[ConnInfo]

An async iterator yielding incoming ConnInfo messages from the client.

required
context ServicerContext[ConnInfo, ConnInfo]

The gRPC request context.

required

Yields:

Type Description
AsyncIterator[ConnInfo]

Outgoing ConnInfo messages to the client.

Source code in pyvider/rpcplugin/protocol/service.py
async def StartStream(
    self,
    request_iterator: AsyncIterator[ConnInfo],
    context: grpc.aio.ServicerContext[ConnInfo, ConnInfo],
) -> AsyncIterator[ConnInfo]:
    """
    Handles the bidirectional stream for broker connections.

    This gRPC method allows the client and server to exchange `ConnInfo`
    messages to manage subchannels for additional services or callbacks.

    Args:
        request_iterator: An async iterator yielding incoming `ConnInfo`
                          messages from the client.
        context: The gRPC request context.

    Yields:
        Outgoing `ConnInfo` messages to the client.
    """
    incoming: ConnInfo | None = None  # Initialize to avoid unbound variable in exception handler
    try:  # Outer try for iterator errors
        async for incoming in request_iterator:
            sub_id = incoming.service_id
            try:  # Inner try for processing each item
                logger.debug(
                    "Broker received subchannel request",
                    sub_id=sub_id,
                    network=incoming.network,
                    address=incoming.address,
                )

                if incoming.knock.knock:  # Request to open/ensure channel
                    if sub_id in self._subchannels and self._subchannels[sub_id].is_open:
                        yield ConnInfo(
                            service_id=sub_id,
                            network=incoming.network,
                            address=incoming.address,
                            knock=ConnInfo.Knock(knock=False, ack=True, error=""),
                        )
                    else:  # New subchannel request or existing but not open
                        subchan = SubchannelConnection(sub_id, incoming.address)
                        await subchan.open()
                        self._subchannels[sub_id] = subchan
                        yield ConnInfo(
                            service_id=sub_id,
                            network=incoming.network,
                            address=incoming.address,
                            knock=ConnInfo.Knock(knock=False, ack=True, error=""),
                        )
                else:  # Request to close channel (knock=False)
                    if sub_id in self._subchannels:
                        await self._subchannels[sub_id].close()
                        del self._subchannels[sub_id]
                        yield ConnInfo(  # Ack the close
                            service_id=sub_id,
                            knock=ConnInfo.Knock(knock=False, ack=True, error=""),
                        )
                    else:
                        yield ConnInfo(
                            service_id=sub_id,
                            knock=ConnInfo.Knock(knock=False, ack=True, error="Channel not found"),
                        )
            except Exception as ex_inner:
                err_str_inner = f"Broker error processing item for sub_id {sub_id}: {ex_inner}"
                logger.error(
                    "Broker error processing subchannel item",
                    sub_id=sub_id,
                    error=str(ex_inner),
                    trace=traceback.format_exc(),
                )
                yield ConnInfo(
                    service_id=sub_id,
                    knock=ConnInfo.Knock(knock=False, ack=False, error=err_str_inner),
                )
                # Crucial: process next item, don't fall into ex_outer
                continue
    except Exception as ex_outer:
        outer_error_sub_id = getattr(incoming, "service_id", 0) if incoming is not None else 0
        err_str_outer = (
            "Broker stream error from client iterator for sub_id "
            f"{outer_error_sub_id} (outer loop): {ex_outer}"
        )
        try:
            yield ConnInfo(
                service_id=0,
                knock=ConnInfo.Knock(knock=False, ack=False, error=err_str_outer),
            )
        except Exception as ex_yield:
            logger.error("Failed to yield broker error response", error=str(ex_yield))

GRPCControllerService

GRPCControllerService(
    shutdown_event: Event, stdio_service: GRPCStdioService
)

Bases: GRPCControllerServicer

Implements the GRPCController service for plugin lifecycle management. Specifically, it handles the Shutdown RPC to gracefully terminate the plugin.

Initializes the GRPCControllerService.

Parameters:

Name Type Description Default
shutdown_event Event

An asyncio.Event to signal plugin shutdown.

required
stdio_service GRPCStdioService

The GRPCStdioService instance to also shutdown.

required
Source code in pyvider/rpcplugin/protocol/service.py
def __init__(self, shutdown_event: asyncio.Event, stdio_service: GRPCStdioService) -> None:
    """
    Initializes the GRPCControllerService.

    Args:
        shutdown_event: An asyncio.Event to signal plugin shutdown.
        stdio_service: The GRPCStdioService instance to also shutdown.
    """
    self._shutdown_event = shutdown_event or asyncio.Event()
    self._stdio_service = stdio_service
Functions
Shutdown async
Shutdown(
    request: Empty, context: ServicerContext[Empty, Empty]
) -> CEmpty

Handles the Shutdown RPC request from the client.

This method signals other plugin components to shut down gracefully and then initiates the process termination.

Parameters:

Name Type Description Default
request Empty

The Empty request message (from grpc_controller.proto).

required
context ServicerContext[Empty, Empty]

The gRPC request context.

required

Returns:

Type Description
Empty

An Empty response message.

Source code in pyvider/rpcplugin/protocol/service.py
@resilient(
    context={"operation": "controller_shutdown", "component": "protocol"},
    log_errors=True,
)
async def Shutdown(self, request: CEmpty, context: grpc.aio.ServicerContext[CEmpty, CEmpty]) -> CEmpty:
    """
    Handles the Shutdown RPC request from the client.

    This method signals other plugin components to shut down gracefully
    and then initiates the process termination.

    Args:
        request: The Empty request message (from grpc_controller.proto).
        context: The gRPC request context.

    Returns:
        An Empty response message.
    """
    self._stdio_service.shutdown()
    self._shutdown_event.set()

    self._shutdown_task = asyncio.create_task(self._delayed_shutdown())
    return CEmpty()

GRPCStdioService

GRPCStdioService()

Bases: GRPCStdioServicer

Implementation of plugin stdio streaming.

Source code in pyvider/rpcplugin/protocol/service.py
def __init__(self) -> None:
    self._message_queue: asyncio.Queue[Any] = asyncio.Queue()  # Allow Any for sentinel
    self._shutdown = False
Functions
StreamStdio async
StreamStdio(
    request: Empty,
    context: ServicerContext[Empty, StdioData],
) -> AsyncIterator[StdioData]

Streams STDOUT/STDERR lines to the caller.

Source code in pyvider/rpcplugin/protocol/service.py
async def StreamStdio(
    self, request: empty_pb2.Empty, context: grpc.aio.ServicerContext[empty_pb2.Empty, StdioData]
) -> AsyncIterator[StdioData]:
    """Streams STDOUT/STDERR lines to the caller."""

    done = asyncio.Event()

    def on_rpc_done(_: Any) -> None:
        done.set()

    context.add_done_callback(on_rpc_done)

    async for item in self._stream_items(done):
        yield item
put_line async
put_line(line: bytes, is_stderr: bool = False) -> None

Adds a line of data (stdout or stderr) to the message queue for streaming.

Parameters:

Name Type Description Default
line bytes

The bytes data of the line.

required
is_stderr bool

True if the line is from stderr, False for stdout.

False
Source code in pyvider/rpcplugin/protocol/service.py
async def put_line(self, line: bytes, is_stderr: bool = False) -> None:
    """
    Adds a line of data (stdout or stderr) to the message queue for streaming.

    Args:
        line: The bytes data of the line.
        is_stderr: True if the line is from stderr, False for stdout.
    """
    try:
        data = StdioData(channel=StdioData.STDERR if is_stderr else StdioData.STDOUT, data=line)
        await self._message_queue.put(data)
    except Exception:
        pass  # Empty block

SubchannelConnection

Represents a brokered subchannel for plugin-to-plugin communication.

In the go-plugin architecture, subchannels allow plugins to establish secondary communication channels for callbacks, additional services, or plugin-to-plugin communication. Each subchannel has a unique ID and network address.

Attributes:

Name Type Description
conn_id int

Unique identifier for this subchannel connection.

address str

Network address for the subchannel (format depends on transport).

is_open bool

Whether the subchannel is currently open and available.

Example
subchannel = SubchannelConnection(
    conn_id=1,
    address="127.0.0.1:9000"
)
await subchannel.open()
# Subchannel now ready for communication
await subchannel.close()
Functions
close async
close() -> None

Close the subchannel and release resources.

This method closes the subchannel connection and marks it as unavailable. Resources associated with the subchannel are released.

Side Effects

Sets is_open to False after closing.

Source code in pyvider/rpcplugin/protocol/service.py
async def close(self) -> None:
    """
    Close the subchannel and release resources.

    This method closes the subchannel connection and marks it as
    unavailable. Resources associated with the subchannel are released.

    Side Effects:
        Sets is_open to False after closing.
    """
    await asyncio.sleep(0.05)
    self.is_open = False
open async
open() -> None

Open the subchannel for communication.

This method establishes the subchannel connection and marks it as available for use. In a real implementation, this would involve network setup or IPC channel creation.

Side Effects

Sets is_open to True after successful opening.

Source code in pyvider/rpcplugin/protocol/service.py
async def open(self) -> None:
    """
    Open the subchannel for communication.

    This method establishes the subchannel connection and marks it as
    available for use. In a real implementation, this would involve
    network setup or IPC channel creation.

    Side Effects:
        Sets is_open to True after successful opening.
    """
    logger.debug("Opening subchannel", conn_id=self.conn_id, address=self.address)
    await asyncio.sleep(0.05)  # simulate
    self.is_open = True

Functions

register_protocol_service

register_protocol_service(
    server: Server, shutdown_event: Event
) -> None

Registers all standard gRPC services for the plugin.

Source code in pyvider/rpcplugin/protocol/service.py
def register_protocol_service(server: grpc.aio.Server, shutdown_event: asyncio.Event) -> None:
    """Registers all standard gRPC services for the plugin."""
    stdio_service = GRPCStdioService()
    broker_service = GRPCBrokerService()
    controller_service = GRPCControllerService(shutdown_event, stdio_service)

    add_GRPCStdioServicer_to_server(stdio_service, server)  # type: ignore[no-untyped-call]
    add_GRPCBrokerServicer_to_server(broker_service, server)  # type: ignore[no-untyped-call]
    add_GRPCControllerServicer_to_server(controller_service, server)  # type: ignore[no-untyped-call]