Python SDK

A lightweight Python client for Conduit's governed MLS data access. Uses httpx for async support and automatic retries.

[i]Requirements

Requires httpx: pip install httpx

ConduitClient

python
import httpx
import time
from dataclasses import dataclass
from typing import Any, Optional


@dataclass
class GovernanceMetadata:
    governed: bool
    anti_training: bool
    stateless: bool
    rate_limit_remaining: Optional[int]
    rate_limit_reset: Optional[int]
    billing_status: Optional[str]


@dataclass
class ConduitResponse:
    data: Any
    governance: GovernanceMetadata
    status: int


class ConduitError(Exception):
    def __init__(self, status: int, message: str, details: Any = None):
        super().__init__(message)
        self.status = status
        self.details = details


class ConduitClient:
    def __init__(self, gateway_url: str, api_key: str):
        self.gateway_url = gateway_url
        self.api_key = api_key
        self.client = httpx.AsyncClient(
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            timeout=30.0,
        )

    async def call(
        self,
        method: str = "tools/call",
        tool_name: str | None = None,
        arguments: dict | None = None,
        resource_uri: str | None = None,
    ) -> ConduitResponse:
        if resource_uri:
            body = {
                "jsonrpc": "2.0",
                "method": "resources/read",
                "params": {"uri": resource_uri},
                "id": int(time.time() * 1000),
            }
        else:
            params = {}
            if tool_name:
                params = {"name": tool_name, "arguments": arguments or {}}
            body = {
                "jsonrpc": "2.0",
                "method": method,
                "params": params if params else None,
                "id": int(time.time() * 1000),
            }

        res = await self.client.post(self.gateway_url, json=body)

        if res.status_code >= 400:
            try:
                error_data = res.json()
            except Exception:
                error_data = {"error": res.text}
            raise ConduitError(
                res.status_code,
                error_data.get("error", "Request failed"),
                error_data,
            )

        data = res.json()
        headers = res.headers

        return ConduitResponse(
            data=data,
            governance=GovernanceMetadata(
                governed=headers.get("X-Conduit-Governed") == "true",
                anti_training=headers.get("X-Conduit-Anti-Training") == "true",
                stateless=headers.get("X-Conduit-Stateless") == "true",
                rate_limit_remaining=_parse_int(headers.get("X-RateLimit-Remaining")),
                rate_limit_reset=_parse_int(headers.get("X-RateLimit-Reset")),
                billing_status=headers.get("X-Conduit-Billing"),
            ),
            status=res.status_code,
        )

    async def list_tools(self) -> ConduitResponse:
        return await self.call(method="tools/list")

    async def search_properties(self, **kwargs) -> ConduitResponse:
        return await self.call(tool_name="search_properties", arguments=kwargs)

    async def read_listing(self, listing_uri: str) -> ConduitResponse:
        return await self.call(resource_uri=listing_uri)

    async def close(self):
        await self.client.aclose()


def _parse_int(val: str | None) -> int | None:
    if val is None:
        return None
    try:
        return int(val)
    except ValueError:
        return None

Usage examples

Basic usage

python
import asyncio

async def main():
    client = ConduitClient(
        gateway_url="https://gateway.conduitapi.dev/s/austin-mls/reso-feed",
        api_key="cnd_live_xxxxxxxxxxxxxxxxxxxx",
    )

    try:
        # List available tools
        result = await client.list_tools()
        print("Tools:", result.data)
        print("Governed:", result.governance.governed)

        # Search properties
        result = await client.search_properties(
            city="Austin",
            propertyType="Residential",
            minPrice=300000,
            maxPrice=500000,
        )
        print("Results:", result.data)
        print("Rate limit remaining:", result.governance.rate_limit_remaining)

    finally:
        await client.close()

asyncio.run(main())

Error handling with retry

python
import asyncio

async def query_with_retry(client: ConduitClient, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            return await client.search_properties(city="Austin")

        except ConduitError as e:
            if e.status == 429:
                retry_after = e.details.get("retry_after", 60)
                print(f"Rate limited. Waiting {retry_after}s...")
                await asyncio.sleep(retry_after)
                continue

            elif e.status in (502, 504):
                wait = 2 ** attempt
                print(f"Upstream error {e.status}. Retrying in {wait}s...")
                await asyncio.sleep(wait)
                continue

            else:
                # 401, 403, 404 — don't retry
                raise

    raise Exception("Max retries exceeded")

Checking governance metadata

python
result = await client.search_properties(city="Austin")

if result.governance.anti_training:
    print("WARNING: Data must not be used for model training")

if result.governance.stateless:
    print("INFO: Do not persist query results")

if result.governance.billing_status == "past_due":
    print("WARNING: Billing is past due — resolve to avoid access loss")