Skip to content

Data Source API Reference

Alpha Status

pyvider is in alpha. This reference covers stable APIs.

Complete API reference for data sources and the BaseDataSource class.


Base Data Source Class

from pyvider.data_sources import BaseDataSource

Class Attributes

Attribute Type Required Description
config_class Type[attrs.define] Yes Configuration attrs class (inputs)
state_class Type[attrs.define] Yes State attrs class (outputs)

Required Methods

read()

async def read(self, ctx: ResourceContext) -> StateType | None:
    """Execute query and return data."""

Purpose: Fetch data based on user configuration.

Parameters: - ctx: ResourceContext containing configuration and context

Returns: - StateType | None: Query results, or None if config unavailable

When Called: During every terraform plan and terraform apply

Example:

async def read(self, ctx: ResourceContext) -> ServerQueryData | None:
    if not ctx.config:
        return None

    servers = await api.list_servers(
        filter=ctx.config.filter,
        limit=ctx.config.limit,
    )

    return ServerQueryData(
        id=f"{ctx.config.filter}:{ctx.config.limit}",
        servers=servers,
        count=len(servers),
    )

Important: - Data sources use the same ResourceContext API as resources - Access configuration via ctx.config - Return None if config is unavailable - Unlike resources, data sources re-fetch data on every Terraform operation


get_schema()

@classmethod
def get_schema(cls) -> PvsSchema:
    """Define Terraform schema."""

Purpose: Define input parameters and output attributes.

Returns: PvsSchema object

Example:

from pyvider.schema import s_data_source, a_str, a_num, a_list

@classmethod
def get_schema(cls) -> PvsSchema:
    return s_data_source({
        # Inputs (from user)
        "filter": a_str(required=True, description="Filter expression"),
        "limit": a_num(default=10, description="Max results"),

        # Outputs (computed by data source)
        "id": a_str(computed=True, description="Query ID"),
        "servers": a_list(a_str(), computed=True, description="Server list"),
        "count": a_num(computed=True, description="Number of servers"),
    })

Important: All outputs MUST have computed=True.


Type Signatures

Configuration Class

Input parameters from user:

import attrs

@attrs.define
class ServerQueryConfig:
    """User-provided query parameters."""
    filter: str
    limit: int = 10
    region: str | None = None

Rules: - All fields represent Terraform inputs - Use defaults for optional parameters - Use str | None for nullable inputs


Data Class

Query results returned to user:

import attrs

@attrs.define
class ServerQueryData:
    """Query results."""
    id: str                    # Required: Stable query ID
    servers: list[dict]        # Query results
    count: int                 # Result metadata
    next_page: str | None      # Optional: Pagination token

Rules: - id field is required (must be deterministic) - All fields are outputs (returned to Terraform) - Use appropriate types (list, dict, str, int, bool)


Schema Attributes

Input Attributes

"filter": a_str(required=True, description="Filter expression")
"limit": a_num(default=10, description="Max results")
"region": a_str(description="AWS region")  # Optional (no default)

Modifiers: - required=True - User must provide value - default=value - Optional with default - No required or default - Optional (null allowed)


Output Attributes

"id": a_str(computed=True, description="Query ID")
"results": a_list(a_map(a_str()), computed=True, description="Results")
"count": a_num(computed=True, description="Result count")
"error": a_str(computed=True, description="Error message if query failed")

Rules: - ALL outputs must have computed=True - Cannot have required or default (they're outputs) - Use descriptive descriptions


ID Generation

Data source IDs must be deterministic (same inputs = same ID):

Good: Deterministic

async def read(self, config: Config) -> Data:
    # Generate stable ID from inputs
    query_id = f"{config.endpoint}:{config.filter}:{config.limit}"

    results = await api.query(...)

    return Data(
        id=query_id,  # Same config always = same ID
        results=results,
    )

Bad: Non-Deterministic

import uuid

async def read(self, config: Config) -> Data:
    return Data(
        id=str(uuid.uuid4()),  # Different ID every time!
        results=results,
    )

Hash-Based IDs

For complex configurations:

import hashlib
import json

async def read(self, config: Config) -> Data:
    # Create hash of all config values
    config_str = json.dumps({
        "endpoint": config.endpoint,
        "filter": config.filter,
        "limit": config.limit,
    }, sort_keys=True)

    query_id = hashlib.md5(config_str.encode()).hexdigest()

    return Data(id=query_id, ...)

Error Handling

Return Data With Error Field

@attrs.define
class QueryData:
    id: str
    results: list[dict]
    error: str | None = None  # Error field

async def read(self, config: Config) -> QueryData:
    try:
        results = await api.query(config.endpoint)
        return QueryData(
            id=config.endpoint,
            results=results,
            error=None,
        )
    except APIError as e:
        return QueryData(
            id=config.endpoint,
            results=[],
            error=str(e),
        )

Don't Raise Exceptions

# Good: Return data with error
async def read(self, config: Config) -> Data:
    try:
        return await query(config)
    except Exception as e:
        return Data(id=id, results=[], error=str(e))

# Bad: Raise exception
async def read(self, config: Config) -> Data:
    result = await api.query()  # Might raise!
    return Data(id=id, results=result)

Common Patterns

Handle Missing Data

async def read(self, config: Config) -> Data:
    result = await api.get(config.resource_id)

    if not result:
        # Return empty data instead of None
        return Data(
            id=config.resource_id,
            found=False,
            value=None,
        )

    return Data(
        id=result["id"],
        found=True,
        value=result["value"],
    )

Boolean Outputs

@attrs.define
class FileInfoData:
    id: str
    exists: bool      # Use bool for yes/no outputs
    readable: bool
    size: int | None  # None if doesn't exist

async def read(self, config: Config) -> FileInfoData:
    from pathlib import Path
    path = Path(config.path)

    return FileInfoData(
        id=str(path.absolute()),
        exists=path.exists(),
        readable=path.exists() and os.access(path, os.R_OK),
        size=path.stat().st_size if path.exists() else None,
    )

List Outputs

from pyvider.schema import s_data_source, a_list, a_map, a_str

@classmethod
def get_schema(cls) -> PvsSchema:
    return s_data_source({
        "query": a_str(required=True),
        "id": a_str(computed=True),
        # List of strings
        "tags": a_list(a_str(), computed=True),
        # List of objects
        "servers": a_list(
            a_map(a_str()),  # Each server is a map
            computed=True
        ),
    })

Caching

Cache expensive queries:

from functools import lru_cache

class ServerQuery(BaseDataSource):
    @lru_cache(maxsize=128)
    async def _fetch_servers(self, filter_str: str, limit: int) -> list[dict]:
        """Cached query."""
        return await api.list_servers(filter=filter_str, limit=limit)

    async def read(self, config: Config) -> Data:
        # Query is cached by filter+limit
        servers = await self._fetch_servers(config.filter, config.limit)

        return Data(
            id=f"{config.filter}:{config.limit}",
            servers=servers,
            count=len(servers),
        )

Validation

Data sources don't have _validate_config(). Validate in read():

async def read(self, config: Config) -> Data:
    # Validate inputs
    errors = []

    if not config.endpoint.startswith("/"):
        errors.append("Endpoint must start with /")

    if config.limit < 1 or config.limit > 1000:
        errors.append("Limit must be between 1 and 1000")

    if errors:
        return Data(
            id="error",
            results=[],
            error="; ".join(errors),
        )

    # Proceed with query
    ...

Performance Considerations

Idempotency

Reads should be idempotent (same result every time for same inputs):

# Good: Idempotent
async def read(self, config: Config) -> Data:
    # Same query always returns same results
    servers = await api.list_servers(filter=config.filter)
    return Data(id=config.filter, servers=servers)

# Bad: Not idempotent
async def read(self, config: Config) -> Data:
    # Returns different results each time!
    servers = await api.list_recent_servers()
    return Data(id=str(uuid.uuid4()), servers=servers)

Minimize API Calls

# Good: Single API call
async def read(self, config: Config) -> Data:
    response = await api.get_user_with_posts(config.user_id)
    return Data(
        id=config.user_id,
        user=response["user"],
        posts=response["posts"],
    )

# Bad: Multiple API calls
async def read(self, config: Config) -> Data:
    user = await api.get_user(config.user_id)
    posts = await api.get_posts(config.user_id)  # Separate call
    return Data(id=config.user_id, user=user, posts=posts)

Testing

import pytest
from my_provider.data_sources.server_query import ServerQuery, ServerQueryConfig

@pytest.mark.asyncio
async def test_server_query():
    ds = ServerQuery()
    config = ServerQueryConfig(filter="status=running", limit=5)

    data = await ds.read(config)

    assert data.count <= 5
    assert len(data.servers) == data.count
    assert data.id == "status=running:5"  # Deterministic ID

@pytest.mark.asyncio
async def test_server_query_error_handling():
    ds = ServerQuery()
    config = ServerQueryConfig(filter="invalid!", limit=5)

    data = await ds.read(config)

    # Should return data with error, not raise
    assert data.error is not None
    assert data.servers == []

Complete Example

import attrs
import httpx
from pyvider.data_sources import register_data_source, BaseDataSource
from pyvider.resources.context import ResourceContext
from pyvider.schema import s_data_source, a_str, a_num, a_list, a_map, PvsSchema

@attrs.define
class ServerQueryConfig:
    region: str
    limit: int = 10

@attrs.define
class ServerQueryData:
    id: str
    servers: list[dict[str, str]]
    count: int

@register_data_source("servers")
class ServerQuery(BaseDataSource):
    config_class = ServerQueryConfig
    state_class = ServerQueryData

    @classmethod
    def get_schema(cls) -> PvsSchema:
        return s_data_source({
            # Inputs
            "region": a_str(required=True, description="AWS region"),
            "limit": a_num(default=10, description="Max servers"),

            # Outputs
            "id": a_str(computed=True, description="Query ID"),
            "servers": a_list(
                a_map(a_str()),
                computed=True,
                description="Server list"
            ),
            "count": a_num(computed=True, description="Server count"),
        })

    async def read(self, ctx: ResourceContext) -> ServerQueryData | None:
        if not ctx.config:
            return None

        async with httpx.AsyncClient() as client:
            response = await client.get(
                "https://api.example.com/servers",
                params={"region": ctx.config.region, "limit": ctx.config.limit}
            )
            servers = response.json()["servers"]

            return ServerQueryData(
                id=f"{ctx.config.region}:{ctx.config.limit}",
                servers=servers,
                count=len(servers),
            )

See Also