Skip to content

Otlp adapter

provide.foundation.integrations.openobserve.otlp_adapter

OpenObserve-specific OTLP adapter extending generic client.

Provides OpenObserveOTLPClient that extends OTLPLogClient with OpenObserve-specific configuration and customizations.

Classes

OpenObserveOTLPClient

OpenObserveOTLPClient(
    endpoint: str,
    headers: dict[str, str] | None = None,
    service_name: str = "foundation",
    service_version: str | None = None,
    environment: str | None = None,
    timeout: float = 30.0,
    use_circuit_breaker: bool = True,
)

Bases: OTLPLogClient

OpenObserve-specific OTLP client with vendor customizations.

Extends the generic OTLPLogClient with OpenObserve-specific configuration and header management.

Examples:

>>> from provide.foundation.integrations.openobserve.config import OpenObserveConfig
>>> from provide.foundation.logger.config.telemetry import TelemetryConfig
>>> oo_config = OpenObserveConfig.from_env()
>>> telemetry_config = TelemetryConfig.from_env()
>>> client = OpenObserveOTLPClient.from_openobserve_config(
...     oo_config, telemetry_config
... )
>>> client.send_log("Hello OpenObserve!")
True
Source code in provide/foundation/logger/otlp/client.py
def __init__(
    self,
    endpoint: str,
    headers: dict[str, str] | None = None,
    service_name: str = "foundation",
    service_version: str | None = None,
    environment: str | None = None,
    timeout: float = 30.0,
    use_circuit_breaker: bool = True,
) -> None:
    """Initialize OTLP client.

    Args:
        endpoint: OTLP endpoint URL (e.g., "https://api.example.com/v1/logs")
        headers: Optional custom headers (auth, organization, etc.)
        service_name: Service name for resource attributes
        service_version: Optional service version
        environment: Optional environment (dev, staging, prod)
        timeout: Request timeout in seconds
        use_circuit_breaker: Enable circuit breaker pattern
    """
    self.endpoint = build_otlp_endpoint(endpoint, signal_type="logs")
    self.headers = headers or {}
    self.service_name = service_name
    self.service_version = service_version
    self.environment = environment
    self.timeout = timeout
    self.use_circuit_breaker = use_circuit_breaker

    # Check if OpenTelemetry SDK is available
    self._otlp_available = self._check_otlp_availability()
Functions
from_env classmethod
from_env() -> OpenObserveOTLPClient | None

Create client from environment variables.

Returns:

Type Description
OpenObserveOTLPClient | None

Configured client if OpenObserve is configured, None otherwise

Examples:

>>> # With env vars set:
>>> # OPENOBSERVE_URL=https://api.openobserve.ai
>>> # OPENOBSERVE_ORG=my-org
>>> # OPENOBSERVE_USER=admin
>>> # OPENOBSERVE_PASSWORD=secret
>>> client = OpenObserveOTLPClient.from_env()
>>> if client:
...     client.send_log("Hello!")
Source code in provide/foundation/integrations/openobserve/otlp_adapter.py
@classmethod
def from_env(cls) -> OpenObserveOTLPClient | None:
    """Create client from environment variables.

    Returns:
        Configured client if OpenObserve is configured, None otherwise

    Examples:
        >>> # With env vars set:
        >>> # OPENOBSERVE_URL=https://api.openobserve.ai
        >>> # OPENOBSERVE_ORG=my-org
        >>> # OPENOBSERVE_USER=admin
        >>> # OPENOBSERVE_PASSWORD=secret
        >>> client = OpenObserveOTLPClient.from_env()
        >>> if client:
        ...     client.send_log("Hello!")
    """
    try:
        oo_config = OpenObserveConfig.from_env()
        if not oo_config.is_configured():
            return None

        telemetry_config = TelemetryConfig.from_env()

        return cls.from_openobserve_config(oo_config, telemetry_config)
    except Exception:
        return None
from_openobserve_config classmethod
from_openobserve_config(
    oo_config: OpenObserveConfig,
    telemetry_config: TelemetryConfig,
) -> OpenObserveOTLPClient

Create OTLP client configured for OpenObserve.

Derives OTLP settings from OpenObserve configuration: - Builds OTLP endpoint from OpenObserve URL - Adds OpenObserve headers (organization, stream) - Configures Basic auth from credentials

Parameters:

Name Type Description Default
oo_config OpenObserveConfig

OpenObserve configuration

required
telemetry_config TelemetryConfig

Telemetry configuration

required

Returns:

Type Description
OpenObserveOTLPClient

Configured OpenObserveOTLPClient

Raises:

Type Description
ValueError

If OpenObserve URL is not set

Examples:

>>> oo_config = OpenObserveConfig(
...     url="https://api.openobserve.ai",
...     org="my-org",
...     user="admin",
...     password="secret",
... )
>>> telemetry_config = TelemetryConfig(service_name="my-service")
>>> client = OpenObserveOTLPClient.from_openobserve_config(
...     oo_config, telemetry_config
... )
Source code in provide/foundation/integrations/openobserve/otlp_adapter.py
@classmethod
def from_openobserve_config(
    cls,
    oo_config: OpenObserveConfig,
    telemetry_config: TelemetryConfig,
) -> OpenObserveOTLPClient:
    """Create OTLP client configured for OpenObserve.

    Derives OTLP settings from OpenObserve configuration:
    - Builds OTLP endpoint from OpenObserve URL
    - Adds OpenObserve headers (organization, stream)
    - Configures Basic auth from credentials

    Args:
        oo_config: OpenObserve configuration
        telemetry_config: Telemetry configuration

    Returns:
        Configured OpenObserveOTLPClient

    Raises:
        ValueError: If OpenObserve URL is not set

    Examples:
        >>> oo_config = OpenObserveConfig(
        ...     url="https://api.openobserve.ai",
        ...     org="my-org",
        ...     user="admin",
        ...     password="secret",
        ... )
        >>> telemetry_config = TelemetryConfig(service_name="my-service")
        >>> client = OpenObserveOTLPClient.from_openobserve_config(
        ...     oo_config, telemetry_config
        ... )
    """
    if not oo_config.url:
        msg = "OpenObserve URL must be set"
        raise ValueError(msg)

    # Build OTLP endpoint from OpenObserve URL
    endpoint = get_openobserve_otlp_endpoint(oo_config.url, oo_config.org)

    # Build headers with OpenObserve metadata
    headers = build_openobserve_headers(oo_config)

    # Merge with telemetry config headers
    if telemetry_config.otlp_headers:
        headers.update(telemetry_config.otlp_headers)

    return cls(
        endpoint=endpoint,
        headers=headers,
        service_name=telemetry_config.service_name or "foundation",
        service_version=telemetry_config.service_version,
    )

Functions

build_openobserve_headers

build_openobserve_headers(
    oo_config: OpenObserveConfig,
    base_headers: dict[str, str] | None = None,
) -> dict[str, str]

Build headers with OpenObserve-specific metadata.

Adds: - organization header - stream-name header - Basic auth header (from user/password)

Parameters:

Name Type Description Default
oo_config OpenObserveConfig

OpenObserve configuration

required
base_headers dict[str, str] | None

Base headers to include

None

Returns:

Type Description
dict[str, str]

Complete headers dict with OpenObserve metadata

Examples:

>>> config = OpenObserveConfig(
...     org="my-org",
...     stream="logs",
...     user="admin",
...     password="secret"
... )
>>> headers = build_openobserve_headers(config)
>>> "authorization" in headers
True
Source code in provide/foundation/integrations/openobserve/otlp_adapter.py
def build_openobserve_headers(
    oo_config: OpenObserveConfig,
    base_headers: dict[str, str] | None = None,
) -> dict[str, str]:
    """Build headers with OpenObserve-specific metadata.

    Adds:
    - organization header
    - stream-name header
    - Basic auth header (from user/password)

    Args:
        oo_config: OpenObserve configuration
        base_headers: Base headers to include

    Returns:
        Complete headers dict with OpenObserve metadata

    Examples:
        >>> config = OpenObserveConfig(
        ...     org="my-org",
        ...     stream="logs",
        ...     user="admin",
        ...     password="secret"
        ... )
        >>> headers = build_openobserve_headers(config)
        >>> "authorization" in headers
        True
    """
    headers: dict[str, str] = {}

    if base_headers:
        headers.update(base_headers)

    # Add OpenObserve-specific headers
    if oo_config.org:
        headers["organization"] = oo_config.org

    if oo_config.stream:
        headers["stream-name"] = oo_config.stream

    # Add Basic auth
    if oo_config.user and oo_config.password:
        credentials = f"{oo_config.user}:{oo_config.password}"
        encoded = base64.b64encode(credentials.encode()).decode("ascii")
        headers["authorization"] = f"Basic {encoded}"

    return headers

get_openobserve_otlp_endpoint

get_openobserve_otlp_endpoint(
    base_url: str,
    org: str | None = None,
    endpoint_type: str = "logs",
) -> str

Derive OTLP endpoint from OpenObserve base URL.

Handles: - URLs with /api/{org}/ path - URLs without /api/ path - Trailing slashes - Both logs and metrics endpoints

Parameters:

Name Type Description Default
base_url str

OpenObserve base URL

required
org str | None

Organization name (defaults to "default")

None
endpoint_type str

Type of endpoint ("logs" or "metrics", defaults to "logs")

'logs'

Returns:

Type Description
str

OTLP endpoint URL

Examples:

>>> get_openobserve_otlp_endpoint("https://api.openobserve.ai", "my-org")
'https://api.openobserve.ai/api/my-org/v1/logs'
>>> get_openobserve_otlp_endpoint("https://api.openobserve.ai/api/my-org/")
'https://api.openobserve.ai/api/my-org/v1/logs'
>>> get_openobserve_otlp_endpoint("https://api.openobserve.ai", "my-org", "metrics")
'https://api.openobserve.ai/api/my-org/v1/metrics'
Source code in provide/foundation/integrations/openobserve/otlp_adapter.py
def get_openobserve_otlp_endpoint(base_url: str, org: str | None = None, endpoint_type: str = "logs") -> str:
    """Derive OTLP endpoint from OpenObserve base URL.

    Handles:
    - URLs with /api/{org}/ path
    - URLs without /api/ path
    - Trailing slashes
    - Both logs and metrics endpoints

    Args:
        base_url: OpenObserve base URL
        org: Organization name (defaults to "default")
        endpoint_type: Type of endpoint ("logs" or "metrics", defaults to "logs")

    Returns:
        OTLP endpoint URL

    Examples:
        >>> get_openobserve_otlp_endpoint("https://api.openobserve.ai", "my-org")
        'https://api.openobserve.ai/api/my-org/v1/logs'

        >>> get_openobserve_otlp_endpoint("https://api.openobserve.ai/api/my-org/")
        'https://api.openobserve.ai/api/my-org/v1/logs'

        >>> get_openobserve_otlp_endpoint("https://api.openobserve.ai", "my-org", "metrics")
        'https://api.openobserve.ai/api/my-org/v1/metrics'
    """
    # Remove trailing slash
    url = base_url.rstrip("/")

    # Extract base URL if /api/ present
    if "/api/" in url:
        url = url.split("/api/")[0]

    # Build OTLP endpoint
    org_name = org or "default"
    return f"{url}/api/{org_name}/v1/{endpoint_type}"

get_openobserve_otlp_metrics_endpoint

get_openobserve_otlp_metrics_endpoint(
    base_url: str, org: str | None = None
) -> str

Derive OTLP metrics endpoint from OpenObserve base URL.

Convenience function that calls get_openobserve_otlp_endpoint with endpoint_type="metrics".

Parameters:

Name Type Description Default
base_url str

OpenObserve base URL

required
org str | None

Organization name (defaults to "default")

None

Returns:

Type Description
str

OTLP metrics endpoint

Examples:

>>> get_openobserve_otlp_metrics_endpoint("https://api.openobserve.ai", "my-org")
'https://api.openobserve.ai/api/my-org/v1/metrics'
Source code in provide/foundation/integrations/openobserve/otlp_adapter.py
def get_openobserve_otlp_metrics_endpoint(base_url: str, org: str | None = None) -> str:
    """Derive OTLP metrics endpoint from OpenObserve base URL.

    Convenience function that calls get_openobserve_otlp_endpoint with endpoint_type="metrics".

    Args:
        base_url: OpenObserve base URL
        org: Organization name (defaults to "default")

    Returns:
        OTLP metrics endpoint

    Examples:
        >>> get_openobserve_otlp_metrics_endpoint("https://api.openobserve.ai", "my-org")
        'https://api.openobserve.ai/api/my-org/v1/metrics'
    """
    return get_openobserve_otlp_endpoint(base_url, org, endpoint_type="metrics")