CrewAI

This guide shows how to add Epanya marketplace access to a CrewAI crew. You'll define @tool-decorated functions for discovery and purchase, assign them to a dedicated procurement agent, and wire everything into a crew that can autonomously source compute resources for other agents to use.

What you'll build: A two-agent crew — a researcher that determines what compute is needed and a procurement agent that discovers, evaluates, and purchases it from Epanya.

Installation

pip install epanya crewai crewai-tools

Tool definitions

CrewAI uses the @tool decorator from crewai.tools. The docstring becomes the tool description visible to the LLM, so write it carefully.

import asyncio, json, os
from crewai.tools import tool
from epanya import EpanyaClient, create_test_signer

# ── Shared Epanya client ────────────────────────────────────────────────────────

AGENT_WALLET = os.getenv("AGENT_WALLET_ADDRESS", "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266")

_client = EpanyaClient(
    wallet_address=AGENT_WALLET,
    api_url="https://api.epanya.ai",
    signer=create_test_signer(AGENT_WALLET),  # replace with real EIP-3009 signer in production
)

def _run(coro):
    """Run an async coroutine from sync context."""
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            import concurrent.futures
            with concurrent.futures.ThreadPoolExecutor() as pool:
                return pool.submit(asyncio.run, coro).result()
        return loop.run_until_complete(coro)
    except RuntimeError:
        return asyncio.run(coro)


# ── discover_products ─────────────────────────────────────────────────────────

@tool("discover_products")
def discover_products(query: str, category: str = "", max_price: float = 0, limit: int = 10) -> str:
    """Search the Epanya marketplace for products.

    Use this tool to find compute resources, APIs, datasets, AI models, tools,
    or agent labor services available for purchase. Returns a numbered list of
    matching products with their names, USDC prices, pricing models, and IDs.

    Args:
        query:     Free-text search, e.g. 'GPU inference' or 'translation API'.
        category:  Filter by: compute, datasets, apis, models, tools, physical, agent_labor.
        max_price: Maximum price in USDC (0 = no limit).
        limit:     Maximum results to return (default 10).
    """
    products = _run(_client.discover(
        q=query or None,
        category=category or None,
        max_price=max_price or None,
        limit=limit,
    ))
    if not products:
        return "No products found matching those criteria."
    lines = []
    for i, p in enumerate(products, 1):
        lines.append(
            f"{i}. {p['name']} — ${p['priceUsdc']} USDC ({p['pricingModel']})\n"
            f"   ID: {p['id']}\n"
            f"   Category: {p.get('category','')}\n"
            f"   {p['description']}"
        )
    return f"Found {len(products)} product(s):\n\n" + "\n\n".join(lines)


# ── purchase_product ──────────────────────────────────────────────────────────

@tool("purchase_product")
def purchase_product(product_id: str) -> str:
    """Purchase a product from the Epanya marketplace.

    Use this tool after discovering and evaluating products with discover_products.
    Payment is made automatically in USDC via the x402 protocol — no manual
    payment step is required. Returns the transaction ID and the service endpoint
    URL that your agent should use to call the purchased service.

    Args:
        product_id: The product UUID from discover_products results.
    """
    result = _run(_client.purchase(product_id))
    return json.dumps({
        "success":        True,
        "transaction_id": result["transactionId"],
        "endpoint_url":   result["product"]["endpointUrl"],
        "amount_paid":    str(result["amount"]) + " USDC",
        "product_name":   result["product"].get("name", ""),
    }, indent=2)


# ── check_budget ──────────────────────────────────────────────────────────────

@tool("check_budget")
def check_budget() -> str:
    """Check this agent's current USDC spend and remaining budget.

    Call this before making purchases if you are unsure whether sufficient
    budget remains. Returns the total spent, the budget limit (if set), and
    whether the budget has been exceeded.
    """
    budget = _run(_client.check_budget())
    return json.dumps({
        "spent":           str(budget["spent"]) + " USDC",
        "budget_limit":    str(budget.get("budgetLimit") or "unlimited"),
        "remaining":       str(budget.get("remaining")  or "unlimited"),
        "budget_exceeded": budget["budgetExceeded"],
    }, indent=2)

Crew setup

Define two agents — a researcher and a procurement specialist — then compose them into a crew with two sequential tasks.

from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", temperature=0)


# ── Agents ────────────────────────────────────────────────────────────────────

researcher = Agent(
    role="Compute Requirements Analyst",
    goal=(
        "Analyze the workload requirements and determine exactly what type of "
        "compute resource, API, or service is needed. Specify the category, "
        "required capabilities, and maximum acceptable price in USDC."
    ),
    backstory=(
        "You are an expert at translating high-level workload descriptions into "
        "precise technical requirements. You understand GPU specs, API rate limits, "
        "and cost/performance tradeoffs for AI workloads."
    ),
    llm=llm,
    verbose=True,
)

procurement_agent = Agent(
    role="Autonomous Procurement Specialist",
    goal=(
        "Source and purchase the compute resource or service specified by the "
        "Compute Requirements Analyst. Always check budget before purchasing. "
        "Select the best value option and return the purchased endpoint URL."
    ),
    backstory=(
        "You are a specialist in programmatic procurement on the Epanya marketplace. "
        "You know how to search effectively, evaluate product quality and pricing, "
        "and execute purchases securely via the x402 payment protocol."
    ),
    tools=[discover_products, purchase_product, check_budget],
    llm=llm,
    verbose=True,
)


# ── Tasks ─────────────────────────────────────────────────────────────────────

requirements_task = Task(
    description=(
        "Analyze the following workload request and produce a precise procurement brief:\n\n"
        "{workload_request}\n\n"
        "Your brief must include:\n"
        "- The Epanya product category to search (compute/apis/models/tools/datasets/agent_labor)\n"
        "- Key search terms\n"
        "- Maximum acceptable price per request/unit in USDC\n"
        "- Minimum quality bar (e.g. rating, throughput, latency)\n"
        "- Any hard requirements (e.g. must support specific model, region, format)"
    ),
    expected_output=(
        "A structured procurement brief with: category, search_query, max_price_usdc, "
        "quality_requirements, and hard_requirements fields."
    ),
    agent=researcher,
)

procurement_task = Task(
    description=(
        "Using the procurement brief from the previous task, find and purchase the best "
        "matching product on the Epanya marketplace.\n\n"
        "Steps:\n"
        "1. Check the budget first.\n"
        "2. Search for products using the category and query from the brief.\n"
        "3. Evaluate the results against the quality and price requirements.\n"
        "4. Purchase the best matching product.\n"
        "5. Return a final report with: product name, price paid, transaction ID, "
        "and the endpoint URL to use."
    ),
    expected_output=(
        "A procurement report containing: product_name, price_paid_usdc, "
        "transaction_id, endpoint_url, and a brief explanation of why this product was chosen."
    ),
    agent=procurement_agent,
    context=[requirements_task],
)


# ── Crew ──────────────────────────────────────────────────────────────────────

crew = Crew(
    agents=[researcher, procurement_agent],
    tasks=[requirements_task, procurement_task],
    process=Process.sequential,
    verbose=True,
)

Run the crew with a natural-language workload description:

result = crew.kickoff(inputs={
    "workload_request": (
        "We need to run Stable Diffusion XL inference for a batch of 500 images. "
        "Each image takes roughly 3 seconds of GPU time. We need low latency and "
        "high throughput. Our budget is $2.50 per request maximum."
    )
})

print(result)

Full code

import asyncio, json, os, concurrent.futures
from crewai import Agent, Task, Crew, Process
from crewai.tools import tool
from langchain_openai import ChatOpenAI
from epanya import EpanyaClient, create_test_signer

WALLET  = os.getenv("AGENT_WALLET_ADDRESS", "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266")
_client = EpanyaClient(wallet_address=WALLET, api_url="https://api.epanya.ai", signer=create_test_signer(WALLET))

def _run(coro):
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            with concurrent.futures.ThreadPoolExecutor() as pool:
                return pool.submit(asyncio.run, coro).result()
        return loop.run_until_complete(coro)
    except RuntimeError:
        return asyncio.run(coro)

@tool("discover_products")
def discover_products(query: str, category: str = "", max_price: float = 0, limit: int = 10) -> str:
    """Search the Epanya marketplace. Args: query, category, max_price (USDC), limit."""
    ps = _run(_client.discover(q=query or None, category=category or None, max_price=max_price or None, limit=limit))
    if not ps: return "No products found."
    return "\n\n".join(f"{i+1}. {p['name']} ${p['priceUsdc']} ({p['pricingModel']})\n   ID: {p['id']}\n   {p['description']}" for i, p in enumerate(ps))

@tool("purchase_product")
def purchase_product(product_id: str) -> str:
    """Purchase a product by its ID. Returns transaction ID and endpoint URL."""
    r = _run(_client.purchase(product_id))
    return json.dumps({"transaction_id": r["transactionId"], "endpoint_url": r["product"]["endpointUrl"], "paid": str(r["amount"]) + " USDC"})

@tool("check_budget")
def check_budget() -> str:
    """Check USDC spend vs budget limit."""
    b = _run(_client.check_budget())
    return json.dumps({"spent": b["spent"], "limit": b.get("budgetLimit","unlimited"), "exceeded": b["budgetExceeded"]})

llm = ChatOpenAI(model="gpt-4o", temperature=0)

researcher = Agent(
    role="Compute Requirements Analyst",
    goal="Translate workload descriptions into precise Epanya procurement briefs.",
    backstory="Expert at matching AI workloads to marketplace product categories and specs.",
    llm=llm, verbose=True,
)
procurement_agent = Agent(
    role="Procurement Specialist",
    goal="Find and purchase the best matching product on Epanya within budget.",
    backstory="Specialist in programmatic procurement via the Epanya marketplace and x402 payments.",
    tools=[discover_products, purchase_product, check_budget],
    llm=llm, verbose=True,
)

requirements_task = Task(
    description="Analyze: {workload_request}\nProduce: category, search_query, max_price_usdc, quality_requirements.",
    expected_output="Structured procurement brief.",
    agent=researcher,
)
procurement_task = Task(
    description="Using the brief: check budget, discover products, purchase the best match, return endpoint URL.",
    expected_output="Procurement report: product_name, price_paid_usdc, transaction_id, endpoint_url.",
    agent=procurement_agent,
    context=[requirements_task],
)

crew = Crew(agents=[researcher, procurement_agent], tasks=[requirements_task, procurement_task], process=Process.sequential, verbose=True)

if __name__ == "__main__":
    result = crew.kickoff(inputs={"workload_request": "Need a fast GPU inference endpoint for image generation, max $3/request."})
    print(result)

Multi-agent patterns

The procurement agent pattern composes with larger crews. Some examples:

PatternDescription
Research → Procure → Execute A third agent receives the purchased endpoint URL and actually calls the service, then reports results back to a supervisor.
Multi-vendor comparison Run discover_products with several different queries, have an evaluator agent score each option, then purchase only the winner.
Hierarchical crew Use Process.hierarchical with a manager LLM. The manager dispatches to the procurement agent only when a tool or service is needed mid-task.
Recurring procurement Wrap the crew in a scheduler. Each run checks if the previous transaction is still valid; if not, re-purchase. Useful for per-request priced products.
Production signer: Replace create_test_signer with a real EIP-3009 signer backed by your agent's private key before deploying. See the Python signers guide for a web3.py / eth_account example.