年化50%跨所价差套利实战教学
本人管理目前10m资金做套利
这是我一个实盘的资金曲线
所以接下来的内容绝对干货

什么是跨所价差套利?
某品种,在A交易所:Ask1报价为99,而在B交易所:Bid1报价为101。
此时,你在A交易所买入,与卖一撮合,成交价为99
在B交易所卖出,与买一撮合,成交价为101
你就锚定了这101-99的差价,在扣除A和B的手续费后还有利可图,那么这就是个套利机会
不少现货交易所的价差可以达到20-30%
每次来黑天鹅你总能听到某某搬价差赚到了多少多少钱
是不是让你很心动?
听起来很简单,其实要解决的问题,很多:
- 如何知道最快的行情,如何最快的执行下单
- 订单类型如何选择:市价?Maker?IOC?
- 手续费如何最低
- 价差不回归怎么办
- 价差盈利资金费率亏损怎么办
- 单边爆仓怎么办
- 怎么避免ADL
- 交易所拉闸变成单机币怎么办
- 交易所改规则Tick精度变化怎么办
等等等等,都是跨所需要考虑的事情,这些细节风控,在我的团队我已经列出来40多条需要做的风控尽调。
由于篇幅太长,本文会讲解第一条和第二条如何实现的实战教学。
关注我,我以后会更新教学如何现货不提币变相套利的两招
以及Maker三角套利等等
我做过的套利策略:站内期限、三角套利、跨所价差、跨所Maker后Taker、精度套利
当年我用20万,日化1%,做套利半年滚到了100万,那个时代已经过去了
现在只有流动性特别低的小所和黑天鹅才会有这么高的收益
不过据我所知,在现货、链上,还是有人可以做到1m以上资金年化100%
实战
工具:OpenQuant高频交易系统(openquant.cc)
(Rust写的交易系统,可以最快的实现事件行情订阅驱动策略、Ws下单、Cookie下单、绑定机构id、自动化处理交易所价格精度、步长、面值等工程问题。使用最少的代码即可写出一个可行的价差套利策略)
服务器:东京Aws服务器 500MB内存足矣
(这个不用解释,大部分交易所都在Aws东京)
1. 极速行情的获取:Tick 级驱动
高频策略的本质是数据流处理。传统轮询行情太慢,但完全依赖推送又容易处理不过来。OpenQuant 允许我们针对不同类型的数据,配置不同的处理模式。
在 subscribes 方法中,我们首先要解决这三种数据流的订阅:
-
极速行情(BBO):使用 Latest 模式。高频场景下,100ms 前的报价是垃圾。如果策略处理不过来,系统自动丢弃积压包,永远只喂给策略最新的 Tick。
-
订单回报(Order):使用 Async 模式。订单状态绝对不能丢,否则会导致状态机错乱。
-
资金查询(Timer):使用 Timer 轮询。依靠 WS 推送余额往往不稳定。为了确保下单前保证金校验(Risk Check)准确,我们单独开一个定时器,每 3 秒主动查一次余额,更新本地状态。
def subscribes(self):
"""
配置数据流订阅
"""
subs = []
account_ids = {exchange['exchange']: idx for idx, exchange in enumerate(self.cex_configs)}
# 1. 行情流:BBO -> Latest 模式 (只看最新,丢弃过时包)
for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
subs.append({
"account_id": account_ids[exchange],
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [{"Bbo": [self.symbol]}]
}
})
# 2. 交易流:Order -> Async 模式 (队列处理,确保不漏单)
for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
subs.append({
"account_id": account_ids[exchange],
"event_handle_mode": "Async",
"sub": {
"SubscribeWs": [{"Order": [self.symbol]}]
}
})
# 3. 风控流:Balance -> Timer 模式 (每3秒主动轮询)
subs.append({
"account_id": 0,
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 3, "nanos": 0},
"name": "QueryBalance"
}
}
})
return subs
2. 核心逻辑:计算与触发
当行情推送到 on_bbo 回调时,我们不做任何复杂的预测,只做最简单的数学题。
这里我们定义了两个角色:
- 高流动性所 :如 Binance 等,深度好,价格稳,滑点低,作为行情的锚点。
- 低流动性所 :中小交易所流动性更差,更容易出现短暂的价差机会。
代码逻辑非常直观:
def _check_and_trade(self):
# 计算方向:低流动性所开仓,高流动性所平仓
# Spread = (Bid_H - Ask_L) / Ask_L
buy_low_spread = (self.high_liq_bid - self.low_liq_ask) / self.low_liq_ask
# 扣除双边 Taker 手续费
total_fee = self.low_liq_taker_fee + self.high_liq_taker_fee
net_spread = buy_low_spread - total_fee
# 只有当净价差大于我们设定的阈值(例如万2)时,才触发交易
if net_spread > self.spread_threshold:
# 触发低流所开仓流程
self._execute_arbitrage(is_buy_low=True, spread=net_spread)
3. 订单与先后顺序
如果你用普通的限价单,可能永远挂在盘口吃不到单,或者吃单之后早已错过了套利时机;如果你用市价单,滑点可能会吃掉你所有的利润。
所以对于开仓而言的最优解是使用 IOC 订单:要么立即成交,要么取消,绝不挂单等待。
此外,下单顺序至关重要。低流动性所的机会转瞬即逝,且一般是跟随、落后于高流动性所的,因此先打低流动性所的 IOC 单,在保证滑点可控的情况下吃到货,然后立刻去高流动性所对冲。
def _execute_arbitrage(self, is_buy_low: bool, spread: float):
# 价格计算:买单加滑点,卖单减滑点,保证成交率
# ioc_slippage 设置为 0.0001 (万1)
if is_buy_low:
low_liq_price = self.low_liq_ask * (1 + self.ioc_slippage)
else:
low_liq_price = self.low_liq_bid * (1 - self.ioc_slippage)
# 构造 IOC 订单
order = {
"symbol": self.symbol,
"side": "Buy" if is_buy_low else "Sell",
"order_type": "Limit", # 限价单配合IOC模式
"amount": self.trade_amount,
"price": low_liq_price,
"time_in_force": "IOC", # 立即成交或取消,不挂单
"cid": cid
}
# 异步发送,不阻塞策略线程
self.trader.place_order(..., order=order, sync=False)
# 打开对冲开关
self.pending_high_liq_order = True
self.pending_high_liq_side = "Sell" if is_buy_low else "Buy"
在这个逻辑中,我们利用 OpenQuant 自动处理了最麻烦的精度对齐和最小下单量检查,代码里只需要关注“买卖”本身。
4. 闭环:极速回调对冲
这才是 OpenQuant 相比轮询框架最大的优势。我们不在发单后死等,而是通过 on_order 回调监听。
一旦收到低流动性所的 Filled 状态,毫秒级内自动触发高流动性所(Binance)的对冲单。
def on_order(self, account_id, order):
"""
订单状态推送回调
"""
# ... 解析 order_info ...
# 核心逻辑:如果是低流所成交,且处于等待对冲状态
if (order_info["exchange"] == self.low_liq_exchange and
filled > 0 and
self.pending_high_liq_order):
# 立即执行对冲逻辑
self._place_high_liq_order(filled)
def _place_high_liq_order(self, amount):
"""
在高流动性所(Binance)执行对冲
"""
# 计算对冲价格:同样给滑点,确保必定成交
is_buy = (self.pending_high_liq_side == "Buy")
price = self.high_liq_ask * (1.0001) if is_buy else self.high_liq_bid * (0.9999)
order = {
"symbol": self.symbol,
"side": self.pending_high_liq_side,
"order_type": "Limit",
"amount": amount, # 对冲数量严格等于成交数量
"price": price,
"time_in_force": "IOC",
"cid": trader.create_cid(self.high_liq_exchange)
}
# 发送对冲单
self.trader.place_order(..., order=order, sync=False)
self.pending_high_liq_order = False
5. 现实的复杂性
实盘就是不断解决这些问题的过程。下面是我们的跨所套利策略在 OpenQuant 上实际运行的盈利曲线抓拍。可以看到,得益于 OpenQuant Rust 底层的低延迟,我们可以成功捕捉到两腿之间极其微小的价差,积少成多,获得 50% 的年化收益率。
OpenQuant 的初衷就是解决那些繁琐的工程问题,让策略开发者能像写上面的 Python 代码一样,专注于策略逻辑本身。最基础的代码演示附在了最后,如果你对高频套利感兴趣,可以参考它,在它的基础上不断完善,形成自己专属的跨所套利策略。
完整代码:
"""
简单跨所套利策略 - 教学示例
年化50%跨所价差套利实战教学
"""
import time
import traderv2 as trader # type: ignore
from base_strategy import BaseStrategy
class Strategy(BaseStrategy):
def __init__(self, cex_configs, dex_configs, strategy_config, trader_instance: trader.TraderV2):
self.log = trader_instance.log
self.trader = trader_instance
self.strategy_config = strategy_config
self.cex_configs = cex_configs
self.orders = {} # cid -> order_info
self.pending_high_liq_order = False
self.pending_high_liq_side = None
self.current_round_orders = []
self.total_trades = 0
self.total_pnl = 0.0
self.high_liq_ask = 0.0
self.high_liq_bid = 0.0
self.low_liq_ask = 0.0
self.low_liq_bid = 0.0
self.instruments = {}
# 存储余额信息
self.high_liq_balance = None
self.low_liq_balance = None
def name(self):
return self.strategy_config["name"]
def start(self):
"""策略启动"""
# 配置参数
self.high_liq_exchange = self.strategy_config["high_liquidity_exchange"]
self.low_liq_exchange = self.strategy_config["low_liquidity_exchange"]
self.symbol = self.strategy_config["symbol"]
self.spread_threshold = self.strategy_config.get("spread_threshold", 0.0002)
self.trade_amount = self.strategy_config.get("trade_amount", 100.0)
self.ioc_slippage = self.strategy_config.get("ioc_slippage", 0.0001)
self.min_balance_ratio = self.strategy_config.get("min_balance_ratio", 0.3)
self.leverage = self.strategy_config.get("leverage", 5)
# 获取账户ID
account_ids = {exchange['exchange']: idx for idx, exchange in enumerate(self.cex_configs)}
self.high_liq_account_id = account_ids[self.high_liq_exchange]
self.low_liq_account_id = account_ids[self.low_liq_exchange]
# 获取交易对信息
self._load_instruments()
# 获取手续费(优先从配置读取,读不到则从API查询)
self._load_fee_rates()
# 设置杠杆
self._set_leverage()
# 初始化余额
self._load_balances()
self.log(f"策略启动成功!", level="INFO", color="green")
self.log(f"高流动性所: {self.high_liq_exchange}", level="INFO")
self.log(f"低流动性所: {self.low_liq_exchange}", level="INFO")
self.log(f"交易品种: {self.symbol}", level="INFO")
self.log(f"价差阈值: {self.spread_threshold:.4%}", level="INFO")
self.log(f"交易数量: {self.trade_amount}", level="INFO")
self.log(f"杠杆倍数: {self.leverage}x", level="INFO")
def _load_instruments(self):
"""加载交易对信息"""
for account_id, exchange in [(self.high_liq_account_id, self.high_liq_exchange),
(self.low_liq_account_id, self.low_liq_exchange)]:
result = self.trader.get_instrument(account_id, self.symbol)
time.sleep(0.1)
if "Ok" in result:
self.instruments[exchange] = result["Ok"]
self.log(f"{exchange} 交易对信息加载成功", level="INFO")
else:
raise Exception(f"获取 {exchange} 交易对信息失败: {result.get('Err')}")
def _load_fee_rates(self):
"""加载手续费率"""
accounts_config = self.strategy_config.get("accounts", [])
if len(accounts_config) > max(self.high_liq_account_id, self.low_liq_account_id):
high_liq_config = accounts_config[self.high_liq_account_id]
low_liq_config = accounts_config[self.low_liq_account_id]
self.high_liq_taker_fee = high_liq_config.get("taker_fee")
self.low_liq_taker_fee = low_liq_config.get("taker_fee")
else:
self.high_liq_taker_fee = None
self.low_liq_taker_fee = None
if self.high_liq_taker_fee is None:
result = self.trader.get_fee_rate(self.high_liq_account_id, self.symbol)
time.sleep(0.1)
if "Ok" in result:
self.high_liq_taker_fee = result["Ok"].get("taker", 0.0005)
self.log(f"{self.high_liq_exchange} 手续费率(API查询): {self.high_liq_taker_fee:.6f}",
level="INFO")
else:
self.high_liq_taker_fee = 0.0005
self.log(f"{self.high_liq_exchange} 手续费率查询失败,使用默认值", level="WARN")
if self.low_liq_taker_fee is None:
result = self.trader.get_fee_rate(self.low_liq_account_id, self.symbol)
time.sleep(0.1)
if "Ok" in result:
self.low_liq_taker_fee = result["Ok"].get("taker", 0.0005)
self.log(f"{self.low_liq_exchange} 手续费率(API查询): {self.low_liq_taker_fee:.6f}",
level="INFO")
else:
self.low_liq_taker_fee = 0.0005
self.log(f"{self.low_liq_exchange} 手续费率查询失败,使用默认值", level="WARN")
def _set_leverage(self):
"""设置杠杆倍数"""
for account_id, exchange in [(self.high_liq_account_id, self.high_liq_exchange),
(self.low_liq_account_id, self.low_liq_exchange)]:
result = self.trader.set_leverage(account_id, self.symbol, self.leverage)
time.sleep(0.1)
if "Ok" in result:
self.log(f"{exchange} 设置杠杆 {self.leverage}x 成功", level="INFO", color="green")
else:
self.log(f"{exchange} 设置杠杆失败: {result.get('Err')}", level="WARN", color="yellow")
def _load_balances(self):
"""初始化余额"""
high_result = self.trader.get_usdt_balance(self.high_liq_account_id)
time.sleep(0.1)
if "Ok" in high_result:
self.high_liq_balance = high_result["Ok"]
self.log(f"{self.high_liq_exchange} 余额加载成功: 可用 {self.high_liq_balance.get('available_balance', 0):.2f} USDT",
level="INFO")
else:
self.log(f"{self.high_liq_exchange} 余额加载失败: {high_result.get('Err')}", level="WARN", color="yellow")
low_result = self.trader.get_usdt_balance(self.low_liq_account_id)
time.sleep(0.1)
if "Ok" in low_result:
self.low_liq_balance = low_result["Ok"]
self.log(f"{self.low_liq_exchange} 余额加载成功: 可用 {self.low_liq_balance.get('available_balance', 0):.2f} USDT",
level="INFO")
else:
self.log(f"{self.low_liq_exchange} 余额加载失败: {low_result.get('Err')}", level="WARN", color="yellow")
def subscribes(self):
"""订阅配置"""
account_ids = {exchange['exchange']: idx for idx, exchange in enumerate(self.cex_configs)}
subs = []
# 订阅 BBO - Latest 模式
for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
subs.append({
"account_id": account_ids[exchange],
"event_handle_mode": "Latest",
"sub": {
"SubscribeWs": [{"Bbo": [self.symbol]}]
}
})
# 订阅订单更新 - Async 模式
for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
subs.append({
"account_id": account_ids[exchange],
"event_handle_mode": "Async",
"sub": {
"SubscribeWs": [{"Order": [self.symbol]}]
}
})
# 定时查询余额
subs.append({
"account_id": 0,
"sub": {
"SubscribeTimer": {
"update_interval": {"secs": 3, "nanos": 0},
"name": "QueryBalance"
}
}
})
return subs
def on_bbo(self, exchange, bbo):
"""BBO 更新回调"""
# 更新价格
if exchange == self.high_liq_exchange:
self.high_liq_ask = bbo["ask_price"]
self.high_liq_bid = bbo["bid_price"]
elif exchange == self.low_liq_exchange:
self.low_liq_ask = bbo["ask_price"]
self.low_liq_bid = bbo["bid_price"]
# 检查价差
if self.high_liq_ask > 0 and self.low_liq_ask > 0:
self._check_and_trade()
def _check_and_trade(self):
"""检查价差并执行交易"""
if self.pending_high_liq_order or len(self.orders) > 0:
return
buy_low_spread = (self.high_liq_bid - self.low_liq_ask) / self.low_liq_ask
sell_low_spread = (self.low_liq_bid - self.high_liq_ask) / self.high_liq_ask
total_fee = self.low_liq_taker_fee + self.high_liq_taker_fee
buy_low_spread -= total_fee
sell_low_spread -= total_fee
if buy_low_spread > self.spread_threshold:
self._execute_arbitrage(is_buy_low=True, spread=buy_low_spread)
elif sell_low_spread > self.spread_threshold:
self._execute_arbitrage(is_buy_low=False, spread=sell_low_spread)
def _execute_arbitrage(self, is_buy_low: bool, spread: float):
"""执行套利交易"""
if not self._check_balance():
return
if is_buy_low:
low_liq_price = self.low_liq_ask * (1 + self.ioc_slippage)
else:
low_liq_price = self.low_liq_bid * (1 - self.ioc_slippage)
cid = trader.create_cid(self.low_liq_exchange)
order = {
"symbol": self.symbol,
"side": "Buy" if is_buy_low else "Sell",
"order_type": "Limit",
"amount": self.trade_amount,
"price": low_liq_price,
"time_in_force": "IOC",
"pos_side": "Long" if is_buy_low else "Short",
"cid": cid
}
params = {
"is_dual_side": False,
"margin_mode": "Cross"
}
order_info = {
"exchange": self.low_liq_exchange,
"side": "Buy" if is_buy_low else "Sell",
"amount": self.trade_amount,
"price": low_liq_price,
"filled": 0.0,
"avg_price": 0.0,
"status": "Pending"
}
self.orders[cid] = order_info
result = self.trader.place_order(
account_id=self.low_liq_account_id,
order=order,
params=params,
sync=False
)
if "Ok" in result:
self.log(f"<b>发现套利机会!</b> 价差: {spread:.8%}", level="INFO", color="green")
self.log(f"步骤1: {self.low_liq_exchange} {'买入' if is_buy_low else '卖出'} "
f"{self.trade_amount} @ {low_liq_price:.5f}", level="INFO", color="green")
self.pending_high_liq_order = True
self.pending_high_liq_side = "Sell" if is_buy_low else "Buy"
else:
del self.orders[cid]
self.log(f"下单失败: {result.get('Err')}", level="ERROR", color="red")
def _check_balance(self) -> bool:
"""检查余额是否充足(直接读取已存储的余额)"""
if not self.high_liq_balance or not self.low_liq_balance:
return False
high_available = self.high_liq_balance.get("available_balance", 0.0)
low_available = self.low_liq_balance.get("available_balance", 0.0)
trade_value = self.trade_amount * max(self.high_liq_ask, self.low_liq_ask)
margin_needed = trade_value / self.leverage
if high_available < margin_needed / self.min_balance_ratio:
self.log(f"高流动性所余额不足", level="WARN", color="yellow")
return False
if low_available < margin_needed / self.min_balance_ratio:
self.log(f"低流动性所余额不足", level="WARN", color="yellow")
return False
return True
def on_order_submitted(self, account_id, order_id_result, order):
"""订单提交回调"""
cid = order.get("cid")
if not cid or cid not in self.orders:
return
order_info = self.orders[cid]
if "Err" in order_id_result:
self.log(f"订单提交失败: {order_id_result['Err']}", level="ERROR", color="red")
order_info["status"] = "Rejected"
del self.orders[cid]
else:
order_id = order_id_result["Ok"]
order_info["order_id"] = order_id
order_info["status"] = "Submitted"
self.log(f"订单提交成功: {order_id}", level="INFO", color="green")
def on_order(self, account_id, order):
"""订单更新回调"""
cid = order.get("cid")
if not cid or cid not in self.orders:
return
order_info = self.orders[cid]
status = order.get("status", "")
filled = order.get("filled", 0.0)
filled_avg_price = order.get("filled_avg_price", 0.0)
order_info["status"] = status
order_info["filled"] = filled
if filled > 0:
order_info["avg_price"] = filled_avg_price
if (order_info["exchange"] == self.low_liq_exchange and
filled > 0 and
self.pending_high_liq_order and
not order_info.get("high_liq_placed")):
order_info["high_liq_placed"] = True
self._place_high_liq_order(filled)
if status in ["Filled", "Canceled"]:
self.log(f"{order_info['exchange']} 订单完成: {status}, "
f"成交 {filled} @ {filled_avg_price:.5f}",
level="INFO", color="green")
if filled > 0:
self.current_round_orders.append({
"exchange": order_info["exchange"],
"side": order_info["side"],
"filled": filled,
"avg_price": filled_avg_price
})
del self.orders[cid]
if len(self.orders) == 0 and not self.pending_high_liq_order:
self._settle_round()
def _place_high_liq_order(self, low_liq_filled_amount):
"""下高流动性所订单"""
if not self.pending_high_liq_order:
return
is_buy = (self.pending_high_liq_side == "Buy")
if is_buy:
price = self.high_liq_ask * (1 + self.ioc_slippage)
else:
price = self.high_liq_bid * (1 - self.ioc_slippage)
cid = trader.create_cid(self.high_liq_exchange)
order = {
"symbol": self.symbol,
"side": self.pending_high_liq_side,
"order_type": "Limit",
"amount": low_liq_filled_amount,
"price": price,
"time_in_force": "IOC",
"pos_side": "Long" if self.pending_high_liq_side == "Buy" else "Short",
"cid": cid
}
params = {
"is_dual_side": False,
"margin_mode": "Cross"
}
order_info = {
"exchange": self.high_liq_exchange,
"side": self.pending_high_liq_side,
"amount": low_liq_filled_amount,
"price": price,
"filled": 0.0,
"avg_price": 0.0,
"status": "Pending"
}
self.orders[cid] = order_info
result = self.trader.place_order(
account_id=self.high_liq_account_id,
order=order,
params=params,
sync=False
)
if "Ok" in result:
self.pending_high_liq_order = False
self.log(f"步骤2: {self.high_liq_exchange} {self.pending_high_liq_side} "
f"{low_liq_filled_amount} @ {price:.5f}", level="INFO", color="green")
else:
del self.orders[cid]
self.pending_high_liq_order = False
self.log(f"高流动性所下单失败: {result.get('Err')}", level="ERROR", color="red")
def _settle_round(self):
"""结算本轮交易"""
if len(self.current_round_orders) != 2:
self.current_round_orders.clear()
return
# 计算盈亏
buy_cost = 0.0
sell_revenue = 0.0
for order in self.current_round_orders:
value = order["filled"] * order["avg_price"]
fee_rate = 0.0
if order["exchange"] == self.high_liq_exchange:
fee_rate = self.high_liq_taker_fee
elif order["exchange"] == self.low_liq_exchange:
fee_rate = self.low_liq_taker_fee
if order["side"] == "Buy":
buy_cost += value * (1 + fee_rate)
else:
sell_revenue += value * (1 - fee_rate)
pnl = sell_revenue - buy_cost
pnl_pct = (pnl / buy_cost * 100) if buy_cost > 0 else 0
self.total_trades += 1
self.total_pnl += pnl
# 输出报告
report = f"<b>═══ 第 {self.total_trades} 轮套利完成 ═══</b><br>"
report += f"本轮盈亏: <b>{pnl:.4f} USDT</b> ({pnl_pct:+.4f}%)<br>"
report += f"交易明细:<br>"
for i, order in enumerate(self.current_round_orders):
report += f" {i+1}. {order['exchange']} {order['side']} "
report += f"{order['filled']} @ {order['avg_price']:.5f}<br>"
self.log(report, level="INFO", color="green")
self.current_round_orders.clear()
def on_timer_subscribe(self, timer_name):
"""定时器回调"""
if timer_name == "QueryBalance":
# 查询余额并存储(每3秒)
high_result = self.trader.get_usdt_balance(self.high_liq_account_id)
low_result = self.trader.get_usdt_balance(self.low_liq_account_id)
if "Ok" in high_result:
self.high_liq_balance = high_result["Ok"]
if "Ok" in low_result:
self.low_liq_balance = low_result["Ok"]
def on_stop(self):
"""策略停止"""
self.log(f"策略停止,累计交易: {self.total_trades} 轮,"
f"累计盈亏: {self.total_pnl:.4f} USDT",
level="INFO", color="green")
time.sleep(1)
