我正在运行 Python ib-api 以接收来自盈透证券的实时市场数据。它可以提供我期望的数据,但它以“电子阅读器线程中未处理的异常”结束。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`
`DEFAULT_PRICE_DATA_ID = 1001`
`FINISHED = object()
STARTED = object()
TIME_OUT = object()`
class finishableQueue(object):
def __init__(self, queue_to_finish):
self._queue = queue_to_finish
self.status = STARTED
def get(self, timeout):
contents_of_queue=[]
finished=False
while not finished:
try:
current_element = self._queue.get(timeout=timeout)
if current_element is FINISHED:
finished = True
self.status = FINISHED
else:
contents_of_queue.append(current_element)
except queue.Empty:
finished = True
self.status = TIME_OUT
return contents_of_queue
def timed_out(self):
return self.status is TIME_OUT
class TestWrapper(EWrapper):
def __init__(self):
self._my_price_data_dict = {}
def get_error(self, timeout=5):
if self.is_error():
try:
return self._my_errors.get(timeout=timeout)
except queue.Empty:
return None
return None
def is_error(self):
an_error_if=not self._my_errors.empty()
return an_error_if
def init_error(self):
error_queue=queue.Queue()
self._my_errors = error_queue
def error(self, id, errorCode, errorString):
## Overriden method
errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
self._my_errors.put(errormsg)
def init_ibprices(self, tickerid):
ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()
return ibprice_data_queue
def tickPrice(self, reqId, tickType, price, attrib):
tickdata = (TickTypeEnum.to_str(tickType), price)
price_data_dict = self._my_price_data_dict
if reqId not in price_data_dict.keys():
self.init_ibprices(reqId)
price_data_dict[reqId].put(tickdata)
class TestClient(EClient):
def __init__(self, wrapper):
EClient.__init__(self, wrapper)
def error(self, reqId, errorCode, errorString):
print("Error: ", reqId, " ", errorCode, " ", errorString)
def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
ib_data_queue = finishableQueue(self.init_ibprices(tickerid))
self.reqMktData(
tickerid,
ibcontract,
"",
False,
False,
[]
)
MAX_WAIT_SECONDS = 5
print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)
price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)
while self.wrapper.is_error():
print(self.get_error())
if ib_data_queue.timed_out():
print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")
self.cancelMktData(tickerid)
return price_data
class TestApp(TestWrapper, TestClient):
def __init__(self, ipaddress, portid, clientid):
TestWrapper.__init__(self)
TestClient.__init__(self, wrapper=self)
self.connect(ipaddress, portid, clientid)
thread = Thread(target = self.run)
thread.start()
setattr(self, "_thread", thread)
self.init_error()
def main(slist):
app = TestApp("127.0.0.1", 7497, 1)
for i in slist:
ibcontract = IBcontract()
ibcontract.secType = "STK"
ibcontract.symbol = i
ibcontract.exchange ="SEHK"
Lastprice = app.getIBrealtimedata(ibcontract)
df = pd.DataFrame(Lastprice)
print(ibcontract.symbol, df.head())
app.disconnect()
if __name__ == "__main__":
seclist = [700,2318,5,12]
main(seclist)
以下是错误消息:
unhandled exception in EReader thread Traceback (most recent call last): File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py", line 34, >in run data = self.conn.recvMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >99, in recvMsg buf = self._recvAllMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >119, in _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] An operation was attempted on something that is >not a socket
最佳答案
一个单独的线程开始从套接字读取传入的消息:
thread = Thread(target = self.run)
thread.start()
但是这个线程永远不会停止,并且在您调用 disconnect() 时仍在运行。结果,它尝试访问现在为 None 的套接字对象,从而触发错误。尝试通过设置
done=True
在断开连接之前停止 EReader 线程.附带说明一下,由于此错误发生在程序最后断开连接时,因此不应干扰接收预期数据。
关于python-3.x - 运行 ibapi 的 EReader 线程中的异常错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57555202/