Transport and protocol negotiation for the RPC plugin handshake.
This module handles transport negotiation, protocol version negotiation,
and I/O operations for handshake processes.
Classes
Functions
create_stderr_relay
async
create_stderr_relay(
process: Popen[bytes],
) -> asyncio.Task[None] | None
Creates a background task that continuously reads and logs stderr from the
plugin process. Essential for debugging handshake issues, especially with Go
plugins.
Parameters:
| Name |
Type |
Description |
Default |
process
|
Popen[bytes]
|
The subprocess.Popen instance with stderr pipe.
|
required
|
Returns:
| Type |
Description |
Task[None] | None
|
The asyncio.Task managing the stderr relay, or None if stderr is not available.
|
Source code in pyvider/rpcplugin/handshake/negotiation.py
| async def create_stderr_relay(
process: subprocess.Popen[bytes],
) -> asyncio.Task[None] | None:
"""
Creates a background task that continuously reads and logs stderr from the
plugin process. Essential for debugging handshake issues, especially with Go
plugins.
Args:
process: The subprocess.Popen instance with stderr pipe.
Returns:
The asyncio.Task managing the stderr relay, or None if stderr is not available.
"""
if not process or not process.stderr:
logger.debug("🤝📤⚠️ No process or stderr stream available for relay")
return None
async def _stderr_reader() -> None:
"""Background task to continuously read stderr"""
logger.debug("🤝📤🚀 Starting stderr relay task")
# Ensure stderr is not None before accessing readline
if process.stderr is None:
logger.error("🤝📤❌ Stderr became None unexpectedly in relay task.")
return
while process.poll() is None:
try:
line = await asyncio.get_event_loop().run_in_executor(None, process.stderr.readline)
if not line:
await asyncio.sleep(DEFAULT_PROCESS_WAIT_TIME)
continue
text = line.decode("utf-8", errors="replace").rstrip()
if text:
logger.debug(f"🤝📤📝 Plugin stderr: {text}")
except Exception as e:
logger.error(f"🤝📤❌ Error in stderr relay: {e}")
break
logger.debug("🤝📤🛑 Stderr relay task ended")
relay_task = asyncio.create_task(_stderr_reader())
return relay_task
|
negotiate_protocol_version
negotiate_protocol_version(
server_versions: list[int],
) -> int
🤝🔄 Selects the highest mutually supported protocol version.
Compares the server-provided versions against the client's supported versions
from the configuration.
Returns:
| Type |
Description |
int
|
The highest mutually supported protocol version.
|
Raises:
| Type |
Description |
ProtocolError
|
If no mutually supported version is found.
|
Source code in pyvider/rpcplugin/handshake/negotiation.py
| def negotiate_protocol_version(server_versions: list[int]) -> int:
"""
🤝🔄 Selects the highest mutually supported protocol version.
Compares the server-provided versions against the client's supported versions
from the configuration.
Returns:
The highest mutually supported protocol version.
Raises:
ProtocolError: If no mutually supported version is found.
"""
logger.debug(f"🤝🔄 Negotiating protocol version. Server supports: {server_versions}")
supported_versions_config = rpcplugin_config.supported_protocol_versions
for version in sorted(server_versions, reverse=True):
if version in supported_versions_config:
return version
logger.error(
"🤝❌ Protocol negotiation failed: No compatible version found. "
f"Server supports: {server_versions}, Client supports: "
f"{supported_versions_config}"
)
raise ProtocolError(
message=(
"No mutually supported protocol version. Server supports: "
f"{server_versions}, Client supports: {supported_versions_config}"
),
hint=(
"Ensure client and server configurations for 'PLUGIN_PROTOCOL_VERSIONS' "
"and 'SUPPORTED_PROTOCOL_VERSIONS' have at least one common version."
),
)
|
negotiate_transport
async
negotiate_transport(
server_transports: list[str],
) -> tuple[str, TransportT]
(🗣️🚊 Transport Negotiation) Negotiates the transport type with the server and
creates the appropriate transport instance.
Returns:
| Type |
Description |
tuple[str, TransportT]
|
A tuple of (transport_name, transport_instance).
|
Raises:
| Type |
Description |
TransportError
|
If no compatible transport can be negotiated or an error
occurs during negotiation.
|
Source code in pyvider/rpcplugin/handshake/negotiation.py
| async def negotiate_transport(server_transports: list[str]) -> tuple[str, TransportT]:
"""
(🗣️🚊 Transport Negotiation) Negotiates the transport type with the server and
creates the appropriate transport instance.
Returns:
A tuple of (transport_name, transport_instance).
Raises:
TransportError: If no compatible transport can be negotiated or an error
occurs during negotiation.
"""
logger.debug(f"(🗣️🚊 Transport Negotiation: Starting) => Available transports: {server_transports}")
if not server_transports:
logger.error("🗣️🚊❌ (Transport Negotiation: Failed) => No transport options provided")
raise TransportError(
message="No transport options were provided by the server for negotiation.",
hint=(
"Ensure the server configuration specifies at least one supported "
"transport (e.g., 'unix', 'tcp')."
),
)
try:
if "unix" in server_transports:
logger.debug("🗣️🚊🧦 (Transport Negotiation: Selected Unix) => Unix socket transport is available")
temp_dir = os.environ.get("TEMP_DIR") or tempfile.gettempdir()
transport_path = str(Path(temp_dir) / f"pyvider-{os.getpid()}.sock")
from pyvider.rpcplugin.transport import UnixSocketTransport
return "unix", cast(TransportT, UnixSocketTransport(path=transport_path))
elif "tcp" in server_transports:
logger.debug("🗣️🚊👥 (Transport Negotiation: Selected TCP) => TCP transport is available")
from pyvider.rpcplugin.transport import TCPSocketTransport
return "tcp", cast(TransportT, TCPSocketTransport())
else:
logger.error(
"🗣️🚊❌ (Transport Negotiation: Failed) => No supported transport found",
extra={"server_transports": server_transports},
)
client_supported = (
rpcplugin_config.plugin_client_transports if rpcplugin_config else "config not loaded"
)
raise TransportError(
message=(f"No compatible transport found. Server offered: {server_transports}."),
hint=(
"Ensure the client supports at least one of the server's "
f"offered transports. Client supports: {client_supported}."
),
)
except Exception as e:
logger.error(
"🗣️🚊❌ (Transport Negotiation: Exception) => Error during transport negotiation",
extra={"error": str(e)},
)
raise TransportError(
message=f"An unexpected error occurred during transport negotiation: {e}",
hint="Check server logs for more details on transport setup.",
) from e
|
read_handshake_response
async
read_handshake_response(process: Popen[bytes]) -> str
Robust handshake response reader with multiple strategies to handle
different Go-Python interop challenges.
The handshake response is a pipe-delimited string with format:
CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT
Parameters:
| Name |
Type |
Description |
Default |
process
|
Popen[bytes]
|
The subprocess.Popen instance representing the plugin.
|
required
|
Returns:
| Type |
Description |
str
|
The complete handshake response string.
|
Raises:
| Type |
Description |
HandshakeError
|
If handshake fails (e.g. process exits early) or times out.
|
Source code in pyvider/rpcplugin/handshake/negotiation.py
| async def read_handshake_response(process: subprocess.Popen[bytes]) -> str:
"""
Robust handshake response reader with multiple strategies to handle
different Go-Python interop challenges.
The handshake response is a pipe-delimited string with format:
CORE_VERSION|PLUGIN_VERSION|NETWORK|ADDRESS|PROTOCOL|TLS_CERT
Args:
process: The subprocess.Popen instance representing the plugin.
Returns:
The complete handshake response string.
Raises:
HandshakeError: If handshake fails (e.g. process exits early) or times out.
"""
if not process or not process.stdout:
raise HandshakeError(
message=("Plugin process or its stdout stream is not available for handshake."),
hint="Ensure the plugin process started correctly and is accessible.",
)
logger.debug("🤝📥🚀 Reading handshake response from plugin process...")
timeout = rpcplugin_config.plugin_handshake_timeout
start_time = time.time()
buffer = ""
while (time.time() - start_time) < timeout:
_process_has_exited(process, buffer)
completed, buffer, had_data = await _read_with_fallback(process, buffer)
if completed:
return completed
if not had_data:
await asyncio.sleep(DEFAULT_HANDSHAKE_RETRY_WAIT)
continue
if not _buffer_has_complete_handshake(buffer):
await asyncio.sleep(DEFAULT_HANDSHAKE_RETRY_WAIT)
stderr_output = _collect_process_stderr(process)
stderr_output_truncated = (stderr_output[:200] + "...") if len(stderr_output) > 200 else stderr_output
raise HandshakeError(
message=(f"Timed out waiting for handshake response from plugin after {timeout} seconds."),
hint=(
f"Ensure plugin starts and prints handshake to stdout promptly. "
f"Last buffer: '{buffer}'. Stderr: '{stderr_output_truncated}'"
if stderr_output_truncated
else (f"Ensure plugin starts and prints handshake to stdout promptly. Last buffer: '{buffer}'.")
),
)
|