Skip to content

Otlp helpers

provide.foundation.integrations.openobserve.otlp_helpers

TODO: Add module docstring.

Functions

add_trace_attributes

add_trace_attributes(
    attributes: dict[str, Any], trace_module: Any
) -> None

Add trace context to attributes if available.

Parameters:

Name Type Description Default
attributes dict[str, Any]

Dictionary to update with trace context

required
trace_module Any

OpenTelemetry trace module

required
Source code in provide/foundation/integrations/openobserve/otlp_helpers.py
def add_trace_attributes(attributes: dict[str, Any], trace_module: Any) -> None:
    """Add trace context to attributes if available.

    Args:
        attributes: Dictionary to update with trace context
        trace_module: OpenTelemetry trace module
    """
    current_span = trace_module.get_current_span()
    if current_span and current_span.is_recording():
        span_context = current_span.get_span_context()
        attributes["trace_id"] = f"{span_context.trace_id:032x}"
        attributes["span_id"] = f"{span_context.span_id:016x}"

add_trace_context_to_log_entry

add_trace_context_to_log_entry(
    log_entry: dict[str, Any],
) -> None

Add trace context to log entry if available.

Tries OpenTelemetry trace context first, then Foundation's tracer context.

Parameters:

Name Type Description Default
log_entry dict[str, Any]

Log entry dictionary to update with trace context

required
Source code in provide/foundation/integrations/openobserve/otlp_helpers.py
def add_trace_context_to_log_entry(log_entry: dict[str, Any]) -> None:
    """Add trace context to log entry if available.

    Tries OpenTelemetry trace context first, then Foundation's tracer context.

    Args:
        log_entry: Log entry dictionary to update with trace context
    """
    # Try OpenTelemetry trace context first
    try:
        from opentelemetry import trace

        current_span = trace.get_current_span()
        if current_span and current_span.is_recording():
            span_context = current_span.get_span_context()
            log_entry["trace_id"] = f"{span_context.trace_id:032x}"
            log_entry["span_id"] = f"{span_context.span_id:016x}"
            return
    except ImportError:
        pass

    # Try Foundation's tracer context
    try:
        from provide.foundation.tracer.context import (
            get_current_span,
            get_current_trace_id,
        )

        span = get_current_span()
        if span:
            log_entry["trace_id"] = span.trace_id
            log_entry["span_id"] = span.span_id
        elif trace_id := get_current_trace_id():
            log_entry["trace_id"] = trace_id
    except ImportError:
        pass

build_bulk_url

build_bulk_url(client: Any) -> str

Build the bulk API URL for the client.

Parameters:

Name Type Description Default
client Any

OpenObserve client instance

required

Returns:

Type Description
str

Bulk API URL

Source code in provide/foundation/integrations/openobserve/otlp_helpers.py
def build_bulk_url(client: Any) -> str:
    """Build the bulk API URL for the client.

    Args:
        client: OpenObserve client instance

    Returns:
        Bulk API URL
    """
    if f"/api/{client.organization}" in client.url:
        return f"{client.url}/_bulk"
    return f"{client.url}/api/{client.organization}/_bulk"

build_log_entry

build_log_entry(
    message: str,
    level: str,
    service: str | None,
    attributes: dict[str, Any] | None,
    config: Any,
) -> dict[str, Any]

Build the log entry dictionary.

Parameters:

Name Type Description Default
message str

Log message

required
level str

Log level

required
service str | None

Service name (optional)

required
attributes dict[str, Any] | None

Additional attributes (optional)

required
config Any

Telemetry configuration

required

Returns:

Type Description
dict[str, Any]

Complete log entry dictionary with trace context

Source code in provide/foundation/integrations/openobserve/otlp_helpers.py
def build_log_entry(
    message: str,
    level: str,
    service: str | None,
    attributes: dict[str, Any] | None,
    config: Any,
) -> dict[str, Any]:
    """Build the log entry dictionary.

    Args:
        message: Log message
        level: Log level
        service: Service name (optional)
        attributes: Additional attributes (optional)
        config: Telemetry configuration

    Returns:
        Complete log entry dictionary with trace context
    """
    from datetime import datetime

    log_entry = {
        "_timestamp": int(datetime.now().timestamp() * 1_000_000),
        "level": level.upper(),
        "message": message,
        "service": service or config.service_name or "foundation",
    }

    if attributes:
        log_entry.update(attributes)

    add_trace_context_to_log_entry(log_entry)
    return log_entry

configure_otlp_exporter

configure_otlp_exporter(
    config: Any, oo_config: Any
) -> tuple[str, dict[str, str]]

Configure OTLP exporter endpoint and headers.

Parameters:

Name Type Description Default
config Any

Telemetry configuration

required
oo_config Any

OpenObserve configuration

required

Returns:

Type Description
tuple[str, dict[str, str]]

Tuple of (logs_endpoint, headers)

Source code in provide/foundation/integrations/openobserve/otlp_helpers.py
def configure_otlp_exporter(config: Any, oo_config: Any) -> tuple[str, dict[str, str]]:
    """Configure OTLP exporter endpoint and headers.

    Args:
        config: Telemetry configuration
        oo_config: OpenObserve configuration

    Returns:
        Tuple of (logs_endpoint, headers)
    """
    headers = config.get_otlp_headers_dict()
    if oo_config.org:
        headers["organization"] = oo_config.org
    if oo_config.stream:
        headers["stream-name"] = oo_config.stream

    # Determine endpoint for logs
    if config.otlp_traces_endpoint:
        logs_endpoint = config.otlp_traces_endpoint.replace("/v1/traces", "/v1/logs")
    else:
        logs_endpoint = f"{config.otlp_endpoint}/v1/logs"

    return logs_endpoint, headers

create_otlp_resource

create_otlp_resource(
    service_name: str,
    service_version: str | None,
    resource_class: Any,
    resource_attrs_class: Any,
) -> Any

Create OTLP resource with service information.

Parameters:

Name Type Description Default
service_name str

Service name

required
service_version str | None

Optional service version

required
resource_class Any

Resource class from OpenTelemetry

required
resource_attrs_class Any

ResourceAttributes class from OpenTelemetry

required

Returns:

Type Description
Any

Resource instance

Source code in provide/foundation/integrations/openobserve/otlp_helpers.py
def create_otlp_resource(
    service_name: str,
    service_version: str | None,
    resource_class: Any,
    resource_attrs_class: Any,
) -> Any:
    """Create OTLP resource with service information.

    Args:
        service_name: Service name
        service_version: Optional service version
        resource_class: Resource class from OpenTelemetry
        resource_attrs_class: ResourceAttributes class from OpenTelemetry

    Returns:
        Resource instance
    """
    resource_attrs = {
        resource_attrs_class.SERVICE_NAME: service_name,
    }
    if service_version:
        resource_attrs[resource_attrs_class.SERVICE_VERSION] = service_version

    return resource_class.create(resource_attrs)

map_level_to_severity

map_level_to_severity(level: str) -> int

Map log level string to OTLP severity number.

Parameters:

Name Type Description Default
level str

Log level string (e.g., "INFO", "ERROR")

required

Returns:

Type Description
int

OTLP severity number (1-21)

Source code in provide/foundation/integrations/openobserve/otlp_helpers.py
def map_level_to_severity(level: str) -> int:
    """Map log level string to OTLP severity number.

    Args:
        level: Log level string (e.g., "INFO", "ERROR")

    Returns:
        OTLP severity number (1-21)
    """
    severity_map = {
        "TRACE": 1,
        "DEBUG": 5,
        "INFO": 9,
        "WARN": 13,
        "WARNING": 13,
        "ERROR": 17,
        "FATAL": 21,
        "CRITICAL": 21,
    }
    return severity_map.get(level.upper(), 9)