Tristero Docs

Spot Swaps

Execute spot swaps with Permit2

Spot Swaps

Quote, Sign, and Submit

import asyncio
import json
import os

from eth_account import Account

from tristero import get_swap_quote, sign_and_submit, make_async_w3


async def main() -> None:
    private_key = os.getenv("TEST_ACCOUNT_PRIVKEY")
    if not private_key:
        raise RuntimeError("Set TEST_ACCOUNT_PRIVKEY")

    wallet = Account.from_key(private_key).address
    w3 = make_async_w3(os.getenv("ARB_RPC_URL", "https://arbitrum-one-rpc.publicnode.com"))

    # 1. Get a quote (USDC -> WETH on Arbitrum)
    quote = await get_swap_quote(
        wallet=wallet,
        src_chain=42161,
        src_token="0xaf88d065e77c8cC2239327C5EDb3A432268e5831",  # USDC
        dst_chain=42161,
        dst_token="0x82aF49447D8a07e3bd95BD0d56f35241523fBab1",  # WETH
        amount=1_000_000,  # 1 USDC (6 decimals)
    )
    print(json.dumps(quote, indent=2))

    # 2. Sign and submit (w3 required for Permit2 approval on source chain)
    result = await sign_and_submit(quote, private_key, w3=w3, wait=True, timeout=300)
    print(result)


asyncio.run(main())

Direct Execution

For direct spot swap execution:

import os
import asyncio

from eth_account import Account

from tristero import ChainID, TokenSpec, execute_permit2_swap, make_async_w3


async def main() -> None:
    private_key = os.getenv("TEST_ACCOUNT_PRIVKEY")
    if not private_key:
        raise RuntimeError("Set TEST_ACCOUNT_PRIVKEY")

    account = Account.from_key(private_key)

    arbitrum_rpc = os.getenv("ARB_RPC_URL", "https://arbitrum-one-rpc.publicnode.com")
    w3 = make_async_w3(arbitrum_rpc)

    result = await execute_permit2_swap(
        w3=w3,
        account=account,
        src_t=TokenSpec(chain_id=ChainID(42161), token_address="0xaf88d065e77c8cC2239327C5EDb3A432268e5831"),  # USDC (Arbitrum)
        dst_t=TokenSpec(chain_id=ChainID(8453), token_address="0xfde4C96c8593536E31F229EA8f37b2ADa2699bb2"),  # USDT (Base)
        raw_amount=1_000_000,  # 1 USDC (6 decimals)
        timeout=300,
    )
    print(result)


asyncio.run(main())

On this page