首页 策略 正文
  • 本文约15161字,阅读需1小时
  • 41
  • 0

年化50%跨所价差套利实战教学

温馨提示:本文最后更新于2025年12月30日 22:21,若内容或图片失效,请在下方留言或联系博主。

本人管理目前10m资金做套利
这是我一个实盘的资金曲线
所以接下来的内容绝对干货

什么是跨所价差套利?

某品种,在A交易所:Ask1报价为99,而在B交易所:Bid1报价为101。
此时,你在A交易所买入,与卖一撮合,成交价为99
在B交易所卖出,与买一撮合,成交价为101
你就锚定了这101-99的差价,在扣除A和B的手续费后还有利可图,那么这就是个套利机会

不少现货交易所的价差可以达到20-30%
每次来黑天鹅你总能听到某某搬价差赚到了多少多少钱
是不是让你很心动?

听起来很简单,其实要解决的问题,很多:

  1. 如何知道最快的行情,如何最快的执行下单
  2. 订单类型如何选择:市价?Maker?IOC?
  3. 手续费如何最低
  4. 价差不回归怎么办
  5. 价差盈利资金费率亏损怎么办
  6. 单边爆仓怎么办
  7. 怎么避免ADL
  8. 交易所拉闸变成单机币怎么办
  9. 交易所改规则Tick精度变化怎么办

等等等等,都是跨所需要考虑的事情,这些细节风控,在我的团队我已经列出来40多条需要做的风控尽调。
由于篇幅太长,本文会讲解第一条和第二条如何实现的实战教学。

关注我,我以后会更新教学如何现货不提币变相套利的两招

以及Maker三角套利等等

我做过的套利策略:站内期限、三角套利、跨所价差、跨所Maker后Taker、精度套利
当年我用20万,日化1%,做套利半年滚到了100万,那个时代已经过去了
现在只有流动性特别低的小所和黑天鹅才会有这么高的收益

不过据我所知,在现货、链上,还是有人可以做到1m以上资金年化100%


实战

工具:OpenQuant高频交易系统(openquant.cc)
(Rust写的交易系统,可以最快的实现事件行情订阅驱动策略、Ws下单、Cookie下单、绑定机构id、自动化处理交易所价格精度、步长、面值等工程问题。使用最少的代码即可写出一个可行的价差套利策略)

服务器:东京Aws服务器 500MB内存足矣
(这个不用解释,大部分交易所都在Aws东京)

1. 极速行情的获取:Tick 级驱动

高频策略的本质是数据流处理。传统轮询行情太慢,但完全依赖推送又容易处理不过来。OpenQuant 允许我们针对不同类型的数据,配置不同的处理模式。

在 subscribes 方法中,我们首先要解决这三种数据流的订阅:

  • 极速行情(BBO):使用 Latest 模式。高频场景下,100ms 前的报价是垃圾。如果策略处理不过来,系统自动丢弃积压包,永远只喂给策略最新的 Tick。

  • 订单回报(Order):使用 Async 模式。订单状态绝对不能丢,否则会导致状态机错乱。

  • 资金查询(Timer):使用 Timer 轮询。依靠 WS 推送余额往往不稳定。为了确保下单前保证金校验(Risk Check)准确,我们单独开一个定时器,每 3 秒主动查一次余额,更新本地状态。

def subscribes(self):
    """
    配置数据流订阅
    """
    subs = []
    account_ids = {exchange['exchange']: idx for idx, exchange in enumerate(self.cex_configs)}

    # 1. 行情流:BBO -> Latest 模式 (只看最新,丢弃过时包)
    for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
        subs.append({
            "account_id": account_ids[exchange],
            "event_handle_mode": "Latest", 
            "sub": {
                "SubscribeWs": [{"Bbo": [self.symbol]}]
            }
        })

    # 2. 交易流:Order -> Async 模式 (队列处理,确保不漏单)
    for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
        subs.append({
            "account_id": account_ids[exchange],
            "event_handle_mode": "Async",
            "sub": {
                "SubscribeWs": [{"Order": [self.symbol]}]
            }
        })

    # 3. 风控流:Balance -> Timer 模式 (每3秒主动轮询)
    subs.append({
        "account_id": 0,
        "sub": {
            "SubscribeTimer": {
                "update_interval": {"secs": 3, "nanos": 0},
                "name": "QueryBalance"
            }
        }
    })

    return subs

2. 核心逻辑:计算与触发

当行情推送到 on_bbo 回调时,我们不做任何复杂的预测,只做最简单的数学题。

这里我们定义了两个角色:

  • 高流动性所 :如 Binance 等,深度好,价格稳,滑点低,作为行情的锚点。
  • 低流动性所 :中小交易所流动性更差,更容易出现短暂的价差机会。

代码逻辑非常直观:

def _check_and_trade(self):
    # 计算方向:低流动性所开仓,高流动性所平仓
    # Spread = (Bid_H - Ask_L) / Ask_L
    buy_low_spread = (self.high_liq_bid - self.low_liq_ask) / self.low_liq_ask

    # 扣除双边 Taker 手续费
    total_fee = self.low_liq_taker_fee + self.high_liq_taker_fee
    net_spread = buy_low_spread - total_fee

    # 只有当净价差大于我们设定的阈值(例如万2)时,才触发交易
    if net_spread > self.spread_threshold:
        # 触发低流所开仓流程
        self._execute_arbitrage(is_buy_low=True, spread=net_spread)

3. 订单与先后顺序

如果你用普通的限价单,可能永远挂在盘口吃不到单,或者吃单之后早已错过了套利时机;如果你用市价单,滑点可能会吃掉你所有的利润。

所以对于开仓而言的最优解是使用 IOC 订单:要么立即成交,要么取消,绝不挂单等待。

此外,下单顺序至关重要。低流动性所的机会转瞬即逝,且一般是跟随、落后于高流动性所的,因此先打低流动性所的 IOC 单,在保证滑点可控的情况下吃到货,然后立刻去高流动性所对冲。

    def _execute_arbitrage(self, is_buy_low: bool, spread: float):
        # 价格计算:买单加滑点,卖单减滑点,保证成交率
        # ioc_slippage 设置为 0.0001 (万1)
        if is_buy_low:
            low_liq_price = self.low_liq_ask * (1 + self.ioc_slippage)
        else:
            low_liq_price = self.low_liq_bid * (1 - self.ioc_slippage)

        # 构造 IOC 订单
        order = {
            "symbol": self.symbol,
            "side": "Buy" if is_buy_low else "Sell",
            "order_type": "Limit",    # 限价单配合IOC模式
            "amount": self.trade_amount,
            "price": low_liq_price,
            "time_in_force": "IOC",   # 立即成交或取消,不挂单
            "cid": cid
        }

        # 异步发送,不阻塞策略线程
        self.trader.place_order(..., order=order, sync=False)

        # 打开对冲开关
        self.pending_high_liq_order = True
        self.pending_high_liq_side = "Sell" if is_buy_low else "Buy"

在这个逻辑中,我们利用 OpenQuant 自动处理了最麻烦的精度对齐和最小下单量检查,代码里只需要关注“买卖”本身。

4. 闭环:极速回调对冲

这才是 OpenQuant 相比轮询框架最大的优势。我们不在发单后死等,而是通过 on_order 回调监听。
一旦收到低流动性所的 Filled 状态,毫秒级内自动触发高流动性所(Binance)的对冲单。

    def on_order(self, account_id, order):
        """
        订单状态推送回调
        """
        # ... 解析 order_info ...

        # 核心逻辑:如果是低流所成交,且处于等待对冲状态
        if (order_info["exchange"] == self.low_liq_exchange and 
            filled > 0 and 
            self.pending_high_liq_order):

            # 立即执行对冲逻辑
            self._place_high_liq_order(filled)

    def _place_high_liq_order(self, amount):
        """
        在高流动性所(Binance)执行对冲
        """
        # 计算对冲价格:同样给滑点,确保必定成交
        is_buy = (self.pending_high_liq_side == "Buy")
        price = self.high_liq_ask * (1.0001) if is_buy else self.high_liq_bid * (0.9999)

        order = {
            "symbol": self.symbol,
            "side": self.pending_high_liq_side,
            "order_type": "Limit",
            "amount": amount, # 对冲数量严格等于成交数量
            "price": price,
            "time_in_force": "IOC", 
            "cid": trader.create_cid(self.high_liq_exchange)
        }

        # 发送对冲单
        self.trader.place_order(..., order=order, sync=False)
        self.pending_high_liq_order = False

5. 现实的复杂性

实盘就是不断解决这些问题的过程。下面是我们的跨所套利策略在 OpenQuant 上实际运行的盈利曲线抓拍。可以看到,得益于 OpenQuant Rust 底层的低延迟,我们可以成功捕捉到两腿之间极其微小的价差,积少成多,获得 50% 的年化收益率。

OpenQuant 的初衷就是解决那些繁琐的工程问题,让策略开发者能像写上面的 Python 代码一样,专注于策略逻辑本身。最基础的代码演示附在了最后,如果你对高频套利感兴趣,可以参考它,在它的基础上不断完善,形成自己专属的跨所套利策略。

完整代码:

"""
简单跨所套利策略 - 教学示例
年化50%跨所价差套利实战教学
"""

import time
import traderv2 as trader  # type: ignore
from base_strategy import BaseStrategy

class Strategy(BaseStrategy):
    def __init__(self, cex_configs, dex_configs, strategy_config, trader_instance: trader.TraderV2):
        self.log = trader_instance.log
        self.trader = trader_instance
        self.strategy_config = strategy_config
        self.cex_configs = cex_configs

        self.orders = {}  # cid -> order_info
        self.pending_high_liq_order = False
        self.pending_high_liq_side = None
        self.current_round_orders = []

        self.total_trades = 0
        self.total_pnl = 0.0

        self.high_liq_ask = 0.0
        self.high_liq_bid = 0.0
        self.low_liq_ask = 0.0
        self.low_liq_bid = 0.0

        self.instruments = {}

        # 存储余额信息
        self.high_liq_balance = None
        self.low_liq_balance = None

    def name(self):
        return self.strategy_config["name"]

    def start(self):
        """策略启动"""
        # 配置参数
        self.high_liq_exchange = self.strategy_config["high_liquidity_exchange"]
        self.low_liq_exchange = self.strategy_config["low_liquidity_exchange"]
        self.symbol = self.strategy_config["symbol"]
        self.spread_threshold = self.strategy_config.get("spread_threshold", 0.0002)
        self.trade_amount = self.strategy_config.get("trade_amount", 100.0)
        self.ioc_slippage = self.strategy_config.get("ioc_slippage", 0.0001)
        self.min_balance_ratio = self.strategy_config.get("min_balance_ratio", 0.3)
        self.leverage = self.strategy_config.get("leverage", 5)

        # 获取账户ID
        account_ids = {exchange['exchange']: idx for idx, exchange in enumerate(self.cex_configs)}
        self.high_liq_account_id = account_ids[self.high_liq_exchange]
        self.low_liq_account_id = account_ids[self.low_liq_exchange]

        # 获取交易对信息
        self._load_instruments()

        # 获取手续费(优先从配置读取,读不到则从API查询)
        self._load_fee_rates()

        # 设置杠杆
        self._set_leverage()

        # 初始化余额
        self._load_balances()

        self.log(f"策略启动成功!", level="INFO", color="green")
        self.log(f"高流动性所: {self.high_liq_exchange}", level="INFO")
        self.log(f"低流动性所: {self.low_liq_exchange}", level="INFO")
        self.log(f"交易品种: {self.symbol}", level="INFO")
        self.log(f"价差阈值: {self.spread_threshold:.4%}", level="INFO")
        self.log(f"交易数量: {self.trade_amount}", level="INFO")
        self.log(f"杠杆倍数: {self.leverage}x", level="INFO")

    def _load_instruments(self):
        """加载交易对信息"""
        for account_id, exchange in [(self.high_liq_account_id, self.high_liq_exchange),
                                      (self.low_liq_account_id, self.low_liq_exchange)]:
            result = self.trader.get_instrument(account_id, self.symbol)
            time.sleep(0.1)

            if "Ok" in result:
                self.instruments[exchange] = result["Ok"]
                self.log(f"{exchange} 交易对信息加载成功", level="INFO")
            else:
                raise Exception(f"获取 {exchange} 交易对信息失败: {result.get('Err')}")

    def _load_fee_rates(self):
        """加载手续费率"""
        accounts_config = self.strategy_config.get("accounts", [])

        if len(accounts_config) > max(self.high_liq_account_id, self.low_liq_account_id):
            high_liq_config = accounts_config[self.high_liq_account_id]
            low_liq_config = accounts_config[self.low_liq_account_id]

            self.high_liq_taker_fee = high_liq_config.get("taker_fee")
            self.low_liq_taker_fee = low_liq_config.get("taker_fee")
        else:
            self.high_liq_taker_fee = None
            self.low_liq_taker_fee = None

        if self.high_liq_taker_fee is None:
            result = self.trader.get_fee_rate(self.high_liq_account_id, self.symbol)
            time.sleep(0.1)
            if "Ok" in result:
                self.high_liq_taker_fee = result["Ok"].get("taker", 0.0005)
                self.log(f"{self.high_liq_exchange} 手续费率(API查询): {self.high_liq_taker_fee:.6f}", 
                        level="INFO")
            else:
                self.high_liq_taker_fee = 0.0005
                self.log(f"{self.high_liq_exchange} 手续费率查询失败,使用默认值", level="WARN")

        if self.low_liq_taker_fee is None:
            result = self.trader.get_fee_rate(self.low_liq_account_id, self.symbol)
            time.sleep(0.1)
            if "Ok" in result:
                self.low_liq_taker_fee = result["Ok"].get("taker", 0.0005)
                self.log(f"{self.low_liq_exchange} 手续费率(API查询): {self.low_liq_taker_fee:.6f}", 
                        level="INFO")
            else:
                self.low_liq_taker_fee = 0.0005
                self.log(f"{self.low_liq_exchange} 手续费率查询失败,使用默认值", level="WARN")

    def _set_leverage(self):
        """设置杠杆倍数"""
        for account_id, exchange in [(self.high_liq_account_id, self.high_liq_exchange),
                                      (self.low_liq_account_id, self.low_liq_exchange)]:
            result = self.trader.set_leverage(account_id, self.symbol, self.leverage)
            time.sleep(0.1)

            if "Ok" in result:
                self.log(f"{exchange} 设置杠杆 {self.leverage}x 成功", level="INFO", color="green")
            else:
                self.log(f"{exchange} 设置杠杆失败: {result.get('Err')}", level="WARN", color="yellow")

    def _load_balances(self):
        """初始化余额"""
        high_result = self.trader.get_usdt_balance(self.high_liq_account_id)
        time.sleep(0.1)

        if "Ok" in high_result:
            self.high_liq_balance = high_result["Ok"]
            self.log(f"{self.high_liq_exchange} 余额加载成功: 可用 {self.high_liq_balance.get('available_balance', 0):.2f} USDT", 
                    level="INFO")
        else:
            self.log(f"{self.high_liq_exchange} 余额加载失败: {high_result.get('Err')}", level="WARN", color="yellow")

        low_result = self.trader.get_usdt_balance(self.low_liq_account_id)
        time.sleep(0.1)

        if "Ok" in low_result:
            self.low_liq_balance = low_result["Ok"]
            self.log(f"{self.low_liq_exchange} 余额加载成功: 可用 {self.low_liq_balance.get('available_balance', 0):.2f} USDT", 
                    level="INFO")
        else:
            self.log(f"{self.low_liq_exchange} 余额加载失败: {low_result.get('Err')}", level="WARN", color="yellow")

    def subscribes(self):
        """订阅配置"""
        account_ids = {exchange['exchange']: idx for idx, exchange in enumerate(self.cex_configs)}

        subs = []

        # 订阅 BBO - Latest 模式
        for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
            subs.append({
                "account_id": account_ids[exchange],
                "event_handle_mode": "Latest",
                "sub": {
                    "SubscribeWs": [{"Bbo": [self.symbol]}]
                }
            })

        # 订阅订单更新 - Async 模式
        for exchange in [self.high_liq_exchange, self.low_liq_exchange]:
            subs.append({
                "account_id": account_ids[exchange],
                "event_handle_mode": "Async",
                "sub": {
                    "SubscribeWs": [{"Order": [self.symbol]}]
                }
            })

        # 定时查询余额
        subs.append({
            "account_id": 0,
            "sub": {
                "SubscribeTimer": {
                    "update_interval": {"secs": 3, "nanos": 0},
                    "name": "QueryBalance"
                }
            }
        })

        return subs

    def on_bbo(self, exchange, bbo):
        """BBO 更新回调"""
        # 更新价格
        if exchange == self.high_liq_exchange:
            self.high_liq_ask = bbo["ask_price"]
            self.high_liq_bid = bbo["bid_price"]
        elif exchange == self.low_liq_exchange:
            self.low_liq_ask = bbo["ask_price"]
            self.low_liq_bid = bbo["bid_price"]

        # 检查价差
        if self.high_liq_ask > 0 and self.low_liq_ask > 0:
            self._check_and_trade()

    def _check_and_trade(self):
        """检查价差并执行交易"""
        if self.pending_high_liq_order or len(self.orders) > 0:
            return

        buy_low_spread = (self.high_liq_bid - self.low_liq_ask) / self.low_liq_ask
        sell_low_spread = (self.low_liq_bid - self.high_liq_ask) / self.high_liq_ask

        total_fee = self.low_liq_taker_fee + self.high_liq_taker_fee
        buy_low_spread -= total_fee
        sell_low_spread -= total_fee

        if buy_low_spread > self.spread_threshold:
            self._execute_arbitrage(is_buy_low=True, spread=buy_low_spread)
        elif sell_low_spread > self.spread_threshold:
            self._execute_arbitrage(is_buy_low=False, spread=sell_low_spread)

    def _execute_arbitrage(self, is_buy_low: bool, spread: float):
        """执行套利交易"""
        if not self._check_balance():
            return

        if is_buy_low:
            low_liq_price = self.low_liq_ask * (1 + self.ioc_slippage)
        else:
            low_liq_price = self.low_liq_bid * (1 - self.ioc_slippage)

        cid = trader.create_cid(self.low_liq_exchange)

        order = {
            "symbol": self.symbol,
            "side": "Buy" if is_buy_low else "Sell",
            "order_type": "Limit",
            "amount": self.trade_amount,
            "price": low_liq_price,
            "time_in_force": "IOC",
            "pos_side": "Long" if is_buy_low else "Short",
            "cid": cid
        }

        params = {
            "is_dual_side": False,
            "margin_mode": "Cross"
        }

        order_info = {
            "exchange": self.low_liq_exchange,
            "side": "Buy" if is_buy_low else "Sell",
            "amount": self.trade_amount,
            "price": low_liq_price,
            "filled": 0.0,
            "avg_price": 0.0,
            "status": "Pending"
        }
        self.orders[cid] = order_info

        result = self.trader.place_order(
            account_id=self.low_liq_account_id,
            order=order,
            params=params,
            sync=False
        )

        if "Ok" in result:
            self.log(f"<b>发现套利机会!</b> 价差: {spread:.8%}", level="INFO", color="green")
            self.log(f"步骤1: {self.low_liq_exchange} {'买入' if is_buy_low else '卖出'} "
                    f"{self.trade_amount} @ {low_liq_price:.5f}", level="INFO", color="green")

            self.pending_high_liq_order = True
            self.pending_high_liq_side = "Sell" if is_buy_low else "Buy"
        else:
            del self.orders[cid]
            self.log(f"下单失败: {result.get('Err')}", level="ERROR", color="red")

    def _check_balance(self) -> bool:
        """检查余额是否充足(直接读取已存储的余额)"""
        if not self.high_liq_balance or not self.low_liq_balance:
            return False

        high_available = self.high_liq_balance.get("available_balance", 0.0)
        low_available = self.low_liq_balance.get("available_balance", 0.0)

        trade_value = self.trade_amount * max(self.high_liq_ask, self.low_liq_ask)
        margin_needed = trade_value / self.leverage

        if high_available < margin_needed / self.min_balance_ratio:
            self.log(f"高流动性所余额不足", level="WARN", color="yellow")
            return False
        if low_available < margin_needed / self.min_balance_ratio:
            self.log(f"低流动性所余额不足", level="WARN", color="yellow")
            return False

        return True

    def on_order_submitted(self, account_id, order_id_result, order):
        """订单提交回调"""
        cid = order.get("cid")
        if not cid or cid not in self.orders:
            return

        order_info = self.orders[cid]

        if "Err" in order_id_result:
            self.log(f"订单提交失败: {order_id_result['Err']}", level="ERROR", color="red")
            order_info["status"] = "Rejected"
            del self.orders[cid]
        else:
            order_id = order_id_result["Ok"]
            order_info["order_id"] = order_id
            order_info["status"] = "Submitted"
            self.log(f"订单提交成功: {order_id}", level="INFO", color="green")

    def on_order(self, account_id, order):
        """订单更新回调"""
        cid = order.get("cid")
        if not cid or cid not in self.orders:
            return

        order_info = self.orders[cid]
        status = order.get("status", "")
        filled = order.get("filled", 0.0)
        filled_avg_price = order.get("filled_avg_price", 0.0)

        order_info["status"] = status
        order_info["filled"] = filled
        if filled > 0:
            order_info["avg_price"] = filled_avg_price

        if (order_info["exchange"] == self.low_liq_exchange and 
            filled > 0 and 
            self.pending_high_liq_order and
            not order_info.get("high_liq_placed")):

            order_info["high_liq_placed"] = True
            self._place_high_liq_order(filled)

        if status in ["Filled", "Canceled"]:
            self.log(f"{order_info['exchange']} 订单完成: {status}, "
                    f"成交 {filled} @ {filled_avg_price:.5f}",
                    level="INFO", color="green")

            if filled > 0:
                self.current_round_orders.append({
                    "exchange": order_info["exchange"],
                    "side": order_info["side"],
                    "filled": filled,
                    "avg_price": filled_avg_price
                })

            del self.orders[cid]

            if len(self.orders) == 0 and not self.pending_high_liq_order:
                self._settle_round()

    def _place_high_liq_order(self, low_liq_filled_amount):
        """下高流动性所订单"""
        if not self.pending_high_liq_order:
            return

        is_buy = (self.pending_high_liq_side == "Buy")

        if is_buy:
            price = self.high_liq_ask * (1 + self.ioc_slippage)
        else:
            price = self.high_liq_bid * (1 - self.ioc_slippage)

        cid = trader.create_cid(self.high_liq_exchange)

        order = {
            "symbol": self.symbol,
            "side": self.pending_high_liq_side,
            "order_type": "Limit",
            "amount": low_liq_filled_amount,
            "price": price,
            "time_in_force": "IOC",
            "pos_side": "Long" if self.pending_high_liq_side == "Buy" else "Short",
            "cid": cid
        }

        params = {
            "is_dual_side": False,
            "margin_mode": "Cross"
        }

        order_info = {
            "exchange": self.high_liq_exchange,
            "side": self.pending_high_liq_side,
            "amount": low_liq_filled_amount,
            "price": price,
            "filled": 0.0,
            "avg_price": 0.0,
            "status": "Pending"
        }
        self.orders[cid] = order_info

        result = self.trader.place_order(
            account_id=self.high_liq_account_id,
            order=order,
            params=params,
            sync=False
        )

        if "Ok" in result:
            self.pending_high_liq_order = False

            self.log(f"步骤2: {self.high_liq_exchange} {self.pending_high_liq_side} "
                    f"{low_liq_filled_amount} @ {price:.5f}", level="INFO", color="green")
        else:
            del self.orders[cid]
            self.pending_high_liq_order = False
            self.log(f"高流动性所下单失败: {result.get('Err')}", level="ERROR", color="red")

    def _settle_round(self):
        """结算本轮交易"""
        if len(self.current_round_orders) != 2:
            self.current_round_orders.clear()
            return

        # 计算盈亏
        buy_cost = 0.0
        sell_revenue = 0.0

        for order in self.current_round_orders:
            value = order["filled"] * order["avg_price"]

            fee_rate = 0.0
            if order["exchange"] == self.high_liq_exchange:
                fee_rate = self.high_liq_taker_fee
            elif order["exchange"] == self.low_liq_exchange:
                fee_rate = self.low_liq_taker_fee

            if order["side"] == "Buy":
                buy_cost += value * (1 + fee_rate)
            else:
                sell_revenue += value * (1 - fee_rate)

        pnl = sell_revenue - buy_cost
        pnl_pct = (pnl / buy_cost * 100) if buy_cost > 0 else 0

        self.total_trades += 1
        self.total_pnl += pnl

        # 输出报告
        report = f"<b>═══ 第 {self.total_trades} 轮套利完成 ═══</b><br>"
        report += f"本轮盈亏: <b>{pnl:.4f} USDT</b> ({pnl_pct:+.4f}%)<br>"
        report += f"交易明细:<br>"

        for i, order in enumerate(self.current_round_orders):
            report += f"  {i+1}. {order['exchange']} {order['side']} "
            report += f"{order['filled']} @ {order['avg_price']:.5f}<br>"

        self.log(report, level="INFO", color="green")

        self.current_round_orders.clear()

    def on_timer_subscribe(self, timer_name):
        """定时器回调"""
        if timer_name == "QueryBalance":
            # 查询余额并存储(每3秒)
            high_result = self.trader.get_usdt_balance(self.high_liq_account_id)
            low_result = self.trader.get_usdt_balance(self.low_liq_account_id)

            if "Ok" in high_result:
                self.high_liq_balance = high_result["Ok"]

            if "Ok" in low_result:
                self.low_liq_balance = low_result["Ok"]

    def on_stop(self):
        """策略停止"""
        self.log(f"策略停止,累计交易: {self.total_trades} 轮,"
                f"累计盈亏: {self.total_pnl:.4f} USDT",
                level="INFO", color="green")

        time.sleep(1)
评论