Skip to content

Index

pyvider.rpcplugin.handshake

RPC Plugin handshake functionality.

This package provides handshake configuration, validation, building, parsing, and negotiation capabilities for the RPC plugin system.

Classes

HandshakeConfig

Attributes:

Name Type Description
magic_cookie_key str

The expected environment key for the handshake cookie.

magic_cookie_value str

The expected handshake cookie value.

protocol_versions list[int]

A list of protocol versions supported by the server.

supported_transports list[str]

A list of supported transport types (e.g. "tcp", "unix").

Functions

build_handshake_response async

build_handshake_response(
    plugin_version: int,
    transport_name: str,
    transport: TransportT,
    server_cert: Certificate | None = None,
    port: int | None = None,
) -> str

CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

Note: For TCP transport, the ADDRESS 127.0.0.1 is standard for same-host plugin communication, ensuring the plugin host connects to the plugin locally. The actual listening interface might be broader (e.g., 0.0.0.0), but the handshake communicates 127.0.0.1 for the host to connect to.

Parameters:

Name Type Description Default
plugin_version int

The version of the plugin.

required
transport_name str

The name of the transport ("tcp" or "unix").

required
transport TransportT

The transport instance.

required
server_cert Certificate | None

Optional server certificate for TLS.

None
port int | None

Optional port number, required for TCP transport.

None

Returns:

Type Description
str

The constructed handshake response string.

Raises:

Type Description
ValueError

If required parameters are missing (e.g., port for TCP).

TransportError

If an unsupported transport type is given.

Exception

Propagates exceptions from underlying operations.

Source code in pyvider/rpcplugin/handshake/core.py
@resilient(
    context={"operation": "build_handshake_response", "component": "handshake"},
    log_errors=True,
)
async def build_handshake_response(
    plugin_version: int,
    transport_name: str,
    transport: TransportT,
    server_cert: Certificate | None = None,
    port: int | None = None,
) -> str:
    """
    CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

    Note: For TCP transport, the ADDRESS `127.0.0.1` is standard for same-host
    plugin communication, ensuring the plugin host connects to the plugin
    locally. The actual listening interface might be broader (e.g., `0.0.0.0`),
    but the handshake communicates `127.0.0.1` for the host to connect to.

    Args:
        plugin_version: The version of the plugin.
        transport_name: The name of the transport ("tcp" or "unix").
        transport: The transport instance.
        server_cert: Optional server certificate for TLS.
        port: Optional port number, required for TCP transport.

    Returns:
        The constructed handshake response string.

    Raises:
        ValueError: If required parameters are missing (e.g., port for TCP).
        TransportError: If an unsupported transport type is given.
        Exception: Propagates exceptions from underlying operations.
    """
    if _tracer:
        with _tracer.start_as_current_span("rpc.handshake.build_response") as span:
            span.set_attribute("transport", transport_name)
            span.set_attribute("plugin_version", plugin_version)
            span.set_attribute("has_cert", server_cert is not None)
            return await _build_handshake_response_impl(
                plugin_version, transport_name, transport, server_cert, port
            )
    else:
        return await _build_handshake_response_impl(
            plugin_version, transport_name, transport, server_cert, port
        )

create_stderr_relay async

create_stderr_relay(
    process: Popen[bytes],
) -> asyncio.Task[None] | None

Creates a background task that continuously reads and logs stderr from the plugin process. Essential for debugging handshake issues, especially with Go plugins.

Parameters:

Name Type Description Default
process Popen[bytes]

The subprocess.Popen instance with stderr pipe.

required

Returns:

Type Description
Task[None] | None

The asyncio.Task managing the stderr relay, or None if stderr is not available.

Source code in pyvider/rpcplugin/handshake/negotiation.py
async def create_stderr_relay(
    process: subprocess.Popen[bytes],
) -> asyncio.Task[None] | None:
    """
    Creates a background task that continuously reads and logs stderr from the
    plugin process. Essential for debugging handshake issues, especially with Go
    plugins.

    Args:
        process: The subprocess.Popen instance with stderr pipe.

    Returns:
        The asyncio.Task managing the stderr relay, or None if stderr is not available.
    """
    if not process or not process.stderr:
        logger.debug("🤝📤⚠️ No process or stderr stream available for relay")
        return None

    async def _stderr_reader() -> None:
        """Background task to continuously read stderr"""
        logger.debug("🤝📤🚀 Starting stderr relay task")
        # Ensure stderr is not None before accessing readline
        if process.stderr is None:
            logger.error("🤝📤❌ Stderr became None unexpectedly in relay task.")
            return

        while process.poll() is None:
            try:
                line = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
                if not line:
                    await asyncio.sleep(DEFAULT_PROCESS_WAIT_TIME)
                    continue

                text = line.decode("utf-8", errors="replace").rstrip()
                if text:
                    logger.debug(f"🤝📤📝 Plugin stderr: {text}")
            except Exception as e:
                logger.error(f"🤝📤❌ Error in stderr relay: {e}")
                break

        logger.debug("🤝📤🛑 Stderr relay task ended")

    relay_task = asyncio.create_task(_stderr_reader())
    return relay_task

is_valid_handshake_parts

is_valid_handshake_parts(
    parts: list[str],
) -> TypeGuard[list[str]]

Ensures it contains exactly 6 parts and the first two parts are digits.

Source code in pyvider/rpcplugin/handshake/core.py
def is_valid_handshake_parts(parts: list[str]) -> TypeGuard[list[str]]:
    """
    Ensures it contains exactly 6 parts and the first two parts are digits.
    """
    return len(parts) == 6 and parts[0].isdigit() and parts[1].isdigit()

negotiate_protocol_version

negotiate_protocol_version(
    server_versions: list[int],
) -> int

🤝🔄 Selects the highest mutually supported protocol version.

Compares the server-provided versions against the client's supported versions from the configuration.

Returns:

Type Description
int

The highest mutually supported protocol version.

Raises:

Type Description
ProtocolError

If no mutually supported version is found.

Source code in pyvider/rpcplugin/handshake/negotiation.py
def negotiate_protocol_version(server_versions: list[int]) -> int:
    """
    🤝🔄 Selects the highest mutually supported protocol version.

    Compares the server-provided versions against the client's supported versions
    from the configuration.

    Returns:
      The highest mutually supported protocol version.

    Raises:
      ProtocolError: If no mutually supported version is found.
    """
    logger.debug(f"🤝🔄 Negotiating protocol version. Server supports: {server_versions}")
    supported_versions_config = rpcplugin_config.supported_protocol_versions
    for version in sorted(server_versions, reverse=True):
        if version in supported_versions_config:
            return version

    logger.error(
        "🤝❌ Protocol negotiation failed: No compatible version found. "
        f"Server supports: {server_versions}, Client supports: "
        f"{supported_versions_config}"
    )
    raise ProtocolError(
        message=(
            "No mutually supported protocol version. Server supports: "
            f"{server_versions}, Client supports: {supported_versions_config}"
        ),
        hint=(
            "Ensure client and server configurations for 'PLUGIN_PROTOCOL_VERSIONS' "
            "and 'SUPPORTED_PROTOCOL_VERSIONS' have at least one common version."
        ),
    )

negotiate_transport async

negotiate_transport(
    server_transports: list[str],
) -> tuple[str, TransportT]

(🗣️🚊 Transport Negotiation) Negotiates the transport type with the server and creates the appropriate transport instance.

Returns:

Type Description
tuple[str, TransportT]

A tuple of (transport_name, transport_instance).

Raises:

Type Description
TransportError

If no compatible transport can be negotiated or an error occurs during negotiation.

Source code in pyvider/rpcplugin/handshake/negotiation.py
async def negotiate_transport(server_transports: list[str]) -> tuple[str, TransportT]:
    """
    (🗣️🚊 Transport Negotiation) Negotiates the transport type with the server and
    creates the appropriate transport instance.

    Returns:
      A tuple of (transport_name, transport_instance).

    Raises:
      TransportError: If no compatible transport can be negotiated or an error
                      occurs during negotiation.
    """
    logger.debug(f"(🗣️🚊 Transport Negotiation: Starting) => Available transports: {server_transports}")
    if not server_transports:
        logger.error("🗣️🚊❌ (Transport Negotiation: Failed) => No transport options provided")
        raise TransportError(
            message="No transport options were provided by the server for negotiation.",
            hint=(
                "Ensure the server configuration specifies at least one supported "
                "transport (e.g., 'unix', 'tcp')."
            ),
        )
    try:
        if "unix" in server_transports:
            logger.debug("🗣️🚊🧦 (Transport Negotiation: Selected Unix) => Unix socket transport is available")
            temp_dir = os.environ.get("TEMP_DIR") or tempfile.gettempdir()
            transport_path = str(Path(temp_dir) / f"pyvider-{os.getpid()}.sock")
            from pyvider.rpcplugin.transport import UnixSocketTransport

            return "unix", cast(TransportT, UnixSocketTransport(path=transport_path))

        elif "tcp" in server_transports:
            logger.debug("🗣️🚊👥 (Transport Negotiation: Selected TCP) => TCP transport is available")
            from pyvider.rpcplugin.transport import TCPSocketTransport

            return "tcp", cast(TransportT, TCPSocketTransport())
        else:
            logger.error(
                "🗣️🚊❌ (Transport Negotiation: Failed) => No supported transport found",
                extra={"server_transports": server_transports},
            )
            client_supported = (
                rpcplugin_config.plugin_client_transports if rpcplugin_config else "config not loaded"
            )
            raise TransportError(
                message=(f"No compatible transport found. Server offered: {server_transports}."),
                hint=(
                    "Ensure the client supports at least one of the server's "
                    f"offered transports. Client supports: {client_supported}."
                ),
            )
    except Exception as e:
        logger.error(
            "🗣️🚊❌ (Transport Negotiation: Exception) => Error during transport negotiation",
            extra={"error": str(e)},
        )
        raise TransportError(
            message=f"An unexpected error occurred during transport negotiation: {e}",
            hint="Check server logs for more details on transport setup.",
        ) from e

parse_handshake_response

parse_handshake_response(
    response: str,
) -> tuple[int, int, str, str, str, str | None]

(📡🔍 Handshake Parsing) Parses the handshake response string. Expected Format: CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

Parameters:

Name Type Description Default
response str

The handshake response string to parse.

required

Returns:

Type Description
tuple[int, int, str, str, str, str | None]

A tuple containing: - core_version (int) - plugin_version (int) - network (str) - address (str) - protocol (str) - server_cert (str | None)

Raises:

Type Description
HandshakeError

If parsing fails or the format is invalid.

ValueError

If parts of the handshake string are invalid (e.g., non-integer versions).

Source code in pyvider/rpcplugin/handshake/core.py
@resilient(
    context={"operation": "parse_handshake_response", "component": "handshake"},
    log_errors=True,
)
def parse_handshake_response(
    response: str,
) -> tuple[int, int, str, str, str, str | None]:
    """
    (📡🔍 Handshake Parsing) Parses the handshake response string.
    Expected Format: CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

    Args:
        response: The handshake response string to parse.

    Returns:
        A tuple containing:
            - core_version (int)
            - plugin_version (int)
            - network (str)
            - address (str)
            - protocol (str)
            - server_cert (str | None)

    Raises:
        HandshakeError: If parsing fails or the format is invalid.
        ValueError: If parts of the handshake string are invalid
                    (e.g., non-integer versions).
    """
    if _tracer:
        with _tracer.start_as_current_span("rpc.handshake.parse_response") as span:
            # Only set response_length if response is a string
            if isinstance(response, str):
                span.set_attribute("response_length", len(response))
            return _parse_handshake_response_impl(response, span)
    else:
        return _parse_handshake_response_impl(response, None)

read_handshake_response async

read_handshake_response(process: Popen[bytes]) -> str

Robust handshake response reader with multiple strategies to handle different Go-Python interop challenges.

The handshake response is a pipe-delimited string with format: CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

Parameters:

Name Type Description Default
process Popen[bytes]

The subprocess.Popen instance representing the plugin.

required

Returns:

Type Description
str

The complete handshake response string.

Raises:

Type Description
HandshakeError

If handshake fails (e.g. process exits early) or times out.

Source code in pyvider/rpcplugin/handshake/negotiation.py
async def read_handshake_response(process: subprocess.Popen[bytes]) -> str:
    """
    Robust handshake response reader with multiple strategies to handle
    different Go-Python interop challenges.

    The handshake response is a pipe-delimited string with format:
    CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT

    Args:
        process: The subprocess.Popen instance representing the plugin.

    Returns:
        The complete handshake response string.

    Raises:
        HandshakeError: If handshake fails (e.g. process exits early) or times out.
    """
    if not process or not process.stdout:
        raise HandshakeError(
            message=("Plugin process or its stdout stream is not available for handshake."),
            hint="Ensure the plugin process started correctly and is accessible.",
        )

    logger.debug("🤝📥🚀 Reading handshake response from plugin process...")

    timeout = rpcplugin_config.plugin_handshake_timeout
    start_time = time.time()
    buffer = ""

    while (time.time() - start_time) < timeout:
        _process_has_exited(process, buffer)

        completed, buffer, had_data = await _read_with_fallback(process, buffer)
        if completed:
            return completed

        if not had_data:
            await asyncio.sleep(DEFAULT_HANDSHAKE_RETRY_WAIT)
            continue

        if not _buffer_has_complete_handshake(buffer):
            await asyncio.sleep(DEFAULT_HANDSHAKE_RETRY_WAIT)

    stderr_output = _collect_process_stderr(process)
    stderr_output_truncated = (stderr_output[:200] + "...") if len(stderr_output) > 200 else stderr_output
    raise HandshakeError(
        message=(f"Timed out waiting for handshake response from plugin after {timeout} seconds."),
        hint=(
            f"Ensure plugin starts and prints handshake to stdout promptly. "
            f"Last buffer: '{buffer}'. Stderr: '{stderr_output_truncated}'"
            if stderr_output_truncated
            else (f"Ensure plugin starts and prints handshake to stdout promptly. Last buffer: '{buffer}'.")
        ),
    )
validate_magic_cookie(
    magic_cookie_key: (
        str | None | _SentinelType
    ) = _SENTINEL_INSTANCE,
    magic_cookie_value: (
        str | None | _SentinelType
    ) = _SENTINEL_INSTANCE,
    magic_cookie: (
        str | None | _SentinelType
    ) = _SENTINEL_INSTANCE,
) -> None

Validates the magic cookie.

If a parameter is omitted (i.e. remains as the sentinel), its value is read from rpcplugin_config. However, if the caller explicitly passes None, that is treated as missing and an error is raised.

Parameters:

Name Type Description Default
magic_cookie_key str | None | _SentinelType

The environment key for the magic cookie.

_SENTINEL_INSTANCE
magic_cookie_value str | None | _SentinelType

The expected value of the magic cookie.

_SENTINEL_INSTANCE
magic_cookie str | None | _SentinelType

The actual cookie value provided.

_SENTINEL_INSTANCE

Raises:

Type Description
HandshakeError

If cookie validation fails.

Source code in pyvider/rpcplugin/handshake/core.py
@resilient(
    context={"operation": "validate_magic_cookie", "component": "handshake"},
    log_errors=True,
)
def validate_magic_cookie(
    magic_cookie_key: str | None | _SentinelType = _SENTINEL_INSTANCE,
    magic_cookie_value: str | None | _SentinelType = _SENTINEL_INSTANCE,
    magic_cookie: str | None | _SentinelType = _SENTINEL_INSTANCE,
) -> None:
    """
    Validates the magic cookie.

    If a parameter is omitted (i.e. remains as the sentinel),
    its value is read from rpcplugin_config. However, if the caller
    explicitly passes None, that is treated as missing and an error is raised.

    Args:
        magic_cookie_key: The environment key for the magic cookie.
        magic_cookie_value: The expected value of the magic cookie.
        magic_cookie: The actual cookie value provided.

    Raises:
        HandshakeError: If cookie validation fails.
    """
    if _tracer:
        with _tracer.start_as_current_span("rpc.handshake.validate_cookie") as span:
            span.set_attribute("component", "handshake")
            _validate_magic_cookie_impl(magic_cookie_key, magic_cookie_value, magic_cookie)
    else:
        _validate_magic_cookie_impl(magic_cookie_key, magic_cookie_value, magic_cookie)