Skip to content

Index

provide.foundation.integrations.openobserve

TODO: Add module docstring.

Classes

MetricMetadata

Metadata for a metric.

Attributes:

Name Type Description
name str

Metric name

type str

Metric type (counter, gauge, histogram, summary, etc.)

help str

Help text describing the metric

unit str

Unit of measurement

Functions
from_dict classmethod
from_dict(data: dict[str, Any]) -> MetricMetadata

Create from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with metric metadata

required

Returns:

Type Description
MetricMetadata

MetricMetadata instance

Source code in provide/foundation/integrations/openobserve/metrics_models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> MetricMetadata:
    """Create from dictionary.

    Args:
        data: Dictionary with metric metadata

    Returns:
        MetricMetadata instance
    """
    return cls(
        name=data.get("name", ""),
        type=data.get("type", ""),
        help=data.get("help", ""),
        unit=data.get("unit", ""),
    )
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation

Source code in provide/foundation/integrations/openobserve/metrics_models.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary.

    Returns:
        Dictionary representation
    """
    return {
        "name": self.name,
        "type": self.type,
        "help": self.help,
        "unit": self.unit,
    }

MetricQueryResult

Result from a Prometheus query.

Attributes:

Name Type Description
result_type str

Type of result (vector, matrix, scalar, string)

result list[MetricSample]

List of metric samples

status str

Query status (success, error)

error_type str

Error type if query failed

error str

Error message if query failed

warnings list[str]

List of warnings

Attributes
is_success property
is_success: bool

Check if query was successful.

Returns:

Type Description
bool

True if status is "success"

sample_count property
sample_count: int

Get number of metric samples.

Returns:

Type Description
int

Number of samples in result

Functions
from_dict classmethod
from_dict(data: dict[str, Any]) -> MetricQueryResult

Create from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with query result from Prometheus API

required

Returns:

Type Description
MetricQueryResult

MetricQueryResult instance

Source code in provide/foundation/integrations/openobserve/metrics_models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> MetricQueryResult:
    """Create from dictionary.

    Args:
        data: Dictionary with query result from Prometheus API

    Returns:
        MetricQueryResult instance
    """
    # Handle OpenObserve/Prometheus API response format
    if "data" in data:
        data_section = data["data"]
        status = data.get("status", "success")
        warnings = data.get("warnings", [])
        error_type = data.get("errorType", "")
        error = data.get("error", "")

        result_type = data_section.get("resultType", "")
        result_data = data_section.get("result", [])

        samples = [MetricSample.from_dict(sample) for sample in result_data]

        return cls(
            result_type=result_type,
            result=samples,
            status=status,
            error_type=error_type,
            error=error,
            warnings=warnings,
        )

    # Fallback for other formats (e.g., error responses without data section)
    return cls(
        result_type=data.get("resultType", ""),
        result=[],
        status=data.get("status", "success"),
        error_type=data.get("errorType", ""),
        error=data.get("error", ""),
        warnings=data.get("warnings", []),
    )
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation

Source code in provide/foundation/integrations/openobserve/metrics_models.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary.

    Returns:
        Dictionary representation
    """
    result: dict[str, Any] = {
        "status": self.status,
        "data": {
            "resultType": self.result_type,
            "result": [sample.to_dict() for sample in self.result],
        },
    }

    if self.warnings:
        result["warnings"] = self.warnings

    if self.error:
        result["errorType"] = self.error_type
        result["error"] = self.error

    return result

MetricSample

A single metric sample with labels and value.

Attributes:

Name Type Description
metric dict[str, str]

Label name-value pairs

value tuple[float, str] | None

Metric value (single value for instant queries)

values list[tuple[float, str]]

List of [timestamp, value] pairs for range queries

Functions
from_dict classmethod
from_dict(data: dict[str, Any]) -> MetricSample

Create from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with sample data from Prometheus API

required

Returns:

Type Description
MetricSample

MetricSample instance

Source code in provide/foundation/integrations/openobserve/metrics_models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> MetricSample:
    """Create from dictionary.

    Args:
        data: Dictionary with sample data from Prometheus API

    Returns:
        MetricSample instance
    """
    metric = data.get("metric", {})
    value = data.get("value")
    values = data.get("values", [])

    # Convert value to tuple if present
    value_tuple = None
    if value and isinstance(value, list) and len(value) == 2:
        value_tuple = (float(value[0]), str(value[1]))

    # Convert values list
    values_list = []
    if values:
        for v in values:
            if isinstance(v, list) and len(v) == 2:
                values_list.append((float(v[0]), str(v[1])))

    return cls(
        metric=metric,
        value=value_tuple,
        values=values_list,
    )
to_dict
to_dict() -> dict[str, Any]

Convert to dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation

Source code in provide/foundation/integrations/openobserve/metrics_models.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary.

    Returns:
        Dictionary representation
    """
    result: dict[str, Any] = {"metric": self.metric}

    if self.value:
        result["value"] = list(self.value)

    if self.values:
        result["values"] = [list(v) for v in self.values]

    return result

OpenObserveAuthenticationError

OpenObserveAuthenticationError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: OpenObserveError

Authentication failed with OpenObserve.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

OpenObserveClient

OpenObserveClient(
    url: str,
    username: str,
    password: str,
    organization: str = "default",
    timeout: int = 30,
)

Bases: SearchOperationsMixin, MetricsOperationsMixin, OpenObserveClientBase

Async client for interacting with OpenObserve API.

Uses Foundation's transport system for all HTTP operations. Combines search/streams operations and Prometheus metrics API.

Source code in provide/foundation/integrations/openobserve/client_base.py
def __init__(
    self,
    url: str,
    username: str,
    password: str,
    organization: str = "default",
    timeout: int = 30,
) -> None:
    """Initialize OpenObserve client.

    Args:
        url: Base URL for OpenObserve API
        username: Username for authentication
        password: Password for authentication
        organization: Organization name (default: "default")
        timeout: Request timeout in seconds

    Note:
        Retry logic is handled automatically by UniversalClient's middleware.

    """
    self.url = url.rstrip("/")
    self.username, self.password = validate_credentials(username, password)
    self.organization = organization

    # Create UniversalClient with auth headers and timeout
    self._client = UniversalClient(
        hub=get_hub(),
        default_headers=get_auth_headers(self.username, self.password),
        default_timeout=float(timeout),
    )

OpenObserveConfig

Bases: RuntimeConfig

Configuration for OpenObserve integration.

Functions
get_otlp_endpoint
get_otlp_endpoint() -> str | None

Get OTLP endpoint derived from OpenObserve URL.

Returns:

Type Description
str | None

OTLP endpoint URL or None if not configured

Source code in provide/foundation/integrations/openobserve/config.py
def get_otlp_endpoint(self) -> str | None:
    """Get OTLP endpoint derived from OpenObserve URL.

    Returns:
        OTLP endpoint URL or None if not configured

    """
    if not self.url:
        return None

    # Remove /api/{org} suffix if present
    base_url = self.url
    if "/api/" in base_url:
        # Extract base URL before /api/
        base_url = base_url.split("/api/")[0]

    # Construct OTLP endpoint
    org = self.org or "default"
    return f"{base_url}/api/{org}"
is_available
is_available() -> bool

Test if OpenObserve is available and reachable.

Returns:

Type Description
bool

True if connection test succeeds

Source code in provide/foundation/integrations/openobserve/config.py
def is_available(self) -> bool:
    """Test if OpenObserve is available and reachable.

    Returns:
        True if connection test succeeds

    """
    if not self.is_configured():
        return False

    try:
        # Import here to avoid circular dependency
        import asyncio

        from provide.foundation.integrations.openobserve.client import OpenObserveClient

        client = OpenObserveClient(
            url=self.url,  # type: ignore[arg-type]
            username=self.user,  # type: ignore[arg-type]
            password=self.password,  # type: ignore[arg-type]
            organization=self.org or "default",
        )
        return asyncio.run(client.test_connection())
    except Exception:
        return False
is_configured
is_configured() -> bool

Check if OpenObserve is configured with required settings.

Returns:

Type Description
bool

True if URL, user, and password are all set

Source code in provide/foundation/integrations/openobserve/config.py
def is_configured(self) -> bool:
    """Check if OpenObserve is configured with required settings.

    Returns:
        True if URL, user, and password are all set

    """
    return bool(self.url and self.user and self.password)

OpenObserveConfigError

OpenObserveConfigError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: OpenObserveError

Configuration error for OpenObserve.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

OpenObserveConnectionError

OpenObserveConnectionError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: OpenObserveError

Error connecting to OpenObserve API.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

OpenObserveError

OpenObserveError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: FoundationError

Base exception for OpenObserve-related errors.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

OpenObserveQueryError

OpenObserveQueryError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: OpenObserveError

Error executing query in OpenObserve.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

OpenObserveStreamingError

OpenObserveStreamingError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: OpenObserveError

Error during streaming operations.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

SearchQuery

Search query parameters for OpenObserve.

Functions
to_dict
to_dict() -> dict[str, Any]

Convert to API request format.

Source code in provide/foundation/integrations/openobserve/models.py
def to_dict(self) -> dict[str, Any]:
    """Convert to API request format."""
    return {
        "query": {
            "sql": self.sql,
            "start_time": self.start_time,
            "end_time": self.end_time,
            "from": self.from_offset,
            "size": self.size,
        },
    }

SearchResponse

Response from OpenObserve search API.

Functions
from_dict classmethod
from_dict(data: dict[str, Any]) -> SearchResponse

Create from API response.

Source code in provide/foundation/integrations/openobserve/models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> SearchResponse:
    """Create from API response."""
    return cls(
        hits=data.get("hits", []),
        total=data.get("total", 0),
        took=data.get("took", 0),
        scan_size=data.get("scan_size", 0),
        trace_id=data.get("trace_id"),
        from_offset=data.get("from", 0),
        size=data.get("size", 0),
        is_partial=data.get("is_partial", False),
        function_error=data.get("function_error", []),
    )

StreamInfo

Information about an OpenObserve stream.

Functions
from_dict classmethod
from_dict(data: dict[str, Any]) -> StreamInfo

Create from API response.

Source code in provide/foundation/integrations/openobserve/models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> StreamInfo:
    """Create from API response."""
    return cls(
        name=data.get("name", ""),
        storage_type=data.get("storage_type", ""),
        stream_type=data.get("stream_type", ""),
        doc_count=data.get("stats", {}).get("doc_count", 0),
        compressed_size=data.get("stats", {}).get("compressed_size", 0),
        original_size=data.get("stats", {}).get("original_size", 0),
    )

Functions

aggregate_by_level async

aggregate_by_level(
    stream: str = "default",
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    client: OpenObserveClient | None = None,
) -> dict[str, int]

Get count of logs by level.

Parameters:

Name Type Description Default
stream str

Stream name to search in

'default'
start_time str | int | None

Start time

None
end_time str | int | None

End time

None
client OpenObserveClient | None

OpenObserve client

None

Returns:

Type Description
dict[str, int]

Dictionary mapping level to count

Source code in provide/foundation/integrations/openobserve/search.py
async def aggregate_by_level(
    stream: str = "default",
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    client: OpenObserveClient | None = None,
) -> dict[str, int]:
    """Get count of logs by level.

    Args:
        stream: Stream name to search in
        start_time: Start time
        end_time: End time
        client: OpenObserve client

    Returns:
        Dictionary mapping level to count

    """
    # Sanitize stream name to prevent SQL injection
    safe_stream = _sanitize_stream_name(stream)
    sql = f"SELECT level, COUNT(*) as count FROM {safe_stream} GROUP BY level"  # nosec B608 - Inputs sanitized via _sanitize_* functions
    response = await search_logs(
        sql=sql,
        start_time=start_time,
        end_time=end_time,
        size=1000,
        client=client,
    )

    result = {}
    for hit in response.hits:
        level = hit.get("level", "UNKNOWN")
        count = hit.get("count", 0)
        result[level] = count

    return result

format_csv

format_csv(
    response: SearchResponse,
    columns: list[str] | None = None,
) -> str

Format response as CSV.

Parameters:

Name Type Description Default
response SearchResponse

Search response

required
columns list[str] | None

Specific columns to include (None for all)

None

Returns:

Type Description
str

CSV string

Source code in provide/foundation/integrations/openobserve/formatters.py
def format_csv(response: SearchResponse, columns: list[str] | None = None) -> str:
    """Format response as CSV.

    Args:
        response: Search response
        columns: Specific columns to include (None for all)

    Returns:
        CSV string

    """
    if not response.hits:
        return ""

    # Determine columns
    if columns is None:
        all_keys: set[str] = set()
        for hit in response.hits:
            all_keys.update(hit.keys())
        columns = sorted(all_keys)

    # Create CSV
    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=columns, extrasaction="ignore")

    writer.writeheader()
    for hit in response.hits:
        # Format timestamp for readability
        if "_timestamp" in hit:
            hit = hit.copy()
            timestamp = hit["_timestamp"]
            if timestamp:
                dt = datetime.fromtimestamp(timestamp / 1_000_000)
                hit["_timestamp"] = dt.isoformat()
        writer.writerow(hit)

    return output.getvalue()

format_json

format_json(
    response: SearchResponse | dict[str, Any],
    pretty: bool = True,
) -> str

Format response as JSON.

Parameters:

Name Type Description Default
response SearchResponse | dict[str, Any]

Search response or log entry

required
pretty bool

If True, use pretty printing

True

Returns:

Type Description
str

JSON string

Source code in provide/foundation/integrations/openobserve/formatters.py
def format_json(response: SearchResponse | dict[str, Any], pretty: bool = True) -> str:
    """Format response as JSON.

    Args:
        response: Search response or log entry
        pretty: If True, use pretty printing

    Returns:
        JSON string

    """
    if isinstance(response, SearchResponse):
        data = {
            "hits": response.hits,
            "total": response.total,
            "took": response.took,
            "scan_size": response.scan_size,
        }
    else:
        data = response

    if pretty:
        return json_dumps(data, indent=2, sort_keys=False)
    return json_dumps(data)

format_log_line

format_log_line(entry: dict[str, Any]) -> str

Format a log entry as a traditional log line.

Parameters:

Name Type Description Default
entry dict[str, Any]

Log entry dictionary

required

Returns:

Type Description
str

Formatted log line

Source code in provide/foundation/integrations/openobserve/formatters.py
def format_log_line(entry: dict[str, Any]) -> str:
    """Format a log entry as a traditional log line.

    Args:
        entry: Log entry dictionary

    Returns:
        Formatted log line

    """
    # Extract common fields
    timestamp = entry.get("_timestamp", 0)
    level = entry.get("level", "INFO")
    message = entry.get("message", "")
    service = entry.get("service", "")

    # Convert timestamp to readable format
    if timestamp:
        # Assuming microseconds
        dt = datetime.fromtimestamp(timestamp / 1_000_000)
        time_str = dt.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    else:
        time_str = "unknown"

    # Build log line
    parts = [time_str, f"[{level:5s}]"]

    if service:
        parts.append(f"[{service}]")

    parts.append(message)

    # Add additional fields as key=value
    exclude_fields = {"_timestamp", "level", "message", "service", "_p"}
    extra_fields = []
    for key, value in entry.items():
        if key not in exclude_fields:
            extra_fields.append(f"{key}={value}")

    if extra_fields:
        parts.append(f"({', '.join(extra_fields)})")

    return " ".join(parts)

format_metric_output

format_metric_output(
    result: MetricQueryResult | list[str] | dict[str, Any],
    format_type: str = "table",
    pretty: bool = False,
) -> str

Format metrics output in specified format.

Parameters:

Name Type Description Default
result MetricQueryResult | list[str] | dict[str, Any]

Data to format (query result, list of metrics, or metadata)

required
format_type str

Output format ("table", "json", "csv", "chart", "summary")

'table'
pretty bool

Whether to pretty-print (for JSON)

False

Returns:

Type Description
str

Formatted string

Source code in provide/foundation/integrations/openobserve/metrics_formatters.py
def format_metric_output(
    result: MetricQueryResult | list[str] | dict[str, Any],
    format_type: str = "table",
    pretty: bool = False,
) -> str:
    """Format metrics output in specified format.

    Args:
        result: Data to format (query result, list of metrics, or metadata)
        format_type: Output format ("table", "json", "csv", "chart", "summary")
        pretty: Whether to pretty-print (for JSON)

    Returns:
        Formatted string
    """
    # Handle different input types
    if isinstance(result, list):
        # List of metric names
        return format_metrics_list(result)

    if isinstance(result, dict) and not isinstance(result, MetricQueryResult):
        # Metadata dictionary
        return format_metric_metadata(result)

    if not isinstance(result, MetricQueryResult):
        return str(result)

    # Format query result based on type
    if format_type == "json":
        return format_query_result_json(result, pretty=pretty)
    if format_type == "csv":
        return format_query_result_csv(result)
    if format_type == "chart":
        return format_time_series_chart(result)
    if format_type == "summary":
        return format_metric_summary(result)

    # Default to table
    return format_query_result_table(result)

format_metrics_list

format_metrics_list(
    metrics: list[str], show_count: bool = True
) -> str

Format list of metrics as a simple list.

Parameters:

Name Type Description Default
metrics list[str]

List of metric names

required
show_count bool

Whether to show count at the end

True

Returns:

Type Description
str

Formatted string

Source code in provide/foundation/integrations/openobserve/metrics_formatters.py
def format_metrics_list(metrics: list[str], show_count: bool = True) -> str:
    """Format list of metrics as a simple list.

    Args:
        metrics: List of metric names
        show_count: Whether to show count at the end

    Returns:
        Formatted string
    """
    if not metrics:
        return "No metrics found."

    lines = []
    for metric in sorted(metrics):
        lines.append(f"  {metric}")

    output = "\n".join(lines)

    if show_count:
        output += f"\n\nTotal: {len(metrics)} metrics"

    return output

format_output

format_output(
    response: SearchResponse | dict[str, Any],
    format_type: str = "log",
    **kwargs: Any
) -> str

Format output based on specified type.

Parameters:

Name Type Description Default
response SearchResponse | dict[str, Any]

Search response or log entry

required
format_type str

Output format (json, log, table, csv, summary)

'log'
**kwargs Any

Additional format-specific options

{}

Returns:

Type Description
str

Formatted string

Source code in provide/foundation/integrations/openobserve/formatters.py
def format_output(
    response: SearchResponse | dict[str, Any],
    format_type: str = "log",
    **kwargs: Any,
) -> str:
    """Format output based on specified type.

    Args:
        response: Search response or log entry
        format_type: Output format (json, log, table, csv, summary)
        **kwargs: Additional format-specific options

    Returns:
        Formatted string

    """
    match format_type.lower():
        case "json":
            return format_json(response, **kwargs)
        case "log":
            return _format_as_log(response)
        case "table":
            return _format_as_table(response, **kwargs)
        case "csv":
            return _format_as_csv(response, **kwargs)
        case "summary":
            return _format_as_summary(response)
        case _:
            # Default to log format
            return _format_as_log(response)

format_summary

format_summary(response: SearchResponse) -> str

Format a summary of the search response.

Parameters:

Name Type Description Default
response SearchResponse

Search response

required

Returns:

Type Description
str

Summary string

Source code in provide/foundation/integrations/openobserve/formatters.py
def format_summary(response: SearchResponse) -> str:
    """Format a summary of the search response.

    Args:
        response: Search response

    Returns:
        Summary string

    """
    lines = [
        f"Total hits: {response.total}",
        f"Returned: {len(response.hits)}",
        f"Query time: {response.took}ms",
        f"Scan size: {response.scan_size:,} bytes",
    ]

    if response.trace_id:
        lines.append(f"Trace ID: {response.trace_id}")

    if response.is_partial:
        lines.append("⚠️  Results are partial")

    if response.function_error:
        lines.append("Errors:")
        for error in response.function_error:
            lines.append(f"  - {error}")

    # Add level distribution if available
    level_counts: dict[str, int] = {}
    for hit in response.hits:
        level = hit.get("level", "UNKNOWN")
        level_counts[level] = level_counts.get(level, 0) + 1

    if level_counts:
        lines.append("\nLevel distribution:")
        for level, count in sorted(level_counts.items()):
            lines.append(f"  {level}: {count}")

    return "\n".join(lines)

format_table

format_table(
    response: SearchResponse,
    columns: list[str] | None = None,
) -> str

Format response as a table.

Parameters:

Name Type Description Default
response SearchResponse

Search response

required
columns list[str] | None

Specific columns to include (None for all)

None

Returns:

Type Description
str

Table string

Source code in provide/foundation/integrations/openobserve/formatters.py
def format_table(response: SearchResponse, columns: list[str] | None = None) -> str:
    """Format response as a table.

    Args:
        response: Search response
        columns: Specific columns to include (None for all)

    Returns:
        Table string

    """
    if not response.hits:
        return "No results found"

    # Determine columns if not provided
    if columns is None:
        columns = _determine_columns(response.hits)
        columns = _filter_internal_columns(columns)

    # Try to use tabulate if available
    try:
        return _format_with_tabulate(response.hits, columns)
    except ImportError:
        return _format_simple_table(response.hits, columns)

get_current_trace_logs async

get_current_trace_logs(
    stream: str = "default",
    client: OpenObserveClient | None = None,
) -> SearchResponse | None

Get logs for the current active trace.

Parameters:

Name Type Description Default
stream str

Stream name to search in

'default'
client OpenObserveClient | None

OpenObserve client

None

Returns:

Type Description
SearchResponse | None

SearchResponse with logs for current trace, or None if no active trace

Source code in provide/foundation/integrations/openobserve/search.py
async def get_current_trace_logs(
    stream: str = "default",
    client: OpenObserveClient | None = None,
) -> SearchResponse | None:
    """Get logs for the current active trace.

    Args:
        stream: Stream name to search in
        client: OpenObserve client

    Returns:
        SearchResponse with logs for current trace, or None if no active trace

    """
    # Try to get current trace ID from OpenTelemetry
    try:
        from opentelemetry import trace

        current_span = trace.get_current_span()
        if current_span and current_span.is_recording():
            span_context = current_span.get_span_context()
            trace_id = f"{span_context.trace_id:032x}"
            return await search_by_trace_id(trace_id, stream=stream, client=client)
    except ImportError:
        pass

    # Try to get from Foundation tracer
    try:
        from provide.foundation.tracer.context import get_current_trace_id

        trace_id = get_current_trace_id()  # type: ignore[assignment]
        if trace_id:
            return await search_by_trace_id(trace_id, stream=stream, client=client)
    except ImportError:
        pass

    return None

parse_relative_time

parse_relative_time(
    time_str: str, now: datetime | None = None
) -> int

Parse relative time strings like '-1h', '-30m' to microseconds since epoch.

Parameters:

Name Type Description Default
time_str str

Time string (e.g., '-1h', '-30m', 'now')

required
now datetime | None

Current time (for testing), defaults to datetime.now()

None

Returns:

Type Description
int

Microseconds since epoch

Source code in provide/foundation/integrations/openobserve/models.py
def parse_relative_time(time_str: str, now: datetime | None = None) -> int:
    """Parse relative time strings like '-1h', '-30m' to microseconds since epoch.

    Args:
        time_str: Time string (e.g., '-1h', '-30m', 'now')
        now: Current time (for testing), defaults to datetime.now()

    Returns:
        Microseconds since epoch

    """
    from datetime import timedelta

    if now is None:
        now = datetime.now()

    if time_str == "now":
        return int(now.timestamp() * 1_000_000)

    if time_str.startswith("-"):
        # Parse relative time
        value = time_str[1:]
        if value.endswith("h"):
            delta = timedelta(hours=int(value[:-1]))
        elif value.endswith("m"):
            delta = timedelta(minutes=int(value[:-1]))
        elif value.endswith("s"):
            delta = timedelta(seconds=int(value[:-1]))
        elif value.endswith("d"):
            delta = timedelta(days=int(value[:-1]))
        else:
            # Assume seconds if no unit
            delta = timedelta(seconds=int(value))

        target_time = now - delta
        return int(target_time.timestamp() * 1_000_000)

    # Try to parse as timestamp
    try:
        timestamp = int(time_str)
        # If it's already in microseconds (large number), return as-is
        if timestamp > 1_000_000_000_000:
            return timestamp
        # Otherwise assume seconds and convert
        return timestamp * 1_000_000
    except ValueError:
        # Try to parse as ISO datetime
        dt = datetime.fromisoformat(time_str)
        return int(dt.timestamp() * 1_000_000)

search_by_level async

search_by_level(
    level: str,
    stream: str = "default",
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse

Search for logs by level.

Parameters:

Name Type Description Default
level str

Log level to filter (ERROR, WARN, INFO, DEBUG, etc.)

required
stream str

Stream name to search in

'default'
start_time str | int | None

Start time

None
end_time str | int | None

End time

None
size int

Number of results

100
client OpenObserveClient | None

OpenObserve client

None

Returns:

Type Description
SearchResponse

SearchResponse with matching logs

Source code in provide/foundation/integrations/openobserve/search.py
async def search_by_level(
    level: str,
    stream: str = "default",
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse:
    """Search for logs by level.

    Args:
        level: Log level to filter (ERROR, WARN, INFO, DEBUG, etc.)
        stream: Stream name to search in
        start_time: Start time
        end_time: End time
        size: Number of results
        client: OpenObserve client

    Returns:
        SearchResponse with matching logs

    """
    # Sanitize inputs to prevent SQL injection
    safe_stream = _sanitize_stream_name(stream)
    safe_level = _sanitize_log_level(level)
    sql = f"SELECT * FROM {safe_stream} WHERE level = '{safe_level}' ORDER BY _timestamp DESC"  # nosec B608 - Inputs sanitized via _sanitize_* functions
    return await search_logs(
        sql=sql,
        start_time=start_time,
        end_time=end_time,
        size=size,
        client=client,
    )

search_by_service async

search_by_service(
    service: str,
    stream: str = "default",
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse

Search for logs by service name.

Parameters:

Name Type Description Default
service str

Service name to filter

required
stream str

Stream name to search in

'default'
start_time str | int | None

Start time

None
end_time str | int | None

End time

None
size int

Number of results

100
client OpenObserveClient | None

OpenObserve client

None

Returns:

Type Description
SearchResponse

SearchResponse with matching logs

Source code in provide/foundation/integrations/openobserve/search.py
async def search_by_service(
    service: str,
    stream: str = "default",
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse:
    """Search for logs by service name.

    Args:
        service: Service name to filter
        stream: Stream name to search in
        start_time: Start time
        end_time: End time
        size: Number of results
        client: OpenObserve client

    Returns:
        SearchResponse with matching logs

    """
    # Sanitize inputs to prevent SQL injection
    safe_stream = _sanitize_stream_name(stream)
    safe_service = _sanitize_service_name(service)
    sql = f"SELECT * FROM {safe_stream} WHERE service_name = '{safe_service}' ORDER BY _timestamp DESC"  # nosec B608 - Inputs sanitized via _sanitize_* functions
    return await search_logs(
        sql=sql,
        start_time=start_time,
        end_time=end_time,
        size=size,
        client=client,
    )

search_by_trace_id async

search_by_trace_id(
    trace_id: str,
    stream: str = "default",
    client: OpenObserveClient | None = None,
) -> SearchResponse

Search for logs by trace ID.

Parameters:

Name Type Description Default
trace_id str

Trace ID to search for

required
stream str

Stream name to search in

'default'
client OpenObserveClient | None

OpenObserve client (creates new if not provided)

None

Returns:

Type Description
SearchResponse

SearchResponse with matching logs

Source code in provide/foundation/integrations/openobserve/search.py
async def search_by_trace_id(
    trace_id: str,
    stream: str = "default",
    client: OpenObserveClient | None = None,
) -> SearchResponse:
    """Search for logs by trace ID.

    Args:
        trace_id: Trace ID to search for
        stream: Stream name to search in
        client: OpenObserve client (creates new if not provided)

    Returns:
        SearchResponse with matching logs

    """
    # Sanitize inputs to prevent SQL injection
    safe_stream = _sanitize_stream_name(stream)
    safe_trace_id = _sanitize_trace_id(trace_id)
    sql = f"SELECT * FROM {safe_stream} WHERE trace_id = '{safe_trace_id}' ORDER BY _timestamp ASC"  # nosec B608 - Inputs sanitized via _sanitize_* functions
    return await search_logs(sql=sql, start_time="-24h", client=client)

search_errors async

search_errors(
    stream: str = "default",
    start_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse

Search for error logs.

Parameters:

Name Type Description Default
stream str

Stream name to search in

'default'
start_time str | int | None

Start time

None
size int

Number of results

100
client OpenObserveClient | None

OpenObserve client

None

Returns:

Type Description
SearchResponse

SearchResponse with error logs

Source code in provide/foundation/integrations/openobserve/search.py
async def search_errors(
    stream: str = "default",
    start_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse:
    """Search for error logs.

    Args:
        stream: Stream name to search in
        start_time: Start time
        size: Number of results
        client: OpenObserve client

    Returns:
        SearchResponse with error logs

    """
    return await search_by_level(
        level="ERROR",
        stream=stream,
        start_time=start_time,
        size=size,
        client=client,
    )

search_logs async

search_logs(
    sql: str,
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse

Search logs in OpenObserve.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
start_time str | int | None

Start time (relative like "-1h" or microseconds)

None
end_time str | int | None

End time (relative like "now" or microseconds)

None
size int

Number of results to return

100
client OpenObserveClient | None

OpenObserve client (creates new if not provided)

None

Returns:

Type Description
SearchResponse

SearchResponse with results

Source code in provide/foundation/integrations/openobserve/search.py
async def search_logs(
    sql: str,
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    size: int = 100,
    client: OpenObserveClient | None = None,
) -> SearchResponse:
    """Search logs in OpenObserve.

    Args:
        sql: SQL query to execute
        start_time: Start time (relative like "-1h" or microseconds)
        end_time: End time (relative like "now" or microseconds)
        size: Number of results to return
        client: OpenObserve client (creates new if not provided)

    Returns:
        SearchResponse with results

    """
    if client is None:
        client = OpenObserveClient.from_config()

    return await client.search(
        sql=sql,
        start_time=start_time,
        end_time=end_time,
        size=size,
    )

stream_logs

stream_logs(
    sql: str,
    start_time: str | int | None = None,
    poll_interval: int = 5,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]

Stream logs from OpenObserve with polling.

Continuously polls for new logs and yields them as they arrive.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
start_time str | int | None

Initial start time

None
poll_interval int

Seconds between polls

5
client OpenObserveClient | None

OpenObserve client

None

Yields:

Type Description
dict[str, Any]

Log entries as they arrive

Source code in provide/foundation/integrations/openobserve/streaming.py
def stream_logs(
    sql: str,
    start_time: str | int | None = None,
    poll_interval: int = 5,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]:
    """Stream logs from OpenObserve with polling.

    Continuously polls for new logs and yields them as they arrive.

    Args:
        sql: SQL query to execute
        start_time: Initial start time
        poll_interval: Seconds between polls
        client: OpenObserve client

    Yields:
        Log entries as they arrive

    """
    if client is None:
        client = OpenObserveClient.from_config()

    # Track the last seen timestamp to avoid duplicates
    if start_time is None:
        last_timestamp = parse_relative_time("-1m")
    elif isinstance(start_time, str):
        last_timestamp = parse_relative_time(start_time)
    else:
        # Already an int (microseconds)
        last_timestamp = start_time
    seen_ids = set()

    while True:
        try:
            # Search for new logs since last timestamp using async client
            response = run_async(
                client.search(
                    sql=sql,
                    start_time=last_timestamp,
                    end_time="now",
                    size=1000,
                )
            )

            # Process new logs
            for hit in response.hits:
                # Create a unique ID for deduplication
                timestamp = hit.get("_timestamp", 0)
                log_id = f"{timestamp}:{hash(json_dumps(hit, sort_keys=True))}"

                if log_id not in seen_ids:
                    seen_ids.add(log_id)
                    yield hit

                    # Update last timestamp
                    if timestamp > last_timestamp:
                        last_timestamp = timestamp + 1  # Add 1 microsecond to avoid duplicates

            # Clean up old seen IDs to prevent memory growth
            cutoff_time = parse_relative_time("-1m")
            seen_ids = {lid for lid in seen_ids if int(lid.split(":")[0]) > cutoff_time}

            # Wait before next poll
            time.sleep(poll_interval)

        except KeyboardInterrupt:
            break
        except Exception as e:
            perr(f"Error during streaming: {e}")
            raise OpenObserveStreamingError(f"Streaming failed: {e}") from e

stream_search_http2

stream_search_http2(
    sql: str,
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]

Stream search results using HTTP/2 streaming endpoint (sync wrapper).

This is a sync wrapper around the async streaming function for CLI use.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
start_time str | int | None

Start time

None
end_time str | int | None

End time

None
client OpenObserveClient | None

OpenObserve client

None

Yields:

Type Description
dict[str, Any]

Log entries as they stream

Source code in provide/foundation/integrations/openobserve/streaming.py
def stream_search_http2(
    sql: str,
    start_time: str | int | None = None,
    end_time: str | int | None = None,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]:
    """Stream search results using HTTP/2 streaming endpoint (sync wrapper).

    This is a sync wrapper around the async streaming function for CLI use.

    Args:
        sql: SQL query to execute
        start_time: Start time
        end_time: End time
        client: OpenObserve client

    Yields:
        Log entries as they stream

    """

    async def _stream() -> list[dict[str, Any]]:
        results = []
        async for item in stream_search_http2_async(
            sql=sql, start_time=start_time, end_time=end_time, client=client
        ):
            results.append(item)
        return results

    results = run_async(_stream())
    yield from results

tail_logs

tail_logs(
    stream: str = "default",
    filters: dict[str, str] | None = None,
    follow: bool = True,
    lines: int = 10,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]

Tail logs similar to 'tail -f' command.

Parameters:

Name Type Description Default
stream str

Stream name to tail

'default'
filters dict[str, str] | None

Dictionary of key-value pairs for filtering

None
follow bool

If True, continue streaming new logs

True
lines int

Number of initial lines to show

10
client OpenObserveClient | None

OpenObserve client

None

Yields:

Type Description
dict[str, Any]

Log entries

Source code in provide/foundation/integrations/openobserve/streaming.py
def tail_logs(
    stream: str = "default",
    filters: dict[str, str] | None = None,
    follow: bool = True,
    lines: int = 10,
    client: OpenObserveClient | None = None,
) -> Generator[dict[str, Any], None, None]:
    """Tail logs similar to 'tail -f' command.

    Args:
        stream: Stream name to tail
        filters: Dictionary of key-value pairs for filtering
        follow: If True, continue streaming new logs
        lines: Number of initial lines to show
        client: OpenObserve client

    Yields:
        Log entries

    """
    # Sanitize stream name to prevent SQL injection
    if not re.match(r"^[a-zA-Z0-9_]+$", stream):
        raise ValidationError(
            "Invalid stream name", code="INVALID_STREAM_NAME", stream=stream, allowed_pattern="^[a-zA-Z0-9_]+$"
        )

    # Validate lines parameter
    if not isinstance(lines, int) or lines <= 0 or lines > 10000:
        raise ValidationError(
            "Invalid lines parameter", code="INVALID_LINES_PARAM", lines=lines, expected_range="1-10000"
        )

    # Build WHERE clause safely from filters
    where_clause = _build_where_clause_from_filters(filters or {})
    sql = f"SELECT * FROM {stream} {where_clause} ORDER BY _timestamp DESC LIMIT {lines}"

    if client is None:
        client = OpenObserveClient.from_config()

    # Get initial logs using async client
    response = run_async(client.search(sql=sql, start_time="-1h"))

    # Yield initial logs in reverse order (oldest first)
    yield from reversed(response.hits)

    # If follow mode, continue streaming
    if follow:
        # Get the latest timestamp from initial results
        if response.hits:
            last_timestamp = max(hit.get("_timestamp", 0) for hit in response.hits)
        else:
            last_timestamp = parse_relative_time("-1s")

        # Build streaming query
        stream_sql = f"SELECT * FROM {stream} {where_clause} ORDER BY _timestamp ASC"

        # Stream new logs
        yield from stream_logs(
            sql=stream_sql,
            start_time=last_timestamp + 1,
            client=client,
        )