以下代码用于读取一个简单的 .csv 文件,该文件包含四列、字符串值和标题行。然后,将另一列添加到框架中,该框架获取“已发布”列(日期字符串)中的每一行,并为每一行提供一周中的相应日期。然而,代码抛出“未实现”错误,并且似乎无法识别数据类型,即使它们是在函数参数中定义的(请参阅下面的错误消息)。
我尝试过 Dataframe.read_csv 指定和不指定列数据类型,但得到相同的错误。发生错误的行是列表理解,但我在循环中遇到相同的错误。当我打印出来时,数据框看起来是正确的,但数据类型都是“对象”,这是不正确的。
“NotImplemented”似乎意味着 Dataframe 正在更改,但由于所有操作都在单个框架上,Dask 在哪里看到非 Series 对象?
from dask import delayed, compute, visualize, dataframe
...
def treat(frame):
frame["day"] = [pd.Timestamp(value) for value in frame.posted]
print(frame.columns)
return frame
def find_files():
...
def construct_frames(files):
dataframes = []
# choose 3 of all the files
selection = [files[random.randrange(len(files) - 1)] for i in range(1,4)]
for pair in selection:
key = pair[0]
file = pair[1]
path = os.path.join(TOP_DIR + "/engagement_id=" + key + "/" + file)
data = dataframe.read_csv(path,
dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
print(data.columns, data.head())
treat(data)
dataframes.append(data)
return dataframes
files = find_files()
dataframes = construct_frames(files)
visualize(dataframes)
输出(在 Jupyter 中):
Dask DataFrame Structure:
id data_import_id posted amount
npartitions=1
object object object object
... ... ... ...
Dask Name: from-delayed, 3 tasks
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<ipython-input-8-e30d04e9aed0> in <module>
47
48 files = find_files()
---> 49 dataframes = construct_frames(files)
50
51
<ipython-input-8-e30d04e9aed0> in construct_frames(files)
42 dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
43 print(data)
---> 44 treat(data)
45 dataframes.append(data)
46 return dataframes
<ipython-input-8-e30d04e9aed0> in treat(frame)
15
16 def treat(frame):
---> 17 frame["day"] = [pd.Timestamp(value) for value in frame.posted]
18 print(frame.columns)
19 return frame
<ipython-input-8-e30d04e9aed0> in <listcomp>(.0)
15
16 def treat(frame):
---> 17 frame["day"] = [pd.Timestamp(value) for value in frame.posted]
18 print(frame.columns)
19 return frame
/anaconda3/envs/dask-tutorial/lib/python3.6/site-packages/dask/dataframe/core.py in __getitem__(self, key)
2059 return Series(graph, name, self._meta, self.divisions)
2060 raise NotImplementedError(
-> 2061 "Series getitem in only supported for other series objects "
2062 "with matching partition structure"
2063 )
NotImplementedError: Series getitem in only supported for other series objects with matching partition structure
数据看起来有点像这样,即字母数字字符串和在新列中转换为“天”的日期字符串:
id data_import_id posted amount
00000000 3c221ff 2014-01-02T19:00:00.000-05:00 3656506
00000013 3c221ff 2014-01-03T19:00:00.000-05:00 3656506
00000015 3c221ff 2014-01-04T19:00:00.000-05:00 3656506
0000000a 3c221ff 2014-01-05T19:00:00.000-05:00 3656506
00000001 3c221ff 2014-01-06T19:00:00.000-05:00 3656506
最佳答案
我在这一行遇到错误
frame["day"] = [pd.Timestamp(value) for value in frame.posted]
事实证明,有几种可能性可以将列附加到 dask
DataFrame
- 这些方法假设时区信息不重要
- 如果时区很重要,请参阅@MikeB2019x here的评论了解如何考虑这一点
使用map_partitions
(根据this SO post)
ddf = dataframe.read_csv('test.csv',
delimiter=" ",
engine='python',
dtype={"id":str,"data_import_id": str, "amount": str})
meta = ('posted', 'datetime64[ns]')
ddf['posted'] = ddf.posted.map_partitions(pd.to_datetime, meta=meta)
ddf = treat(ddf)
print(ddf.head())
id data_import_id posted amount day_of_week weekday
0 00000000 3c221ff 2014-01-02 19:00:00-05:00 3656506 2 Thursday
1 00000013 3c221ff 2014-01-03 19:00:00-05:00 3656506 3 Friday
2 00000015 3c221ff 2014-01-04 19:00:00-05:00 3656506 4 Saturday
3 0000000a 3c221ff 2014-01-05 19:00:00-05:00 3656506 5 Sunday
4 00000001 3c221ff 2014-01-06 19:00:00-05:00 3656506 6 Monday
print(ddf.dtypes)
id object
data_import_id object
posted datetime64[ns]
amount object
day_of_week int64
weekday object
dtype: object
使用.to_datetime
(根据this SO post)
ddf = dataframe.read_csv('test.csv',
delimiter=" ",
engine='python',
dtype={"id":str,"data_import_id": str, "amount": str})
ddf['posted']=dataframe.to_datetime(ddf.posted, format="%Y%m%d %H:%M:%S") # option 1
# ddf['posted']=dataframe.to_datetime(ddf.posted, unit='ns') # option 2
ddf = treat(ddf)
print(ddf.head())
id data_import_id posted amount day_of_week weekday
0 00000000 3c221ff 2014-01-02 19:00:00-05:00 3656506 2 Thursday
1 00000013 3c221ff 2014-01-03 19:00:00-05:00 3656506 3 Friday
2 00000015 3c221ff 2014-01-04 19:00:00-05:00 3656506 4 Saturday
3 0000000a 3c221ff 2014-01-05 19:00:00-05:00 3656506 5 Sunday
4 00000001 3c221ff 2014-01-06 19:00:00-05:00 3656506 6 Monday
print(ddf.dtypes)
id object
data_import_id object
posted datetime64[ns]
amount object
day_of_week int64
weekday object
dtype: object
或者,只需将 parse_dates
参数指定为 .read_csv
ddf = dataframe.read_csv('test.csv',
delimiter=" ",
engine='python',
parse_dates=['posted'],
dtype={"id":str,"data_import_id": str, "amount": str})
ddf = treat(ddf)
print(ddf.head())
id data_import_id posted amount day_of_week weekday
0 00000000 3c221ff 2014-01-02 19:00:00-05:00 3656506 2 Thursday
1 00000013 3c221ff 2014-01-03 19:00:00-05:00 3656506 3 Friday
2 00000015 3c221ff 2014-01-04 19:00:00-05:00 3656506 4 Saturday
3 0000000a 3c221ff 2014-01-05 19:00:00-05:00 3656506 5 Sunday
4 00000001 3c221ff 2014-01-06 19:00:00-05:00 3656506 6 Monday
print(ddf.dtypes)
id object
data_import_id object
posted datetime64[ns, pytz.FixedOffset(-300)]
amount object
day_of_week int64
weekday object
dtype: object
顺便说一句,datetime
属性(.dt
datetime 命名空间)可以在 dask
系列上使用,类似于 pandas - 请参阅 here
def treat(frame):
frame['day_of_week'] = frame['posted'].dt.day
frame['weekday'] = frame['posted'].dt.weekday_name
return frame
关于python-3.x - Dask - dataframe.read_csv 无法识别正确的数据类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55910503/