Skip to content

Generator

provide.foundation.cli.commands.logs.generator

TODO: Add module docstring.

Classes

LogGenerator

Generates log entries for testing OpenObserve integration.

This class encapsulates log generation logic with support for: - Multiple message styles (normal, Burroughs-inspired) - Configurable error rates - Thread-safe trace and span ID generation - Rate-controlled log emission

Functions
generate_continuous
generate_continuous(
    rate: float,
    enable_rate_limit: bool,
    logs_rate_limited: int,
) -> tuple[int, int, int]

Generate logs in continuous mode.

Parameters:

Name Type Description Default
rate float

Target logs per second

required
enable_rate_limit bool

Whether rate limiting is enabled

required
logs_rate_limited int

Initial rate-limited counter

required

Returns:

Type Description
tuple[int, int, int]

Final counters tuple (logs_sent, logs_failed, logs_rate_limited)

Source code in provide/foundation/cli/commands/logs/generator.py
def generate_continuous(
    self, rate: float, enable_rate_limit: bool, logs_rate_limited: int
) -> tuple[int, int, int]:
    """Generate logs in continuous mode.

    Args:
        rate: Target logs per second
        enable_rate_limit: Whether rate limiting is enabled
        logs_rate_limited: Initial rate-limited counter

    Returns:
        Final counters tuple (logs_sent, logs_failed, logs_rate_limited)

    """
    logs_sent = 0
    logs_failed = 0
    start_time = time.time()
    last_stats_time = start_time
    last_stats_sent = 0
    index = 0

    while True:
        current_time = time.time()

        # Generate and send log entry
        entry = self.generate_log_entry(index)
        index += 1
        logs_sent, logs_failed, logs_rate_limited = self.send_log_entry(
            entry, logs_sent, logs_failed, logs_rate_limited
        )

        # Control rate
        elapsed = current_time - start_time
        expected_count = int(elapsed * rate)

        if logs_sent >= expected_count:
            next_time = start_time + (logs_sent / rate)
            sleep_time = next_time - time.time()
            if sleep_time > 0:
                time.sleep(sleep_time)

        # Print stats
        from provide.foundation.cli.commands.logs.stats import print_stats

        last_stats_time, last_stats_sent = print_stats(
            current_time,
            last_stats_time,
            logs_sent,
            last_stats_sent,
            logs_failed,
            enable_rate_limit,
            logs_rate_limited,
        )
generate_fixed_count
generate_fixed_count(
    count: int, rate: float
) -> tuple[int, int, int]

Generate a fixed number of logs.

Parameters:

Name Type Description Default
count int

Number of logs to generate

required
rate float

Target logs per second (0 for unlimited)

required

Returns:

Type Description
tuple[int, int, int]

Final counters tuple (logs_sent, logs_failed, logs_rate_limited)

Source code in provide/foundation/cli/commands/logs/generator.py
def generate_fixed_count(self, count: int, rate: float) -> tuple[int, int, int]:
    """Generate a fixed number of logs.

    Args:
        count: Number of logs to generate
        rate: Target logs per second (0 for unlimited)

    Returns:
        Final counters tuple (logs_sent, logs_failed, logs_rate_limited)

    """
    logs_sent = 0
    logs_failed = 0
    logs_rate_limited = 0

    for i in range(count):
        entry = self.generate_log_entry(i)
        logs_sent, logs_failed, logs_rate_limited = self.send_log_entry(
            entry, logs_sent, logs_failed, logs_rate_limited
        )

        # Control rate
        if rate > 0:
            time.sleep(1.0 / rate)

        # Print progress
        from provide.foundation.cli.commands.logs.stats import print_progress

        print_progress(i, count)

    return logs_sent, logs_failed, logs_rate_limited
generate_log_entry
generate_log_entry(index: int) -> dict[str, Any]

Generate a single log entry with optional error simulation.

Parameters:

Name Type Description Default
index int

Log entry index

required

Returns:

Type Description
dict[str, Any]

Dict containing log entry data

Source code in provide/foundation/cli/commands/logs/generator.py
def generate_log_entry(self, index: int) -> dict[str, Any]:
    """Generate a single log entry with optional error simulation.

    Args:
        index: Log entry index

    Returns:
        Dict containing log entry data

    """
    message = self._generate_message()
    is_error = random.random() < self.error_rate  # nosec B311 - Test data generation

    # Determine trace ID (new trace every 10 entries)
    trace_id = self.generate_trace_id() if index % 10 == 0 else f"trace_{self._trace_counter - 1:08d}"

    # Base entry
    entry = {
        "message": message,
        "service": random.choice(SERVICE_NAMES),  # nosec B311 - Test data
        "operation": random.choice(OPERATIONS),  # nosec B311 - Test data
        "iteration": index,
        "trace_id": trace_id,
        "span_id": self.generate_span_id(),
        "duration_ms": random.randint(MIN_DURATION_MS, MAX_DURATION_MS),  # nosec B311 - Test data
    }

    # Add error fields if this is an error
    if is_error:
        entry["level"] = "error"
        entry["error_code"] = random.choice(ERROR_CODES)  # nosec B311 - Test data
        entry["error_type"] = random.choice(ERROR_TYPES)  # nosec B311 - Test data
    else:
        # Random log level for non-errors
        entry["level"] = random.choice(NON_ERROR_LEVELS)  # nosec B311 - Test data

    # Add domain/action/status for DAS emoji system
    entry["domain"] = random.choice(DOMAINS)  # nosec B311 - Test data
    entry["action"] = random.choice(ACTIONS)  # nosec B311 - Test data
    entry["status"] = "error" if is_error else random.choice(STATUSES)  # nosec B311 - Test data

    return entry
generate_span_id
generate_span_id() -> str

Generate a unique span ID.

Returns:

Type Description
str

Formatted span ID string

Source code in provide/foundation/cli/commands/logs/generator.py
def generate_span_id(self) -> str:
    """Generate a unique span ID.

    Returns:
        Formatted span ID string

    """
    with self._trace_lock:
        span_id = f"span_{self._span_counter:08d}"
        self._span_counter += 1
    return span_id
generate_trace_id
generate_trace_id() -> str

Generate a unique trace ID.

Returns:

Type Description
str

Formatted trace ID string

Source code in provide/foundation/cli/commands/logs/generator.py
def generate_trace_id(self) -> str:
    """Generate a unique trace ID.

    Returns:
        Formatted trace ID string

    """
    with self._trace_lock:
        trace_id = f"trace_{self._trace_counter:08d}"
        self._trace_counter += 1
    return trace_id
send_log_entry
send_log_entry(
    entry: dict[str, Any],
    logs_sent: int,
    logs_failed: int,
    logs_rate_limited: int,
) -> tuple[int, int, int]

Send a log entry and update counters.

Parameters:

Name Type Description Default
entry dict[str, Any]

Log entry dictionary to send

required
logs_sent int

Current count of successfully sent logs

required
logs_failed int

Current count of failed logs

required
logs_rate_limited int

Current count of rate-limited logs

required

Returns:

Type Description
tuple[int, int, int]

Updated counters tuple (logs_sent, logs_failed, logs_rate_limited)

Source code in provide/foundation/cli/commands/logs/generator.py
def send_log_entry(
    self, entry: dict[str, Any], logs_sent: int, logs_failed: int, logs_rate_limited: int
) -> tuple[int, int, int]:
    """Send a log entry and update counters.

    Args:
        entry: Log entry dictionary to send
        logs_sent: Current count of successfully sent logs
        logs_failed: Current count of failed logs
        logs_rate_limited: Current count of rate-limited logs

    Returns:
        Updated counters tuple (logs_sent, logs_failed, logs_rate_limited)

    """
    try:
        service_logger = get_logger(f"generated.{entry['service']}")
        level = entry.pop("level", "info")
        message = entry.pop("message")
        getattr(service_logger, level)(message, **entry)
        logs_sent += 1
    except Exception as e:
        from provide.foundation.errors import RateLimitExceededError

        logs_failed += 1
        if isinstance(e, RateLimitExceededError):
            logs_rate_limited += 1
    return logs_sent, logs_failed, logs_rate_limited

Functions