跳转到内容

Python 实现

Python 版 TWS API 的连接代码通常写得比 Java、C#、C++ 短,但它仍然有同一个核心要求:连接以后必须让消息循环持续运行。

在 Python 中,这个消息循环就是:

app.run()

如果 app.run() 没有运行,TWS / IB Gateway 返回的消息不会被持续解码,nextValidId()currentTime()tickPrice()historicalData() 等回调都不会按预期触发。

一个清晰的 Python 客户端通常包含四部分:

部分作用
EWrapper 回调接收 TWS 返回的数据
EClient 请求向 TWS 发送请求
threading.Thread(target=app.run)后台处理消息队列
threading.Event 或队列让主线程等待关键回调

新手可以先用 threading.Event 等待 nextValidId()currentTime(),等连接稳定后,再扩展到行情、账户和订单。

下面是推荐的最小结构:连接后等待 nextValidId(),然后请求服务器时间,收到 currentTime() 后断开。

import threading
import time
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
class App(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.connected_event = threading.Event()
self.time_event = threading.Event()
self.server_time = None
self.error_messages = []
def nextValidId(self, orderId: int):
# 收到 nextValidId,说明 API 初始握手完成。
self.connected_event.set()
# 用一个简单请求验证回调链路。
self.reqCurrentTime()
def currentTime(self, time_: int):
# time_ 是 IBKR 服务器返回的 Unix 时间戳。
self.server_time = time_
self.time_event.set()
self.disconnect()
def error(self, reqId, errorCode, errorString, advancedOrderRejectJson=""):
# TWS API 的通知、警告和错误都会进入 error 回调。
self.error_messages.append(
f"reqId={reqId}, code={errorCode}, message={errorString}"
)
app = App()
app.connect("127.0.0.1", 7497, clientId=1)
# 必须启动 app.run(),否则不会处理 TWS 返回消息。
thread = threading.Thread(target=app.run, daemon=True)
thread.start()
if not app.connected_event.wait(8):
app.disconnect()
raise TimeoutError("没有收到 nextValidId 回调")
if not app.time_event.wait(8):
app.disconnect()
raise TimeoutError("reqCurrentTime 没有返回")
print(f"SERVER_TIME={app.server_time}")
time.sleep(0.2)
thread.join(timeout=1)

这段代码的关键不是 reqCurrentTime() 本身,而是它证明了消息循环、解码器和 EWrapper 回调都能正常工作。

使用 TWS 模拟账户、127.0.0.1:7497 检查连接时,常见输出类似:

TWS API OK: server_time=1781368619, host=127.0.0.1, port=7497

看到这类结果,说明 Python 端至少具备继续测试其他接口的基础条件。

学习脚本里,最常见写法是单独开一个后台线程:

thread = threading.Thread(target=app.run, daemon=True)
thread.start()

这样主线程可以继续等待事件、发起请求或在结束时断开连接。

不推荐这样写:

app.connect("127.0.0.1", 7497, clientId=1)
app.run()
app.reqCurrentTime() # 这里通常执行不到,因为 app.run() 会持续阻塞

app.run() 是消息循环,会一直运行到连接断开。如果把它直接放在主线程里,后面的代码通常不会按你想的顺序执行。

Python 回调里尤其要避免耗时操作。比如:

def historicalData(self, reqId, bar):
# 不建议在这里做大量数据库写入或复杂计算。
self.bars_queue.put((reqId, bar))

更稳的设计是:回调里把数据放入队列,另一个线程或任务慢慢处理。这样 app.run() 可以继续快速处理 TWS 返回消息。

现象原因修正
没有 nextValidId()没有启动 app.run(),或 TWS 未接受连接启动消息线程,检查 TWS 弹窗
connect() 后脚本立刻结束主线程退出,daemon 消息线程也退出Event 等待关键回调
回调只触发一次就停止回调里抛异常或主动 disconnect()给回调加日志,确认断开位置
行情回调很慢回调里做了耗时处理回调只入队,业务线程处理
多个脚本互相影响clientId 重复每个脚本使用不同 clientId

每个请求都记录自己的 reqId、接口名、合约和发起时间。比如:

self.pending_requests[req_id] = {
"type": "historicalData",
"symbol": "AAPL",
"created_at": time.time(),
}

historicalData()historicalDataEnd()error() 回来时,就能知道是哪一个请求触发了回调。后面写行情、历史数据和订单章节时,这个习惯会非常有用。

IBKR Campus: TWS API Documentation