Bulk api
provide.foundation.integrations.openobserve.bulk_api
¶
OpenObserve bulk API (non-OTLP fallback).
Provides functions for sending logs via OpenObserve's proprietary bulk ingestion API. This is used as a fallback when OTLP is unavailable or when the circuit breaker is open.
Classes¶
Functions¶
build_bulk_request
¶
build_bulk_request(
message: str,
level: str,
service_name: str | None,
attributes: dict[str, Any] | None,
config: TelemetryConfig,
stream: str,
) -> str
Build NDJSON bulk request payload.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Log message |
required |
level
|
str
|
Log level |
required |
service_name
|
str | None
|
Service name (follows OTLP standard) |
required |
attributes
|
dict[str, Any] | None
|
Log attributes |
required |
config
|
TelemetryConfig
|
Telemetry configuration |
required |
stream
|
str
|
OpenObserve stream name |
required |
Returns:
| Type | Description |
|---|---|
str
|
NDJSON-formatted bulk request payload |
Examples:
>>> config = TelemetryConfig(service_name="test")
>>> bulk = build_bulk_request("Hello", "INFO", None, None, config, "logs")
>>> "\n" in bulk
True
Source code in provide/foundation/integrations/openobserve/bulk_api.py
build_bulk_url
¶
Build the bulk API URL for the client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
OpenObserveClient
|
OpenObserve client instance |
required |
Returns:
| Type | Description |
|---|---|
str
|
Bulk API URL |
Examples:
>>> client = OpenObserveClient(
... url="https://api.openobserve.ai",
... username="admin",
... password="secret",
... organization="my-org",
... )
>>> build_bulk_url(client)
'https://api.openobserve.ai/api/my-org/_bulk'
Source code in provide/foundation/integrations/openobserve/bulk_api.py
build_log_entry
¶
build_log_entry(
message: str,
level: str,
service_name: str | None,
attributes: dict[str, Any] | None,
config: TelemetryConfig,
) -> dict[str, Any]
Build the log entry dictionary for bulk API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Log message |
required |
level
|
str
|
Log level |
required |
service_name
|
str | None
|
Service name (optional, follows OTLP standard) |
required |
attributes
|
dict[str, Any] | None
|
Additional attributes (optional) |
required |
config
|
TelemetryConfig
|
Telemetry configuration |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Complete log entry dictionary with trace context |
Examples:
>>> config = TelemetryConfig(service_name="my-service")
>>> entry = build_log_entry("Hello", "INFO", None, {"key": "value"}, config)
>>> entry["message"]
'Hello'
Source code in provide/foundation/integrations/openobserve/bulk_api.py
send_log_bulk
¶
send_log_bulk(
message: str,
level: str = "INFO",
service_name: str | None = None,
attributes: dict[str, Any] | None = None,
client: OpenObserveClient | None = None,
) -> bool
Send log via OpenObserve bulk API (non-OTLP).
This is OpenObserve's proprietary bulk ingestion API, not OTLP. Used as fallback when OTLP is unavailable or circuit is open.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Log message |
required |
level
|
str
|
Log level |
'INFO'
|
service_name
|
str | None
|
Service name (follows OTLP standard) |
None
|
attributes
|
dict[str, Any] | None
|
Additional attributes |
None
|
client
|
OpenObserveClient | None
|
OpenObserve client (creates new if not provided) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if sent successfully |
Examples: