Skip to content

How to Handle Pagination

Alpha Status

pyvider is in alpha. This guide covers stable functionality.

Handle paginated API responses in data sources to fetch large result sets efficiently.


Basic Pagination Pattern

import attrs
import httpx
from pyvider.data_sources import register_data_source, BaseDataSource
from pyvider.schema import s_data_source, a_str, a_num, a_list, PvsSchema

@attrs.define
class PaginatedQueryConfig:
    endpoint: str
    limit: int = 100  # Max results to fetch

@attrs.define
class PaginatedQueryData:
    id: str
    items: list[dict]
    total_fetched: int
    has_more: bool

@register_data_source("paginated_query")
class PaginatedQuery(BaseDataSource):
    config_class = PaginatedQueryConfig
    state_class = PaginatedQueryData

    @classmethod
    def get_schema(cls) -> PvsSchema:
        return s_data_source({
            "endpoint": a_str(required=True),
            "limit": a_num(default=100),
            "id": a_str(computed=True),
            "items": a_list(a_map(a_str()), computed=True),
            "total_fetched": a_num(computed=True),
            "has_more": a_bool(computed=True),
        })

    async def read(self, config: PaginatedQueryConfig) -> PaginatedQueryData:
        all_items = []
        page = 1
        has_more = True

        async with httpx.AsyncClient() as client:
            while has_more and len(all_items) < config.limit:
                response = await client.get(
                    f"https://api.example.com{config.endpoint}",
                    params={"page": page, "per_page": 100}
                )
                data = response.json()

                all_items.extend(data["items"])
                has_more = data.get("has_more", False)
                page += 1

                # Stop if we have enough
                if len(all_items) >= config.limit:
                    break

        return PaginatedQueryData(
            id=f"{config.endpoint}:{config.limit}",
            items=all_items[:config.limit],
            total_fetched=len(all_items[:config.limit]),
            has_more=has_more,
        )

Token-Based Pagination

Many APIs use cursor/token-based pagination:

async def read(self, config: Config) -> Data:
    all_items = []
    next_token = None

    async with httpx.AsyncClient() as client:
        while len(all_items) < config.limit:
            params = {"limit": min(100, config.limit - len(all_items))}
            if next_token:
                params["next_token"] = next_token

            response = await client.get(
                f"https://api.example.com{config.endpoint}",
                params=params
            )
            data = response.json()

            all_items.extend(data["items"])
            next_token = data.get("next_token")

            # No more pages
            if not next_token:
                break

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items,
        total_fetched=len(all_items),
    )

Offset-Based Pagination

Traditional offset/limit pagination:

async def read(self, config: Config) -> Data:
    all_items = []
    offset = 0
    page_size = 100

    async with httpx.AsyncClient() as client:
        while len(all_items) < config.limit:
            fetch_size = min(page_size, config.limit - len(all_items))

            response = await client.get(
                f"https://api.example.com{config.endpoint}",
                params={"offset": offset, "limit": fetch_size}
            )
            data = response.json()

            items = data.get("items", [])
            if not items:
                break  # No more results

            all_items.extend(items)
            offset += len(items)

            # Got fewer than requested = last page
            if len(items) < fetch_size:
                break

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items,
        total_fetched=len(all_items),
    )

Some APIs use HTTP Link headers (like GitHub):

import httpx
from urllib.parse import parse_qs, urlparse

async def read(self, config: Config) -> Data:
    all_items = []
    url = f"https://api.example.com{config.endpoint}"

    async with httpx.AsyncClient() as client:
        while url and len(all_items) < config.limit:
            response = await client.get(url, params={"per_page": 100})
            data = response.json()

            all_items.extend(data)

            # Parse Link header for next page
            link_header = response.headers.get("Link", "")
            url = None
            for link in link_header.split(","):
                if 'rel="next"' in link:
                    url = link.split(";")[0].strip("<> ")
                    break

            if len(all_items) >= config.limit:
                break

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items[:config.limit],
        total_fetched=len(all_items[:config.limit]),
    )

Parallel Pagination

Fetch multiple pages concurrently (be careful with rate limits):

import asyncio

async def read(self, config: Config) -> Data:
    # Determine total pages needed
    pages_needed = (config.limit + 99) // 100  # Round up

    async with httpx.AsyncClient() as client:
        # Create tasks for each page
        tasks = [
            self._fetch_page(client, config.endpoint, page)
            for page in range(1, pages_needed + 1)
        ]

        # Fetch all pages concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Combine results
        all_items = []
        for result in results:
            if isinstance(result, Exception):
                continue  # Skip failed pages
            all_items.extend(result)

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items[:config.limit],
        total_fetched=len(all_items[:config.limit]),
    )

async def _fetch_page(
    self,
    client: httpx.AsyncClient,
    endpoint: str,
    page: int
) -> list[dict]:
    """Fetch a single page."""
    response = await client.get(
        f"https://api.example.com{endpoint}",
        params={"page": page, "per_page": 100}
    )
    data = response.json()
    return data.get("items", [])

Rate Limiting

Handle API rate limits during pagination:

import asyncio
from datetime import datetime, timedelta

class RateLimitedQuery(BaseDataSource):
    def __init__(self):
        super().__init__()
        self._last_request = None
        self._min_interval = timedelta(milliseconds=100)  # 10 req/sec

    async def _wait_for_rate_limit(self):
        """Ensure we don't exceed rate limit."""
        if self._last_request:
            elapsed = datetime.now() - self._last_request
            if elapsed < self._min_interval:
                wait_time = (self._min_interval - elapsed).total_seconds()
                await asyncio.sleep(wait_time)
        self._last_request = datetime.now()

    async def read(self, config: Config) -> Data:
        all_items = []
        page = 1

        async with httpx.AsyncClient() as client:
            while len(all_items) < config.limit:
                # Respect rate limit
                await self._wait_for_rate_limit()

                response = await client.get(
                    f"https://api.example.com{config.endpoint}",
                    params={"page": page, "per_page": 100}
                )

                # Handle 429 Too Many Requests
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", "60"))
                    await asyncio.sleep(retry_after)
                    continue  # Retry same page

                data = response.json()
                all_items.extend(data["items"])

                if not data.get("has_more"):
                    break

                page += 1

        return Data(
            id=f"{config.endpoint}:{config.limit}",
            items=all_items[:config.limit],
        )

Error Handling

Handle pagination errors gracefully:

async def read(self, config: Config) -> Data:
    all_items = []
    page = 1
    errors = []

    async with httpx.AsyncClient() as client:
        while page <= 10 and len(all_items) < config.limit:  # Max 10 pages
            try:
                response = await client.get(
                    f"https://api.example.com{config.endpoint}",
                    params={"page": page, "per_page": 100},
                    timeout=30.0  # Add timeout
                )
                response.raise_for_status()

                data = response.json()
                all_items.extend(data.get("items", []))

                if not data.get("has_more"):
                    break

                page += 1

            except httpx.HTTPError as e:
                errors.append(f"Page {page} failed: {e}")
                # Continue to next page instead of failing completely
                page += 1
                continue

            except Exception as e:
                errors.append(f"Unexpected error on page {page}: {e}")
                break  # Stop on unexpected errors

    return Data(
        id=f"{config.endpoint}:{config.limit}",
        items=all_items[:config.limit],
        total_fetched=len(all_items),
        errors=errors if errors else None,
    )

Best Practices

1. Set Maximum Pages

Prevent infinite loops:

async def read(self, config: Config) -> Data:
    max_pages = 100  # Safety limit
    page = 0

    while page < max_pages and len(all_items) < config.limit:
        # Fetch page
        page += 1

2. Respect User Limits

Don't fetch more than requested:

while len(all_items) < config.limit:
    # Fetch only what's needed
    fetch_size = min(100, config.limit - len(all_items))
    ...

3. Return Partial Results

Don't fail if some pages error:

async def read(self, config: Config) -> Data:
    all_items = []
    try:
        # Fetch pages
        ...
    except Exception as e:
        # Return what we got
        return Data(
            items=all_items,
            error=f"Partial results due to: {e}"
        )

4. Cache Expensive Queries

from functools import lru_cache

@lru_cache(maxsize=32)
async def read(self, config: Config) -> Data:
    # Pagination results cached by config
    ...

5. Add Progress Logging

import logging

async def read(self, config: Config) -> Data:
    logger = logging.getLogger(__name__)
    page = 1

    while ...:
        logger.info(f"Fetching page {page}, got {len(all_items)} items so far")
        page += 1

See Also