使用 pandas 处理实时传入数据的最推荐/pythonic 方式是什么?
每隔几秒我就会收到一个格式如下的数据点:
{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
我想将它附加到现有的 DataFrame 中,然后对其进行一些分析。
问题是,仅使用 DataFrame.append 追加行可能会导致所有复制的性能问题。
我尝试过的事情:
一些人建议预先分配一个大 DataFrame 并在数据进入时对其进行更新:
In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)
In [2]: columns = ['high', 'low', 'open', 'close']
In [3]: df = pd.DataFrame(index=t, columns=columns)
In [4]: df
Out[4]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 NaN NaN NaN NaN
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
In [6]: data_ = pd.Series(data)
In [7]: df.loc[data['time']] = data_
In [8]: df
Out[8]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 4 3 2 1
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
另一种选择是建立一个字典列表。只需将传入的数据附加到列表中,然后将其分割成更小的 DataFrame 即可完成工作。
In [9]: ls = []
In [10]: for n in range(5):
.....: # Naive stuff ahead =)
.....: time = '2013-01-01 00:00:0' + str(n)
.....: d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
.....: ls.append(d)
In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')
In [12]: df
Out[12]:
close high low open stock
time
2013-01-01 00:00:01 3.270078 1.008289 7.486118 2.180683 BLAH
2013-01-01 00:00:02 3.883586 2.215645 0.051799 2.310823 BLAH
或类似的东西,可能会更多地处理输入。
最佳答案
我会按如下方式使用 HDF5/pytables:
- “尽可能长”地将数据保存为 Python 列表。
- 将您的结果附加到该列表中。
- 当它变得“大”时:
- 使用 pandas io(和一个可附加的表)推送到 HDF5 存储。
- 清除列表。
- 重复。
事实上,我定义的函数对每个“key”使用了一个列表,这样你就可以在同一个进程中将多个 DataFrames 存储到 HDF5 Store 中。
我们为每一行定义一个函数d
:
CACHE = {}
STORE = 'store.h5' # Note: another option is to keep the actual file open
def process_row(d, key, max_len=5000, _cache=CACHE):
"""
Append row d to the store 'key'.
When the number of items in the key's cache reaches max_len,
append the list of rows to the HDF5 store and clear the list.
"""
# keep the rows for each key separate.
lst = _cache.setdefault(key, [])
if len(lst) >= max_len:
store_and_clear(lst, key)
lst.append(d)
def store_and_clear(lst, key):
"""
Convert key's cache list to a DataFrame and append that to HDF5.
"""
df = pd.DataFrame(lst)
with pd.HDFStore(STORE) as store:
store.append(key, df)
lst.clear()
注意:我们使用 with 语句在每次写入后自动关闭存储。 可能保持打开状态会更快,但如果是这样的话it's recommended that you flush regularly (closing flushes) .另请注意,使用 collections deque 可能更具可读性。而不是列表,但是这里列表的性能会稍微好一点。
要使用它,你称之为:
process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
key="df")
注意:“df”是存储的 key在 pytables 商店中使用。
一旦作业完成,请确保您 store_and_clear
剩余的缓存:
for k, lst in CACHE.items(): # you can instead use .iteritems() in python 2
store_and_clear(lst, k)
现在您可以通过以下方式获得完整的 DataFrame:
with pd.HDFStore(STORE) as store:
df = store["df"] # other keys will be store[key]
一些评论:
- 5000 可以调整,尝试使用一些更小/更大的数字以满足您的需求。
- List append is O(1) , DataFrame append 为 O(
len(df)
). - 在您进行统计或数据处理之前,您不需要 pandas,请使用最快的。
- 此代码适用于传入的多个键(数据点)。
- 这是非常少的代码,我们停留在 vanilla python list 和 pandas dataframe...
此外,为了获得最新的读数,您可以定义一个 get 方法来存储和清除 before 读数。通过这种方式,您将获得最新的数据:
def get_latest(key, _cache=CACHE):
store_and_clear(_cache[key], key)
with pd.HDFStore(STORE) as store:
return store[key]
现在当您访问时:
df = get_latest("df")
您将获得可用的最新“df”。
另一个选项稍微涉及更多:在 vanilla pytables 中定义一个自定义表,参见 tutorial .
注意:您需要知道字段名称才能创建 column descriptor .
关于python - 如何使用 python pandas 处理传入的实时数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16740887/