Python 实现
Python 版 TWS API 的连接代码通常写得比 Java、C#、C++ 短,但它仍然有同一个核心要求:连接以后必须让消息循环持续运行。
在 Python 中,这个消息循环就是:
app.run()如果 app.run() 没有运行,TWS / IB Gateway 返回的消息不会被持续解码,nextValidId()、currentTime()、tickPrice()、historicalData() 等回调都不会按预期触发。
一个清晰的 Python 客户端通常包含四部分:
| 部分 | 作用 |
|---|---|
EWrapper 回调 | 接收 TWS 返回的数据 |
EClient 请求 | 向 TWS 发送请求 |
threading.Thread(target=app.run) | 后台处理消息队列 |
threading.Event 或队列 | 让主线程等待关键回调 |
新手可以先用 threading.Event 等待 nextValidId() 和 currentTime(),等连接稳定后,再扩展到行情、账户和订单。
最小可用结构
Section titled “最小可用结构”下面是推荐的最小结构:连接后等待 nextValidId(),然后请求服务器时间,收到 currentTime() 后断开。
import threadingimport time
from ibapi.client import EClientfrom ibapi.wrapper import EWrapper
class App(EWrapper, EClient): def __init__(self): EClient.__init__(self, self) self.connected_event = threading.Event() self.time_event = threading.Event() self.server_time = None self.error_messages = []
def nextValidId(self, orderId: int): # 收到 nextValidId,说明 API 初始握手完成。 self.connected_event.set()
# 用一个简单请求验证回调链路。 self.reqCurrentTime()
def currentTime(self, time_: int): # time_ 是 IBKR 服务器返回的 Unix 时间戳。 self.server_time = time_ self.time_event.set() self.disconnect()
def error(self, reqId, errorCode, errorString, advancedOrderRejectJson=""): # TWS API 的通知、警告和错误都会进入 error 回调。 self.error_messages.append( f"reqId={reqId}, code={errorCode}, message={errorString}" )
app = App()app.connect("127.0.0.1", 7497, clientId=1)
# 必须启动 app.run(),否则不会处理 TWS 返回消息。thread = threading.Thread(target=app.run, daemon=True)thread.start()
if not app.connected_event.wait(8): app.disconnect() raise TimeoutError("没有收到 nextValidId 回调")
if not app.time_event.wait(8): app.disconnect() raise TimeoutError("reqCurrentTime 没有返回")
print(f"SERVER_TIME={app.server_time}")
time.sleep(0.2)thread.join(timeout=1)这段代码的关键不是 reqCurrentTime() 本身,而是它证明了消息循环、解码器和 EWrapper 回调都能正常工作。
使用 TWS 模拟账户、127.0.0.1:7497 检查连接时,常见输出类似:
TWS API OK: server_time=1781368619, host=127.0.0.1, port=7497看到这类结果,说明 Python 端至少具备继续测试其他接口的基础条件。
app.run() 应该放在哪里
Section titled “app.run() 应该放在哪里”学习脚本里,最常见写法是单独开一个后台线程:
thread = threading.Thread(target=app.run, daemon=True)thread.start()这样主线程可以继续等待事件、发起请求或在结束时断开连接。
不推荐这样写:
app.connect("127.0.0.1", 7497, clientId=1)app.run()app.reqCurrentTime() # 这里通常执行不到,因为 app.run() 会持续阻塞app.run() 是消息循环,会一直运行到连接断开。如果把它直接放在主线程里,后面的代码通常不会按你想的顺序执行。
回调里不要做重活
Section titled “回调里不要做重活”Python 回调里尤其要避免耗时操作。比如:
def historicalData(self, reqId, bar): # 不建议在这里做大量数据库写入或复杂计算。 self.bars_queue.put((reqId, bar))更稳的设计是:回调里把数据放入队列,另一个线程或任务慢慢处理。这样 app.run() 可以继续快速处理 TWS 返回消息。
| 现象 | 原因 | 修正 |
|---|---|---|
没有 nextValidId() | 没有启动 app.run(),或 TWS 未接受连接 | 启动消息线程,检查 TWS 弹窗 |
connect() 后脚本立刻结束 | 主线程退出,daemon 消息线程也退出 | 用 Event 等待关键回调 |
| 回调只触发一次就停止 | 回调里抛异常或主动 disconnect() | 给回调加日志,确认断开位置 |
| 行情回调很慢 | 回调里做了耗时处理 | 回调只入队,业务线程处理 |
| 多个脚本互相影响 | clientId 重复 | 每个脚本使用不同 clientId |
一个实用习惯
Section titled “一个实用习惯”每个请求都记录自己的 reqId、接口名、合约和发起时间。比如:
self.pending_requests[req_id] = { "type": "historicalData", "symbol": "AAPL", "created_at": time.time(),}当 historicalData()、historicalDataEnd() 或 error() 回来时,就能知道是哪一个请求触发了回调。后面写行情、历史数据和订单章节时,这个习惯会非常有用。