Skip to content

Observability

provide.foundation.observability

Observability module for Foundation.

Provides integration with observability platforms like OpenObserve. Only available when OpenTelemetry dependencies are installed.

Classes

OpenObserveClient

OpenObserveClient(
    url: str,
    username: str,
    password: str,
    organization: str = "default",
    timeout: int = 30,
)

Bases: SearchOperationsMixin, MetricsOperationsMixin, OpenObserveClientBase

Async client for interacting with OpenObserve API.

Uses Foundation's transport system for all HTTP operations. Combines search/streams operations and Prometheus metrics API.

Source code in provide/foundation/integrations/openobserve/client_base.py
def __init__(
    self,
    url: str,
    username: str,
    password: str,
    organization: str = "default",
    timeout: int = 30,
) -> None:
    """Initialize OpenObserve client.

    Args:
        url: Base URL for OpenObserve API
        username: Username for authentication
        password: Password for authentication
        organization: Organization name (default: "default")
        timeout: Request timeout in seconds

    Note:
        Retry logic is handled automatically by UniversalClient's middleware.

    """
    self.url = url.rstrip("/")
    self.username, self.password = validate_credentials(username, password)
    self.organization = organization

    # Create UniversalClient with auth headers and timeout
    self._client = UniversalClient(
        hub=get_hub(),
        default_headers=get_auth_headers(self.username, self.password),
        default_timeout=float(timeout),
    )

Functions

is_openobserve_available

is_openobserve_available() -> bool

Check if OpenObserve integration is available.

Returns:

Type Description
bool

True if OpenTelemetry and OpenObserve are available

Source code in provide/foundation/observability/__init__.py
def is_openobserve_available() -> bool:
    """Check if OpenObserve integration is available.

    Returns:
        True if OpenTelemetry and OpenObserve are available

    """
    return _HAS_OTEL and "OpenObserveClient" in globals()

search_logs async

search_logs(
    sql: str,
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse

Search logs in OpenObserve.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
start_time str | int | None

Start time (relative like "-1h" or microseconds)

None
end_time str | int | None

End time (relative like "now" or microseconds)

None
size int

Number of results to return

100
client OpenObserveClient | None

OpenObserve client (creates new if not provided)

None

Returns:

Type Description
SearchResponse

SearchResponse with results

Source code in provide/foundation/integrations/openobserve/search.py
async def search_logs(
    sql: str,
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse:
    """Search logs in OpenObserve.

    Args:
        sql: SQL query to execute
        start_time: Start time (relative like "-1h" or microseconds)
        end_time: End time (relative like "now" or microseconds)
        size: Number of results to return
        client: OpenObserve client (creates new if not provided)

    Returns:
        SearchResponse with results

    """
    if client is None:
        client = OpenObserveClient.from_config()

    return await client.search(
        sql=sql,
        start_time=start_time,
        end_time=end_time,
        size=size,
    )

stream_logs

stream_logs(
    sql: str,
    start_time: str | int | None = None,
    poll_interval: int = 5,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]

Stream logs from OpenObserve with polling.

Continuously polls for new logs and yields them as they arrive.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
start_time str | int | None

Initial start time

None
poll_interval int

Seconds between polls

5
client OpenObserveClient | None

OpenObserve client

None

Yields:

Type Description
dict[str, Any]

Log entries as they arrive

Source code in provide/foundation/integrations/openobserve/streaming.py
def stream_logs(
    sql: str,
    start_time: str | int | None = None,
    poll_interval: int = 5,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]:
    """Stream logs from OpenObserve with polling.

    Continuously polls for new logs and yields them as they arrive.

    Args:
        sql: SQL query to execute
        start_time: Initial start time
        poll_interval: Seconds between polls
        client: OpenObserve client

    Yields:
        Log entries as they arrive

    """
    if client is None:
        client = OpenObserveClient.from_config()

    # Track the last seen timestamp to avoid duplicates
    if start_time is None:
        last_timestamp = parse_relative_time("-1m")
    elif isinstance(start_time, str):
        last_timestamp = parse_relative_time(start_time)
    else:
        # Already an int (microseconds)
        last_timestamp = start_time
    seen_ids = set()

    while True:
        try:
            # Search for new logs since last timestamp using async client
            response = run_async(
                client.search(
                    sql=sql,
                    start_time=last_timestamp,
                    end_time="now",
                    size=1000,
                )
            )

            # Process new logs
            for hit in response.hits:
                # Create a unique ID for deduplication
                timestamp = hit.get("_timestamp", 0)
                log_id = f"{timestamp}:{hash(json_dumps(hit, sort_keys=True))}"

                if log_id not in seen_ids:
                    seen_ids.add(log_id)
                    yield hit

                    # Update last timestamp
                    if timestamp > last_timestamp:
                        last_timestamp = timestamp + 1  # Add 1 microsecond to avoid duplicates

            # Clean up old seen IDs to prevent memory growth
            cutoff_time = parse_relative_time("-1m")
            seen_ids = {lid for lid in seen_ids if int(lid.split(":")[0]) > cutoff_time}

            # Wait before next poll
            time.sleep(poll_interval)

        except KeyboardInterrupt:
            break
        except Exception as e:
            perr(f"Error during streaming: {e}")
            raise OpenObserveStreamingError(f"Streaming failed: {e}") from e