Skip to content

Negotiation

pyvider.rpcplugin.handshake.negotiation

Transport and protocol negotiation for the RPC plugin handshake.

This module handles transport negotiation, protocol version negotiation, and I/O operations for handshake processes.

Classes

Functions

create_stderr_relay async

create_stderr_relay(
    process: Popen[bytes],
) -> asyncio.Task[None] | None

Creates a background task that continuously reads and logs stderr from the plugin process. Essential for debugging handshake issues, especially with Go plugins.

Parameters:

Name Type Description Default
process Popen[bytes]

The subprocess.Popen instance with stderr pipe.

required

Returns:

Type Description
Task[None] | None

The asyncio.Task managing the stderr relay, or None if stderr is not available.

Source code in pyvider/rpcplugin/handshake/negotiation.py
async def create_stderr_relay(
    process: subprocess.Popen[bytes],
) -> asyncio.Task[None] | None:
    """
    Creates a background task that continuously reads and logs stderr from the
    plugin process. Essential for debugging handshake issues, especially with Go
    plugins.

    Args:
        process: The subprocess.Popen instance with stderr pipe.

    Returns:
        The asyncio.Task managing the stderr relay, or None if stderr is not available.
    """
    if not process or not process.stderr:
        logger.debug("🤝📤⚠️ No process or stderr stream available for relay")
        return None

    async def _stderr_reader() -> None:
        """Background task to continuously read stderr"""
        logger.debug("🤝📤🚀 Starting stderr relay task")
        # Ensure stderr is not None before accessing readline
        if process.stderr is None:
            logger.error("🤝📤❌ Stderr became None unexpectedly in relay task.")
            return

        while process.poll() is None:
            try:
                line = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
                if not line:
                    await asyncio.sleep(DEFAULT_PROCESS_WAIT_TIME)
                    continue

                text = line.decode("utf-8", errors="replace").rstrip()
                if text:
                    logger.debug(f"🤝📤📝 Plugin stderr: {text}")
            except Exception as e:
                logger.error(f"🤝📤❌ Error in stderr relay: {e}")
                break

        logger.debug("🤝📤🛑 Stderr relay task ended")

    relay_task = asyncio.create_task(_stderr_reader())
    return relay_task

negotiate_protocol_version

negotiate_protocol_version(
    server_versions: list[int],
) -> int

🤝🔄 Selects the highest mutually supported protocol version.

Compares the server-provided versions against the client's supported versions from the configuration.

Returns:

Type Description
int

The highest mutually supported protocol version.

Raises:

Type Description
ProtocolError

If no mutually supported version is found.

Source code in pyvider/rpcplugin/handshake/negotiation.py
def negotiate_protocol_version(server_versions: list[int]) -> int:
    """
    🤝🔄 Selects the highest mutually supported protocol version.

    Compares the server-provided versions against the client's supported versions
    from the configuration.

    Returns:
      The highest mutually supported protocol version.

    Raises:
      ProtocolError: If no mutually supported version is found.
    """
    logger.debug(f"🤝🔄 Negotiating protocol version. Server supports: {server_versions}")
    supported_versions_config = rpcplugin_config.supported_protocol_versions
    for version in sorted(server_versions, reverse=True):
        if version in supported_versions_config:
            return version

    logger.error(
        "🤝❌ Protocol negotiation failed: No compatible version found. "
        f"Server supports: {server_versions}, Client supports: "
        f"{supported_versions_config}"
    )
    raise ProtocolError(
        message=(
            "No mutually supported protocol version. Server supports: "
            f"{server_versions}, Client supports: {supported_versions_config}"
        ),
        hint=(
            "Ensure client and server configurations for 'PLUGIN_PROTOCOL_VERSIONS' "
            "and 'SUPPORTED_PROTOCOL_VERSIONS' have at least one common version."
        ),
    )

negotiate_transport async

negotiate_transport(
    server_transports: list[str],
) -> tuple[str, TransportT]

(🗣️🚊 Transport Negotiation) Negotiates the transport type with the server and creates the appropriate transport instance.

Returns:

Type Description
tuple[str, TransportT]

A tuple of (transport_name, transport_instance).

Raises:

Type Description
TransportError

If no compatible transport can be negotiated or an error occurs during negotiation.

Source code in pyvider/rpcplugin/handshake/negotiation.py
async def negotiate_transport(server_transports: list[str]) -> tuple[str, TransportT]:
    """
    (🗣️🚊 Transport Negotiation) Negotiates the transport type with the server and
    creates the appropriate transport instance.

    Returns:
      A tuple of (transport_name, transport_instance).

    Raises:
      TransportError: If no compatible transport can be negotiated or an error
                      occurs during negotiation.
    """
    logger.debug(f"(🗣️🚊 Transport Negotiation: Starting) => Available transports: {server_transports}")
    if not server_transports:
        logger.error("🗣️🚊❌ (Transport Negotiation: Failed) => No transport options provided")
        raise TransportError(
            message="No transport options were provided by the server for negotiation.",
            hint=(
                "Ensure the server configuration specifies at least one supported "
                "transport (e.g., 'unix', 'tcp')."
            ),
        )
    try:
        if "unix" in server_transports:
            logger.debug("🗣️🚊🧦 (Transport Negotiation: Selected Unix) => Unix socket transport is available")
            temp_dir = os.environ.get("TEMP_DIR") or tempfile.gettempdir()
            transport_path = str(Path(temp_dir) / f"pyvider-{os.getpid()}.sock")
            from pyvider.rpcplugin.transport import UnixSocketTransport

            return "unix", cast(TransportT, UnixSocketTransport(path=transport_path))

        elif "tcp" in server_transports:
            logger.debug("🗣️🚊👥 (Transport Negotiation: Selected TCP) => TCP transport is available")
            from pyvider.rpcplugin.transport import TCPSocketTransport

            return "tcp", cast(TransportT, TCPSocketTransport())
        else:
            logger.error(
                "🗣️🚊❌ (Transport Negotiation: Failed) => No supported transport found",
                extra={"server_transports": server_transports},
            )
            client_supported = (
                rpcplugin_config.plugin_client_transports if rpcplugin_config else "config not loaded"
            )
            raise TransportError(
                message=(f"No compatible transport found. Server offered: {server_transports}."),
                hint=(
                    "Ensure the client supports at least one of the server's "
                    f"offered transports. Client supports: {client_supported}."
                ),
            )
    except Exception as e:
        logger.error(
            "🗣️🚊❌ (Transport Negotiation: Exception) => Error during transport negotiation",
            extra={"error": str(e)},
        )
        raise TransportError(
            message=f"An unexpected error occurred during transport negotiation: {e}",
            hint="Check server logs for more details on transport setup.",
        ) from e

read_handshake_response async

read_handshake_response(process: Popen[bytes]) -> str

Robust handshake response reader with multiple strategies to handle different Go-Python interop challenges.

The handshake response is a pipe-delimited string with format: CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

Parameters:

Name Type Description Default
process Popen[bytes]

The subprocess.Popen instance representing the plugin.

required

Returns:

Type Description
str

The complete handshake response string.

Raises:

Type Description
HandshakeError

If handshake fails (e.g. process exits early) or times out.

Source code in pyvider/rpcplugin/handshake/negotiation.py
async def read_handshake_response(process: subprocess.Popen[bytes]) -> str:
    """
    Robust handshake response reader with multiple strategies to handle
    different Go-Python interop challenges.

    The handshake response is a pipe-delimited string with format:
    CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

    Args:
        process: The subprocess.Popen instance representing the plugin.

    Returns:
        The complete handshake response string.

    Raises:
        HandshakeError: If handshake fails (e.g. process exits early) or times out.
    """
    if not process or not process.stdout:
        raise HandshakeError(
            message=("Plugin process or its stdout stream is not available for handshake."),
            hint="Ensure the plugin process started correctly and is accessible.",
        )

    logger.debug("🤝📥🚀 Reading handshake response from plugin process...")

    timeout = rpcplugin_config.plugin_handshake_timeout
    start_time = time.time()
    buffer = ""

    while (time.time() - start_time) < timeout:
        _process_has_exited(process, buffer)

        completed, buffer, had_data = await _read_with_fallback(process, buffer)
        if completed:
            return completed

        if not had_data:
            await asyncio.sleep(DEFAULT_HANDSHAKE_RETRY_WAIT)
            continue

        if not _buffer_has_complete_handshake(buffer):
            await asyncio.sleep(DEFAULT_HANDSHAKE_RETRY_WAIT)

    stderr_output = _collect_process_stderr(process)
    stderr_output_truncated = (stderr_output[:200] + "...") if len(stderr_output) > 200 else stderr_output
    raise HandshakeError(
        message=(f"Timed out waiting for handshake response from plugin after {timeout} seconds."),
        hint=(
            f"Ensure plugin starts and prints handshake to stdout promptly. "
            f"Last buffer: '{buffer}'. Stderr: '{stderr_output_truncated}'"
            if stderr_output_truncated
            else (f"Ensure plugin starts and prints handshake to stdout promptly. Last buffer: '{buffer}'.")
        ),
    )