python-3.x - 运行 ibapi 的 EReader 线程中的异常错误

标签 python-3.x ib-api

我正在运行 Python ib-api 以接收来自盈透证券的实时市场数据。它可以提供我期望的数据,但它以“电子阅读器线程中未处理的异常”结束。

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`

`DEFAULT_PRICE_DATA_ID = 1001`

`FINISHED = object()
STARTED = object()
TIME_OUT = object()`

class finishableQueue(object):

    def __init__(self, queue_to_finish):

        self._queue = queue_to_finish
        self.status = STARTED

    def get(self, timeout):

        contents_of_queue=[]
        finished=False

        while not finished:
            try:
                current_element = self._queue.get(timeout=timeout)
                if current_element is FINISHED:
                    finished = True
                    self.status = FINISHED
                else:
                    contents_of_queue.append(current_element)

            except queue.Empty:
                finished = True
                self.status = TIME_OUT

        return contents_of_queue

    def timed_out(self):
        return self.status is TIME_OUT


class TestWrapper(EWrapper):

    def __init__(self):
        self._my_price_data_dict = {}

    def get_error(self, timeout=5):
        if self.is_error():
            try:
                return self._my_errors.get(timeout=timeout)
            except queue.Empty:
                return None

        return None

    def is_error(self):
        an_error_if=not self._my_errors.empty()
        return an_error_if

    def init_error(self):
        error_queue=queue.Queue()
        self._my_errors = error_queue

    def error(self, id, errorCode, errorString):
        ## Overriden method
        errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
        self._my_errors.put(errormsg)

    def init_ibprices(self, tickerid):
        ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()

        return ibprice_data_queue

    def tickPrice(self, reqId, tickType, price, attrib):
        tickdata = (TickTypeEnum.to_str(tickType), price)

        price_data_dict = self._my_price_data_dict

        if reqId not in price_data_dict.keys():
            self.init_ibprices(reqId)

        price_data_dict[reqId].put(tickdata)


class TestClient(EClient):

    def __init__(self, wrapper):
        EClient.__init__(self, wrapper)

    def error(self, reqId, errorCode, errorString):
        print("Error: ", reqId, " ", errorCode, " ", errorString)

    def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
        ib_data_queue = finishableQueue(self.init_ibprices(tickerid))

        self.reqMktData(
            tickerid,
            ibcontract,
            "",
            False,
            False,
            []
        )

        MAX_WAIT_SECONDS = 5
        print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)

        price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)

        while self.wrapper.is_error():
            print(self.get_error())

        if ib_data_queue.timed_out():
            print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")

        self.cancelMktData(tickerid)

        return price_data

class TestApp(TestWrapper, TestClient):
    def __init__(self, ipaddress, portid, clientid):
        TestWrapper.__init__(self)
        TestClient.__init__(self, wrapper=self)

        self.connect(ipaddress, portid, clientid)

        thread = Thread(target = self.run)
        thread.start()

        setattr(self, "_thread", thread)

        self.init_error()

def main(slist):

    app = TestApp("127.0.0.1", 7497, 1)

    for i in slist:
        ibcontract = IBcontract()
        ibcontract.secType = "STK"
        ibcontract.symbol = i
        ibcontract.exchange ="SEHK"

        Lastprice = app.getIBrealtimedata(ibcontract)

        df = pd.DataFrame(Lastprice)
        print(ibcontract.symbol, df.head())

    app.disconnect()

if __name__ == "__main__":

    seclist = [700,2318,5,12]
    main(seclist)

以下是错误消息:

unhandled exception in EReader thread Traceback (most recent call last): File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py", line 34, >in run data = self.conn.recvMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >99, in recvMsg buf = self._recvAllMsg() File "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", line >119, in _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] An operation was attempted on something that is >not a socket

最佳答案

一个单独的线程开始从套接字读取传入的消息:

thread = Thread(target = self.run)
thread.start()

但是这个线程永远不会停止,并且在您调用 disconnect() 时仍在运行。结果,它尝试访问现在为 None 的套接字对象,从而触发错误。尝试通过设置 done=True 在断开连接之前停止 EReader 线程.

附带说明一下,由于此错误发生在程序最后断开连接时,因此不应干扰接收预期数据。

关于python-3.x - 运行 ibapi 的 EReader 线程中的异常错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57555202/

相关文章:

python - ModuleNotFoundError : No module named 'pymysql' in jupyter

python - 为什么我得到 "ERROR 508 322 Error processing request.-' bW' : cause - Duplicate ticker id"in IB API/TWS (Python)

python - IB API : Checking if in trade or have open order

python-3.x - 我如何找到特定文档insert_many()失败?

python - Django:如何获取模板 View 来显示图像——新手版

python - 无效语法错误 : Quine McCluskey Algorithm in python

python - 如何将数据添加到二进制文件中?

google-colaboratory - 如何在Colab中安装和使用IB API?

python - Interactive Brokers API (IBAPI) - 使用 threading.Timer 对象在数据连接断开时自动退出