```
├── .flake8
├── .gitignore (700 tokens)
├── LICENSE (omitted)
├── README.md (2.2k tokens)
├── README_EN.md (3.6k tokens)
├── apex_requirements.txt (100 tokens)
├── docs/
├── ADDING_EXCHANGES.md (3.2k tokens)
├── telegram-bot-setup-en.md (400 tokens)
├── telegram-bot-setup.md (300 tokens)
├── env_example.txt (600 tokens)
├── exchanges/
├── __init__.py (100 tokens)
├── apex.py (5k tokens)
├── aster.py (6.6k tokens)
├── backpack.py (5.1k tokens)
├── base.py (800 tokens)
├── bp_client.py (4.9k tokens)
├── edgex.py (4.9k tokens)
├── ethereal.py (7.1k tokens)
├── extended.py (7.5k tokens)
├── factory.py (800 tokens)
├── grvt.py (4.8k tokens)
├── lighter.py (4.6k tokens)
├── lighter_custom_websocket.py (4.2k tokens)
├── nado.py (4.3k tokens)
├── paradex.py (5.6k tokens)
├── standx.py (6.1k tokens)
├── hedge/
├── hedge_mode_apex.py (9.8k tokens)
├── hedge_mode_bp.py (11k tokens)
├── hedge_mode_edgex.py (13.2k tokens)
├── hedge_mode_ext.py (11.3k tokens)
├── hedge_mode_grvt.py (10k tokens)
├── hedge_mode_grvt_v2.py (12.3k tokens)
├── hedge_mode_nado.py (8.9k tokens)
├── hedge_mode_standx.py (7.3k tokens)
├── hedge_mode.py (1500 tokens)
├── helpers/
├── __init__.py
├── ethereal/
├── README.md (300 tokens)
├── sign_linked_signer.py (800 tokens)
├── subaccount_eip712.html (2.7k tokens)
├── lark_bot.py (500 tokens)
├── lighter_ws.py (200 tokens)
├── logger.py (900 tokens)
├── telegram_bot.py (400 tokens)
├── para_requirements.txt (100 tokens)
├── requirements.txt (200 tokens)
├── runbot.py (1100 tokens)
├── tests/
├── test_query_retry.py (500 tokens)
├── trading_bot.py (5.1k tokens)
```
## /.flake8
```flake8 path="/.flake8"
[flake8]
max-line-length = 129
```
## /.gitignore
```gitignore path="/.gitignore"
# Dev environment
/backend/db.sqlite3
*.csv
bnbenv
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
lerna-debug.log*
node_modules
dist
dist-ssr
*.local
# Editor directories and files
.vscode/*
!.vscode/extensions.json
.idea
.DS_Store
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
*_env/
venv/
ENV/
env.bak/
venv.bak/
env_files/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# pyenv settings
.python-version
```
## /README.md
##### 关注我 **X (Twitter)**: [@yourQuantGuy](https://x.com/yourQuantGuy)
---
**English speakers**: Please read README_EN.md for the English version of this documentation.
## 📢 分享说明
**欢迎分享本项目!** 如果您要分享或修改此代码,请务必包含对原始仓库的引用。我们鼓励开源社区的发展,但请保持对原作者工作的尊重和认可。
---
## 自动交易机器人
一个支持多个交易所(目前包括 EdgeX, Backpack, Paradex, Aster, Lighter, grvt, Extended)的模块化交易机器人。该机器人实现了自动下单并在盈利时自动平仓的策略,主要目的是取得高交易量。
## 邀请链接 (获得返佣以及福利)
#### EdgeX: [https://pro.edgex.exchange/referral/QUANT](https://pro.edgex.exchange/referral/QUANT)
永久享受 VIP 1 费率;额外 10% 手续费返佣;10% 额外奖励积分
#### Backpack: [https://backpack.exchange/join/quant](https://backpack.exchange/join/quant)
使用我的推荐链接获得 35% 手续费返佣
#### Paradex: [https://app.paradex.trade/r/quant](https://app.paradex.trade/r/quant)
使用我的推荐链接获得 10% 手续费返佣以及潜在未来福利
#### Aster: [https://www.asterdex.com/zh-CN/referral/5191B1](https://www.asterdex.com/zh-CN/referral/5191B1)
使用我的推荐链接获得 30% 手续费返佣以及积分加成
#### grvt: [https://grvt.io/exchange/sign-up?ref=QUANT](https://grvt.io/exchange/sign-up?ref=QUANT)
获得 1.3x 全网最高的积分加成,未来的手续费返佣(官方预计 10 月中上线),以及即将开始的专属交易竞赛
#### Extended: [https://app.extended.exchange/join/QUANT](https://app.extended.exchange/join/QUANT)
10%的即时手续费减免;积分加成(官方未公布具体加成公式,但文档里有明确说明,通过官方大使邀请能拿到比自己小号邀请自己更多的分数)
#### ApeX: [https://join.omni.apex.exchange/quant](https://join.omni.apex.exchange/quant)
30%返佣; 5%手续费减免; 积分加成; 有资格参与 10 月 20 日至 11 月 2 日的社区专属交易竞赛,总奖金高达$5500
## 安装
Python 版本要求(最佳选项是 Python 3.10 - 3.12):
- grvt 要求 python 版本在 3.10 及以上
- Paradex 要求 python 版本在 3.9 - 3.12
- 其他交易所需要 python 版本在 3.8 及以上
1. **克隆仓库**:
```bash
git clone <repository-url>
cd perp-dex-tools
```
2. **创建并激活虚拟环境**:
首先确保你目前不在任何虚拟环境中:
```bash
deactivate
```
创建虚拟环境:
```bash
python3 -m venv env
```
激活虚拟环境(每次使用脚本时,都需要激活虚拟环境):
```bash
source env/bin/activate # Windows: env\Scripts\activate
```
3. **安装依赖**:
首先确保你目前不在任何虚拟环境中:
```bash
deactivate
```
激活虚拟环境(每次使用脚本时,都需要激活虚拟环境):
```bash
source env/bin/activate # Windows: env\Scripts\activate
```
```bash
pip install -r requirements.txt
```
**grvt 用户**:如果您想使用 grvt 交易所,需要额外安装 grvt 专用依赖:
激活虚拟环境(每次使用脚本时,都需要激活虚拟环境):
```bash
source env/bin/activate # Windows: env\Scripts\activate
```
```bash
pip install grvt-pysdk
```
**Paradex 用户**:如果您想使用 Paradex 交易所,需要额外创建一个虚拟环境并安装 Paradex 专用依赖:
首先确保你目前不在任何虚拟环境中:
```bash
deactivate
```
创建 Paradex 专用的虚拟环境(名称为 para_env):
```bash
python3 -m venv para_env
```
激活虚拟环境(每次使用脚本时,都需要激活虚拟环境):
```bash
source para_env/bin/activate # Windows: para_env\Scripts\activate
```
安装 Paradex 依赖
```bash
pip install -r para_requirements.txt
```
**apex 用户**:如果您想使用 apex 交易所,需要额外安装 apex 专用依赖:
激活虚拟环境(每次使用脚本时,都需要激活虚拟环境):
```bash
source env/bin/activate # Windows: env\Scripts\activate
```
```bash
pip install -r apex_requirements.txt
```
4. **设置环境变量**:
在项目根目录创建`.env`文件,并使用 env_example.txt 作为样本,修改为你的 api 密匙。
5. **Telegram 机器人设置(可选)**:
如需接收交易通知,请参考 [Telegram 机器人设置指南](docs/telegram-bot-setup.md) 配置 Telegram 机器人。
## 策略概述
**重要提醒**:大家一定要先理解了这个脚本的逻辑和风险,这样你就能设置更适合你自己的参数,或者你也可能觉得这不是一个好策略,根本不想用这个策略来刷交易量。我在推特也说过,我不是为了分享而写这些脚本,而是我真的在用这个脚本,所以才写了,然后才顺便分享出来。
这个脚本主要还是要看长期下来的磨损,只要脚本持续开单,如果一个月后价格到你被套的最高点,那么你这一个月的交易量就都是零磨损的了。所以我认为如果把`--quantity`和`--wait-time`设置的太小,并不是一个好的长期的策略,但确实适合短期内高强度冲交易量。我自己一般用 40 到 60 的 quantity,450 到 650 的 wait-time,以此来保证即使市场和你的判断想法,脚本依然能够持续稳定地下单,直到价格回到你的开单点,实现零磨损刷了交易量。
该机器人实现了简单的交易策略:
1. **订单下单**:在市场价格附近下限价单
2. **订单监控**:等待订单成交
3. **平仓订单**:在止盈水平自动下平仓单
4. **持仓管理**:监控持仓和活跃订单
5. **风险管理**:限制最大并发订单数
6. **网格步长控制**:通过 `--grid-step` 参数控制新订单与现有平仓订单之间的最小价格距离
7. **停止交易控制**:通过 `--stop-price` 参数控制停止交易的的价格条件
#### ⚙️ 关键参数
- **quantity**: 每笔订单的交易数量
- **direction**: 脚本交易的方向,buy 表示看多,sell 表示看空
- **take-profit**: 止盈百分比(如 0.02 表示 0.02%)
- **max-orders**: 最大同时活跃订单数(风险控制)
- **wait-time**: 订单间等待时间(避免过于频繁交易)
- **grid-step**: 网格步长控制(防止平仓订单过于密集)
- **stop-price**: 当市场价格达到该价格时退出脚本
- **pause-price**: 当市场价格达到该价格时暂停脚本
#### 网格步长功能详解
`--grid-step` 参数用于控制新订单的平仓价格与现有平仓订单之间的最小距离:
- **默认值 -100**:无网格步长限制,按原策略执行
- **正值(如 0.5)**:新订单的平仓价格必须与最近的平仓订单价格保持至少 0.5% 的距离
- **作用**:防止平仓订单过于密集,提高成交概率和风险管理
例如,当看多且 `--grid-step 0.5` 时:
- 如果现有平仓订单价格为 2000 USDT
- 新订单的平仓价格必须低于 1990 USDT(2000 × (1 - 0.5%))
- 这样可以避免平仓订单过于接近,提高整体策略效果
#### 📊 交易流程示例
假设当前 ETH 价格为 $2000,设置止盈为 0.02%:
1. **开仓**:在 $2000.40 下买单(略高于市价)
2. **成交**:订单被市场成交,获得多头仓位
3. **平仓**:立即在 $2000.80 下卖单(止盈价格)
4. **完成**:平仓单成交,获得 0.02% 利润
5. **重复**:继续下一轮交易
#### 🛡️ 风险控制
- **订单限制**:通过 `max-orders` 限制最大并发订单数
- **网格控制**:通过 `grid-step` 确保平仓订单有合理间距
- **下单频率控制**:通过 `wait-time` 确保下单的时间间隔,防止短时间内被套
- **实时监控**:持续监控持仓和订单状态
- **⚠️ 无止损机制**:此策略不包含止损功能,在不利市场条件下可能面临较大损失
## 示例命令:
### EdgeX 交易所:
ETH:
```bash
python runbot.py --exchange edgex --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450
```
ETH(带网格步长控制):
```bash
python runbot.py --exchange edgex --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450 --grid-step 0.5
```
ETH(带停止交易的价格控制):
```bash
python runbot.py --exchange edgex --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450 --stop-price 5500
```
BTC:
```bash
python runbot.py --exchange edgex --ticker BTC --quantity 0.05 --take-profit 0.02 --max-orders 40 --wait-time 450
```
### Backpack 交易所:
ETH 永续合约:
```bash
python runbot.py --exchange backpack --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450
```
ETH 永续合约(带网格步长控制):
```bash
python runbot.py --exchange backpack --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450 --grid-step 0.3
```
ETH 永续合约(启用 Boost 模式):
```bash
python runbot.py --exchange backpack --ticker ETH --direction buy --quantity 0.1 --boost
```
### Aster 交易所:
ETH:
```bash
python runbot.py --exchange aster --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450
```
ETH(启用 Boost 模式):
```bash
python runbot.py --exchange aster --ticker ETH --direction buy --quantity 0.1 --boost
```
### GRVT 交易所:
BTC:
```bash
python runbot.py --exchange grvt --ticker BTC --quantity 0.05 --take-profit 0.02 --max-orders 40 --wait-time 450
```
### Extended 交易所:
ETH:
```bash
python runbot.py --exchange extended --ticker ETH --quantity 0.1 --take-profit 0 --max-orders 40 --wait-time 450 --grid-step 0.1
```
## 🆕 对冲模式 (Hedge Mode)
新增的对冲模式 (`hedge_mode.py`) 是一个新的交易策略,通过同时在两个交易所进行对冲交易来降低风险:
### 对冲模式工作原理
1. **开仓阶段**:在选定交易所(如 Backpack)下 maker 订单
2. **对冲阶段**:订单成交后,立即在 Lighter 下市价订单进行对冲
3. **平仓阶段**:在选定交易所下另一个 maker 订单平仓
4. **对冲平仓**:在 Lighter 下市价订单平仓
### 对冲模式优势
- **风险降低**:通过同时持有相反头寸,降低单边市场风险
- **交易量提升**:在两个交易所同时产生交易量
- **套利机会**:利用两个交易所之间的价差
- **自动化执行**:全自动化的对冲交易流程
### 对冲模式使用示例
```bash
# 运行 BTC 对冲模式(Backpack)
python hedge_mode.py --exchange backpack --ticker BTC --size 0.05 --iter 20 --max-position 1
# 运行 ETH 对冲模式(Extended)
python hedge_mode.py --exchange extended --ticker ETH --size 0.1 --iter 20
# 运行 BTC 对冲模式(Apex)
python hedge_mode.py --exchange apex --ticker BTC --size 0.05 --iter 20
# 运行 BTC 对冲模式(GRVT)
python hedge_mode.py --exchange grvt --ticker BTC --size 0.05 --iter 20
# 运行 BTC 对冲模式(edgeX)
python hedge_mode.py --exchange edgex --ticker BTC --size 0.001 --iter 20
```
### 对冲模式参数
- `--exchange`: 主要交易所(支持 'backpack', 'extended', 'apex', 'grvt', 'edgex')
- `--ticker`: 交易对符号(如 BTC, ETH)
- `--size`: 每笔订单数量
- `--iter`: 交易循环次数
- `--fill-timeout`: maker 订单填充超时时间(秒,默认 5)
- `--sleep`: 每一笔交易之后的暂停时间,增加持仓时间(秒,默认 0)
- `--max-position`: 当设置了这个参数后,对冲模式会在对冲的同时逐渐建仓到设置的最大仓位,单位是币本位,比如在跑 btc 时设置 0.1,就是指逐渐建仓到 0.1btc,并逐渐建仓。达到这个最大仓位后,会逐渐建仓,以此循环。
## 配置
### 环境变量
#### 通用配置
- `ACCOUNT_NAME`: 环境变量中当前账号的名称,用于多账号日志区分,可自定义,非必须
#### Telegram 配置(可选)
- `TELEGRAM_BOT_TOKEN`: Telegram 机器人令牌
- `TELEGRAM_CHAT_ID`: Telegram 对话 ID
#### EdgeX 配置
- `EDGEX_ACCOUNT_ID`: 您的 EdgeX 账户 ID
- `EDGEX_STARK_PRIVATE_KEY`: 您的 EdgeX API 私钥
- `EDGEX_BASE_URL`: EdgeX API 基础 URL(默认:https://pro.edgex.exchange)
- `EDGEX_WS_URL`: EdgeX WebSocket URL(默认:wss://quote.edgex.exchange)
#### Backpack 配置
- `BACKPACK_PUBLIC_KEY`: 您的 Backpack API Key
- `BACKPACK_SECRET_KEY`: 您的 Backpack API Secret
#### Paradex 配置
- `PARADEX_L1_ADDRESS`: L1 钱包地址
- `PARADEX_L2_PRIVATE_KEY`: L2 钱包私钥(点击头像,钱包,"复制 paradex 私钥")
#### Aster 配置
- `ASTER_API_KEY`: 您的 Aster API Key
- `ASTER_SECRET_KEY`: 您的 Aster API Secret
#### Lighter 配置
- `API_KEY_PRIVATE_KEY`: Lighter API 私钥
- `LIGHTER_ACCOUNT_INDEX`: Lighter 账户索引
- `LIGHTER_API_KEY_INDEX`: Lighter API 密钥索引
#### GRVT 配置
- `GRVT_TRADING_ACCOUNT_ID`: 您的 GRVT 交易账户 ID
- `GRVT_PRIVATE_KEY`: 您的 GRVT 私钥
- `GRVT_API_KEY`: 您的 GRVT API 密钥
#### Extended 配置
- `EXTENDED_API_KEY`: Extended API Key
- `EXTENDED_STARK_KEY_PUBLIC`: 创建 API 后显示的 Stark 公钥
- `EXTENDED_STARK_KEY_PRIVATE`: 创建 API 后显示的 Stark 私钥
- `EXTENDED_VAULT`: 创建 API 后显示的 Extended Vault ID
#### Apex 配置
- `APEX_API_KEY`: 您的 Apex API 密钥
- `APEX_API_KEY_PASSPHRASE`: 您的 Apex API 密钥密码
- `APEX_API_KEY_SECRET`: 您的 Apex API 密钥私钥
- `APEX_OMNI_KEY_SEED`: 您的 Apex Omni 密钥种子
#### Nado 配置
- `NADO_PRIVATE_KEY`: 您的钱包私钥
- `NADO_MODE`: 网络模式(MAINNET 或 DEVNET,默认:MAINNET)
**获取 LIGHTER_ACCOUNT_INDEX 的方法**:
1. 在下面的网址最后加上你的钱包地址:
```
https://mainnet.zklighter.elliot.ai/api/v1/account?by=l1_address&value=
```
2. 在浏览器中打开这个网址
3. 在结果中搜索 "account_index" - 如果你有子账户,会有多个 account_index,短的那个是你主账户的,长的是你的子账户。
### 命令行参数
- `--exchange`: 使用的交易所:'edgex'、'backpack'、'paradex'、'aster'、'lighter'、'grvt'、'extended' 或 'nado'(默认:edgex)
- `--ticker`: 标的资产符号(例如:ETH、BTC、SOL)。合约 ID 自动解析。
- `--quantity`: 订单数量(默认:0.1)
- `--take-profit`: 止盈百分比(例如 0.02 表示 0.02%)
- `--direction`: 交易方向:'buy'或'sell'(默认:buy)
- `--env-file`: 账户配置文件 (默认:.env)
- `--max-orders`: 最大活跃订单数(默认:40)
- `--wait-time`: 订单间等待时间(秒)(默认:450)
- `--grid-step`: 与下一个平仓订单价格的最小距离百分比(默认:-100,表示无限制)
- `--stop-price`: 当 `direction` 是 'buy' 时,当 price >= stop-price 时停止交易并退出程序;'sell' 逻辑相反(默认:-1,表示不会因为价格原因停止交易),参数的目的是防止订单被挂在”你认为的开多高点或开空低点“。
- `--pause-price`: 当 `direction` 是 'buy' 时,当 price >= pause-price 时暂停交易,并在价格回到 pause-price 以下时重新开始交易;'sell' 逻辑相反(默认:-1,表示不会因为价格原因停止交易),参数的目的是防止订单被挂在”你认为的开多高点或开空低点“。
- `--boost`: 启用 Boost 模式进行交易量提升(仅适用于 aster 和 backpack 交易所)
Boost 模式的下单逻辑:下 maker 单开仓,成交后立即用 taker 单关仓,以此循环。磨损为一单 maker,一单 taker 的手续费,以及滑点。
## 日志记录
该机器人提供全面的日志记录:
- **交易日志**:包含订单详情的 CSV 文件
- **调试日志**:带时间戳的详细活动日志
- **控制台输出**:实时状态更新
- **错误处理**:全面的错误日志记录和处理
## Q & A
### 如何在同一设备配置同一交易所的多个账号?
1. 为每个账户创建一个 .env 文件,如 account_1.env, account_2.env
2. 在每个账户的 .env 文件中设置 `ACCOUNT_NAME=`, 如`ACCOUNT_NAME=MAIN`。
3. 在每个文件中配置好每个账户的 API key 或是密匙
4. 通过更改命令行中的 `--env-file` 参数来开始不同的账户,如 `python runbot.py --env-file account_1.env [其他参数...]`
### 如何在同一设备配置不同交易所的多个账号?
将不同交易所的账号都配置在同一 `.env` 文件后,通过更改命令行中的 `--exchange` 参数来开始不同的交易所,如 `python runbot.py --exchange backpack [其他参数...]`
### 如何在同一设备用同一账号配置同一交易所的多个合约?
将账号配置在 `.env` 文件后,通过更改命令行中的 `--ticker` 参数来开始不同的合约,如 `python runbot.py --ticker ETH [其他参数...]`
## 贡献
1. Fork 仓库
2. 创建功能分支
3. 进行更改
4. 如适用,添加测试
5. 提交拉取请求
## 许可证
本项目采用非商业许可证 - 详情请参阅[LICENSE](LICENSE)文件。
**重要提醒**:本软件仅供个人学习和研究使用,严禁用于任何商业用途。如需商业使用,请联系作者获取商业许可证。
## 免责声明
本软件仅供教育和研究目的。加密货币交易涉及重大风险,可能导致重大财务损失。使用风险自负,切勿用您无法承受损失的资金进行交易。
## /README_EN.md
##### Follow Me - **X (Twitter)**: [@yourQuantGuy](https://x.com/yourQuantGuy)
## 📢 Sharing Notice
**Sharing is encouraged!** If you share or modify this code, please include attribution to the original repository. We encourage the growth of the open-source community, but please maintain respect and recognition for the original author's work.
---
## Multi-Exchange Trading Bot
A modular trading bot that supports multiple exchanges including EdgeX, Backpack, Paradex, Aster, Lighter, GRVT, and Extended. The bot implements an automated strategy that places orders and automatically closes them at a profit.
## Referral Links (Enjoy fee rebates and benefits)
#### EdgeX: [https://pro.edgex.exchange/referral/QUANT](https://pro.edgex.exchange/referral/QUANT)
Instant VIP 1 Trading Fees; 10% Fee Rebate; 10% Bonus Points
#### Backpack: [https://backpack.exchange/join/quant](https://backpack.exchange/join/quant)
You will get 35% fee rebates on all your trading fees
#### Paradex: [https://app.paradex.trade/r/quant](https://app.paradex.trade/r/quant)
You will get 10% taker fee discount rebates and potential future benefits
#### Aster: [https://www.asterdex.com/zh-CN/referral/5191B1](https://www.asterdex.com/zh-CN/referral/5191B1)
You will get 30% fee rebates and points boost
#### grvt: [https://grvt.io/exchange/sign-up?ref=QUANT](https://grvt.io/exchange/sign-up?ref=QUANT)
You will get 1.3x points boost; rebates (auto rebates system is expected to be launched in mid-Oct); access to private trading competition
#### Extended: [https://app.extended.exchange/join/QUANT](https://app.extended.exchange/join/QUANT)
10% fee discount; points boost (black box, but "you will get more points from affiliate referral program than using another account to refer yourself" quoted from the official documentation from Extended); access to private trading competition
#### ApeX: [https://join.omni.apex.exchange/quant](https://join.omni.apex.exchange/quant)
30% fee rebates; 5% fee discount; points boost
## Installation
1. **Clone the repository**:
```bash
git clone <repository-url>
cd perp-dex-tools
```
2. **Create and activate virtual environment**:
First, make sure you are not currently in any virtual environment:
```bash
deactivate
```
Create virtual environment:
```bash
python3 -m venv env
```
Activate virtual environment (you need to activate the virtual environment every time you use the script):
```bash
source env/bin/activate # Windows: env\Scripts\activate
```
3. **Install dependencies**:
First, make sure you are not currently in any virtual environment:
```bash
deactivate
```
Activate virtual environment (you need to activate the virtual environment every time you use the script):
```bash
source env/bin/activate # Windows: env\Scripts\activate
```
```bash
pip install -r requirements.txt
```
**Paradex Users**: If you want to use Paradex exchange, you need to create an additional virtual environment and install Paradex-specific dependencies:
First, make sure you are not currently in any virtual environment:
```bash
deactivate
```
Create a dedicated virtual environment for Paradex (named para_env):
```bash
python3 -m venv para_env
```
Activate virtual environment (you need to activate the virtual environment every time you use the script):
```bash
source para_env/bin/activate # Windows: para_env\Scripts\activate
```
Install Paradex dependencies
```bash
pip install -r para_requirements.txt
```
**Apex Users**: If you want to use Apex exchange, you need to install Apex dependencies:
First, make sure you are not currently in any virtual environment:
```bash
source env/bin/activate # Windows: env\Scripts\activate
```
Install Apex dependencies
```bash
pip install -r apex_requirements.txt
```
4. **Set up environment variables**:
Create a `.env` file in the project root directory and use env_example.txt as a template to modify with your API keys.
5. **Telegram Bot Setup (Optional)**:
To receive trading notifications, please refer to the [Telegram Bot Setup Guide](docs/telegram-bot-setup-en.md) to configure your Telegram bot.
## Strategy Overview
**Important Notice**: Everyone must first understand the logic and risks of this script so you can set parameters that are more suitable for yourself, or you might think this is not a good strategy and don't want to use it at all. As I mentioned on Twitter, I didn't write these scripts for sharing purposes, but because I'm actually using this script myself, so I wrote it, and then shared it.
This script mainly focuses on long-term wear and tear. As long as the script continues to place orders, if the price reaches your highest trapped point after a month, then all your trading volume for that month will be zero-wear. Therefore, I believe that setting `--quantity` and `--wait-time` too small is not a good long-term strategy, but it is indeed suitable for short-term high-intensity volume trading. I usually use quantity between 40-60 and wait-time between 450-650 to ensure that even if the market goes against your judgment, the script can still place orders continuously and stably until the price returns to your entry point, achieving zero-wear volume trading.
The bot implements a simple trading strategy:
1. **Order Placement**: Places limit orders near the current market price
2. **Order Monitoring**: Waits for orders to be filled
3. **Close Order**: Automatically places close orders at the take-profit level
4. **Position Management**: Monitors positions and active orders
5. **Risk Management**: Limits maximum number of concurrent orders
6. **Grid Step Control**: Controls minimum price distance between new orders and existing close orders via `--grid-step` parameter
7. **Stop Trading Control**: Controls the price conditions for stopping transactions through the `--stop-price` parameter
#### ⚙️ Key Parameters
- **quantity**: Trading amount per order
- **take-profit**: Take-profit percentage (e.g., 0.02 means 0.02%)
- **max-orders**: Maximum concurrent active orders (risk control)
- **wait-time**: Wait time between orders (prevents overtrading)
- **grid-step**: Grid step control (prevents close orders from being too dense)
- **stop-price**: When `direction` is 'buy', exit when price >= stop-price; 'sell' logic is opposite (default: -1, no price-based termination)
- **pause-price**: When `direction` is 'buy', pause when price >= pause-price; 'sell' logic is opposite (default: -1, no price-based pausing)
#### Grid Step Feature
The `--grid-step` parameter controls the minimum distance between new order close prices and existing close order prices:
- **Default -100**: No grid step restriction, executes original strategy
- **Positive value (e.g., 0.5)**: New order close price must maintain at least 0.5% distance from the nearest close order price
- **Purpose**: Prevents close orders from being too dense, improving fill probability and risk management
For example, when Long and `--grid-step 0.5`:
- If existing close order price is 2000 USDT
- New order close price must be lower than 1990 USDT (2000 × (1 - 0.5%))
- This prevents close orders from being too close together, improving overall strategy effectiveness
#### 📊 Trading Flow Example
Assuming current ETH price is $2000 with take-profit set to 0.02%:
1. **Open Position**: Places buy order at $2000.40 (slightly above market price)
2. **Fill**: Order gets filled by the market, acquiring long position
3. **Close Position**: Immediately places sell order at $2000.80 (take-profit price)
4. **Complete**: Close order gets filled, earning 0.02% profit
5. **Repeat**: Continues to the next trading cycle
#### 🛡️ Risk Management
- **Order Limits**: Limits maximum concurrent orders via `max-orders`
- **Grid Control**: Ensures reasonable spacing between close orders via `grid-step`
- **Order Frequency Control**: Controls order timing via `wait-time` to prevent being trapped in short periods
- **Real-time Monitoring**: Continuously monitors positions and order status
- **⚠️ No Stop Loss**: This strategy does not include stop-loss functionality and may face significant losses in adverse market conditions
## Sample commands:
### EdgeX Exchange:
ETH:
```bash
python runbot.py --exchange edgex --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450
```
ETH (with grid step control):
```bash
python runbot.py --exchange edgex --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450 --grid-step 0.5
```
ETH (with stop price control):
```bash
python runbot.py --exchange edgex --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450 --stop-price 5500
```
BTC:
```bash
python runbot.py --exchange edgex --ticker BTC --quantity 0.05 --take-profit 0.02 --max-orders 40 --wait-time 450
```
### Backpack Exchange:
ETH Perpetual:
```bash
python runbot.py --exchange backpack --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450
```
ETH Perpetual (with grid step control):
```bash
python runbot.py --exchange backpack --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450 --grid-step 0.3
```
ETH Perpetual (with Boost mode enabled):
```bash
python runbot.py --exchange backpack --ticker ETH --direction buy --quantity 0.1 --boost
```
### Aster Exchange:
ETH:
```bash
python runbot.py --exchange aster --ticker ETH --quantity 0.1 --take-profit 0.02 --max-orders 40 --wait-time 450
```
ETH (with Boost mode enabled):
```bash
python runbot.py --exchange aster --ticker ETH --direction buy --quantity 0.1 --boost
```
### GRVT Exchange:
BTC:
```bash
python runbot.py --exchange grvt --ticker BTC --quantity 0.05 --take-profit 0.02 --max-orders 40 --wait-time 450
```
### Extended Exchange:
ETH:
```bash
python runbot.py --exchange extended --ticker ETH --quantity 0.1 --take-profit 0 --max-orders 40 --wait-time 450 --grid-step 0.01
```
## 🆕 Hedge Mode
The new Hedge Mode (`hedge_mode.py`) is an trading strategy that reduces risk by simultaneously hedging trades across two exchanges:
### How Hedge Mode Works
1. **Opening Phase**: Place maker order at selected exchange (e.g., Backpack)
2. **Hedging Phase**: After order fills, immediately place market order at Lighter to hedge
3. **Closing Phase**: Place another maker order at selected exchange to close position
4. **Hedge Closing**: Place market order at Lighter to close hedge position
### Hedge Mode Usage Examples
```bash
# Run BTC hedge mode with Backpack
python hedge_mode.py --exchange backpack --ticker BTC --size 0.05 --iter 20 --max-position 1
# Run ETH hedge mode with Extended
python hedge_mode.py --exchange extended --ticker ETH --size 0.1 --iter 20
# Run BTC hedge mode with Apex
python hedge_mode.py --exchange apex --ticker BTC --size 0.05 --iter 20
# Run BTC hedge mode with GRVT
python hedge_mode.py --exchange grvt --ticker BTC --size 0.05 --iter 20
# Run BTC hedge mode with edgeX
python hedge_mode.py --exchange edgex --ticker BTC --size 0.001 --iter 20
```
### Hedge Mode Parameters
- `--exchange`: Primary exchange (supports 'backpack', 'extended', 'apex', 'grvt', 'edgex')
- `--ticker`: Trading pair symbol (e.g., BTC, ETH)
- `--size`: Order quantity per trade
- `--iter`: Number of trading cycles
- `--fill-timeout`: Maker order fill timeout in seconds (default: 5)
- `--sleep`: Sleep time in seconds after each step (default: 0)
- `--max-position`: When this parameter is set, the hedging mode will gradually build a position up to the specified maximum size while performing the hedge. The unit is in the base asset. For example, when running BTC, setting it to 0.1 means it will gradually build a position up to 0.1 BTC while hedging.
## Configuration
### Environment Variables
#### General Configuration
- `ACCOUNT_NAME`: The name of the current account in the environment variable, used for distinguishing between multiple account logs, customizable, not mandatory
#### Telegram Configuration (Optional)
- `TELEGRAM_BOT_TOKEN`: Telegram bot token
- `TELEGRAM_CHAT_ID`: Telegram chat ID
#### EdgeX Configuration
- `EDGEX_ACCOUNT_ID`: Your EdgeX account ID
- `EDGEX_STARK_PRIVATE_KEY`: Your EdgeX api private key
- `EDGEX_BASE_URL`: EdgeX API base URL (default: https://pro.edgex.exchange)
- `EDGEX_WS_URL`: EdgeX WebSocket URL (default: wss://quote.edgex.exchange)
#### Backpack Configuration
- `BACKPACK_PUBLIC_KEY`: Your Backpack API key
- `BACKPACK_SECRET_KEY`: Your Backpack API Secret
#### Paradex Configuration
- `PARADEX_L1_ADDRESS`: L1 wallet address
- `PARADEX_L2_PRIVATE_KEY`: L2 wallet private key (click avatar, wallet, "copy paradex private key")
#### Aster Configuration
- `ASTER_API_KEY`: Your Aster API Key
- `ASTER_SECRET_KEY`: Your Aster API Secret
#### Lighter Configuration
- `API_KEY_PRIVATE_KEY`: Your Lighter API private key
- `LIGHTER_ACCOUNT_INDEX`: Lighter account index
- `LIGHTER_API_KEY_INDEX`: Lighter API key index
#### GRVT Configuration
- `GRVT_TRADING_ACCOUNT_ID`: Your GRVT trading account ID
- `GRVT_PRIVATE_KEY`: Your GRVT private key
- `GRVT_API_KEY`: Your GRVT API key
#### Extended Configuration
- `EXTENDED_API_KEY`: Your Extended API key
- `EXTENDED_STARK_KEY_PUBLIC`: Your Stark public key
- `EXTENDED_STARK_KEY_PRIVATE`: Your Stark private key
- `EXTENDED_VAULT`: Your Extended Vault ID
#### Apex Configuration
- `APEX_API_KEY`: Your Apex API key
- `APEX_API_KEY_PASSPHRASE`: Your Apex API key passphrase
- `APEX_API_KEY_SECRET`: Your Apex API key secret
- `APEX_OMNI_KEY_SEED`: Your Apex Omni key seed
**How to get LIGHTER_ACCOUNT_INDEX**:
1. Add your wallet address to the end of the following URL:
```
https://mainnet.zklighter.elliot.ai/api/v1/account?by=l1_address&value=
```
2. Open this URL in your browser
3. Search for "account_index" in the results - if you have subaccounts, there will be multiple account_index values. The shorter one is your main account, and the longer ones are your subaccounts.
### Command Line Arguments
- `--exchange`: Exchange to use: 'edgex', 'backpack', 'paradex', 'aster', 'lighter', 'grvt', or 'extended' (default: edgex)
- `--ticker`: Base asset symbol (e.g., ETH, BTC, SOL). Contract ID is auto-resolved.
- `--quantity`: Order quantity (default: 0.1)
- `--take-profit`: Take profit percent (e.g., 0.02 means 0.02%)
- `--direction`: Trading direction: 'buy' or 'sell' (default: buy)
- `--env-file`: Account configuration file (default: .env)
- `--max-orders`: Maximum number of active orders (default: 40)
- `--wait-time`: Wait time between orders in seconds (default: 450)
- `--grid-step`: Minimum distance in percentage to the next close order price (default: -100, means no restriction)
- `--stop-price`: When `direction` is 'buy', stop trading and exit the program when price >= stop-price; 'sell' logic is opposite (default: -1, no price-based termination). The purpose of this parameter is to prevent orders from being placed at "high points for long positions or low points for short positions that you consider".
- `--pause-price`: When `direction` is 'buy', pause trading when price >= pause-price and resume trading when price falls back below pause-price; 'sell' logic is opposite (default: -1, no price-based pausing). The purpose of this parameter is to prevent orders from being placed at "high points for long positions or low points for short positions that you consider".
- `--boost`: Enable Boost mode for volume boosting on Aster and Backpack exchanges (only available for 'aster' and 'backpack')
Boost trading logic: Place maker orders to open positions, immediately close with taker orders after fill, repeat this cycle. Wear consists of one maker order, one taker order fees, and slippage.
## Logging
The bot provides comprehensive logging:
- **Transaction Logs**: CSV files with order details
- **Debug Logs**: Detailed activity logs with timestamps
- **Console Output**: Real-time status updates
- **Error Handling**: Comprehensive error logging and handling
## Q & A
### How to configure multiple accounts for the same exchange on the same device?
1. Create a .env file for each account, such as account_1.env, account_2.env
2. Set `ACCOUNT_NAME=` in each account's .env file, such as `ACCOUNT_NAME=MAIN`.
3. Configure the API keys or secrets for each account in each file
4. Use different `--env-file` parameters in the command line to start different accounts, such as `python runbot.py --env-file account_1.env [other parameters...]`
### How to configure multiple accounts for different exchanges on the same device?
Configure all different exchange accounts in the same `.env` file, then use different `--exchange` parameters in the command line to start different exchanges, such as `python runbot.py --exchange backpack [other parameters...]`
### How to configure multiple contracts for the same account and exchange on the same device?
Configure the account in the `.env` file, then use different `--ticker` parameters in the command line to start different contracts, such as `python runbot.py --ticker ETH [other parameters...]`
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests if applicable
5. Submit a pull request
## License
This project is licensed under a Non-Commercial License - see the [LICENSE](LICENSE) file for details.
**Important Notice**: This software is for personal learning and research purposes only. Commercial use is strictly prohibited. For commercial use, please contact the author for a commercial license.
## Disclaimer
This software is for educational and research purposes only. Trading cryptocurrencies involves significant risk and can result in substantial financial losses. Use at your own risk and never trade with money you cannot afford to lose.
## /apex_requirements.txt
# Apex's SDK couldn't handle requirements properly, so install the required packages manually
# ref: https://github.com/ApeX-Protocol/apexpro-openapi/pull/28
websocket-client
websockets
dateparser==1.0.0
ecdsa==0.16.0
eth_keys
eth-account==0.13.7
mpmath
pytest>=4.4.0,<5.0.0
requests-mock==1.6.0
requests>=2.32.3,<3.0.0
setuptools>=50.3.2
sympy==1.6
tox==3.13.2
web3==6.0.0
numpy
# Apex Python SDK
apexomni
## /docs/ADDING_EXCHANGES.md
# Adding New Exchanges
This document explains how to add support for new exchanges to the modular trading bot.
## Overview
The trading bot has been modularized to support multiple exchanges through a plugin-like architecture. Each exchange is implemented as a separate client that inherits from `BaseExchangeClient`. The bot currently supports EdgeX, Backpack, Paradex, and GRVT exchanges.
## Architecture
```
exchanges/
├── __init__.py # Module initialization
├── base.py # Base exchange client interface
├── edgex.py # EdgeX exchange implementation
├── backpack.py # Backpack exchange implementation
├── paradex.py # Paradex exchange implementation
├── aster.py # Aster exchange implementation
├── factory.py # Exchange factory for dynamic selection
└── your_exchange.py # Your new exchange implementation
```
## Steps to Add a New Exchange
### 1. Create Exchange Client
Create a new file `exchanges/your_exchange.py` that implements the `BaseExchangeClient` interface:
```python
import os
from decimal import Decimal
from typing import Dict, Any, List, Optional, Tuple
from .base import BaseExchangeClient, OrderResult, OrderInfo, query_retry
from helpers.logger import TradingLogger
class YourExchangeClient(BaseExchangeClient):
"""Your exchange client implementation."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
# Initialize your exchange-specific client here
self.api_key = os.getenv('YOUR_EXCHANGE_API_KEY')
self.secret_key = os.getenv('YOUR_EXCHANGE_SECRET_KEY')
# Initialize logger
self.logger = TradingLogger(exchange="your_exchange", ticker=self.config.ticker, log_to_console=False)
# Initialize your exchange SDK
self.client = YourExchangeSDK(self.api_key, self.secret_key)
def _validate_config(self) -> None:
"""Validate exchange-specific configuration."""
required_env_vars = ['YOUR_EXCHANGE_API_KEY', 'YOUR_EXCHANGE_SECRET_KEY']
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
async def connect(self) -> None:
"""Connect to the exchange (WebSocket, etc.)."""
# Establish connection to your exchange
await self.client.connect()
# Wait for connection to establish
await asyncio.sleep(2)
async def disconnect(self) -> None:
"""Disconnect from the exchange."""
# Clean up connections
await self.client.disconnect()
def get_exchange_name(self) -> str:
"""Get the exchange name."""
return "your_exchange"
def setup_order_update_handler(self, handler) -> None:
"""Setup order update handler for WebSocket."""
self._order_update_handler = handler
# Set up WebSocket or polling for order updates
self.client.setup_order_callback(handler)
@query_retry(default_return=(0, 0))
async def fetch_bbo_prices(self, contract_id: str) -> Tuple[Decimal, Decimal]:
"""Fetch best bid and offer prices for a contract."""
# Implement market data fetching
order_book = await self.client.get_order_book(contract_id)
best_bid = Decimal(order_book['bids'][0][0]) if order_book['bids'] else 0
best_ask = Decimal(order_book['asks'][0][0]) if order_book['asks'] else 0
return best_bid, best_ask
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place an open order."""
try:
# Get current market prices
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if direction == 'buy':
order_price = best_ask - self.config.tick_size
else:
order_price = best_bid + self.config.tick_size
# Place order with your exchange
order_result = await self.client.place_order(
symbol=contract_id,
side=direction,
quantity=float(quantity),
price=float(self.round_to_tick(order_price)),
order_type='LIMIT'
)
return OrderResult(
success=True,
order_id=order_result['id'],
side=direction,
size=quantity,
price=order_price,
status=order_result['status']
)
except Exception as e:
return OrderResult(success=False, error_message=str(e))
async def place_close_order(self, contract_id: str, quantity: Decimal, price: Decimal, side: str) -> OrderResult:
"""Place a close order."""
try:
# Adjust price to ensure maker order
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if side == 'sell' and price <= best_bid:
adjusted_price = best_bid + self.config.tick_size
elif side == 'buy' and price >= best_ask:
adjusted_price = best_ask - self.config.tick_size
else:
adjusted_price = price
order_result = await self.client.place_order(
symbol=contract_id,
side=side,
quantity=float(quantity),
price=float(self.round_to_tick(adjusted_price)),
order_type='LIMIT'
)
return OrderResult(
success=True,
order_id=order_result['id'],
side=side,
size=quantity,
price=adjusted_price,
status=order_result['status']
)
except Exception as e:
return OrderResult(success=False, error_message=str(e))
async def cancel_order(self, order_id: str) -> OrderResult:
"""Cancel an order."""
try:
await self.client.cancel_order(order_id)
return OrderResult(success=True)
except Exception as e:
return OrderResult(success=False, error_message=str(e))
@query_retry()
async def get_order_info(self, order_id: str) -> Optional[OrderInfo]:
"""Get order information."""
try:
order_data = await self.client.get_order(order_id)
return OrderInfo(
order_id=order_data['id'],
side=order_data['side'].lower(),
size=Decimal(order_data['quantity']),
price=Decimal(order_data['price']),
status=order_data['status'],
filled_size=Decimal(order_data.get('filled_quantity', 0)),
remaining_size=Decimal(order_data.get('remaining_quantity', 0))
)
except Exception as e:
self.logger.log(f"Error getting order info: {e}", "ERROR")
return None
@query_retry(default_return=[])
async def get_active_orders(self, contract_id: str) -> List[OrderInfo]:
"""Get active orders for a contract."""
try:
orders = await self.client.get_open_orders(contract_id)
order_list = []
for order in orders:
order_list.append(OrderInfo(
order_id=order['id'],
side=order['side'].lower(),
size=Decimal(order['quantity']),
price=Decimal(order['price']),
status=order['status'],
filled_size=Decimal(order.get('filled_quantity', 0)),
remaining_size=Decimal(order.get('remaining_quantity', 0))
))
return order_list
except Exception as e:
self.logger.log(f"Error getting active orders: {e}", "ERROR")
return []
@query_retry(default_return=0)
async def get_account_positions(self) -> Decimal:
"""Get account positions."""
try:
positions = await self.client.get_positions()
for position in positions:
if position['symbol'] == self.config.contract_id:
return abs(Decimal(position['size']))
return Decimal(0)
except Exception as e:
self.logger.log(f"Error getting positions: {e}", "ERROR")
return Decimal(0)
async def get_contract_attributes(self) -> Tuple[str, Decimal]:
"""Get contract ID and tick size for a ticker."""
ticker = self.config.ticker
if not ticker:
raise ValueError("Ticker is empty")
# Get market info from your exchange
markets = await self.client.get_markets()
for market in markets:
if market['base_asset'] == ticker and market['quote_asset'] == 'USDT':
self.config.contract_id = market['symbol']
self.config.tick_size = Decimal(market['tick_size'])
return self.config.contract_id, self.config.tick_size
raise ValueError(f"Contract not found for ticker: {ticker}")
```
### 2. Register the Exchange
Add your exchange to the factory in `exchanges/factory.py`:
```python
from .your_exchange import YourExchangeClient
class ExchangeFactory:
_registered_exchanges = {
'edgex': EdgeXClient,
'backpack': BackpackClient,
'paradex': ParadexClient,
'aster': AsterClient,
'your_exchange': YourExchangeClient, # Add this line
}
```
### 3. Update Module Imports
Add your exchange to `exchanges/__init__.py`:
```python
from .your_exchange import YourExchangeClient
__all__ = ['BaseExchangeClient', 'EdgeXClient', 'BackpackClient', 'ParadexClient', 'AsterClient', 'YourExchangeClient', 'ExchangeFactory']
```
### 4. Test Your Implementation
Test your exchange client:
```python
from exchanges import ExchangeFactory
# Test exchange creation
config = {
'ticker': 'BTC',
'quantity': Decimal('0.01'),
'take_profit': Decimal('0.5'),
'direction': 'buy',
'max_orders': 5,
'wait_time': 10,
'grid_step': Decimal('0.1'),
'stop_price': Decimal('-1'),
'pause_price': Decimal('-1')
}
client = ExchangeFactory.create_exchange('your_exchange', config)
print(f"Created {client.get_exchange_name()} client")
```
## Required Methods
All exchange clients must implement these methods from `BaseExchangeClient`:
### Core Methods
- `_validate_config()` - Validate exchange-specific configuration
- `connect()` - Establish connection to exchange
- `disconnect()` - Clean up connections
- `get_exchange_name()` - Return exchange name
### Order Management
- `place_open_order(contract_id, quantity, direction)` - Place opening orders
- `place_close_order(contract_id, quantity, price, side)` - Place closing orders
- `cancel_order(order_id)` - Cancel orders
- `get_order_info(order_id)` - Get order details
- `get_active_orders(contract_id)` - Get all active orders
### Data Retrieval
- `get_account_positions()` - Get account positions
- `setup_order_update_handler(handler)` - Set up real-time order updates
- `fetch_bbo_prices(contract_id)` - Get best bid/offer prices
- `get_contract_attributes()` - Get contract ID and tick size for ticker
## Data Structures
### OrderResult
```python
@dataclass
class OrderResult:
success: bool
order_id: Optional[str] = None
side: Optional[str] = None
size: Optional[Decimal] = None
price: Optional[Decimal] = None
status: Optional[str] = None
error_message: Optional[str] = None
filled_size: Optional[Decimal] = None
```
### OrderInfo
```python
@dataclass
class OrderInfo:
order_id: str
side: str
size: Decimal
price: Decimal
status: str
filled_size: Decimal = 0.0
remaining_size: Decimal = 0.0
cancel_reason: str = ''
```
## Environment Variables
Each exchange requires specific environment variables. Here are the current implementations:
### EdgeX
- `EDGEX_ACCOUNT_ID` - Your EdgeX account ID
- `EDGEX_STARK_PRIVATE_KEY` - Your Stark private key
- `EDGEX_BASE_URL` - API base URL (optional, defaults to https://pro.edgex.exchange)
- `EDGEX_WS_URL` - WebSocket URL (optional, defaults to wss://quote.edgex.exchange)
### Backpack
- `BACKPACK_PUBLIC_KEY` - Your Backpack public key
- `BACKPACK_SECRET_KEY` - Your Backpack secret key (base64 encoded)
### Paradex
- `PARADEX_L1_ADDRESS` - Your Ethereum L1 address
- `PARADEX_L2_PRIVATE_KEY` - Your L2 private key (hex format)
- `PARADEX_L2_ADDRESS` - Your L2 address (optional)
- `PARADEX_ENVIRONMENT` - Environment (prod/testnet/nightly, defaults to prod)
### GRVT
- `GRVT_TRADING_ACCOUNT_ID` - Your GRVT trading account ID
- `GRVT_PRIVATE_KEY` - Your GRVT private key
- `GRVT_API_KEY` - Your GRVT API key
- `GRVT_ENVIRONMENT` - Environment (prod/testnet/staging/dev, defaults to prod)
## Usage
Once implemented, users can select your exchange:
```bash
python runbot.py --exchange your_exchange --ticker BTC --quantity 0.01 --direction buy
```
## Best Practices
1. **Error Handling**: Always return appropriate `OrderResult` objects with error messages
2. **Async/Await**: All methods should be async for non-blocking operations
3. **Configuration**: Use environment variables for API keys and endpoints
4. **Logging**: Use the provided `TradingLogger` for consistent logging
5. **Testing**: Test thoroughly with paper trading before live trading
6. **Documentation**: Document any exchange-specific requirements
7. **Retry Logic**: Use the `@query_retry` decorator for API calls that might fail
8. **Price Rounding**: Use `self.round_to_tick()` for price adjustments
9. **WebSocket Handling**: Implement proper order update handlers for real-time updates
10. **Order Types**: Use POST_ONLY orders to avoid taker fees when possible
## Current Exchange Implementations
### EdgeX
- Uses official EdgeX SDK
- Supports WebSocket order updates
- Implements retry logic for POST_ONLY rejections
- Requires Stark private key authentication
### Backpack
- Uses official Backpack SDK (bpx)
- Custom WebSocket manager for order updates
- ED25519 signature authentication
- Supports perpetual futures
### Paradex
- Uses official Paradex SDK (paradex_py)
- L2 credentials only (no L1 private key required)
- WebSocket subscription for order updates
- Supports StarkNet-based trading
### GRVT
- Uses official GRVT SDK (grvt-pysdk)
- REST API and WebSocket support
- Private key authentication
- Supports perpetual futures trading
## Example: Binance Futures
Here's a simplified example of how you might implement Binance Futures:
```python
import ccxt
class BinanceFuturesClient(BaseExchangeClient):
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.api_key = os.getenv('BINANCE_API_KEY')
self.secret_key = os.getenv('BINANCE_SECRET_KEY')
self.client = ccxt.binance({
'apiKey': self.api_key,
'secret': self.secret_key,
'sandbox': False, # Set to True for testnet
'options': {'defaultType': 'future'}
})
def _validate_config(self) -> None:
if not self.api_key or not self.secret_key:
raise ValueError("BINANCE_API_KEY and BINANCE_SECRET_KEY required")
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
try:
order = self.client.create_order(
symbol=contract_id,
type='limit',
side=direction,
amount=float(quantity),
price=float(self.round_to_tick(await self._get_market_price(contract_id, direction))),
params={'postOnly': True}
)
return OrderResult(success=True, order_id=order['id'])
except Exception as e:
return OrderResult(success=False, error_message=str(e))
```
This modular approach makes it easy to add new exchanges while maintaining a consistent interface for the trading bot.
## /docs/telegram-bot-setup-en.md
# How to Create a Telegram Bot from Scratch
This guide will walk you through the complete process of creating a Telegram bot from scratch, including bot creation, configuration, and integration with your application.
### Step 1: Start Chatting with BotFather
1. Open Telegram and search for `@BotFather`
2. Start a chat with BotFather
3. Send `/start` to begin
### Step 2: Create a New Bot
1. Send the `/newbot` command
2. Choose a name for your bot (display name)
3. Choose a username for your bot (must end with 'bot', e.g., `my_trading_bot`)
4. BotFather will provide you with a **bot token** - save it securely!
### Step 3: Get Your Bot Token
After creating the bot, BotFather will provide a token in the following format:
```
123456789:ABCdefGHIjklMNOpqrsTUVwxyz
```
**Important Security Notes:**
- Never share this token publicly
- Store it in environment variables
- Use `.env` file for local development
- Use secure key management in production environments
### Step 4: Get Your Chat ID
Send any message to your newly created bot on Telegram.
Then replace {bot_token} in the following URL with the token provided when creating the bot, and enter the URL in your web browser:
https://api.telegram.org/bot{bot_token}/getUpdates
For example, if your token is `123456789:ABCdefGHIjklMNOpqrsTUVwxyz`, change the URL to:
`https://api.telegram.org/bot123456789:ABCdefGHIjklMNOpqrsTUVwxyz/getUpdates`
After opening the URL, you will see information similar to the following:
```
{"ok":true,"result":[{"update_id":880712345,
"message":{"message_id":1,"from":{"id":5586512345,"is_bot":false,"first_name":"yourQuantGuy","username":"yourquantguy","language_code":"en","is_premium":true},"chat":{"id":5586512345,"first_name":"yourQuantGuy","username":"yourquantguy","type":"private"},"date":1759103123,"text":"/start","entities":[{"offset":0,"length":6,"type":"bot_command"}]}}]}
```
Find the id in "chat":{"id": ... }, which is 5586512345 in the example above.
### Step 5: Update .env File
Add your Telegram bot token and chat ID to the .env file in your script:
```
TELEGRAM_BOT_TOKEN=
TELEGRAM_CHAT_ID=
```
For example:
```
TELEGRAM_BOT_TOKEN="123456789:ABCdefGHIjklMNOpqrsTUVwxyz"
TELEGRAM_CHAT_ID=5586512345
LANGUAGE=EN
```
## /docs/telegram-bot-setup.md
# 如何从零开始创建 Telegram 机器人
本指南将带您完成从零开始创建 Telegram 机器人的完整过程,包括机器人创建、配置以及与您的应用程序集成。
### 步骤 1:与 BotFather 开始聊天
1. 打开 Telegram 并搜索 `@BotFather`
2. 与 BotFather 开始聊天
3. 发送 `/start` 开始
### 步骤 2:创建新机器人
1. 发送 `/newbot` 命令
2. 为您的机器人选择一个名称(显示名称)
3. 为您的机器人选择一个用户名(必须以 'bot' 结尾,例如 `my_trading_bot`)
4. BotFather 将为您提供一个**机器人令牌** - 请安全保存!
### 步骤 3:获取您的机器人令牌
创建机器人后,BotFather 将提供一个如下格式的令牌:
```
123456789:ABCdefGHIjklMNOpqrsTUVwxyz
```
**重要安全注意事项:**
- 永远不要公开分享此令牌
- 将其存储在环境变量中
- 在本地开发中使用 `.env` 文件
- 在生产环境中使用安全的密钥管理
### 步骤 4:获取您的对话 ID
给你新创建的机器人在Telegram上发一条任意信息。
然后将以下网址中的{bot_token}用创建机器人时提供的令牌代替,在网页中输入以下网址:
https://api.telegram.org/bot{bot_token}/getUpdates
例如,如果你的令牌是 `123456789:ABCdefGHIjklMNOpqrsTUVwxyz`,则将网址改为:
`https://api.telegram.org/bot123456789:ABCdefGHIjklMNOpqrsTUVwxyz/getUpdates`
打开网之后,你会看到类似于以下的信息:
```
{"ok":true,"result":[{"update_id":880712345,
"message":{"message_id":1,"from":{"id":5586512345,"is_bot":false,"first_name":"yourQuantGuy","username":"yourquantguy","language_code":"en","is_premium":true},"chat":{"id":5586512345,"first_name":"yourQuantGuy","username":"yourquantguy","type":"private"},"date":1759103123,"text":"/start","entities":[{"offset":0,"length":6,"type":"bot_command"}]}}]}
```
找到 "chat":{"id": ... }中的 id,即示例中的 5586512345。
### 步骤 5:更新 .env 文件
在脚本的 .env 文件中加入你的Telegram机器人令牌,以及对话ID。
```
TELEGRAM_BOT_TOKEN=
TELEGRAM_CHAT_ID=
```
比如:
```
TELEGRAM_BOT_TOKEN="123456789:ABCdefGHIjklMNOpqrsTUVwxyz"
TELEGRAM_CHAT_ID=5586512345
LANGUAGE=CN
```
## /env_example.txt
# used to distinguish log files for multiple accounts, optional
ACCOUNT_NAME=
# EdgeX Account Credentials (REQUIRED for API access)
# Find these in your EdgeX account under "API Management"
EDGEX_ACCOUNT_ID=your_account_id_here
EDGEX_STARK_PRIVATE_KEY=your_stark_private_key_here
# Backpack Account Credentials (REQUIRED for API access)
# Find these in your Backpack account under "Portfolio" -> "Settings" -> "API Keys"
BACKPACK_PUBLIC_KEY=your_api_key
BACKPACK_SECRET_KEY=your_api_secret
# Paradex Configuration
# user icon -> "Wallet" -> "Ethereum Address" and "Copy Paradex Private key"
PARADEX_L1_ADDRESS=your_generated_l1_address_here
PARADEX_L2_PRIVATE_KEY=your_generated_l2_private_key_here
PARADEX_ENVIRONMENT=prod # or testnet
# GRVT Configuration
# Get these from your GRVT account settings
GRVT_TRADING_ACCOUNT_ID=your_trading_account_id_here
GRVT_PRIVATE_KEY=your_private_key_here
GRVT_API_KEY=your_api_key_here
GRVT_ENVIRONMENT=prod # or testnet/staging/dev
# Aster Configuration
ASTER_API_KEY=your_aster_api_key
ASTER_SECRET_KEY=your_aster_secret_key
# Lighter Configuration
API_KEY_PRIVATE_KEY=your_api_key_private_key_here
LIGHTER_ACCOUNT_INDEX=0
LIGHTER_API_KEY_INDEX=0
# Extended Configuration
EXTENDED_API_KEY=your_extended_api_key
EXTENDED_STARK_KEY_PUBLIC=your_stark_public_key
EXTENDED_STARK_KEY_PRIVATE=your_stark_private_key
EXTENDED_VAULT=extended_vault_id
# Apex Configuration
# Generate API_KEY, API_KEY_PASSPHRASE, API_KEY_SECRET by clicking <Generate API> on key management page
# Generate OMNI_KEY_SEED by clicking <Omni Key> on key management page
APEX_API_KEY=your_apex_api_key
APEX_API_KEY_PASSPHRASE=your_apex_api_key_passphrase
APEX_API_KEY_SECRET=your_apex_api_key_secret
APEX_OMNI_KEY_SEED=your_apex_omni_key_seed
# Nado Configuration
# Get your private key from Nado account settings
NADO_PRIVATE_KEY=your_nado_private_key_here
NADO_MODE=MAINNET # or DEVNET
# Ethereal Configuration
ETHEREAL_BASE_URL=https://api.ethereal.trade
ETHEREAL_RPC_URL=https://rpc.ethereal.trade
ETHEREAL_CHAIN_ID=5064014
ETHEREAL_PRIVATE_KEY=your_private_key # required for signing orders
ETHEREAL_SUBACCOUNT_ID=your_subaccount_id
ETHEREAL_ACCOUNT_NAME=primary
# StandX Configuration
# BSC/Solana wallet address and private key
STANDX_WALLET_ADDRESS=your_wallet_address_here
STANDX_PRIVATE_KEY=your_wallet_private_key_here
STANDX_CHAIN=bsc # or solana
# Notification (optional)
# guide: https://www.feishu.cn/hc/zh-CN/articles/185289387886-%E6%B6%88%E6%81%AF%E5%8A%A9%E6%89%8B-%E6%9C%BA%E5%99%A8%E4%BA%BA
LARK_TOKEN=
TELEGRAM_BOT_TOKEN=
TELEGRAM_CHAT_ID=
# Logging
LOG_TO_CONSOLE=true
LOG_TO_FILE=true
LOG_FILE=trading_log.csv
TIMEZONE=Asia/Shanghai
# EdgeX API Endpoints
EDGEX_BASE_URL=https://pro.edgex.exchange
EDGEX_WS_URL=wss://quote.edgex.exchange
## /exchanges/__init__.py
```py path="/exchanges/__init__.py"
"""
Exchange clients module for perp-dex-tools.
This module provides a unified interface for different exchange implementations.
"""
from .base import BaseExchangeClient, query_retry
from .factory import ExchangeFactory
__all__ = [
'BaseExchangeClient', 'EdgeXClient', 'BackpackClient', 'ParadexClient',
'GrvtClient', 'ExchangeFactory', 'query_retry'
]
```
## /exchanges/apex.py
```py path="/exchanges/apex.py"
# coding: utf-8
"""
Apex exchange client implementation.
"""
import os
import asyncio
import json
import time
import traceback
import types
from decimal import Decimal
from typing import Dict, List, Optional, Tuple
from apexomni import constants as apex_constants, FailedRequestError
from apexomni._websocket_stream import _ApexWebSocketManager, PRIVATE_WSS
from apexomni.http_private_sign import HttpPrivateSign
from apexomni.websocket_api import WebSocket as ApexWebSocketClient
from .base import BaseExchangeClient, OrderResult, OrderInfo, query_retry
from helpers.logger import TradingLogger
class ApexClient(BaseExchangeClient):
"""Apex exchange client implementation"""
def __init__(self, config: Dict[str, any]):
"""Initialize Apex client."""
super().__init__(config)
# Apex credentials from environment
self.api_key = os.getenv('APEX_API_KEY')
self.api_key_passphrase = os.getenv('APEX_API_KEY_PASSPHRASE')
self.api_key_secret = os.getenv('APEX_API_KEY_SECRET')
self.omni_key_seed = os.getenv('APEX_OMNI_KEY_SEED')
self.api_key_credentials = {
'key': self.api_key, 'secret': self.api_key_secret,
'passphrase': self.api_key_passphrase
}
self.environment = os.getenv('APEX_ENVIRONMENT', 'prod')
assert self.environment in {'prod', 'test'}, 'Apex environment can only be prod or test.'
if self.environment == 'prod':
self.http_base_url = apex_constants.APEX_OMNI_HTTP_MAIN
self.ws_base_url = apex_constants.APEX_OMNI_WS_MAIN
self.network_id = apex_constants.NETWORKID_OMNI_MAIN_ARB
else:
self.http_base_url = apex_constants.APEX_OMNI_HTTP_TEST
self.ws_base_url = apex_constants.APEX_OMNI_WS_TEST
self.network_id = apex_constants.NETWORKID_TEST
# Initialize logger
self.logger = TradingLogger(exchange="apex", ticker=self.config.ticker, log_to_console=False)
# Initialize Apex clients
self._initialize_apex_clients()
self._order_update_handler = None
self.account_handler = None
# --- reconnection state ---
self._ws_task: Optional[asyncio.Task] = None
self._ws_stop = asyncio.Event()
self._ws_disconnected = asyncio.Event()
self._loop: Optional[asyncio.AbstractEventLoop] = None
def _initialize_apex_clients(self) -> None:
"""Initialize Apex REST and Websocket clients"""
try:
self.rest_client = HttpPrivateSign(
self.http_base_url, network_id=self.network_id,
zk_seeds=self.omni_key_seed, zk_l2Key='',
api_key_credentials=self.api_key_credentials
)
# According to official SDK, need to request for user data first:
# https://github.com/ApeX-Protocol/apexpro-openapi/blob/main/README_V3.md#zkkey-sign-create-order-method
# This will store user info in HttpPrivateSign instance
self.rest_client.configs_v3()
self.rest_client.get_account_v3()
self.ws_client = ApexWebSocketClient(
endpoint=self.ws_base_url,
api_key_credentials=self.api_key_credentials
)
except Exception as e:
raise ValueError(f"Failed to initialize Apex client: {e}")
def _validate_config(self) -> None:
"""Validate Apex configuration."""
required_env_vars = ['APEX_API_KEY', 'APEX_API_KEY_PASSPHRASE',
'APEX_API_KEY_SECRET', 'APEX_OMNI_KEY_SEED']
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
return
async def connect(self) -> None:
"""Connect to Apex Websocket with auto-reconnect."""
self._loop = asyncio.get_running_loop()
try:
# Patch _on_close/_on_open (SDK did nothing but logging)
self.ws_client._on_close = types.MethodType(
lambda _: self._loop.call_soon_threadsafe(self._ws_disconnected.set),
self.ws_client
)
self.ws_client._on_open = types.MethodType(
lambda _: self.logger.log("[WS] connected", "INFO"),
self.ws_client
)
except Exception as e:
self.logger.log(f"[WS] failed to set hooks: {e}", "ERROR")
if not self._ws_task or self._ws_task.done():
self._ws_task = asyncio.create_task(self._run_private_ws())
async def _run_private_ws(self):
"""Tiny reconnect loop with exponential backoff."""
backoff = 1.0
while not self._ws_stop.is_set():
try:
# connect
self.ws_client.ws_private = _ApexWebSocketManager(**self.ws_client.kwargs)
self.ws_client.ws_private._connect(self.ws_client.endpoint + PRIVATE_WSS)
self.ws_client.account_info_stream_v3(self.account_handler)
self.logger.log("[WS] private connected", "INFO")
backoff = 1.0
# wait until either disconnect or stop
self._ws_disconnected.clear()
done, _ = await asyncio.wait(
{asyncio.create_task(self._ws_stop.wait()),
asyncio.create_task(self._ws_disconnected.wait()),},
return_when=asyncio.FIRST_COMPLETED,
)
if self._ws_stop.is_set():
break
self.logger.log(
"[WS] disconnected; attempting to reconnect...", "WARNING"
)
except Exception as e:
self.logger.log(f"[WS] connect error: {e}", "ERROR")
finally:
# ensure socket is closed before retry
try:
self.ws_client.exit()
except Exception:
pass
# backoff and retry
await asyncio.sleep(backoff)
backoff = min(60.0, backoff * 2)
# Final cleanup (on stop)
try:
self.ws_client.exit()
except Exception:
pass
async def disconnect(self) -> None:
"""Disconnect from Apex."""
try:
self._ws_stop.set()
if self._ws_task:
await self._ws_task
except Exception:
pass
try:
if hasattr(self, "private_rest_client") and self.rest_client:
self.rest_client._exit()
if hasattr(self, "ws_client"):
self.ws_client.exit()
except Exception as e:
self.logger.log(f"Error during Apex disconnect: {e}", "ERROR")
# ---------------------------
# Utility / Name
# ---------------------------
def get_exchange_name(self) -> str:
"""Get the exchange name."""
return "apex"
# ---------------------------
# WS Handlers
# ---------------------------
def setup_order_update_handler(self, handler) -> None:
"""Setup order update handler for WebSocket."""
self._order_update_handler = handler
def order_update_handler(message):
"""Handle order updates from WebSocket."""
try:
# Parse the message structure
if isinstance(message, str):
message = json.loads(message)
# Check if this is a trade-event with ORDER_UPDATE
content = message.get("contents", {})
topic = message.get("topic", "")
if topic != "ws_zk_accounts_v3":
return
# Extract order data from the nested structure
orders = content.get('orders', [])
# on websocket starting, Apex will send the historical filled orders, could be confusing
if len(orders) > 1 and not content.get('fills'):
return
if not orders:
return
order = orders[0] # Get the first order
if order.get('symbol') != self.config.contract_id:
return
order_id = order.get('id')
status = order.get('status')
side = order.get('side', '').lower()
filled_size = order.get('cumSuccessFillSize')
remaining_size = order.get('remainingSize')
if side == self.config.close_order_side:
order_type = "CLOSE"
else:
order_type = "OPEN"
if status in ['OPEN', 'PARTIALLY_FILLED', 'FILLED', 'CANCELED']:
if self._order_update_handler:
self._order_update_handler({
'order_id': order_id,
'side': side,
'order_type': order_type,
'status': status,
'size': order.get('size'),
'price': order.get('price'),
'contract_id': order.get('symbol'),
'filled_size': filled_size
})
except Exception as e:
self.logger.log(f"Error handling order update: {e}", "ERROR")
self.logger.log(f"Traceback: {traceback.format_exc()}", "ERROR")
try:
self.account_handler = order_update_handler
except Exception as e:
self.logger.log(f"Could not add trade-event handler: {e}", "ERROR")
# ---------------------------
# REST-ish helpers
# ---------------------------
@query_retry(default_return=(0, 0))
async def fetch_bbo_prices(self, contract_id: str) -> Tuple[Decimal, Decimal]:
"""Fetch best bid and ask price using official SDK"""
order_book = self.rest_client.depth_v3(symbol=contract_id)
order_book_data = order_book['data']
# Extract bids and asks from the entry
bids = order_book_data.get('b', [])
asks = order_book_data.get('a', [])
# Best bid is the highest price someone is willing to buy at
best_bid = Decimal(max(bids, key=lambda x: Decimal(x[0]))[0])
# Best ask is the lowest price someone is willing to sell at
best_ask = Decimal(min(asks, key=lambda x: Decimal(x[0]))[0])
return best_bid, best_ask
async def get_order_price(self, direction: str) -> Decimal:
"""Get the price of an order with Apex using official SDK."""
best_bid, best_ask = await self.fetch_bbo_prices(self.config.contract_id)
if best_bid <= 0 or best_ask <= 0:
self.logger.log("Invalid bid/ask prices", "ERROR")
raise ValueError("Invalid bid/ask prices")
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
order_price = best_ask - self.config.tick_size
else:
# For sell orders, place slightly above best bid to ensure execution
order_price = best_bid + self.config.tick_size
return self.round_to_tick(order_price)
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place an open order with Apex using official SDK with retry logic for POST_ONLY rejections."""
max_retries = 15
retry_count = 0
while retry_count < max_retries:
try:
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='Invalid bid/ask prices')
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
order_price = best_ask - self.config.tick_size
side = 'buy'
else:
# For sell orders, place slightly above best bid to ensure execution
order_price = best_bid + self.config.tick_size
side = 'sell'
# Place the order using official SDK (post-only to ensure maker order)
order_result = self.rest_client.create_order_v3(
symbol=contract_id,
size=str(quantity),
price=str(self.round_to_tick(order_price)),
side=side.upper(),
type='LIMIT',
timestampSeconds=time.time(),
timeInForce='POST_ONLY',
)
if not order_result or 'data' not in order_result:
return OrderResult(success=False, error_message='Failed to place order')
# Extract order ID from response
order_id = order_result['data'].get('id')
if not order_id:
return OrderResult(success=False, error_message='No order ID in response')
# Check order status after a short delay to see if it was rejected
await asyncio.sleep(0.01)
order_info = await self.get_order_info(order_id)
if order_info:
if order_info.status == 'CANCELED':
if retry_count < max_retries - 1:
retry_count += 1
continue
else:
return OrderResult(success=False, error_message=f'Order rejected after {max_retries} attempts')
elif order_info.status in ['OPEN', 'PENDING', 'PARTIALLY_FILLED', 'FILLED']:
# Order successfully placed
return OrderResult(
success=True,
order_id=order_id,
side=side,
size=quantity,
price=order_price,
status=order_info.status
)
else:
return OrderResult(success=False, error_message=f'Unexpected order status: {order_info.status}')
else:
# Assume order is successful if we can't get info
return OrderResult(
success=True,
order_id=order_id,
side=side,
size=quantity,
price=order_price,
status='OPEN'
)
except Exception as e:
if retry_count < max_retries - 1:
retry_count += 1
await asyncio.sleep(0.1) # Wait before retry
continue
else:
return OrderResult(success=False, error_message=str(e))
return OrderResult(success=False, error_message='Max retries exceeded')
async def place_close_order(self, contract_id: str, quantity: Decimal, price: Decimal, side: str) -> OrderResult:
"""Place a close order with Apex using official SDK with retry logic for POST_ONLY rejections."""
max_retries = 15
retry_count = 0
while retry_count < max_retries:
try:
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='Invalid bid/ask prices')
# Adjust order price based on market conditions and side
adjusted_price = price
if side.lower() == 'sell':
# For sell orders, ensure price is above best bid to be a maker order
if price <= best_bid:
adjusted_price = best_bid + self.config.tick_size
elif side.lower() == 'buy':
# For buy orders, ensure price is below best ask to be a maker order
if price >= best_ask:
adjusted_price = best_ask - self.config.tick_size
adjusted_price = self.round_to_tick(adjusted_price)
# Place the order using official SDK (post-only to avoid taker fees)
order_result = self.rest_client.create_order_v3(
symbol=contract_id,
size=str(quantity),
price=str(adjusted_price),
side=side.upper(),
type='LIMIT',
timestampSeconds=time.time(),
timeInForce='POST_ONLY',
)
if not order_result or 'data' not in order_result:
return OrderResult(success=False, error_message='Failed to place order')
# Extract order ID from response
order_id = order_result['data'].get('id')
if not order_id:
return OrderResult(success=False, error_message='No order ID in response')
# Check order status after a short delay to see if it was rejected
await asyncio.sleep(0.01)
order_info = await self.get_order_info(order_id)
if order_info:
if order_info.status == 'CANCELED':
if retry_count < max_retries - 1:
retry_count += 1
continue
else:
return OrderResult(success=False, error_message=f'Close order rejected after {max_retries} attempts')
elif order_info.status in ['OPEN', 'PENDING', 'PARTIALLY_FILLED', 'FILLED']:
# Order successfully placed
return OrderResult(
success=True,
order_id=order_id,
side=side,
size=quantity,
price=adjusted_price,
status=order_info.status
)
else:
return OrderResult(success=False, error_message=f'Unexpected close order status: {order_info.status}')
else:
# Assume order is successful if we can't get info
return OrderResult(
success=True,
order_id=order_id,
side=side,
size=quantity,
price=adjusted_price
)
except Exception as e:
if retry_count < max_retries - 1:
retry_count += 1
await asyncio.sleep(0.1) # Wait before retry
continue
else:
return OrderResult(success=False, error_message=str(e))
return OrderResult(success=False, error_message='Max retries exceeded for close order')
async def cancel_order(self, order_id: str) -> OrderResult:
"""Cancel an order with Apex using official SDK."""
try:
# Cancel the order using official SDK
cancel_result = self.rest_client.delete_order_v3(id=order_id)
if not cancel_result or 'data' not in cancel_result:
return OrderResult(success=False, error_message='Failed to cancel order')
return OrderResult(success=True)
except FailedRequestError as e:
# Apex's API return non-JSON response when trying to cancel a filled order
if 'Could not decode JSON' in e.message:
return OrderResult(success=False, error_message='Order has been filled')
else:
return OrderResult(success=False, error_message=e.message)
except Exception as e:
return OrderResult(success=False, error_message=str(e))
@query_retry()
async def get_order_info(self, order_id: str) -> Optional[OrderInfo]:
"""Get order information from Apex using official SDK."""
order_result = self.rest_client.get_order_v3(id=order_id)
if not order_result or 'data' not in order_result:
return None
order_data = order_result['data']
return OrderInfo(
order_id=order_data.get('id', ''),
side=order_data.get('side', '').lower(),
size=Decimal(order_data.get('size', 0)),
price=Decimal(order_data.get('price', 0)),
status=order_data.get('status', ''),
filled_size=Decimal(order_data.get('cumSuccessFillSize', 0)),
remaining_size=Decimal(order_data.get('size', 0)) - Decimal(order_data.get('cumSuccessFillSize', 0))
)
@query_retry(default_return=[])
async def get_active_orders(self, contract_id: str) -> List[OrderInfo]:
"""Get active orders for a symbol using official SDK."""
# Get active orders using official SDK
active_orders = self.rest_client.open_orders_v3()
if not active_orders or 'data' not in active_orders:
return []
# Filter orders for the specific contract and ensure they are dictionaries
# The API returns orders under 'dataList' key, not 'orderList'
order_list = active_orders['data']
contract_orders = []
for order in order_list:
if isinstance(order, dict) and order.get('symbol') == contract_id:
contract_orders.append(OrderInfo(
order_id=order.get('id', ''),
side=order.get('side', '').lower(),
size=Decimal(order.get('size', 0)),
price=Decimal(order.get('price', 0)),
status=order.get('status', ''),
filled_size=Decimal(order.get('cumSuccessFillSize', 0)),
remaining_size=Decimal(order.get('size', 0)) - Decimal(order.get('cumSuccessFillSize', 0))
))
return contract_orders
@query_retry(default_return=0)
async def get_account_positions(self) -> Decimal:
"""Get account positions using official SDK."""
account_data = self.rest_client.get_account_v3()
if not account_data or 'positions' not in account_data:
self.logger.log("No positions or failed to get positions", "WARNING")
position_amt = 0
else:
# The API returns positions under data.positionList
positions = account_data.get('positions', [])
if positions:
# Find position for current contract
position = None
for p in positions:
if isinstance(p, dict) and p.get('symbol') == self.config.contract_id:
position = p
break
if position:
position_amt = abs(Decimal(position.get('size', 0)))
else:
position_amt = 0
else:
position_amt = 0
return position_amt
async def get_contract_attributes(self) -> Tuple[str, Decimal]:
"""Get contract ID for a ticker."""
ticker = self.config.ticker
if len(ticker) == 0:
self.logger.log("Ticker is empty", "ERROR")
raise ValueError("Ticker is empty")
response = self.rest_client.configs_v3(symbol=ticker)
data = response.get('data', {})
if not data:
self.logger.log("Failed to get metadata", "ERROR")
raise ValueError("Failed to get metadata")
contract_list = data.get('contractConfig', {}).get('perpetualContract', [])
if not contract_list:
self.logger.log("Failed to get contract list", "ERROR")
raise ValueError("Failed to get contract list")
current_contract = None
for c in contract_list:
if c.get('crossSymbolName') == ticker + 'USDT':
current_contract = c
break
if not current_contract:
self.logger.log("Failed to get contract ID for ticker", "ERROR")
raise ValueError("Failed to get contract ID for ticker")
self.config.contract_id = current_contract.get('symbol')
min_quantity = Decimal(current_contract.get('minOrderSize'))
if self.config.quantity < min_quantity:
self.logger.log(f"Order quantity is less than min quantity: {self.config.quantity} < {min_quantity}", "ERROR")
raise ValueError(f"Order quantity is less than min quantity: {self.config.quantity} < {min_quantity}")
self.config.tick_size = Decimal(current_contract.get('tickSize'))
return self.config.contract_id, self.config.tick_size
```
## /exchanges/aster.py
```py path="/exchanges/aster.py"
"""
Aster exchange client implementation.
"""
import os
import asyncio
import json
import time
import hmac
import hashlib
from decimal import Decimal
from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urlencode
import aiohttp
import websockets
import sys
from .base import BaseExchangeClient, OrderResult, OrderInfo, query_retry
from helpers.logger import TradingLogger
class AsterWebSocketManager:
"""WebSocket manager for Aster order updates."""
def __init__(self, config: Dict[str, Any], api_key: str, secret_key: str, order_update_callback):
self.api_key = api_key
self.secret_key = secret_key
self.order_update_callback = order_update_callback
self.websocket = None
self.running = False
self.base_url = "https://fapi.asterdex.com"
self.ws_url = "wss://fstream.asterdex.com"
self.listen_key = None
self.logger = None
self._keepalive_task = None
self._last_ping_time = None
self.config = config
def _generate_signature(self, params: Dict[str, Any]) -> str:
"""Generate HMAC SHA256 signature for Aster API authentication."""
# Use urlencode to properly format the query string
query_string = urlencode(params)
# Generate HMAC SHA256 signature
signature = hmac.new(
self.secret_key.encode('utf-8'),
query_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
async def _get_listen_key(self) -> str:
"""Get listen key for user data stream."""
params = {
'timestamp': int(time.time() * 1000)
}
signature = self._generate_signature(params)
params['signature'] = signature
headers = {
'X-MBX-APIKEY': self.api_key,
'Content-Type': 'application/x-www-form-urlencoded'
}
async with aiohttp.ClientSession() as session:
async with session.post(
'https://fapi.asterdex.com/fapi/v1/listenKey',
headers=headers,
data=params
) as response:
if response.status == 200:
result = await response.json()
return result.get('listenKey')
else:
raise Exception(f"Failed to get listen key: {response.status}")
async def _keepalive_listen_key(self) -> bool:
"""Keep alive the listen key to prevent timeout."""
try:
if not self.listen_key:
return False
params = {
'timestamp': int(time.time() * 1000)
}
signature = self._generate_signature(params)
params['signature'] = signature
headers = {
'X-MBX-APIKEY': self.api_key,
'Content-Type': 'application/x-www-form-urlencoded'
}
async with aiohttp.ClientSession() as session:
async with session.put(
f"{self.base_url}/fapi/v1/listenKey",
headers=headers,
data=params
) as response:
if response.status == 200:
if self.logger:
self.logger.log("Listen key keepalive successful", "DEBUG")
return True
else:
if self.logger:
self.logger.log(f"Failed to keepalive listen key: {response.status}", "WARNING")
return False
except Exception as e:
if self.logger:
self.logger.log(f"Error keeping alive listen key: {e}", "ERROR")
return False
async def _check_connection_health(self) -> bool:
"""Check if the WebSocket connection is healthy based on ping timing."""
if not self._last_ping_time:
return True # No pings received yet, assume healthy
# Check if we haven't received a ping in the last 10 minutes
# (server sends pings every 5 minutes, so 10 minutes indicates a problem)
time_since_last_ping = time.time() - self._last_ping_time
if time_since_last_ping > 10 * 60: # 10 minutes
if self.logger:
self.logger.log(
f"No ping received for {time_since_last_ping/60:.1f} minutes, "
"connection may be unhealthy", "WARNING"
)
return False
return True
async def _start_keepalive_task(self):
"""Start the keepalive task to extend listen key validity and monitor connection health."""
while self.running:
try:
# Check connection health every 5 minutes
await asyncio.sleep(5 * 60)
if not self.running:
break
# Check if connection is healthy
if not await self._check_connection_health():
if self.logger:
self.logger.log("Connection health check failed, reconnecting...", "WARNING")
# Try to reconnect
try:
await self.connect()
except Exception as e:
if self.logger:
self.logger.log(f"Reconnection failed: {e}", "ERROR")
# Wait before retrying
await asyncio.sleep(30)
continue
# Check if we need to keepalive the listen key (every 50 minutes)
if self.listen_key and time.time() % (50 * 60) < 5 * 60: # Within 5 minutes of 50-minute mark
success = await self._keepalive_listen_key()
if not success:
if self.logger:
self.logger.log("Listen key keepalive failed, reconnecting...", "WARNING")
# Try to reconnect
try:
await self.connect()
except Exception as e:
if self.logger:
self.logger.log(f"Reconnection failed: {e}", "ERROR")
# Wait before retrying
await asyncio.sleep(30)
except Exception as e:
if self.logger:
self.logger.log(f"Error in keepalive task: {e}", "ERROR")
# Wait a bit before retrying
await asyncio.sleep(60)
async def connect(self):
"""Connect to Aster WebSocket."""
try:
# Get listen key
self.listen_key = await self._get_listen_key()
if not self.listen_key:
raise Exception("Failed to get listen key")
# Connect to WebSocket
ws_url = f"{self.ws_url}/ws/{self.listen_key}"
self.websocket = await websockets.connect(ws_url)
self.running = True
if self.logger:
self.logger.log("Connected to Aster WebSocket with listen key", "INFO")
# Start keepalive task
self._keepalive_task = asyncio.create_task(self._start_keepalive_task())
# Start listening for messages
await self._listen()
except Exception as e:
if self.logger:
self.logger.log(f"WebSocket connection error: {e}", "ERROR")
raise
async def _listen(self):
"""Listen for WebSocket messages."""
try:
async for message in self.websocket:
if not self.running:
break
# Check if this is a ping frame (websockets library handles pong automatically)
if isinstance(message, bytes) and message == b'\x89\x00': # Ping frame
self._last_ping_time = time.time()
if self.logger:
self.logger.log("Received ping frame, sending pong", "DEBUG")
continue
try:
data = json.loads(message)
await self._handle_message(data)
except json.JSONDecodeError as e:
if self.logger:
self.logger.log(f"Failed to parse WebSocket message: {e}", "ERROR")
except Exception as e:
if self.logger:
self.logger.log(f"Error handling WebSocket message: {e}", "ERROR")
except websockets.exceptions.ConnectionClosed:
if self.logger:
self.logger.log("WebSocket connection closed", "WARNING")
except Exception as e:
if self.logger:
self.logger.log(f"WebSocket listen error: {e}", "ERROR")
async def _handle_message(self, data: Dict[str, Any]):
"""Handle incoming WebSocket messages."""
try:
event_type = data.get('e', '')
if event_type == 'ORDER_TRADE_UPDATE':
await self._handle_order_update(data)
elif event_type == 'listenKeyExpired':
if self.logger:
self.logger.log("Listen key expired, reconnecting...", "WARNING")
# Reconnect with new listen key
await self.connect()
else:
if self.logger:
self.logger.log(f"Unknown WebSocket message: {data}", "DEBUG")
except Exception as e:
if self.logger:
self.logger.log(f"Error handling WebSocket message: {e}", "ERROR")
async def _handle_order_update(self, order_data: Dict[str, Any]):
"""Handle order update messages."""
try:
order_info = order_data.get('o', {})
order_id = order_info.get('i', '')
symbol = order_info.get('s', '')
side = order_info.get('S', '')
quantity = order_info.get('q', '0')
price = order_info.get('p', '0')
executed_qty = order_info.get('z', '0')
status = order_info.get('X', '')
# Map status
status_map = {
'NEW': 'OPEN',
'PARTIALLY_FILLED': 'PARTIALLY_FILLED',
'FILLED': 'FILLED',
'CANCELED': 'CANCELED',
'REJECTED': 'REJECTED',
'EXPIRED': 'EXPIRED'
}
mapped_status = status_map.get(status, status)
# Call the order update callback if it exists
if hasattr(self, 'order_update_callback') and self.order_update_callback:
if side.lower() == self.config.close_order_side:
order_type = "CLOSE"
else:
order_type = "OPEN"
await self.order_update_callback({
'order_id': order_id,
'side': side.lower(),
'order_type': order_type,
'status': mapped_status,
'size': quantity,
'price': price,
'contract_id': symbol,
'filled_size': executed_qty
})
except Exception as e:
if self.logger:
self.logger.log(f"Error handling order update: {e}", "ERROR")
async def disconnect(self):
"""Disconnect from WebSocket."""
self.running = False
# Cancel keepalive task
if self._keepalive_task and not self._keepalive_task.done():
self._keepalive_task.cancel()
try:
await self._keepalive_task
except asyncio.CancelledError:
pass
if self.websocket:
await self.websocket.close()
if self.logger:
self.logger.log("WebSocket disconnected", "INFO")
def set_logger(self, logger):
"""Set the logger instance."""
self.logger = logger
class AsterClient(BaseExchangeClient):
"""Aster exchange client implementation."""
def __init__(self, config: Dict[str, Any]):
"""Initialize Aster client."""
super().__init__(config)
# Aster credentials from environment
self.api_key = os.getenv('ASTER_API_KEY')
self.secret_key = os.getenv('ASTER_SECRET_KEY')
self.base_url = 'https://fapi.asterdex.com'
if not self.api_key or not self.secret_key:
raise ValueError(
"ASTER_API_KEY and ASTER_SECRET_KEY must be set in environment variables"
)
# Initialize logger early
self.logger = TradingLogger(exchange="aster", ticker=self.config.ticker, log_to_console=False)
self._order_update_handler = None
def _validate_config(self) -> None:
"""Validate Aster configuration."""
required_env_vars = ['ASTER_API_KEY', 'ASTER_SECRET_KEY']
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
def _generate_signature(self, params: Dict[str, Any]) -> str:
"""Generate HMAC SHA256 signature for Aster API authentication."""
# Use urlencode to properly format the query string
query_string = urlencode(params)
# Generate HMAC SHA256 signature
signature = hmac.new(
self.secret_key.encode('utf-8'),
query_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
async def _make_request(
self, method: str, endpoint: str, params: Dict[str, Any] = None, data: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Make authenticated request to Aster API."""
if params is None:
params = {}
if data is None:
data = {}
# Add timestamp and recvWindow
timestamp = int(time.time() * 1000)
params['timestamp'] = timestamp
params['recvWindow'] = 5000
url = f"{self.base_url}{endpoint}"
headers = {
'X-MBX-APIKEY': self.api_key,
'Content-Type': 'application/x-www-form-urlencoded'
}
async with aiohttp.ClientSession() as session:
if method.upper() == 'GET':
# For GET requests, signature is based on query parameters only
signature = self._generate_signature(params)
params['signature'] = signature
async with session.get(url, params=params, headers=headers) as response:
result = await response.json()
if response.status != 200:
raise Exception(f"API request failed: {result}")
return result
elif method.upper() == 'POST':
# For POST requests, signature must include both query string and request body
# According to Aster API docs: totalParams = queryString + requestBody
all_params = {**params, **data}
signature = self._generate_signature(all_params)
all_params['signature'] = signature
async with session.post(url, data=all_params, headers=headers) as response:
result = await response.json()
if response.status != 200:
raise Exception(f"API request failed: {result}")
return result
elif method.upper() == 'DELETE':
# For DELETE requests, signature is based on query parameters only
signature = self._generate_signature(params)
params['signature'] = signature
async with session.delete(url, params=params, headers=headers) as response:
result = await response.json()
if response.status != 200:
raise Exception(f"API request failed: {result}")
return result
async def connect(self) -> None:
"""Connect to Aster WebSocket."""
# Initialize WebSocket manager
self.ws_manager = AsterWebSocketManager(
config=self.config,
api_key=self.api_key,
secret_key=self.secret_key,
order_update_callback=self._handle_websocket_order_update
)
# Set logger for WebSocket manager
self.ws_manager.set_logger(self.logger)
try:
# Start WebSocket connection in background task
asyncio.create_task(self.ws_manager.connect())
# Wait a moment for connection to establish
await asyncio.sleep(2)
except Exception as e:
self.logger.log(f"Error connecting to Aster WebSocket: {e}", "ERROR")
raise
async def disconnect(self) -> None:
"""Disconnect from Aster."""
try:
if hasattr(self, 'ws_manager') and self.ws_manager:
await self.ws_manager.disconnect()
except Exception as e:
self.logger.log(f"Error during Aster disconnect: {e}", "ERROR")
def get_exchange_name(self) -> str:
"""Get the exchange name."""
return "aster"
def setup_order_update_handler(self, handler) -> None:
"""Setup order update handler for WebSocket."""
self._order_update_handler = handler
async def _handle_websocket_order_update(self, order_data: Dict[str, Any]):
"""Handle order updates from WebSocket."""
try:
if self._order_update_handler:
self._order_update_handler(order_data)
except Exception as e:
self.logger.log(f"Error handling WebSocket order update: {e}", "ERROR")
@query_retry(default_return=(0, 0))
async def fetch_bbo_prices(self, contract_id: str) -> Tuple[Decimal, Decimal]:
"""Fetch best bid and ask prices from Aster."""
result = await self._make_request('GET', '/fapi/v1/ticker/bookTicker', {'symbol': contract_id})
best_bid = Decimal(result.get('bidPrice', 0))
best_ask = Decimal(result.get('askPrice', 0))
return best_bid, best_ask
async def get_order_price(self, direction: str) -> Decimal:
"""Get the price of an order with Aster using official SDK."""
best_bid, best_ask = await self.fetch_bbo_prices(self.config.contract_id)
if best_bid <= 0 or best_ask <= 0:
self.logger.log("Invalid bid/ask prices", "ERROR")
raise ValueError("Invalid bid/ask prices")
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
order_price = best_ask - self.config.tick_size
else:
# For sell orders, place slightly above best bid to ensure execution
order_price = best_bid + self.config.tick_size
return order_price
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place an open order with Aster."""
attempt = 0
while True:
attempt += 1
if attempt % 5 == 0:
self.logger.log(f"[OPEN] Attempt {attempt} to place order", "INFO")
active_orders = await self.get_active_orders(contract_id)
active_open_orders = 0
for order in active_orders:
if order.side == self.config.direction:
active_open_orders += 1
if active_open_orders > 1:
self.logger.log(f"[OPEN] ERROR: Active open orders abnormal: {active_open_orders}", "ERROR")
raise Exception(f"[OPEN] ERROR: Active open orders abnormal: {active_open_orders}")
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='Invalid bid/ask prices')
# Determine order side and price
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
price = best_ask - self.config.tick_size
elif direction == 'sell':
# For sell orders, place slightly above best bid to ensure execution
price = best_bid + self.config.tick_size
else:
raise Exception(f"[OPEN] Invalid direction: {direction}")
# Place the order
order_data = {
'symbol': contract_id,
'side': direction.upper(),
'type': 'LIMIT',
'quantity': str(quantity),
'price': str(price),
'timeInForce': 'GTX' # GTX is Good Till Crossing (Post Only)
}
result = await self._make_request('POST', '/fapi/v1/order', data=order_data)
order_status = result.get('status', '')
order_id = result.get('orderId', '')
start_time = time.time()
while order_status == 'NEW' and time.time() - start_time < 2:
await asyncio.sleep(0.1)
order_info = await self.get_order_info(order_id)
if order_info is not None:
order_status = order_info.status
if order_status in ['NEW', 'PARTIALLY_FILLED']:
return OrderResult(success=True, order_id=order_id, side=direction, size=quantity, price=price, status='OPEN')
elif order_status == 'FILLED':
return OrderResult(success=True, order_id=order_id, side=direction, size=quantity, price=price, status='FILLED')
elif order_status == 'EXPIRED':
continue
else:
return OrderResult(success=False, error_message='Unknown order status: ' + order_status)
async def _get_active_close_orders(self, contract_id: str) -> int:
"""Get active close orders for a contract using official SDK."""
active_orders = await self.get_active_orders(contract_id)
active_close_orders = 0
for order in active_orders:
if order.side == self.config.close_order_side:
active_close_orders += 1
return active_close_orders
async def place_close_order(self, contract_id: str, quantity: Decimal, price: Decimal, side: str) -> OrderResult:
"""Place a close order with Aster."""
attempt = 0
active_close_orders = await self._get_active_close_orders(contract_id)
while True:
attempt += 1
if attempt % 5 == 0:
self.logger.log(f"[CLOSE] Attempt {attempt} to place order", "INFO")
current_close_orders = await self._get_active_close_orders(contract_id)
if current_close_orders - active_close_orders > 1:
self.logger.log(f"[CLOSE] ERROR: Active close orders abnormal: "
f"{active_close_orders}, {current_close_orders}", "ERROR")
raise Exception(f"[CLOSE] ERROR: Active close orders abnormal: "
f"{active_close_orders}, {current_close_orders}")
else:
active_close_orders = current_close_orders
# Get current market prices to adjust order price if needed
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='No bid/ask data available')
# Adjust order price based on market conditions and side
adjusted_price = price
if side.lower() == 'sell':
order_side = 'SELL'
# For sell orders, ensure price is above best bid to be a maker order
if price <= best_bid:
adjusted_price = best_bid + self.config.tick_size
elif side.lower() == 'buy':
order_side = 'BUY'
# For buy orders, ensure price is below best ask to be a maker order
if price >= best_ask:
adjusted_price = best_ask - self.config.tick_size
adjusted_price = self.round_to_tick(adjusted_price)
# Place the order
order_data = {
'symbol': contract_id,
'side': order_side,
'type': 'LIMIT',
'quantity': str(quantity),
'price': str(adjusted_price),
'timeInForce': 'GTX' # GTX is Good Till Crossing (Post Only)
}
result = await self._make_request('POST', '/fapi/v1/order', data=order_data)
order_status = result.get('status', '')
order_id = result.get('orderId', '')
start_time = time.time()
while order_status == 'NEW' and time.time() - start_time < 2:
await asyncio.sleep(0.1)
order_info = await self.get_order_info(order_id)
if order_info is not None:
order_status = order_info.status
if order_status in ['NEW', 'PARTIALLY_FILLED']:
return OrderResult(success=True, order_id=order_id, side=order_side.lower(),
size=quantity, price=adjusted_price, status='OPEN')
elif order_status == 'FILLED':
return OrderResult(success=True, order_id=order_id, side=order_side.lower(),
size=quantity, price=adjusted_price, status='FILLED')
elif order_status == 'EXPIRED':
continue
else:
return OrderResult(success=False, error_message='Unknown order status: ' + order_status)
async def place_market_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place a market order with Aster."""
# Validate direction
if direction.lower() not in ['buy', 'sell']:
return OrderResult(success=False, error_message=f'Invalid direction: {direction}')
# Place the market order
order_data = {
'symbol': contract_id,
'side': direction.upper(),
'type': 'MARKET',
'quantity': str(quantity)
}
result = await self._make_request('POST', '/fapi/v1/order', data=order_data)
order_status = result.get('status', '')
order_id = result.get('orderId', '')
start_time = time.time()
while order_status != 'FILLED' and time.time() - start_time < 10:
await asyncio.sleep(0.2)
order_info = await self.get_order_info(order_id)
if order_info is not None:
order_status = order_info.status
if order_status != 'FILLED':
self.logger.log(f"Market order failed with status: {order_status}", "ERROR")
sys.exit(1)
# For market orders, we expect them to be filled immediately
else:
return OrderResult(
success=True,
order_id=order_id,
side=direction.lower(),
size=quantity,
price=order_info.price,
status='FILLED'
)
async def cancel_order(self, order_id: str) -> OrderResult:
"""Cancel an order with Aster."""
try:
result = await self._make_request('DELETE', '/fapi/v1/order', {
'symbol': self.config.contract_id,
'orderId': order_id
})
if 'orderId' in result:
return OrderResult(success=True, filled_size=Decimal(result.get('executedQty', 0)))
else:
return OrderResult(success=False, error_message=result.get('msg', 'Unknown error'))
except Exception as e:
return OrderResult(success=False, error_message=str(e))
@query_retry()
async def get_order_info(self, order_id: str) -> Optional[OrderInfo]:
"""Get order information from Aster."""
result = await self._make_request('GET', '/fapi/v1/order', {
'symbol': self.config.contract_id,
'orderId': order_id
})
order_type = result.get('type', '')
if order_type == 'MARKET':
price = Decimal(result.get('avgPrice', 0))
else:
price = Decimal(result.get('price', 0))
if 'orderId' in result:
return OrderInfo(
order_id=str(result['orderId']),
side=result.get('side', '').lower(),
size=Decimal(result.get('origQty', 0)),
price=price,
status=result.get('status', ''),
filled_size=Decimal(result.get('executedQty', 0)),
remaining_size=Decimal(result.get('origQty', 0)) - Decimal(result.get('executedQty', 0))
)
return None
@query_retry(default_return=[])
async def get_active_orders(self, contract_id: str) -> List[OrderInfo]:
"""Get active orders for a contract from Aster."""
result = await self._make_request('GET', '/fapi/v1/openOrders', {'symbol': contract_id})
orders = []
for order in result:
orders.append(OrderInfo(
order_id=str(order['orderId']),
side=order.get('side', '').lower(),
size=Decimal(order.get('origQty', 0)) - Decimal(order.get('executedQty', 0)),
price=Decimal(order.get('price', 0)),
status=order.get('status', ''),
filled_size=Decimal(order.get('executedQty', 0)),
remaining_size=Decimal(order.get('origQty', 0)) - Decimal(order.get('executedQty', 0))
))
return orders
@query_retry(reraise=True)
async def get_account_positions(self) -> Decimal:
"""Get account positions from Aster."""
result = await self._make_request('GET', '/fapi/v2/positionRisk', {'symbol': self.config.contract_id})
for position in result:
if position.get('symbol') == self.config.contract_id:
position_amt = abs(Decimal(position.get('positionAmt', 0)))
return position_amt
return Decimal(0)
async def get_contract_attributes(self) -> Tuple[str, Decimal]:
"""Get contract ID and tick size for a ticker."""
ticker = self.config.ticker
if len(ticker) == 0:
self.logger.log("Ticker is empty", "ERROR")
raise ValueError("Ticker is empty")
try:
result = await self._make_request('GET', '/fapi/v1/exchangeInfo')
for symbol_info in result['symbols']:
if (symbol_info.get('status') == 'TRADING' and
symbol_info.get('baseAsset') == ticker and
symbol_info.get('quoteAsset') == 'USDT'):
self.config.contract_id = symbol_info.get('symbol', '')
# Get tick size from filters
for filter_info in symbol_info.get('filters', []):
if filter_info.get('filterType') == 'PRICE_FILTER':
self.config.tick_size = Decimal(filter_info['tickSize'].strip('0'))
break
# Get minimum quantity
min_quantity = Decimal(0)
for filter_info in symbol_info.get('filters', []):
if filter_info.get('filterType') == 'LOT_SIZE':
min_quantity = Decimal(filter_info.get('minQty', 0))
break
if self.config.quantity < min_quantity:
self.logger.log(
f"Order quantity is less than min quantity: "
f"{self.config.quantity} < {min_quantity}", "ERROR"
)
raise ValueError(
f"Order quantity is less than min quantity: "
f"{self.config.quantity} < {min_quantity}"
)
if self.config.tick_size == 0:
self.logger.log("Failed to get tick size for ticker", "ERROR")
raise ValueError("Failed to get tick size for ticker")
return self.config.contract_id, self.config.tick_size
self.logger.log("Failed to get contract ID for ticker", "ERROR")
raise ValueError("Failed to get contract ID for ticker")
except Exception as e:
self.logger.log(f"Error getting contract attributes: {e}", "ERROR")
raise
```
## /exchanges/backpack.py
```py path="/exchanges/backpack.py"
"""
Backpack exchange client implementation.
"""
import os
import asyncio
import json
import time
import base64
import sys
from decimal import Decimal
from typing import Dict, Any, List, Optional, Tuple
from cryptography.hazmat.primitives.asymmetric import ed25519
import websockets
from bpx.public import Public
from .bp_client import Account, KeepAliveHttpClient
from bpx.constants.enums import OrderTypeEnum, TimeInForceEnum
from .base import BaseExchangeClient, OrderResult, OrderInfo, query_retry
from helpers.logger import TradingLogger
class BackpackWebSocketManager:
"""WebSocket manager for Backpack order updates."""
def __init__(self, public_key: str, secret_key: str, symbol: str, order_update_callback):
self.public_key = public_key
self.secret_key = secret_key
self.symbol = symbol
self.order_update_callback = order_update_callback
self.websocket = None
self.running = False
self.ws_url = "wss://ws.backpack.exchange"
self.logger = None
# Initialize ED25519 private key from base64 decoded secret
self.private_key = ed25519.Ed25519PrivateKey.from_private_bytes(
base64.b64decode(secret_key)
)
def _generate_signature(self, instruction: str, timestamp: int, window: int = 5000) -> str:
"""Generate ED25519 signature for WebSocket authentication."""
# Create the message string in the same format as BPX package
message = f"instruction={instruction}×tamp={timestamp}&window={window}"
# Sign the message using ED25519 private key
signature_bytes = self.private_key.sign(message.encode())
# Return base64 encoded signature
return base64.b64encode(signature_bytes).decode()
async def connect(self):
"""Connect to Backpack WebSocket."""
while True:
try:
self.logger.log("Connecting to Backpack WebSocket", "INFO")
self.websocket = await websockets.connect(self.ws_url)
self.running = True
# Subscribe to order updates for the specific symbol
timestamp = int(time.time() * 1000)
signature = self._generate_signature("subscribe", timestamp)
subscribe_message = {
"method": "SUBSCRIBE",
"params": [f"account.orderUpdate.{self.symbol}"],
"signature": [
self.public_key,
signature,
str(timestamp),
"5000"
]
}
await self.websocket.send(json.dumps(subscribe_message))
if self.logger:
self.logger.log(f"Subscribed to order updates for {self.symbol}", "INFO")
# Start listening for messages
await self._listen()
except Exception as e:
if self.logger:
self.logger.log(f"WebSocket connection error: {e}", "ERROR")
async def _listen(self):
"""Listen for WebSocket messages."""
try:
async for message in self.websocket:
if not self.running:
break
try:
data = json.loads(message)
await self._handle_message(data)
except json.JSONDecodeError as e:
if self.logger:
self.logger.log(f"Failed to parse WebSocket message: {e}", "ERROR")
except Exception as e:
if self.logger:
self.logger.log(f"Error handling WebSocket message: {e}", "ERROR")
except websockets.exceptions.ConnectionClosed:
if self.logger:
self.logger.log("WebSocket connection closed", "WARNING")
except Exception as e:
if self.logger:
self.logger.log(f"WebSocket listen error: {e}", "ERROR")
async def _handle_message(self, data: Dict[str, Any]):
"""Handle incoming WebSocket messages."""
try:
stream = data.get('stream', '')
payload = data.get('data', {})
if 'orderUpdate' in stream:
await self._handle_order_update(payload)
else:
self.logger.log(f"Unknown WebSocket message: {data}", "ERROR")
except Exception as e:
if self.logger:
self.logger.log(f"Error handling WebSocket message: {e}", "ERROR")
async def _handle_order_update(self, order_data: Dict[str, Any]):
"""Handle order update messages."""
try:
# Call the order update callback if it exists
if hasattr(self, 'order_update_callback') and self.order_update_callback:
await self.order_update_callback(order_data)
except Exception as e:
if self.logger:
self.logger.log(f"Error handling order update: {e}", "ERROR")
async def disconnect(self):
"""Disconnect from WebSocket."""
self.running = False
if self.websocket:
await self.websocket.close()
if self.logger:
self.logger.log("WebSocket disconnected", "INFO")
def set_logger(self, logger):
"""Set the logger instance."""
self.logger = logger
def set_order_filled_event(self, event):
"""Set the order filled event for synchronization."""
self.order_filled_event = event
class BackpackClient(BaseExchangeClient):
"""Backpack exchange client implementation."""
def __init__(self, config: Dict[str, Any]):
"""Initialize Backpack client."""
super().__init__(config)
# Backpack credentials from environment
self.public_key = os.getenv('BACKPACK_PUBLIC_KEY')
self.secret_key = os.getenv('BACKPACK_SECRET_KEY')
if not self.public_key or not self.secret_key:
raise ValueError("BACKPACK_PUBLIC_KEY and BACKPACK_SECRET_KEY must be set in environment variables")
# Initialize Backpack clients using official SDK.
# Use KeepAliveHttpClient so all API calls reuse the same TCP connection (lower latency).
self._http_client = KeepAliveHttpClient(timeout=15)
self.public_client = Public()
self.account_client = Account(
public_key=self.public_key,
secret_key=self.secret_key,
default_http_client=self._http_client,
)
self._order_update_handler = None
def _validate_config(self) -> None:
"""Validate Backpack configuration."""
required_env_vars = ['BACKPACK_PUBLIC_KEY', 'BACKPACK_SECRET_KEY']
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
async def connect(self) -> None:
"""Connect to Backpack WebSocket."""
# Initialize WebSocket manager
self.ws_manager = BackpackWebSocketManager(
public_key=self.public_key,
secret_key=self.secret_key,
symbol=self.config.contract_id, # Use contract_id as symbol for Backpack
order_update_callback=self._handle_websocket_order_update
)
# Pass config to WebSocket manager for order type determination
self.ws_manager.config = self.config
# Initialize logger using the same format as helpers
self.logger = TradingLogger(exchange="backpack", ticker=self.config.ticker, log_to_console=False)
self.ws_manager.set_logger(self.logger)
try:
# Start WebSocket connection in background task
asyncio.create_task(self.ws_manager.connect())
# Wait a moment for connection to establish
await asyncio.sleep(2)
except Exception as e:
self.logger.log(f"Error connecting to Backpack WebSocket: {e}", "ERROR")
raise
async def disconnect(self) -> None:
"""Disconnect from Backpack."""
try:
if hasattr(self, 'ws_manager') and self.ws_manager:
await self.ws_manager.disconnect()
except Exception as e:
self.logger.log(f"Error during Backpack disconnect: {e}", "ERROR")
def get_exchange_name(self) -> str:
"""Get the exchange name."""
return "backpack"
def setup_order_update_handler(self, handler) -> None:
"""Setup order update handler for WebSocket."""
self._order_update_handler = handler
async def _handle_websocket_order_update(self, order_data: Dict[str, Any]):
"""Handle order updates from WebSocket."""
try:
event_type = order_data.get('e', '')
order_id = order_data.get('i', '')
symbol = order_data.get('s', '')
side = order_data.get('S', '')
quantity = order_data.get('q', '0')
price = order_data.get('p', '0')
if price == '0':
price = order_data.get('L', '0')
fill_quantity = order_data.get('z', '0')
# Only process orders for our symbol
if symbol != self.config.contract_id:
return
# Determine order side
if side.upper() == 'BID':
order_side = 'buy'
elif side.upper() == 'ASK':
order_side = 'sell'
else:
self.logger.log(f"Unexpected order side: {side}", "ERROR")
sys.exit(1)
# Check if this is a close order (opposite side from bot direction)
is_close_order = (order_side == self.config.close_order_side)
order_type = "CLOSE" if is_close_order else "OPEN"
if event_type == 'orderFill' and quantity == fill_quantity:
if self._order_update_handler:
self._order_update_handler({
'order_id': order_id,
'side': order_side,
'order_type': order_type,
'status': 'FILLED',
'size': quantity,
'price': price,
'contract_id': symbol,
'filled_size': fill_quantity
})
elif event_type in ['orderFill', 'orderAccepted', 'orderCancelled', 'orderExpired']:
if event_type == 'orderFill':
status = 'PARTIALLY_FILLED'
elif event_type == 'orderAccepted':
status = 'OPEN'
elif event_type in ['orderCancelled', 'orderExpired']:
status = 'CANCELED'
if self._order_update_handler:
self._order_update_handler({
'order_id': order_id,
'side': order_side,
'order_type': order_type,
'status': status,
'size': quantity,
'price': price,
'contract_id': symbol,
'filled_size': fill_quantity
})
except Exception as e:
self.logger.log(f"Error handling WebSocket order update: {e}", "ERROR")
async def get_order_price(self, direction: str) -> Decimal:
"""Get the price of an order with Backpack using official SDK."""
best_bid, best_ask = await self.fetch_bbo_prices(self.config.contract_id)
if best_bid <= 0 or best_ask <= 0:
self.logger.log("Invalid bid/ask prices", "ERROR")
raise ValueError("Invalid bid/ask prices")
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
order_price = best_ask - self.config.tick_size
else:
# For sell orders, place slightly above best bid to ensure execution
order_price = best_bid + self.config.tick_size
return self.round_to_tick(order_price)
@query_retry(default_return=(0, 0))
async def fetch_bbo_prices(self, contract_id: str) -> Tuple[Decimal, Decimal]:
# Get order book depth from Backpack
order_book = self.public_client.get_depth(contract_id)
# Extract bids and asks directly from Backpack response
bids = order_book.get('bids', [])
asks = order_book.get('asks', [])
# Sort bids and asks
bids = sorted(bids, key=lambda x: Decimal(x[0]), reverse=True) # (highest price first)
asks = sorted(asks, key=lambda x: Decimal(x[0])) # (lowest price first)
# Best bid is the highest price someone is willing to buy at
best_bid = Decimal(bids[0][0]) if bids and len(bids) > 0 else 0
# Best ask is the lowest price someone is willing to sell at
best_ask = Decimal(asks[0][0]) if asks and len(asks) > 0 else 0
return best_bid, best_ask
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place an open order with Backpack using official SDK with retry logic for POST_ONLY rejections."""
max_retries = 15
retry_count = 0
while retry_count < max_retries:
retry_count += 1
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='Invalid bid/ask prices')
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
order_price = best_ask - self.config.tick_size
side = 'Bid'
else:
# For sell orders, place slightly above best bid to ensure execution
order_price = best_bid + self.config.tick_size
side = 'Ask'
# Place the order using Backpack SDK (post-only to ensure maker order)
order_result = self.account_client.execute_order(
symbol=contract_id,
side=side,
order_type=OrderTypeEnum.LIMIT,
quantity=str(quantity),
price=str(self.round_to_tick(order_price)),
post_only=True,
time_in_force=TimeInForceEnum.GTC
)
if not order_result:
return OrderResult(success=False, error_message='Failed to place order')
if 'code' in order_result:
message = order_result.get('message', 'Unknown error')
self.logger.log(f"[OPEN] Order rejected: {message}", "WARNING")
continue
# Extract order ID from response
order_id = order_result.get('id')
if not order_id:
self.logger.log(f"[OPEN] No order ID in response: {order_result}", "ERROR")
return OrderResult(success=False, error_message='No order ID in response')
# Order successfully placed
return OrderResult(
success=True,
order_id=order_id,
side=side.lower(),
size=quantity,
price=order_price,
status='New'
)
return OrderResult(success=False, error_message='Max retries exceeded')
async def place_market_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place a market order with Backpack."""
# Validate direction
if direction == 'buy':
side = 'Bid'
elif direction == 'sell':
side = 'Ask'
else:
raise Exception(f"[OPEN] Invalid direction: {direction}")
# Run sync SDK call in thread pool to avoid blocking the event loop and reduce latency
result = await asyncio.to_thread(
self.account_client.execute_order,
symbol=contract_id,
side=side,
order_type=OrderTypeEnum.MARKET,
quantity=str(quantity),
)
if not isinstance(result, dict):
error_msg = str(result) if result else "Unknown error"
self.logger.log(f"Market order API returned non-dict: {error_msg}", "ERROR")
raise Exception(f"Backpack API error: {error_msg}")
if '_raw_error' in result:
error_msg = result.get('_raw_error', 'Unknown error')
self.logger.log(f"Market order API error (non-JSON response): {error_msg}", "ERROR")
raise Exception(f"Backpack API error: {error_msg}")
order_id = result.get('id')
order_status = result.get('status', '').upper()
if order_status != 'FILLED':
self.logger.log(f"Market order failed with status: {order_status}", "ERROR")
sys.exit(1)
# For market orders, we expect them to be filled immediately
else:
price = Decimal(result.get('executedQuoteQuantity', '0'))/Decimal(result.get('executedQuantity'))
return OrderResult(
success=True,
order_id=order_id,
side=direction.lower(),
size=quantity,
price=price,
status='FILLED'
)
async def place_close_order(self, contract_id: str, quantity: Decimal, price: Decimal, side: str) -> OrderResult:
"""Place a close order with Backpack using official SDK with retry logic for POST_ONLY rejections."""
max_retries = 15
retry_count = 0
while retry_count < max_retries:
retry_count += 1
# Get current market prices to adjust order price if needed
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='No bid/ask data available')
# Adjust order price based on market conditions and side
adjusted_price = price
if side.lower() == 'sell':
order_side = 'Ask'
# For sell orders, ensure price is above best bid to be a maker order
if price <= best_bid:
adjusted_price = best_bid + self.config.tick_size
elif side.lower() == 'buy':
order_side = 'Bid'
# For buy orders, ensure price is below best ask to be a maker order
if price >= best_ask:
adjusted_price = best_ask - self.config.tick_size
adjusted_price = self.round_to_tick(adjusted_price)
# Place the order using Backpack SDK (post-only to avoid taker fees)
order_result = self.account_client.execute_order(
symbol=contract_id,
side=order_side,
order_type=OrderTypeEnum.LIMIT,
quantity=str(quantity),
price=str(adjusted_price),
post_only=True,
time_in_force=TimeInForceEnum.GTC
)
if not order_result:
return OrderResult(success=False, error_message='Failed to place order')
if 'code' in order_result:
message = order_result.get('message', 'Unknown error')
self.logger.log(f"[CLOSE] Error placing order: {message}", "ERROR")
continue
# Extract order ID from response
order_id = order_result.get('id')
if not order_id:
self.logger.log(f"[CLOSE] No order ID in response: {order_result}", "ERROR")
return OrderResult(success=False, error_message='No order ID in response')
# Order successfully placed
return OrderResult(
success=True,
order_id=order_id,
side=side.lower(),
size=quantity,
price=adjusted_price,
status='New'
)
return OrderResult(success=False, error_message='Max retries exceeded for close order')
async def cancel_order(self, order_id: str) -> OrderResult:
"""Cancel an order with Backpack using official SDK."""
try:
# Cancel the order using Backpack SDK
cancel_result = self.account_client.cancel_order(
symbol=self.config.contract_id,
order_id=order_id
)
if not cancel_result:
return OrderResult(success=False, error_message='Failed to cancel order')
if 'code' in cancel_result:
self.logger.log(
f"[CLOSE] Failed to cancel order {order_id}: {cancel_result.get('message', 'Unknown error')}", "ERROR")
filled_size = self.config.quantity
else:
filled_size = Decimal(cancel_result.get('executedQuantity', 0))
return OrderResult(success=True, filled_size=filled_size)
except Exception as e:
return OrderResult(success=False, error_message=str(e))
@query_retry()
async def get_order_info(self, order_id: str) -> Optional[OrderInfo]:
"""Get order information from Backpack using official SDK."""
# Get order information using Backpack SDK
order_result = self.account_client.get_open_order(
symbol=self.config.contract_id,
order_id=order_id
)
if not order_result:
return None
# Return the order data as OrderInfo
return OrderInfo(
order_id=order_result.get('id', ''),
side=order_result.get('side', '').lower(),
size=Decimal(order_result.get('quantity', 0)),
price=Decimal(order_result.get('price', 0)),
status=order_result.get('status', ''),
filled_size=Decimal(order_result.get('executedQuantity', 0)),
remaining_size=Decimal(order_result.get('quantity', 0)) - Decimal(order_result.get('executedQuantity', 0))
)
@query_retry(default_return=[])
async def get_active_orders(self, contract_id: str) -> List[OrderInfo]:
"""Get active orders for a contract using official SDK."""
# Get active orders using Backpack SDK
active_orders = self.account_client.get_open_orders(symbol=contract_id)
if not active_orders:
return []
# Return the orders list as OrderInfo objects
order_list = active_orders if isinstance(active_orders, list) else active_orders.get('orders', [])
orders = []
for order in order_list:
if isinstance(order, dict):
if order.get('side', '') == 'Bid':
side = 'buy'
elif order.get('side', '') == 'Ask':
side = 'sell'
orders.append(OrderInfo(
order_id=order.get('id', ''),
side=side,
size=Decimal(order.get('quantity', 0)),
price=Decimal(order.get('price', 0)),
status=order.get('status', ''),
filled_size=Decimal(order.get('executedQuantity', 0)),
remaining_size=Decimal(order.get('quantity', 0)) - Decimal(order.get('executedQuantity', 0))
))
return orders
@query_retry(default_return=0)
async def get_account_positions(self) -> Decimal:
"""Get account positions using official SDK."""
positions_data = self.account_client.get_open_positions()
position_amt = 0
for position in positions_data:
if position.get('symbol', '') == self.config.contract_id:
position_amt = Decimal(position.get('netQuantity', 0))
break
return position_amt
async def get_contract_attributes(self) -> Tuple[str, Decimal]:
"""Get contract ID for a ticker."""
ticker = self.config.ticker
if len(ticker) == 0:
self.logger.log("Ticker is empty", "ERROR")
raise ValueError("Ticker is empty")
markets = self.public_client.get_markets()
for market in markets:
if (market.get('marketType', '') == 'PERP' and market.get('baseSymbol', '') == ticker and
market.get('quoteSymbol', '') == 'USDC'):
self.config.contract_id = market.get('symbol', '')
min_quantity = Decimal(market.get('filters', {}).get('quantity', {}).get('minQuantity', 0))
self.config.tick_size = Decimal(market.get('filters', {}).get('price', {}).get('tickSize', 0))
break
if self.config.contract_id == '':
self.logger.log("Failed to get contract ID for ticker", "ERROR")
raise ValueError("Failed to get contract ID for ticker")
if self.config.quantity < min_quantity:
self.logger.log(f"Order quantity is less than min quantity: {self.config.quantity} < {min_quantity}", "ERROR")
raise ValueError(f"Order quantity is less than min quantity: {self.config.quantity} < {min_quantity}")
if self.config.tick_size == 0:
self.logger.log("Failed to get tick size for ticker", "ERROR")
raise ValueError("Failed to get tick size for ticker")
return self.config.contract_id, self.config.tick_size
```
## /exchanges/base.py
```py path="/exchanges/base.py"
"""
Base exchange client interface.
All exchange implementations should inherit from this class.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Tuple, Type, Union
from dataclasses import dataclass
from decimal import Decimal, ROUND_HALF_UP
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential
def query_retry(
default_return: Any = None,
exception_type: Union[Type[Exception], Tuple[Type[Exception], ...]] = (Exception,),
max_attempts: int = 5,
min_wait: float = 1,
max_wait: float = 10,
reraise: bool = False
):
def retry_error_callback(retry_state: RetryCallState):
print(f"Operation: [{retry_state.fn.__name__}] failed after {retry_state.attempt_number} retries, "
f"exception: {str(retry_state.outcome.exception())}")
return default_return
return retry(
stop=stop_after_attempt(max_attempts),
wait=wait_exponential(multiplier=1, min=min_wait, max=max_wait),
retry=retry_if_exception_type(exception_type),
retry_error_callback=retry_error_callback,
reraise=reraise
)
@dataclass
class OrderResult:
"""Standardized order result structure."""
success: bool
order_id: Optional[str] = None
side: Optional[str] = None
size: Optional[Decimal] = None
price: Optional[Decimal] = None
status: Optional[str] = None
error_message: Optional[str] = None
filled_size: Optional[Decimal] = None
@dataclass
class OrderInfo:
"""Standardized order information structure."""
order_id: str
side: str
size: Decimal
price: Decimal
status: str
filled_size: Decimal = 0.0
remaining_size: Decimal = 0.0
cancel_reason: str = ''
class BaseExchangeClient(ABC):
"""Base class for all exchange clients."""
def __init__(self, config: Dict[str, Any]):
"""Initialize the exchange client with configuration."""
self.config = config
self._validate_config()
def round_to_tick(self, price) -> Decimal:
price = Decimal(price)
tick = self.config.tick_size
# quantize forces price to be a multiple of tick
return price.quantize(tick, rounding=ROUND_HALF_UP)
@abstractmethod
def _validate_config(self) -> None:
"""Validate the exchange-specific configuration."""
pass
@abstractmethod
async def connect(self) -> None:
"""Connect to the exchange (WebSocket, etc.)."""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from the exchange."""
pass
@abstractmethod
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place an open order."""
pass
@abstractmethod
async def place_close_order(self, contract_id: str, quantity: Decimal, price: Decimal, side: str) -> OrderResult:
"""Place a close order."""
pass
@abstractmethod
async def cancel_order(self, order_id: str) -> OrderResult:
"""Cancel an order."""
pass
@abstractmethod
async def get_order_info(self, order_id: str) -> Optional[OrderInfo]:
"""Get order information."""
pass
@abstractmethod
async def get_active_orders(self, contract_id: str) -> List[OrderInfo]:
"""Get active orders for a contract."""
pass
@abstractmethod
async def get_account_positions(self) -> Decimal:
"""Get account positions."""
pass
@abstractmethod
def setup_order_update_handler(self, handler) -> None:
"""Setup order update handler for WebSocket."""
pass
@abstractmethod
def get_exchange_name(self) -> str:
"""Get the exchange name."""
pass
```
## /exchanges/bp_client.py
```py path="/exchanges/bp_client.py"
import requests
from bpx.base.base_account import BaseAccount
from bpx.http_client.sync_http_client import SyncHttpClient
from typing import Optional, Union, Dict, Any, List
from bpx.constants.enums import *
class KeepAliveHttpClient:
"""
HTTP client that uses a persistent requests.Session() for connection keep-alive.
Reusing the same TCP connection reduces latency for repeated API calls (e.g. order placement).
Drop-in compatible with SyncHttpClient for get/post/patch/delete.
"""
def __init__(self, timeout: int = 30):
self._session = requests.Session()
self._timeout = timeout
self.proxies: Optional[dict] = None
def _request(self, method: str, url: str, **kwargs) -> Union[Dict[str, Any], List[Any], str]:
kwargs.setdefault("timeout", self._timeout)
if self.proxies is not None:
kwargs["proxies"] = self.proxies
resp = self._session.request(method, url, **kwargs)
if not resp.content:
return {}
try:
return resp.json()
except Exception:
# API returned non-JSON (e.g. error page); return dict so callers get .get() instead of str
return {"_raw_error": resp.text, "statusCode": resp.status_code}
def get(
self,
url: str,
headers: Optional[dict] = None,
params: Optional[dict] = None,
) -> Union[Dict[str, Any], List[Any], str]:
return self._request("GET", url, headers=headers, params=params)
def post(
self,
url: str,
headers: Optional[dict] = None,
data: Optional[dict] = None,
) -> Union[Dict[str, Any], List[Any], str]:
# Backpack API expects JSON body for execute_order and other mutations
body = data if data is not None else {}
return self._request("POST", url, headers=headers, json=body)
def patch(
self,
url: str,
headers: Optional[dict] = None,
data: Optional[dict] = None,
) -> Union[Dict[str, Any], List[Any], str]:
body = data if data is not None else {}
return self._request("PATCH", url, headers=headers, json=body)
def delete(
self,
url: str,
headers: Optional[dict] = None,
data: Optional[dict] = None,
) -> Union[Dict[str, Any], List[Any], str]:
body = data if data is not None else {}
return self._request("DELETE", url, headers=headers, json=body)
http_client = SyncHttpClient()
class Account(BaseAccount):
def __init__(
self,
public_key: str,
secret_key: str,
window: int = 5000,
proxy: Optional[dict] = None,
debug: bool = False,
default_http_client: SyncHttpClient = http_client,
):
super().__init__(public_key, secret_key, window, debug)
self.http_client = default_http_client
self.http_client.proxies = proxy
def get_account(
self, window: Optional[int] = None
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account information
https://docs.backpack.exchange/#tag/Account/operation/get_account
"""
request_config = super().get_account(window=window)
return self.http_client.get(
url=request_config.url, headers=request_config.headers
)
def update_account(
self,
auto_borrow_settlements: Optional[bool] = None,
auto_lend: Optional[bool] = None,
auto_repay_borrows: Optional[bool] = None,
leverage_limit: Optional[str] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Updates the account information
https://docs.backpack.exchange/#tag/Account/operation/update_account_settings
"""
request_config = super().update_account(
auto_borrow_settlements=auto_borrow_settlements,
auto_lend=auto_lend,
auto_repay_borrows=auto_repay_borrows,
leverage_limit=leverage_limit,
window=window,
)
return self.http_client.patch(
url=request_config.url,
headers=request_config.headers,
data=request_config.data,
)
def get_max_borrow_quantity(
self,
symbol: str,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
request_config = super().get_max_borrow_quantity(symbol=symbol, window=window)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_max_order_quantity(
self,
symbol: str,
side: str,
price: Optional[str] = None,
reduce_only: Optional[bool] = None,
auto_borrow: Optional[bool] = None,
auto_borrow_repay: Optional[bool] = None,
auto_lend_redeem: Optional[bool] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
request_config = super().get_max_order_quantity(
symbol=symbol,
side=side,
price=price,
reduce_only=reduce_only,
auto_borrow=auto_borrow,
auto_borrow_repay=auto_borrow_repay,
auto_lend_redeem=auto_lend_redeem,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_max_withdrawal_quantity(
self,
symbol: str,
auto_borrow: Optional[bool] = None,
auto_lend_redeem: Optional[bool] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
request_config = super().get_max_withdrawal_quantity(
symbol=symbol,
auto_borrow=auto_borrow,
auto_lend_redeem=auto_lend_redeem,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_borrow_lend_positions(
self, window: Optional[int] = None
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the borrow lend positions
https://docs.backpack.exchange/#tag/Borrow-Lend/operation/get_borrow_lend_positions
"""
request_config = super().get_borrow_lend_positions(window=window)
return self.http_client.get(
url=request_config.url, headers=request_config.headers
)
def execute_borrow_lend(
self,
quantity: str,
side: Union[BorrowLendSideType, BorrowLendSideEnum],
symbol: str,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Posts borrow lend and returns borrow lend status
https://docs.backpack.exchange/#tag/Borrow-Lend/operation/execute_borrow_lend
"""
request_config = super().execute_borrow_lend(
quantity=quantity, side=side, symbol=symbol, window=window
)
return self.http_client.post(
url=request_config.url,
headers=request_config.headers,
data=request_config.data,
)
def get_balances(
self, window: Optional[int] = None
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account balances
https://docs.backpack.exchange/#tag/Capital/operation/get_balances
"""
request_config = super().get_balances(window=window)
return self.http_client.get(
url=request_config.url, headers=request_config.headers
)
def get_collateral(
self, subaccount_id: Optional[int] = None, window: Optional[int] = None
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account collateral
https://docs.backpack.exchange/#tag/Capital/operation/get_collateral
"""
request_config = super().get_collateral(
subaccount_id=subaccount_id, window=window
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_deposits(
self,
limit: int = 100,
offset: int = 0,
from_: Optional[int] = None,
to: Optional[int] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account deposits
https://docs.backpack.exchange/#tag/Capital/operation/get_deposits
"""
request_config = super().get_deposits(
limit=limit, offset=offset, window=window, from_=from_, to=to
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_deposit_address(
self, blockchain: str, window: Optional[int] = None
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the deposit address for a specified blockchain
https://docs.backpack.exchange/#tag/Capital/operation/get_deposit_address
"""
request_config = super().get_deposit_address(
blockchain=blockchain, window=window
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_withdrawals(
self,
limit: int = 100,
offset: int = 0,
from_: Optional[int] = None,
to: Optional[int] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account withdrawals
https://docs.backpack.exchange/#tag/Capital/operation/get_withdrawals
"""
request_config = super().get_withdrawals(
limit=limit, offset=offset, from_=from_, to=to, window=window
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def withdrawal(
self,
address: str,
symbol: str,
blockchain: str,
quantity: str,
two_factor_code: Optional[str] = None,
auto_borrow: Optional[bool] = None,
auto_lend_redeem: Optional[bool] = None,
client_id: Optional[int] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Posts withdrawal and returns withdrawal status
https://docs.backpack.exchange/#tag/Capital/operation/request_withdrawal
"""
request_config = super().withdrawal(
address=address,
blockchain=blockchain,
quantity=quantity,
symbol=symbol,
two_factor_token=two_factor_code,
auto_borrow=auto_borrow,
auto_lend_redeem=auto_lend_redeem,
client_id=client_id,
window=window,
)
return self.http_client.post(
url=request_config.url,
headers=request_config.headers,
data=request_config.data,
)
def get_open_positions(
self, window: Optional[int] = None
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account open positions
https://docs.backpack.exchange/#tag/Futures/operation/get_positions
"""
request_config = super().get_open_positions(window=window)
return self.http_client.get(
url=request_config.url, headers=request_config.headers
)
def get_borrow_history(
self,
borrow_lend_event_type: Optional[
Union[BorrowLendEventEnum, BorrowLendEventType]
] = None,
sources: Optional[str] = None,
position_id: Optional[str] = None,
symbol: Optional[str] = None,
limit: int = 100,
offset: int = 0,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account borrow history
https://docs.backpack.exchange/#tag/History/operation/get_borrow_lend_history
"""
request_config = super().get_borrow_history(
borrow_lend_event_type=borrow_lend_event_type,
sources=sources,
position_id=position_id,
symbol=symbol,
limit=limit,
offset=offset,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_interest_history(
self,
asset: Optional[str] = None,
symbol: Optional[str] = None,
position_id: Optional[str] = None,
limit: int = 100,
offset: int = 0,
source: Optional[
Union[InterestPaymentSourceType, InterestPaymentSourceEnum]
] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account interest history
https://docs.backpack.exchange/#tag/History/operation/get_interest_history
"""
request_config = super().get_interest_history(
asset=asset,
symbol=symbol,
position_id=position_id,
limit=limit,
offset=offset,
source=source,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_order_history(
self,
symbol: Optional[str] = None,
order_id: Optional[str] = None,
limit: int = 100,
offset: int = 0,
market_type: Optional[Union[MarketTypeEnum, MarketTypeType]] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns orders history of a specified symbol
https://docs.backpack.exchange/#tag/History/operation/get_order_history
"""
request_config = super().get_order_history(
limit=limit,
offset=offset,
order_id=order_id,
symbol=symbol,
market_type=market_type,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_fill_history(
self,
symbol: Optional[str] = None,
limit: int = 100,
offset: int = 0,
from_: Optional[int] = None,
to: Optional[int] = None,
fill_type: Optional[Union[FillTypeEnum, FillTypeType]] = None,
market_type: Optional[Union[MarketTypeType, MarketTypeEnum]] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns fills history of a specified symbol
https://docs.backpack.exchange/#tag/History/operation/get_fills
"""
request_config = super().get_fill_history(
symbol=symbol,
limit=limit,
offset=offset,
from_=from_,
to=to,
fill_type=fill_type,
market_type=market_type,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_funding_payments(
self,
subaccount_id: Optional[int] = None,
symbol: Optional[str] = None,
limit: Optional[int] = 100,
offset: Optional[int] = 0,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account funding payments
https://docs.backpack.exchange/#tag/History/operation/get_funding_payments
"""
request_config = super().get_funding_payments(
subaccount_id=subaccount_id,
symbol=symbol,
limit=limit,
offset=offset,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_profit_and_loss_history(
self,
subaccount_id: Optional[int] = None,
symbol: Optional[str] = None,
limit: int = 100,
offset: int = 0,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account profit and loss history
https://docs.backpack.exchange/#tag/History/operation/get_pnl_payments
"""
request_config = super().get_profit_and_loss_history(
subaccount_id=subaccount_id,
symbol=symbol,
limit=limit,
offset=offset,
window=window,
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_settlements_history(
self,
limit: Optional[int] = 100,
offset: Optional[int] = 0,
source: Optional[
Union[SettlementSourceFilterEnum, SettlementSourceFilterType]
] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns the account settlements history
https://docs.backpack.exchange/#tag/History/operation/get_settlement_history
"""
request_config = super().get_settlements_history(
limit=limit, offset=offset, source=source, window=window
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def get_open_order(
self,
symbol: str,
order_id: Optional[str] = None,
client_id: Optional[int] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns open orders of a specified symbol
https://docs.backpack.exchange/#tag/Order/operation/get_order
"""
request_config = super().get_open_order(
symbol=symbol, order_id=order_id, client_id=client_id, window=window
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def execute_order(
self,
symbol: str,
side: str,
order_type: Union[OrderTypeEnum, OrderTypeType],
time_in_force: Optional[Union[TimeInForceEnum, TimeInForceType]] = None,
quantity: Optional[str] = None,
price: Optional[str] = None,
trigger_price: Optional[str] = None,
self_trade_prevention: Optional[
Union[SelfTradePreventionEnum, SelfTradePreventionType]
] = None,
quote_quantity: Optional[str] = None,
client_id: Optional[int] = None,
post_only: Optional[bool] = None,
reduce_only: Optional[bool] = None,
auto_borrow: Optional[bool] = None,
auto_borrow_repay: Optional[bool] = None,
auto_lend: Optional[bool] = None,
auto_lend_redeem: Optional[bool] = None,
stop_loss_limit_price: Optional[str] = None,
stop_loss_trigger_by: Optional[str] = None,
stop_loss_trigger_price: Optional[str] = None,
take_profit_limit_price: Optional[str] = None,
take_profit_trigger_by: Optional[str] = None,
take_profit_trigger_price: Optional[str] = None,
triggered_by: Optional[str] = None,
trigger_quantity: Optional[str] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Posts an order and returns order status
https://docs.backpack.exchange/#tag/Order/operation/execute_order
"""
request_config = super().execute_order(
symbol=symbol,
side=side,
order_type=order_type,
time_in_force=time_in_force,
quantity=quantity,
price=price,
trigger_price=trigger_price,
self_trade_prevention=self_trade_prevention,
quote_quantity=quote_quantity,
client_id=client_id,
post_only=post_only,
reduce_only=reduce_only,
auto_borrow=auto_borrow,
auto_borrow_repay=auto_borrow_repay,
auto_lend=auto_lend,
auto_lend_redeem=auto_lend_redeem,
stop_loss_limit_price=stop_loss_limit_price,
stop_loss_trigger_by=stop_loss_trigger_by,
stop_loss_trigger_price=stop_loss_trigger_price,
take_profit_limit_price=take_profit_limit_price,
take_profit_trigger_by=take_profit_trigger_by,
take_profit_trigger_price=take_profit_trigger_price,
triggered_by=triggered_by,
trigger_quantity=trigger_quantity,
window=window,
)
request_config.headers['X-Broker-Id'] = '1800'
return self.http_client.post(
url=request_config.url,
headers=request_config.headers,
data=request_config.data,
)
def cancel_order(
self,
symbol: str,
order_id: Optional[str] = None,
client_id: Optional[int] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Cancels an existing order
https://docs.backpack.exchange/#tag/Order/operation/cancel_order
"""
request_config = super().cancel_order(
symbol=symbol, order_id=order_id, client_id=client_id, window=window
)
return self.http_client.delete(
url=request_config.url,
headers=request_config.headers,
data=request_config.data,
)
def get_open_orders(
self,
market_type: Optional[str] = None,
symbol: Optional[str] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Returns open orders of a specified symbol
https://docs.backpack.exchange/#tag/Order/operation/get_open_orders
"""
request_config = super().get_open_orders(
market_type=market_type, symbol=symbol, window=window
)
return self.http_client.get(
url=request_config.url,
headers=request_config.headers,
params=request_config.params,
)
def cancel_all_orders(
self, symbol: str, window: Optional[int] = None
) -> Union[Dict[str, Any], List[Any], str]:
"""
Cancels all existing orders of a specified symbol
https://docs.backpack.exchange/#tag/Order/operation/cancel_open_orders
"""
request_config = super().cancel_all_orders(symbol=symbol, window=window)
return self.http_client.delete(
url=request_config.url,
headers=request_config.headers,
data=request_config.data,
)
def submit_quote(
self,
rfq_id: str,
bid_price: str,
ask_price: str,
client_id: Optional[int] = None,
window: Optional[int] = None,
) -> Union[Dict[str, Any], List[Any], str]:
"""
Submits a quote for a specified RFQ
https://docs.backpack.exchange/#tag/Request-For-Quote/operation/submit_quote
"""
request_config = super().submit_quote(
rfq_id=rfq_id,
bid_price=bid_price,
ask_price=ask_price,
client_id=client_id,
window=window,
)
return self.http_client.post(
url=request_config.url,
headers=request_config.headers,
data=request_config.data,
)
```
## /exchanges/edgex.py
```py path="/exchanges/edgex.py"
"""
EdgeX exchange client implementation.
"""
import os
import asyncio
import json
import traceback
from decimal import Decimal
from typing import Dict, Any, List, Optional, Tuple
from edgex_sdk import Client, OrderSide, WebSocketManager, CancelOrderParams, GetOrderBookDepthParams, GetActiveOrderParams
from .base import BaseExchangeClient, OrderResult, OrderInfo, query_retry
from helpers.logger import TradingLogger
class EdgeXClient(BaseExchangeClient):
"""EdgeX exchange client implementation."""
def __init__(self, config: Dict[str, Any]):
"""Initialize EdgeX client."""
super().__init__(config)
# EdgeX credentials from environment
self.account_id = os.getenv('EDGEX_ACCOUNT_ID')
self.stark_private_key = os.getenv('EDGEX_STARK_PRIVATE_KEY')
self.base_url = os.getenv('EDGEX_BASE_URL', 'https://pro.edgex.exchange')
self.ws_url = os.getenv('EDGEX_WS_URL', 'wss://quote.edgex.exchange')
if not self.account_id or not self.stark_private_key:
raise ValueError("EDGEX_ACCOUNT_ID and EDGEX_STARK_PRIVATE_KEY must be set in environment variables")
# Initialize EdgeX client using official SDK
self.client = Client(
base_url=self.base_url,
account_id=int(self.account_id),
stark_private_key=self.stark_private_key
)
# Initialize WebSocket manager using official SDK
self.ws_manager = WebSocketManager(
base_url=self.ws_url,
account_id=int(self.account_id),
stark_pri_key=self.stark_private_key
)
# Initialize logger
self.logger = TradingLogger(exchange="edgex", ticker=self.config.ticker, log_to_console=False)
self._order_update_handler = None
# --- reconnection state ---
self._ws_task: Optional[asyncio.Task] = None
self._ws_stop = asyncio.Event()
self._ws_disconnected = asyncio.Event()
self._loop: Optional[asyncio.AbstractEventLoop] = None
def _validate_config(self) -> None:
"""Validate EdgeX configuration."""
required_env_vars = ['EDGEX_ACCOUNT_ID', 'EDGEX_STARK_PRIVATE_KEY']
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
# ---------------------------
# Connection / Reconnect
# ---------------------------
async def connect(self) -> None:
"""Connect private WS and keep it alive with auto-reconnect."""
self._loop = asyncio.get_running_loop()
# Hook disconnect/connect once (SDK calls these from threads)
try:
private_client = self.ws_manager.get_private_client()
private_client.on_disconnect(
lambda exc: self._loop.call_soon_threadsafe(self._ws_disconnected.set)
)
private_client.on_connect(
lambda: self.logger.log("[WS] private connected", "INFO")
)
except Exception as e:
self.logger.log(f"[WS] failed to set hooks: {e}", "ERROR")
if not self._ws_task or self._ws_task.done():
self._ws_task = asyncio.create_task(self._run_private_ws())
# give first connection a moment (optional)
await asyncio.sleep(0.5)
async def _run_private_ws(self):
"""Tiny reconnect loop with exponential backoff."""
backoff = 1.0
while not self._ws_stop.is_set():
try:
# connect
self.ws_manager.connect_private()
self.logger.log("[WS] connected", "INFO")
backoff = 1.0
# wait until either disconnect or stop
self._ws_disconnected.clear()
done, _ = await asyncio.wait(
{asyncio.create_task(self._ws_stop.wait()),
asyncio.create_task(self._ws_disconnected.wait()),},
return_when=asyncio.FIRST_COMPLETED,
)
if self._ws_stop.is_set():
break
self.logger.log(
"[WS] disconnected; attempting to reconnect…", "WARNING"
)
except Exception as e:
self.logger.log(f"[WS] connect error: {e}", "ERROR")
finally:
# ensure socket is closed before retry
try:
self.ws_manager.disconnect_private()
except Exception:
pass
# backoff and retry
await asyncio.sleep(backoff)
backoff = min(60.0, backoff * 2)
# Final cleanup (on stop)
try:
self.ws_manager.disconnect_private()
except Exception:
pass
async def disconnect(self) -> None:
"""Disconnect from EdgeX."""
try:
self._ws_stop.set()
if self._ws_task:
await self._ws_task
except Exception:
pass
try:
if hasattr(self, "client") and self.client:
await self.client.close()
if hasattr(self, "ws_manager"):
self.ws_manager.disconnect_all()
except Exception as e:
self.logger.log(f"Error during EdgeX disconnect: {e}", "ERROR")
# ---------------------------
# Utility / Name
# ---------------------------
def get_exchange_name(self) -> str:
"""Get the exchange name."""
return "edgex"
# ---------------------------
# WS Handlers
# ---------------------------
def setup_order_update_handler(self, handler) -> None:
"""Setup order update handler for WebSocket."""
self._order_update_handler = handler
def order_update_handler(message):
"""Handle order updates from WebSocket."""
try:
# Parse the message structure
if isinstance(message, str):
message = json.loads(message)
# Check if this is a trade-event with ORDER_UPDATE
content = message.get("content", {})
event = content.get("event", "")
if event == "ORDER_UPDATE":
# Extract order data from the nested structure
data = content.get('data', {})
orders = data.get('order', [])
if orders and len(orders) > 0:
order = orders[0] # Get the first order
if order.get('contractId') != self.config.contract_id:
return
order_id = order.get('id')
status = order.get('status')
side = order.get('side', '').lower()
filled_size = order.get('cumMatchSize')
if side == self.config.close_order_side:
order_type = "CLOSE"
else:
order_type = "OPEN"
# edgex returns TWO filled events for the same order; take the first one
if status == "FILLED" and len(data.get('collateral', [])):
return
# ignore canceled close orders
if status == "CANCELED" and order_type == "CLOSE":
return
# edgex returns partially filled events as "OPEN" orders
if status == "OPEN" and Decimal(filled_size) > 0:
status = "PARTIALLY_FILLED"
if status in ['OPEN', 'PARTIALLY_FILLED', 'FILLED', 'CANCELED']:
if self._order_update_handler:
self._order_update_handler({
'order_id': order_id,
'side': side,
'order_type': order_type,
'status': status,
'size': order.get('size'),
'price': order.get('price'),
'contract_id': order.get('contractId'),
'filled_size': filled_size
})
except Exception as e:
self.logger.log(f"Error handling order update: {e}", "ERROR")
self.logger.log(f"Traceback: {traceback.format_exc()}", "ERROR")
try:
private_client = self.ws_manager.get_private_client()
private_client.on_message("trade-event", order_update_handler)
except Exception as e:
self.logger.log(f"Could not add trade-event handler: {e}", "ERROR")
# ---------------------------
# REST-ish helpers
# ---------------------------
@query_retry(default_return=(0, 0))
async def fetch_bbo_prices(self, contract_id: str) -> Tuple[Decimal, Decimal]:
depth_params = GetOrderBookDepthParams(contract_id=contract_id, limit=15)
order_book = await self.client.quote.get_order_book_depth(depth_params)
order_book_data = order_book['data']
# Get the first (and should be only) order book entry
order_book_entry = order_book_data[0]
# Extract bids and asks from the entry
bids = order_book_entry.get('bids', [])
asks = order_book_entry.get('asks', [])
# Best bid is the highest price someone is willing to buy at
best_bid = Decimal(bids[0]['price']) if bids and len(bids) > 0 else 0
# Best ask is the lowest price someone is willing to sell at
best_ask = Decimal(asks[0]['price']) if asks and len(asks) > 0 else 0
return best_bid, best_ask
async def get_order_price(self, direction: str) -> Decimal:
"""Get the price of an order with EdgeX using official SDK."""
best_bid, best_ask = await self.fetch_bbo_prices(self.config.contract_id)
if best_bid <= 0 or best_ask <= 0:
self.logger.log("Invalid bid/ask prices", "ERROR")
raise ValueError("Invalid bid/ask prices")
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
order_price = best_ask - self.config.tick_size
else:
# For sell orders, place slightly above best bid to ensure execution
order_price = best_bid + self.config.tick_size
return self.round_to_tick(order_price)
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""Place an open order with EdgeX using official SDK with retry logic for POST_ONLY rejections."""
max_retries = 15
retry_count = 0
while retry_count < max_retries:
try:
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='Invalid bid/ask prices')
if direction == 'buy':
# For buy orders, place slightly below best ask to ensure execution
order_price = best_ask - self.config.tick_size
side = OrderSide.BUY
else:
# For sell orders, place slightly above best bid to ensure execution
order_price = best_bid + self.config.tick_size
side = OrderSide.SELL
# Place the order using official SDK (post-only to ensure maker order)
order_result = await self.client.create_limit_order(
contract_id=contract_id,
size=str(quantity),
price=str(self.round_to_tick(order_price)),
side=side,
post_only=True
)
if not order_result or 'data' not in order_result:
return OrderResult(success=False, error_message='Failed to place order')
# Extract order ID from response
order_id = order_result['data'].get('orderId')
if not order_id:
return OrderResult(success=False, error_message='No order ID in response')
# Check order status after a short delay to see if it was rejected
await asyncio.sleep(0.01)
order_info = await self.get_order_info(order_id)
if order_info:
if order_info.status == 'CANCELED':
if retry_count < max_retries - 1:
retry_count += 1
continue
else:
return OrderResult(success=False, error_message=f'Order rejected after {max_retries} attempts')
elif order_info.status in ['OPEN', 'PARTIALLY_FILLED', 'FILLED']:
# Order successfully placed
return OrderResult(
success=True,
order_id=order_id,
side=side.value,
size=quantity,
price=order_price,
status=order_info.status
)
else:
return OrderResult(success=False, error_message=f'Unexpected order status: {order_info.status}')
else:
# Assume order is successful if we can't get info
return OrderResult(
success=True,
order_id=order_id,
side=side.value,
size=quantity,
price=order_price,
status='OPEN'
)
except Exception as e:
if retry_count < max_retries - 1:
retry_count += 1
await asyncio.sleep(0.1) # Wait before retry
continue
else:
return OrderResult(success=False, error_message=str(e))
return OrderResult(success=False, error_message='Max retries exceeded')
async def place_close_order(self, contract_id: str, quantity: Decimal, price: Decimal, side: str) -> OrderResult:
"""Place a close order with EdgeX using official SDK with retry logic for POST_ONLY rejections."""
max_retries = 15
retry_count = 0
while retry_count < max_retries:
try:
best_bid, best_ask = await self.fetch_bbo_prices(contract_id)
if best_bid <= 0 or best_ask <= 0:
return OrderResult(success=False, error_message='Invalid bid/ask prices')
# Convert side string to OrderSide enum
order_side = OrderSide.BUY if side.lower() == 'buy' else OrderSide.SELL
# Adjust order price based on market conditions and side
adjusted_price = price
if side.lower() == 'sell':
# For sell orders, ensure price is above best bid to be a maker order
if price <= best_bid:
adjusted_price = best_bid + self.config.tick_size
elif side.lower() == 'buy':
# For buy orders, ensure price is below best ask to be a maker order
if price >= best_ask:
adjusted_price = best_ask - self.config.tick_size
adjusted_price = self.round_to_tick(adjusted_price)
# Place the order using official SDK (post-only to avoid taker fees)
order_result = await self.client.create_limit_order(
contract_id=contract_id,
size=str(quantity),
price=str(adjusted_price),
side=order_side,
post_only=True
)
if not order_result or 'data' not in order_result:
return OrderResult(success=False, error_message='Failed to place order')
# Extract order ID from response
order_id = order_result['data'].get('orderId')
if not order_id:
return OrderResult(success=False, error_message='No order ID in response')
# Check order status after a short delay to see if it was rejected
await asyncio.sleep(0.01)
order_info = await self.get_order_info(order_id)
if order_info:
if order_info.status == 'CANCELED':
if retry_count < max_retries - 1:
retry_count += 1
continue
else:
return OrderResult(success=False, error_message=f'Close order rejected after {max_retries} attempts')
elif order_info.status in ['OPEN', 'PARTIALLY_FILLED', 'FILLED']:
# Order successfully placed
return OrderResult(
success=True,
order_id=order_id,
side=side,
size=quantity,
price=adjusted_price,
status=order_info.status
)
else:
return OrderResult(success=False, error_message=f'Unexpected close order status: {order_info.status}')
else:
# Assume order is successful if we can't get info
return OrderResult(
success=True,
order_id=order_id,
side=side,
size=quantity,
price=adjusted_price
)
except Exception as e:
if retry_count < max_retries - 1:
retry_count += 1
await asyncio.sleep(0.1) # Wait before retry
continue
else:
return OrderResult(success=False, error_message=str(e))
return OrderResult(success=False, error_message='Max retries exceeded for close order')
async def cancel_order(self, order_id: str) -> OrderResult:
"""Cancel an order with EdgeX using official SDK."""
try:
# Create cancel parameters using official SDK
cancel_params = CancelOrderParams(order_id=order_id)
# Cancel the order using official SDK
cancel_result = await self.client.cancel_order(cancel_params)
if not cancel_result or 'data' not in cancel_result:
return OrderResult(success=False, error_message='Failed to cancel order')
return OrderResult(success=True)
except Exception as e:
return OrderResult(success=False, error_message=str(e))
@query_retry()
async def get_order_info(self, order_id: str) -> Optional[OrderInfo]:
"""Get order information from EdgeX using official SDK."""
# Use the newly created get_order_by_id method
order_result = await self.client.order.get_order_by_id(order_id_list=[order_id])
if not order_result or 'data' not in order_result:
return None
# The API returns a list of orders, get the first (and should be only) one
order_list = order_result['data']
if order_list and len(order_list) > 0:
order_data = order_list[0]
return OrderInfo(
order_id=order_data.get('id', ''),
side=order_data.get('side', '').lower(),
size=Decimal(order_data.get('size', 0)),
price=Decimal(order_data.get('price', 0)),
status=order_data.get('status', ''),
filled_size=Decimal(order_data.get('cumMatchSize', 0)),
remaining_size=Decimal(order_data.get('size', 0)) - Decimal(order_data.get('cumMatchSize', 0))
)
return None
@query_retry(default_return=[])
async def get_active_orders(self, contract_id: str) -> List[OrderInfo]:
"""Get active orders for a contract using official SDK."""
# Get active orders using official SDK
params = GetActiveOrderParams(size="200", offset_data="", filter_contract_id_list=[contract_id])
active_orders = await self.client.get_active_orders(params)
if not active_orders or 'data' not in active_orders:
return []
# Filter orders for the specific contract and ensure they are dictionaries
# The API returns orders under 'dataList' key, not 'orderList'
order_list = active_orders['data'].get('dataList', [])
contract_orders = []
for order in order_list:
if isinstance(order, dict) and order.get('contractId') == contract_id:
contract_orders.append(OrderInfo(
order_id=order.get('id', ''),
side=order.get('side', '').lower(),
size=Decimal(order.get('size', 0)),
price=Decimal(order.get('price', 0)),
status=order.get('status', ''),
filled_size=Decimal(order.get('cumMatchSize', 0)),
remaining_size=Decimal(order.get('size', 0)) - Decimal(order.get('cumMatchSize', 0))
))
return contract_orders
@query_retry(default_return=0)
async def get_account_positions(self) -> Decimal:
"""Get account positions using official SDK."""
positions_data = await self.client.get_account_positions()
if not positions_data or 'data' not in positions_data:
self.logger.log("No positions or failed to get positions", "WARNING")
position_amt = 0
else:
# The API returns positions under data.positionList
positions = positions_data.get('data', {}).get('positionList', [])
if positions:
# Find position for current contract
position = None
for p in positions:
if isinstance(p, dict) and p.get('contractId') == self.config.contract_id:
position = p
break
if position:
position_amt = abs(Decimal(position.get('openSize', 0)))
else:
position_amt = 0
else:
position_amt = 0
return position_amt
async def get_contract_attributes(self) -> Tuple[str, Decimal]:
"""Get contract ID for a ticker."""
ticker = self.config.ticker
if len(ticker) == 0:
self.logger.log("Ticker is empty", "ERROR")
raise ValueError("Ticker is empty")
response = await self.client.get_metadata()
data = response.get('data', {})
if not data:
self.logger.log("Failed to get metadata", "ERROR")
raise ValueError("Failed to get metadata")
contract_list = data.get('contractList', [])
if not contract_list:
self.logger.log("Failed to get contract list", "ERROR")
raise ValueError("Failed to get contract list")
current_contract = None
for c in contract_list:
if c.get('contractName') == ticker+'USD':
current_contract = c
break
if not current_contract:
self.logger.log("Failed to get contract ID for ticker", "ERROR")
raise ValueError("Failed to get contract ID for ticker")
self.config.contract_id = current_contract.get('contractId')
min_quantity = Decimal(current_contract.get('minOrderSize'))
if self.config.quantity < min_quantity:
self.logger.log(f"Order quantity is less than min quantity: {self.config.quantity} < {min_quantity}", "ERROR")
raise ValueError(f"Order quantity is less than min quantity: {self.config.quantity} < {min_quantity}")
self.config.tick_size = Decimal(current_contract.get('tickSize'))
return self.config.contract_id, self.config.tick_size
```
## /exchanges/ethereal.py
```py path="/exchanges/ethereal.py"
"""
Ethereal exchange client skeleton built on the official ethereal-sdk.
"""
import os
import asyncio
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID
from .base import BaseExchangeClient, OrderResult, OrderInfo, query_retry
from helpers.logger import TradingLogger
from ethereal import AsyncRESTClient, AsyncWSClient
class EtherealClient(BaseExchangeClient):
"""
Ethereal exchange client wired to ethereal-sdk.
"""
def __init__(self, config: Dict[str, Any]):
"""Initialize Ethereal client with environment-driven defaults."""
# REST + chain config (all optional; defaults follow the SDK docs)
self.base_url = os.getenv("ETHEREAL_BASE_URL", "https://api.ethereal.trade")
self.rpc_url = os.getenv("ETHEREAL_RPC_URL", "https://rpc.ethereal.trade")
self.private_key = os.getenv("ETHEREAL_PRIVATE_KEY") # required for signing
self.chain_id = int(os.getenv("ETHEREAL_CHAIN_ID", "5064014"))
self.ws_url = os.getenv("ETHEREAL_WS_URL", "wss://ws.ethereal.trade")
# Account / trading context
self.subaccount_id = os.getenv("ETHEREAL_SUBACCOUNT_ID")
self.account_name = os.getenv("ETHEREAL_ACCOUNT_NAME", "primary")
self.subaccount_hex = os.getenv("ETHEREAL_SUBACCOUNT")
if not self.subaccount_hex and self.account_name:
try:
self.subaccount_hex = self._encode_subaccount_name(self.account_name)
except Exception:
self.subaccount_hex = None
self._warned_missing_subaccount = False
# Clients are created lazily in connect()
self._rest_client: Optional["AsyncRESTClient"] = None
self._ws_client: Optional["AsyncWSClient"] = None
self._order_update_handler = None
self._product_cache: Dict[str, Any] = {}
self._contract_to_product_id: Dict[str, UUID] = {}
self._product_id_to_contract: Dict[UUID, str] = {}
self._ws_task: Optional[asyncio.Task] = None
self._ws_stop: Optional[asyncio.Event] = None
self.logger = TradingLogger(
exchange="ethereal",
ticker=getattr(config, "ticker", ""),
log_to_console=False
)
# Base init last: sets self.config and runs _validate_config
super().__init__(config)
@staticmethod
def _normalize_contract_id(value: Optional[str]) -> str:
"""Normalize contract id to uppercase ticker+USD form."""
raw = value or ""
try:
return str(UUID(str(raw)))
except Exception:
pass
cid = str(raw).upper()
if cid and not cid.endswith("USD"):
cid = f"{cid}USD"
return cid
@staticmethod
def _as_uuid(value: Any) -> Optional[UUID]:
"""Convert value to UUID if possible."""
if isinstance(value, UUID):
return value
try:
return UUID(str(value))
except Exception:
return None
def _validate_config(self) -> None:
"""Validate Ethereal configuration."""
# Ticker is required; contract_id always normalized to ticker+USD.
if not getattr(self.config, "ticker", None):
raise ValueError("Missing required config field: ticker")
ticker_val = str(self.config.ticker).upper()
self.config.ticker = ticker_val
contract_val = getattr(self.config, "contract_id", None)
normalized_contract = (
self._normalize_contract_id(str(contract_val))
if contract_val
else self._normalize_contract_id(ticker_val)
)
self.config.contract_id = normalized_contract
# Signing key is optional for read-only calls, but required for trading.
if not getattr(self, "private_key", None):
self.logger.log("ETHEREAL_PRIVATE_KEY not set; trading calls will fail.", "WARN")
def _build_rest_config(self) -> Dict[str, Any]:
"""
Assemble the config dict expected by AsyncRESTClient.create().
Example from the SDK docs:
await AsyncRESTClient.create({
"base_url": "https://api.ethereal.trade",
"chain_config": {
"rpc_url": "https://rpc.ethereal.trade",
"private_key": "your_private_key",
}
})
"""
config: Dict[str, Any] = {
"base_url": self.base_url,
"chain_config": {
"rpc_url": self.rpc_url,
"chain_id": self.chain_id,
},
}
if self.private_key:
config["chain_config"]["private_key"] = self.private_key
return config
async def _ensure_rest_client(self) -> None:
"""Create the AsyncRESTClient on-demand."""
if self._rest_client is not None:
return
if AsyncRESTClient is None:
raise ImportError("ethereal-sdk not installed or not available in this environment")
try:
self._rest_client = await AsyncRESTClient.create(self._build_rest_config())
except Exception as exc:
self.logger.log(f"[ethereal] failed to create REST client: {exc}", "ERROR")
raise
async def _ensure_products(self) -> Dict[str, Any]:
"""Cache products by ticker for quick lookups."""
if self._product_cache:
return self._product_cache
await self._ensure_rest_client()
try:
products = await self._rest_client.products_by_ticker()
# store uppercase keys
cache = {k.upper(): v for k, v in products.items()}
self._product_cache = cache
self._contract_to_product_id = {}
self._product_id_to_contract = {}
for ticker_key, prod in cache.items():
pid = getattr(prod, "id", None)
pid_uuid = self._as_uuid(pid)
if pid_uuid:
contract_key = self._normalize_contract_id(ticker_key)
self._contract_to_product_id[contract_key] = pid_uuid
self._product_id_to_contract[pid_uuid] = contract_key
except Exception as exc:
self.logger.log(f"[ethereal] products_by_ticker failed: {exc}", "WARNING")
self._product_cache = {}
self._contract_to_product_id = {}
self._product_id_to_contract = {}
return self._product_cache
async def _start_ws(self) -> None:
"""Start AsyncWSClient and subscribe to order stream."""
if AsyncWSClient is None:
raise ImportError("ethereal-sdk websocket client not available")
self._ws_client = AsyncWSClient({"base_url": self.ws_url})
# Set callbacks
self._ws_client.callbacks["OrderUpdate"] = [self._handle_ws_message_order_update]
await self._ws_client.open(namespaces=["/", "/v1/stream"])
# connected flag kept for future use
self._ws_connected = True
# Best-effort subscribe
try:
if self.subaccount_id:
await self._ws_client.subscribe(
stream_type="OrderUpdate",
subaccount_id=self.subaccount_id,
)
except Exception as exc: # pylint: disable=broad-except
self.logger.log(f"[ethereal] WS subscribe failed: {exc}", "WARNING")
return True
async def _ws_reconnect_loop(self) -> None:
"""Keep WS alive; reconnect proactively to handle 12h disconnects."""
backoff = 1.0
# Reconnect a bit before the 12h window (11h) to avoid server closes.
max_uptime = 60 * 60 * 11
while self._ws_stop and not self._ws_stop.is_set():
try:
await self._start_ws()
self.logger.log("[ethereal] WS connected", "INFO")
backoff = 1.0
# Wait for either stop signal or proactive reconnect timeout
try:
await asyncio.wait_for(self._ws_stop.wait(), timeout=max_uptime)
break
except asyncio.TimeoutError:
# proactive reconnect
self.logger.log("[ethereal] WS proactive reconnect", "INFO")
except Exception:
break
except Exception as exc:
self.logger.log(f"[ethereal] WS connect error: {exc}", "ERROR")
finally:
try:
if self._ws_client:
await self._ws_client.close()
except Exception:
pass
await asyncio.sleep(backoff)
backoff = min(60.0, backoff * 2)
self._ws_connected = False
async def _get_product_by_contract_id(self, contract_id: Any) -> Optional[Any]:
"""Resolve a product using contract_id (ticker+USD) or product_id."""
products = await self._ensure_products()
if contract_id is None:
return None
contract_key = self._normalize_contract_id(str(contract_id))
if contract_key in products:
return products[contract_key]
pid = self._as_uuid(contract_id)
if pid:
for item in products.values():
if getattr(item, "id", None) == pid:
return item
return None
async def _get_product_id_for_contract_id(self, contract_id: Any) -> Optional[UUID]:
"""Map normalized contract_id to product_id (UUID)."""
await self._ensure_products()
if contract_id is None:
return None
contract_key = self._normalize_contract_id(str(contract_id))
pid = self._contract_to_product_id.get(contract_key)
if pid:
return pid
product = await self._get_product_by_contract_id(contract_key)
if product:
pid_val = self._as_uuid(getattr(product, "id", None))
resolved_contract = self._normalize_contract_id(getattr(product, "ticker", contract_key))
if pid_val:
self._contract_to_product_id[resolved_contract] = pid_val
self._product_id_to_contract[pid_val] = resolved_contract
return pid_val
fallback = self._as_uuid(contract_id)
return fallback
async def get_ticker_by_product_id(self, product_id: Any) -> Optional[str]:
"""Look up ticker string from a product id."""
if product_id is None:
return None
await self._ensure_products()
pid = self._as_uuid(product_id)
if pid is None:
return None
ticker_cached = self._product_id_to_contract.get(pid)
if ticker_cached:
return ticker_cached
products = await self._ensure_products()
for ticker_key, prod in products.items():
if getattr(prod, "id", None) == pid:
contract_key = self._normalize_contract_id(ticker_key)
self._contract_to_product_id[contract_key] = pid
self._product_id_to_contract[pid] = contract_key
return contract_key
return None
def _encode_subaccount_name(self, name: str) -> str:
"""Encode subaccount name to bytes32 hex (0x...)."""
if not name:
raise ValueError("Subaccount name is empty")
b = name.encode("utf-8")
if len(b) > 32:
raise ValueError("Subaccount name too long for bytes32")
return "0x" + b.ljust(32, b"\x00").hex()
async def _handle_ws_message_order_update(self, data: Dict[str, Any]):
"""Bridge WS order events to the trading bot handler."""
if not self._order_update_handler:
return
try:
# Stream data type:
# https://docs.ethereal.trade/developer-guides/trading-api/websocket-gateway#subscribe
payload = data.get("data") if isinstance(data, dict) and "data" in data else data
if not isinstance(payload, list):
return
for order in payload:
if not isinstance(order, dict):
continue
order_id = order.get("id")
status = self._normalize_status(order.get("status") or order.get("state"))
side_val = order.get("side")
side = "buy" if side_val in (0, "buy", "BUY") else "sell"
filled_size = order.get("filled") or 0
size = order.get("quantity") or order.get("availableQuantity") or filled_size or 0
price = order.get("price") or 0
product_id_val = order.get("productId")
contract_id = None
if product_id_val is not None:
pid_uuid = self._as_uuid(product_id_val)
if pid_uuid:
contract_id = self._product_id_to_contract.get(pid_uuid)
if not contract_id:
try:
contract_id = await self.get_ticker_by_product_id(product_id_val)
except Exception:
contract_id = None
contract_id = contract_id or str(self.config.contract_id)
order_type = "CLOSE" if side == self.config.close_order_side else "OPEN"
if not order_id:
continue
self._order_update_handler(
{
"contract_id": contract_id,
"order_id": str(order_id),
"status": status or "",
"side": side,
"order_type": order_type,
"filled_size": str(filled_size),
"size": str(size),
"price": str(price),
}
)
except Exception:
pass
async def connect(self) -> None:
"""
Initialize REST client and prepare WS hooks.
The ethereal-sdk exposes AsyncRESTClient (for trading / queries) and
AsyncWSClient (for streaming order updates).
"""
await self._ensure_rest_client()
try:
await self._ensure_products()
except Exception as exc:
self.logger.log(f"Failed to preload products: {exc}", "WARNING")
# Try to open WS for order updates; no REST polling fallback to avoid delays.
if self._order_update_handler:
if not self._ws_stop:
self._ws_stop = asyncio.Event()
if not self._ws_task or self._ws_task.done():
self._ws_task = asyncio.create_task(self._ws_reconnect_loop())
async def disconnect(self) -> None:
"""Tear down REST/WS clients."""
try:
if self._ws_stop:
self._ws_stop.set()
if self._ws_task:
await self._ws_task
if self._ws_client:
await self._ws_client.close()
except Exception as exc:
self.logger.log(f"Error closing Ethereal WS client: {exc}", "ERROR")
self._ws_connected = False
self._ws_client = None
self._ws_task = None
self._ws_stop = None
try:
if self._rest_client and hasattr(self._rest_client, "close"):
await self._rest_client.close()
except Exception as exc:
self.logger.log(f"Error closing Ethereal REST client: {exc}", "ERROR")
self._ws_client = None
self._rest_client = None
def get_exchange_name(self) -> str:
"""Get the exchange name."""
return "ethereal"
def setup_order_update_handler(self, handler) -> None:
"""
Register a callback for order updates.
AsyncWSClient supports a callbacks dict keyed by stream type
(see docs for subscribe/unsubscribe). Once WS wiring is added,
self._order_update_handler will be invoked from the WS message handler.
"""
self._order_update_handler = handler
async def place_open_order(self, contract_id: str, quantity: Decimal, direction: str) -> OrderResult:
"""
Place an open order using AsyncRESTClient.create_order().
"""
try:
await self._ensure_rest_client()
except ImportError as exc:
return OrderResult(success=False, error_message=str(exc))
if not self.private_key or not self.subaccount_id:
return OrderResult(
success=False,
error_message="Missing ETHEREAL_PRIVATE_KEY or ETHEREAL_SUBACCOUNT_ID"
)
product_id = await self._get_product_id_for_contract_id(contract_id or self.config.contract_id)
if not product_id:
return OrderResult(success=False, error_message="Product not found for ticker")
best_bid, best_ask = await self._fetch_bbo(product_id)
if best_bid == 0 and best_ask == 0:
return OrderResult(success=False, error_message="Failed to fetch order book")
# Determine order side and price
side = 0 if direction == "buy" else 1
if direction == 'buy':
order_price = best_ask - self.config.tick_size
elif direction == 'sell':
order_price = best_bid + self.config.tick_size
else:
raise Exception(f"[OPEN] Invalid direction: {direction}")
try:
order = await self._rest_client.create_order(
order_type="LIMIT",
quantity=float(quantity),
side=side,
price=float(order_price),
product_id=product_id,
subaccount=self.subaccount_hex,
time_in_force="GTD",
post_only=True,
)
except Exception as exc: # pylint: disable=broad-except
return OrderResult(success=False, error_message=str(exc))
order_id = getattr(order, "id", None) or (order.get("id") if isinstance(order, dict) else None)
status = self._normalize_status(getattr(order, "status", "OPEN") if order_id else "FAILED")
# Check order status after a short delay to see if it was rejected
order_info: Optional[OrderInfo] = None
if order_id:
try:
await asyncio.sleep(0.01)
order_info = await self.get_order_info(str(order_id))
if order_info:
status = order_info.status or status
if status == "CANCELED":
return OrderResult(
success=False,
order_id=str(order_id),
side="buy" if side == 0 else "sell",
size=Decimal(str(quantity)),
price=Decimal(str(order_price)),
status=status,
error_message="Order rejected after placement",
filled_size=order_info.filled_size,
)
except Exception:
pass
return OrderResult(
success=bool(order_id),
order_id=str(order_id) if order_id else None,
side="buy" if side == 0 else "sell",
size=Decimal(str(quantity)),
price=Decimal(str(order_price)),
status=status,
filled_size=order_info.filled_size if order_info else None,
)
async def place_close_order(self, contract_id: str, quantity: Decimal, price: Decimal, side: str) -> OrderResult:
"""
Place a close/reduce order using AsyncRESTClient.create_order().
"""
try:
await self._ensure_rest_client()
except ImportError as exc:
return OrderResult(success=False, error_message=str(exc))
if not self.private_key or not self.subaccount_id:
return OrderResult(
success=False,
error_message="Missing ETHEREAL_PRIVATE_KEY or ETHEREAL_SUBACCOUNT_ID"
)
product_id = await self._get_product_id_for_contract_id(contract_id or self.config.contract_id)
if not product_id:
return OrderResult(success=False, error_message="Product not found for ticker")
# Ensure price respects tick size to avoid 400 from the API
tick = self.config.tick_size or Decimal(0)
price = self.round_to_tick(price) if tick > 0 else Decimal(str(price))
side_val = 0 if side.lower() == "buy" else 1
try:
order = await self._rest_client.create_order(
order_type="LIMIT",
quantity=float(quantity),
side=side_val,
price=float(price),
product_id=product_id,
subaccount=self.subaccount_hex,
time_in_force="GTD",
reduce_only=True,
post_only=True,
)
except Exception as exc: # pylint: disable=broad-except
return OrderResult(success=False, error_message=str(exc))
order_id = getattr(order, "id", None) or (order.get("id") if isinstance(order, dict) else None)
status = self._normalize_status(getattr(order, "status", "OPEN") if order_id else "FAILED")
# Check order status after a short delay to see if it was rejected
order_info: Optional[OrderInfo] = None
if order_id:
try:
await asyncio.sleep(0.01)
order_info = await self.get_order_info(str(order_id))
if order_info:
status = order_info.status or status
if status == "CANCELED":
return OrderResult(
success=False,
order_id=str(order_id),
side="buy" if side_val == 0 else "sell",
size=Decimal(str(quantity)),
price=Decimal(str(price)),
status=status,
error_message="Close order rejected after placement",
filled_size=order_info.filled_size,
)
except Exception:
pass
return OrderResult(
success=bool(order_id),
order_id=str(order_id) if order_id else None,
side="buy" if side_val == 0 else "sell",
size=Decimal(str(quantity)),
price=Decimal(str(price)),
status=status,
filled_size=order_info.filled_size if order_info else None,
)
async def cancel_order(self, order_id: str) -> OrderResult:
"""
Cancel an order.
The SDK exposes AsyncRESTClient.cancel_orders([...]) / cancel_all_orders().
"""
try:
await self._ensure_rest_client()
except ImportError as exc:
return OrderResult(success=False, error_message=str(exc))
if not self.private_key or not self.subaccount_id:
return OrderResult(
success=False,
error_message="Missing ETHEREAL_PRIVATE_KEY or ETHEREAL_SUBACCOUNT_ID"
)
try:
await self._rest_client.cancel_orders(
order_ids=[order_id],
sender=self._rest_client.chain.address if hasattr(self._rest_client, "chain") else None,
subaccount=self.subaccount_hex,
)
return OrderResult(success=True, order_id=order_id)
except Exception as exc: # pylint: disable=broad-except
return OrderResult(success=False, error_message=str(exc))
async def get_order_info(self, order_id: str) -> Optional[OrderInfo]:
"""
Fetch order details via AsyncRESTClient.get_order().
"""
try:
await self._ensure_rest_client()
except ImportError:
return None
try:
order = await self._rest_client.get_order(id=order_id)
except Exception:
order = None
if not order:
return None
side = getattr(order, "side", None)
size = getattr(order, "quantity", None)
price = getattr(order, "price", None)
status = self._normalize_status(getattr(order, "status", "UNKNOWN"))
filled = getattr(order, "filled", None)
remaining = getattr(order, "available_quantity", None)
return OrderInfo(
order_id=order_id,
side="buy" if side in (0, "buy", "BUY") else "sell",
size=Decimal(str(size or "0")),
price=Decimal(str(price or "0")),
status=status,
filled_size=Decimal(str(filled or "0")),
remaining_size=Decimal(str(remaining or "0")),
)
async def get_active_orders(self, contract_id: str) -> List[OrderInfo]:
"""
List open orders for the configured contract.
The SDK provides AsyncRESTClient.list_orders() / list_trades().
"""
try:
await self._ensure_rest_client()
except ImportError:
return []
if not self.subaccount_id:
self.logger.log("ETHEREAL_SUBACCOUNT_ID not set; cannot fetch active orders.", "WARNING")
return []
product_id = await self._get_product_id_for_contract_id(contract_id)
if not product_id:
return []
try:
orders = await self._rest_client.list_orders(
subaccount_id=self.subaccount_id,
product_ids=[product_id] if product_id else None,
statuses=["NEW", "PENDING", "FILLED_PARTIAL"],
is_working=True,
)
except Exception:
orders = []
order_infos: List[OrderInfo] = []
for order in orders or []:
try:
status = self._normalize_status(getattr(order, "status", ""))
# Only keep active orders
if status not in ("OPEN", "PARTIALLY_FILLED"):
continue
order_infos.append(
OrderInfo(
order_id=str(getattr(order, "id", "")),
side="buy" if getattr(order, "side", 0) in (0, "buy", "BUY") else "sell",
size=Decimal(str(getattr(order, "quantity", "0"))),
price=Decimal(str(getattr(order, "price", "0"))),
status=status,
filled_size=Decimal(str(getattr(order, "filled", "0"))),
remaining_size=Decimal(str(getattr(order, "available_quantity", getattr(order, "quantity", "0")))),
)
)
except Exception:
continue
return order_infos
async def get_order_price(self, direction: str) -> Decimal:
"""
Provide a placeholder price used by TradingBot when pacing orders.
Uses current BBO to place near-touching maker orders.
"""
product_id = await self._get_product_id_for_contract_id(self.config.contract_id or self.config.ticker)
if not product_id:
raise ValueError("Product not found for ticker")
best_bid, best_ask = await self._fetch_bbo(product_id)
if best_bid <= 0 or best_ask <= 0:
raise ValueError("Invalid bid/ask prices")
if direction == 'buy':
return best_ask - self.config.tick_size
elif direction == 'sell':
return best_bid + self.config.tick_size
else:
raise ValueError("Invalid direction")
async def fetch_bbo_prices(self, contract_id: str) -> Tuple[Decimal, Decimal]:
"""Expose BBO for TradingBot stop/price checks."""
product_id = await self._get_product_id_for_contract_id(contract_id or self.config.contract_id)
if not product_id:
return Decimal(0), Decimal(0)
return await self._fetch_bbo(product_id)
async def _fetch_bbo(self, product_id: Any) -> Tuple[Decimal, Decimal]:
"""Fetch best bid/ask via get_market_liquidity with manual retries."""
if product_id is None:
raise ValueError("product_id is None for BBO fetch")
max_attempts = 15
wait = 0.2
for attempt in range(max_attempts):
try:
liquidity = await self._rest_client.get_market_liquidity(product_id=product_id)
bids = getattr(liquidity, "bids", None) or []
asks = getattr(liquidity, "asks", None) or []
best_bid = Decimal(str(bids[0][0])) if bids else Decimal(0)
best_ask = Decimal(str(asks[0][0])) if asks else Decimal(0)
return best_bid, best_ask
except Exception as exc:
status = None
retry_after = None
try:
status = getattr(getattr(exc, "response", None), "status_code", None)
retry_after = getattr(getattr(exc, "response", None), "headers", {}).get("Retry-After")
except Exception:
pass
if status == 429 and retry_after:
try:
wait = max(wait, float(retry_after))
except Exception:
wait = max(wait, 1.0)
if attempt + 1 % 5 == 0:
self.logger.log(
f"[ethereal] get_market_liquidity failed for {product_id} "
f"(attempt {attempt + 1}/{max_attempts}): {type(exc).__name__}: {exc}",
"WARNING",
)
if attempt < max_attempts - 1:
await asyncio.sleep(wait)
if status == 429:
wait = min(5.0, wait * 2)
else:
wait = min(1.0, wait * 2)
continue
return Decimal(0), Decimal(0)
async def list_positions(self, subaccount_id: Optional[str] = None) -> List[Any]:
"""
Fetch positions for a subaccount using any available SDK method.
- subaccount_id: overrides the default from ENV if provided.
Returns a list (can be empty).
"""
try:
await self._ensure_rest_client()
except ImportError as exc:
self.logger.log(f"ethereal-sdk not installed: {exc}", "ERROR")
return []
rest = self._rest_client
sid = subaccount_id or self.subaccount_id
if not sid:
if not self._warned_missing_subaccount:
self.logger.log("ETHEREAL_SUBACCOUNT_ID not set; cannot fetch positions.", "WARN")
self._warned_missing_subaccount = True
return []
if hasattr(rest, "list_positions"):
try:
resp = await rest.list_positions(subaccount_id=sid)
return self._extract_positions(resp)
except Exception as exc: # pylint: disable=broad-except
self.logger.log(f"[ethereal] list_positions failed: {exc}", "WARNING")
self.logger.log("No position endpoint available on Ethereal client", "ERROR")
return []
def _extract_positions(self, resp: Any) -> List[Any]:
"""Normalize common Ethereal SDK response shapes to a list."""
if resp is None:
return []
if isinstance(resp, list):
return resp
if isinstance(resp, dict):
for key in ("positions", "data", "result", "results"):
if key in resp and isinstance(resp[key], list):
return resp[key]
if hasattr(resp, "positions"):
maybe = getattr(resp, "positions")
if isinstance(maybe, list):
return maybe
return []
def _normalize_status(self, status_raw: Any) -> str:
"""Normalize SDK status to bot-friendly keywords."""
if status_raw is None:
return ""
s = str(status_raw)
if "." in s:
s = s.split(".")[-1]
s_up = s.upper()
if s_up in ("NEW", "PENDING", "OPEN"):
return "OPEN"
if s_up in ("FILLED_PARTIAL", "FILLED_PARTIALLY", "PARTIALLY_FILLED"):
return "PARTIALLY_FILLED"
if s_up == "FILLED":
return "FILLED"
if s_up in ("CANCELED", "CANCELLED", "EXPIRED"):
return "CANCELED"
return s_up
async def get_account_positions(self) -> Decimal:
"""
Retrieve current position size for the configured contract.
The SDK provides AsyncRESTClient.list_positions() and get_position().
"""
await self._ensure_rest_client()
positions = await self.list_positions()
target_pid = await self._get_product_id_for_contract_id(self.config.contract_id)
for pos in positions:
product_id = (
pos.get("product_id")
if isinstance(pos, dict)
else getattr(pos, "product_id", None)
)
if not product_id:
product_id = (
pos.get("productId")
if isinstance(pos, dict)
else getattr(pos, "productId", None)
)
pid_uuid = self._as_uuid(product_id)
if target_pid and pid_uuid and pid_uuid == target_pid:
size_val = (
pos.get("size")
if isinstance(pos, dict)
else getattr(pos, "size", None)
)
try:
return abs(Decimal(str(size_val or "0")))
except Exception:
return Decimal(0)
return Decimal(0)
async def get_contract_attributes(self) -> Tuple[str, Decimal]:
"""
Retrieve contract identifier and tick size for a ticker.
Uses products_by_ticker; falls back to ticker if lookup fails.
"""
ticker = self.config.ticker
if not ticker:
raise ValueError("Ticker is empty")
product = await self._get_product_by_contract_id(self.config.contract_id) or await self._get_product_by_contract_id(ticker)
if product:
contract_key = self._normalize_contract_id(getattr(product, "ticker", ticker))
pid = self._as_uuid(getattr(product, "id", None))
self.config.contract_id = contract_key
tick_size_val = getattr(product, "tick_size", None)
self.config.tick_size = Decimal(str(tick_size_val)) if tick_size_val else Decimal(0)
if contract_key and pid:
self._contract_to_product_id[contract_key] = pid
self._product_id_to_contract[pid] = contract_key
else:
self.config.contract_id = self._normalize_contract_id(ticker)
self.config.tick_size = Decimal(0)
return self.config.contract_id, self.config.tick_size
```
## /exchanges/factory.py
```py path="/exchanges/factory.py"
"""
Exchange factory for creating exchange clients dynamically.
"""
from typing import Dict, Any, Type
from .base import BaseExchangeClient
class ExchangeFactory:
"""Factory class for creating exchange clients."""
_registered_exchanges = {
'edgex': 'exchanges.edgex.EdgeXClient',
'backpack': 'exchanges.backpack.BackpackClient',
'paradex': 'exchanges.paradex.ParadexClient',
'aster': 'exchanges.aster.AsterClient',
'lighter': 'exchanges.lighter.LighterClient',
'grvt': 'exchanges.grvt.GrvtClient',
'extended': 'exchanges.extended.ExtendedClient',
'apex': 'exchanges.apex.ApexClient',
'nado': 'exchanges.nado.NadoClient',
'ethereal': 'exchanges.ethereal.EtherealClient',
'standx': 'exchanges.standx.StandXClient',
}
@classmethod
def create_exchange(cls, exchange_name: str, config: Dict[str, Any]) -> BaseExchangeClient:
"""Create an exchange client instance.
Args:
exchange_name: Name of the exchange (e.g., 'edgex')
config: Configuration dictionary for the exchange
Returns:
Exchange client instance
Raises:
ValueError: If the exchange is not supported
"""
exchange_name = exchange_name.lower()
if exchange_name not in cls._registered_exchanges:
available_exchanges = ', '.join(cls._registered_exchanges.keys())
raise ValueError(f"Unsupported exchange: {exchange_name}. Available exchanges: {available_exchanges}")
# Dynamically import the exchange class only when needed
exchange_class_path = cls._registered_exchanges[exchange_name]
exchange_class = cls._import_exchange_class(exchange_class_path)
return exchange_class(config)
@classmethod
def _import_exchange_class(cls, class_path: str) -> Type[BaseExchangeClient]:
"""Dynamically import an exchange class.
Args:
class_path: Full module path to the exchange class (e.g., 'exchanges.edgex.EdgeXClient')
Returns:
The exchange class
Raises:
ImportError: If the class cannot be imported
ValueError: If the class does not inherit from BaseExchangeClient
"""
try:
module_path, class_name = class_path.rsplit('.', 1)
module = __import__(module_path, fromlist=[class_name])
exchange_class = getattr(module, class_name)
if not issubclass(exchange_class, BaseExchangeClient):
raise ValueError(f"Exchange class {class_name} must inherit from BaseExchangeClient")
return exchange_class
except (ImportError, AttributeError) as e:
raise ImportError(f"Failed to import exchange class {class_path}: {e}")
@classmethod
def get_supported_exchanges(cls) -> list:
"""Get list of supported exchanges.
Returns:
List of supported exchange names
"""
return list(cls._registered_exchanges.keys())
@classmethod
def register_exchange(cls, name: str, exchange_class: type) -> None:
"""Register a new exchange client.
Args:
name: Exchange name
exchange_class: Exchange client class that inherits from BaseExchangeClient
"""
if not issubclass(exchange_class, BaseExchangeClient):
raise ValueError("Exchange class must inherit from BaseExchangeClient")
# Convert class to module path for lazy loading
module_name = exchange_class.__module__
class_name = exchange_class.__name__
class_path = f"{module_name}.{class_name}"
cls._registered_exchanges[name.lower()] = class_path
```
## /hedge_mode.py
```py path="/hedge_mode.py"
#!/usr/bin/env python3
"""
Hedge Mode Entry Point
This script serves as the main entry point for hedge mode trading.
It imports and runs the appropriate hedge mode implementation based on the exchange parameter.
Usage:
python hedge_mode.py --exchange <exchange> [other arguments]
Supported exchanges:
- backpack: Uses HedgeBot from hedge_mode_bp.py (Backpack + Lighter)
- extended: Uses HedgeBot from hedge_mode_ext.py (Extended + Lighter)
- apex: Uses HedgeBot from hedge_mode_apex.py (Apex + Lighter)
- grvt: Uses HedgeBot from hedge_mode_grvt.py (GRVT + Lighter)
Use --v2 flag to use hedge_mode_grvt_v2.py instead
- edgex: Uses HedgeBot from hedge_mode_edgex.py (edgeX + Lighter)
- nado: Uses HedgeBot from hedge_mode_nado.py (Nado + Lighter)
- standx: Uses HedgeBot from hedge_mode_standx.py (StandX + Lighter)
Cross-platform compatibility:
- Works on Linux, macOS, and Windows
- Direct imports instead of subprocess calls for better performance
"""
import asyncio
import sys
import argparse
from decimal import Decimal
from pathlib import Path
import dotenv
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Hedge Mode Trading Bot Entry Point',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python hedge_mode.py --exchange backpack --ticker BTC --size 0.002 --iter 10
python hedge_mode.py --exchange extended --ticker ETH --size 0.1 --iter 5
python hedge_mode.py --exchange apex --ticker BTC --size 0.002 --iter 10
python hedge_mode.py --exchange grvt --ticker BTC --size 0.05 --iter 10 --max-position 0.1
python hedge_mode.py --exchange grvt --v2 --ticker BTC --size 0.05 --iter 10 --max-position 0.1
python hedge_mode.py --exchange edgex --ticker BTC --size 0.001 --iter 20
python hedge_mode.py --exchange nado --ticker BTC --size 0.003 --iter 20 --max-position 0.05
python hedge_mode.py --exchange standx --ticker BTC --size 0.003 --iter 20 --max-position 0.05
"""
)
parser.add_argument('--exchange', type=str, required=True,
help='Exchange to use (backpack, extended, apex, grvt, or edgex)')
parser.add_argument('--ticker', type=str, default='BTC',
help='Ticker symbol (default: BTC)')
parser.add_argument('--size', type=str, required=True,
help='Number of tokens to buy/sell per order')
parser.add_argument('--iter', type=int, required=True,
help='Number of iterations to run')
parser.add_argument('--fill-timeout', type=int, default=5,
help='Timeout in seconds for maker order fills (default: 5)')
parser.add_argument('--sleep', type=int, default=0,
help='Sleep time in seconds after each step (default: 0)')
parser.add_argument('--env-file', type=str, default=".env",
help=".env file path (default: .env)")
parser.add_argument('--max-position', type=Decimal, default=Decimal('0'),
help='Maximum position to hold (default: 0)')
parser.add_argument('--v2', action='store_true',
help='Use v2 implementation (currently only supported for grvt exchange)')
return parser.parse_args()
def validate_exchange(exchange):
"""Validate that the exchange is supported."""
supported_exchanges = ['backpack', 'extended', 'apex', 'grvt', 'edgex', 'nado', 'standx']
if exchange.lower() not in supported_exchanges:
print(f"Error: Unsupported exchange '{exchange}'")
print(f"Supported exchanges: {', '.join(supported_exchanges)}")
sys.exit(1)
def get_hedge_bot_class(exchange, v2=False):
"""Import and return the appropriate HedgeBot class."""
try:
if exchange.lower() == 'backpack':
from hedge.hedge_mode_bp import HedgeBot
return HedgeBot
elif exchange.lower() == 'extended':
from hedge.hedge_mode_ext import HedgeBot
return HedgeBot
elif exchange.lower() == 'apex':
from hedge.hedge_mode_apex import HedgeBot
return HedgeBot
elif exchange.lower() == 'grvt':
if v2:
from hedge.hedge_mode_grvt_v2 import HedgeBot
else:
from hedge.hedge_mode_grvt import HedgeBot
return HedgeBot
elif exchange.lower() == 'edgex':
from hedge.hedge_mode_edgex import HedgeBot
return HedgeBot
elif exchange.lower() == 'nado':
from hedge.hedge_mode_nado import HedgeBot
return HedgeBot
elif exchange.lower() == 'standx':
from hedge.hedge_mode_standx import HedgeBot
return HedgeBot
else:
raise ValueError(f"Unsupported exchange: {exchange}")
except ImportError as e:
print(f"Error importing hedge mode implementation: {e}")
sys.exit(1)
async def main():
"""Main entry point that creates and runs the appropriate hedge bot."""
args = parse_arguments()
env_path = Path(args.env_file)
if not env_path.exists():
print(f"Env file not find: {env_path.resolve()}")
sys.exit(1)
dotenv.load_dotenv(args.env_file)
# Validate exchange
validate_exchange(args.exchange)
# Validate v2 flag usage
if args.v2 and args.exchange.lower() != 'grvt':
print(f"Error: --v2 flag is only supported for grvt exchange")
sys.exit(1)
# Get the appropriate HedgeBot class
try:
HedgeBotClass = get_hedge_bot_class(args.exchange, v2=args.v2)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
version_str = " v2" if args.v2 else ""
print(f"Starting hedge mode for {args.exchange} exchange{version_str}...")
print(f"Ticker: {args.ticker}, Size: {args.size}, Iterations: {args.iter}")
print("-" * 50)
try:
# v2 bot has different constructor signature (no iterations/sleep_time)
if args.v2 and args.exchange.lower() == 'grvt':
bot = HedgeBotClass(
ticker=args.ticker.upper(),
order_quantity=Decimal(args.size),
fill_timeout=args.fill_timeout,
max_position=args.max_position
)
elif args.exchange in ['backpack', 'edgex', 'nado', 'grvt', 'standx']:
bot = HedgeBotClass(
ticker=args.ticker.upper(),
order_quantity=Decimal(args.size),
fill_timeout=args.fill_timeout,
iterations=args.iter,
sleep_time=args.sleep,
max_position=args.max_position
)
else:
bot = HedgeBotClass(
ticker=args.ticker.upper(),
order_quantity=Decimal(args.size),
fill_timeout=args.fill_timeout,
iterations=args.iter,
sleep_time=args.sleep
)
# Run the bot
await bot.run()
except KeyboardInterrupt:
print("\nHedge mode interrupted by user")
return 1
except Exception as e:
print(f"Error running hedge mode: {e}")
import traceback
print(f"Full traceback: {traceback.format_exc()}")
return 1
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
```
## /helpers/__init__.py
```py path="/helpers/__init__.py"
"""
Helper modules for perp-dex-tools.
"""
from .logger import TradingLogger
__all__ = ['TradingLogger']
```
## /helpers/ethereal/README.md
# Ethereal Subaccount 绑定工具
## 使用方法
本目录包含两个辅助工具,用于完成 Ethereal “linked signer” 绑定所需的双重签名流程:
1. `subaccount_eip712.html`
- 作用:用浏览器 + MetaMask 生成主钱包的 EIP-712 `signature`(授权把 linked signer 绑定到子账户),并生成需要的请求体/命令。
- 用法:
- 用浏览器打开该文件,连接主钱包。
- 填入 `signer`(待绑定的 linked signer 地址),保持 `subaccount label` 默认 `primary`。
- 点击 “Sign Typed Data” 生成 `signature`,页面会自动生成 `signerSignature` 的 Python 命令。
- 将 linked signer 私钥放在 `.env` 的 `ETHEREAL_PRIVATE_KEY` 后,运行生成的命令,得到 `signerSignature`,粘贴回页面。
- 点击 “Link Subaccount” 发送 `/v1/linked-signer/link` 请求,状态区会显示成功/失败。
- 安全性:主钱包签名在浏览器/钱包内完成,避免以明文暴露主钱包私钥;官方 UI 暂未提供该绑定流程,因此用本地页面完成交互。
2. `sign_linked_signer.py`
- 作用:用 linked signer 私钥生成 `signerSignature`(证明你控制待绑定的 signer 地址)。
- 前置:在 `.env` 配置 `ETHEREAL_PRIVATE_KEY` 为 linked signer 私钥。
- 用法示例(参数来自页面提示):
```bash
python helpers/ethereal/sign_linked_signer.py \
--sender <主钱包地址> \
--subaccount <bytes32 子账户名> \
--nonce <页面给出的 nonce> \
--signed-at <页面给出的 signedAt>
```
仅输出签名 hex,将其填入 HTML 的 `signerSignature` 输入框即可。
## 为什么需要用工具绑定子账户?
- Ethereal 官方界面暂未提供子账户绑定 linked signer 的前端;需要用户自己完成 EIP-712 授权。
- 主钱包私钥不应离开钱包/浏览器环境,HTML + MetaMask 可避免在本地脚本中明文处理主钱包私钥。
- linked signer 私钥只用于生成 `signerSignature`,可放在 `.env` 后由 Python 脚本离线签名。
## 可能遇到的问题
1. subaccount label 目前只支持 'primary' 的 bytes32 编码,使用其他编码会报错(暂时不知道原因)
2. subaccount address 如果之前添加过,然后 revoke 了,不能再次添加,需要换一个新的钱包来作为 subaccount 绑定
3. subaccount 不需要 gas ,临时生成一个即可(例如 `cast wallet new`)
---
请在安全环境中使用以上工具,并确认网络指向官方 API(默认 `https://api.ethereal.trade`)。\*\*\*
## /helpers/ethereal/sign_linked_signer.py
```py path="/helpers/ethereal/sign_linked_signer.py"
"""
Utility to produce signerSignature for Ethereal linked signer flow.
Minimal use:
python sign_linked_signer.py \
--sender 0xYourEOA \
--subaccount 0xYourBytes32Label
Other fields auto-generated:
- signer = address derived from ETHEREAL_PRIVATE_KEY (read from .env)
- nonce = time.time_ns()
- signedAt= current seconds
- chainId / verifyingContract / domain name+version use Ethereal defaults (override flags if needed)
Install requirement if missing: pip install eth_account python-dotenv
"""
import argparse
import json
import os
import sys
import time
from typing import Any, Dict
from dotenv import load_dotenv
from eth_account import Account
from eth_account.messages import encode_typed_data
from eth_utils import to_checksum_address
TYPES = {
"EIP712Domain": [
{"name": "name", "type": "string"},
{"name": "version", "type": "string"},
{"name": "chainId", "type": "uint256"},
{"name": "verifyingContract", "type": "address"},
],
"LinkSigner": [
{"name": "sender", "type": "address"},
{"name": "signer", "type": "address"},
{"name": "subaccount", "type": "bytes32"},
{"name": "nonce", "type": "uint64"},
{"name": "signedAt", "type": "uint64"},
],
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Create signerSignature for Ethereal linked signer")
parser.add_argument("--sender", required=True, help="EOA that owns the subaccount")
parser.add_argument("--subaccount", required=True, help="bytes32 subaccount label (0x...)")
parser.add_argument("--nonce", help="uint64 nonce as string; default: time.time_ns()")
parser.add_argument("--signed-at", dest="signed_at", type=int, help="Signed at timestamp (seconds); default: now")
parser.add_argument("--chain-id", dest="chain_id", type=int, default=5064014, help="Chain ID (default: 5064014)")
parser.add_argument(
"--verifying-contract",
dest="verifying_contract",
default="0xb3cdc82035c495c484c9ff11ed5f3ff6d342e3cc",
help="Domain verifyingContract (default: Ethereal mainnet)",
)
parser.add_argument("--name", default="Ethereal", help="Domain name (default: Ethereal)")
parser.add_argument("--version", default="1", help="Domain version (default: 1)")
return parser.parse_args()
def build_typed_data(args: argparse.Namespace) -> Dict[str, Any]:
domain = {
"name": args.name,
"version": args.version,
"chainId": args.chain_id,
"verifyingContract": args.verifying_contract,
}
message = {
"sender": args.sender,
"signer": args.signer,
"subaccount": args.subaccount,
"nonce": str(args.nonce),
"signedAt": int(args.signed_at),
}
return {
"types": TYPES,
"primaryType": "LinkSigner",
"domain": domain,
"message": message,
}
def main():
load_dotenv()
args = parse_args()
priv_key = os.getenv("ETHEREAL_PRIVATE_KEY")
if not priv_key:
sys.stderr.write("ETHEREAL_PRIVATE_KEY not set in environment or .env\n")
sys.exit(1)
account = Account.from_key(priv_key)
# Autofill signer, nonce, signedAt if missing
args.signer = to_checksum_address(account.address)
args.sender = to_checksum_address(args.sender)
args.subaccount = args.subaccount
if not args.nonce:
args.nonce = str(time.time_ns())
if not args.signed_at:
args.signed_at = int(time.time())
typed_data = build_typed_data(args)
signable = encode_typed_data(full_message=typed_data)
signed = account.sign_message(signable)
# Output only the signature hex (signerSignature)
sys.stdout.write("\n0x" + signed.signature.hex() + "\n")
if __name__ == "__main__":
main()
```
## /helpers/lark_bot.py
```py path="/helpers/lark_bot.py"
import os
import ssl
import aiohttp
from typing import Dict, Any, Optional
import certifi
BASE_URL = "https://www.feishu.cn/flow/api/trigger-webhook/"
class LarkBot:
def __init__(self, token: str, base_url: Optional[str]=None):
self.token = token
self.base_url = base_url if base_url else BASE_URL
self.webhook_url = f"{self.base_url.rstrip('/')}/{self.token}"
self.ssl_context = ssl.create_default_context(cafile=certifi.where())
self.connector = aiohttp.TCPConnector(limit=5, ssl=self.ssl_context)
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=5),
trust_env=True
)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def close(self):
"""close ClientSession"""
if self.session:
await self.session.close()
async def send_text(self, content: str) -> Dict[str, Any]:
payload = {
"msg_type": "text",
"content": {
"text": content
}
}
return await self._send_message(payload)
async def _send_message(self, payload: Dict[str, Any]) -> Dict[str, Any]:
if not self.session:
self.session = aiohttp.ClientSession()
try:
async with self.session.post(self.webhook_url, json=payload) as response:
response_data = await response.json()
if response.status != 200 or response_data.get("code", 0) != 0:
print(f"Lark send message failed: {response_data}")
return response_data
except Exception as e:
print(f"Lark send message failed: {e}");
return {"code": -1, "error": str(e)}
# example
async def main():
lark_token = os.getenv("LARK_TOKEN")
if not lark_token:
print("LARK_TOKEN is not set")
return
async with LarkBot(lark_token) as bot:
text_response = await bot.send_text("This is a test message!")
print("Text response:", text_response)
if __name__ == "__main__":
import asyncio
import dotenv
dotenv.load_dotenv()
asyncio.run(main())
```
## /helpers/lighter_ws.py
```py path="/helpers/lighter_ws.py"
import os
def _is_truthy(value: str) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"}
def build_lighter_ws_url() -> str:
"""Build Lighter WebSocket URL with optional legacy server ping fallback."""
base_url = os.getenv("LIGHTER_WS_URL", "wss://mainnet.zklighter.elliot.ai/stream").strip()
if not base_url:
base_url = "wss://mainnet.zklighter.elliot.ai/stream"
use_server_pings = _is_truthy(os.getenv("LIGHTER_WS_SERVER_PINGS", "false"))
if not use_server_pings:
return base_url
separator = "&" if "?" in base_url else "?"
return f"{base_url}{separator}server_pings=true"
def lighter_ws_connect_kwargs() -> dict:
"""
WebSocket settings compatible with both old and new Lighter WS behavior.
- Send client ping frames regularly (required by new server behavior).
- Keep permessage-deflate enabled.
"""
return {
"ping_interval": 50,
"ping_timeout": 20,
"compression": "deflate",
"max_queue": 1024,
}
```
## /para_requirements.txt
python-dotenv>=1.0.0
pytz>=2025.2
asyncio==4.0.0
aiohttp>=3.8.0
websocket-client>=1.6.0
pydantic>=1.8.0
pycryptodome>=3.15.0
ecdsa>=0.17.0
requests==2.32.5
eval_type_backport
tenacity>=9.1.2
# Paradex Python SDK
git+https://github.com/tradeparadex/paradex-py.git@7eb7aa3825d466b2f14abd3e94f2ce6b002d6a63
The content has been capped at 50000 tokens. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.