python-3.x - Dask - dataframe.read_csv 无法识别正确的数据类型

标签 python-3.x dask

以下代码用于读取一个简单的 .csv 文件,该文件包含四列、字符串值和标题行。然后,将另一列添加到框架中,该框架获取“已发布”列(日期字符串)中的每一行,并为每一行提供一周中的相应日期。然而,代码抛出“未实现”错误,并且似乎无法识别数据类型,即使它们是在函数参数中定义的(请参阅下面的错误消息)。

我尝试过 Dataframe.read_csv 指定和不指定列数据类型,但得到相同的错误。发生错误的行是列表理解,但我在循环中遇到相同的错误。当我打印出来时,数据框看起来是正确的,但数据类型都是“对象”,这是不正确的。

“NotImplemented”似乎意味着 Dataframe 正在更改,但由于所有操作都在单个框架上,Dask 在哪里看到非 Series 对象?

from dask import delayed, compute, visualize, dataframe

...

def treat(frame):
    frame["day"] = [pd.Timestamp(value) for value in frame.posted]
    print(frame.columns)
    return frame

def find_files():
...

def construct_frames(files):
    dataframes = []
    # choose 3 of all the files
    selection = [files[random.randrange(len(files) - 1)] for i in range(1,4)]
    for pair in selection:
        key = pair[0]
        file = pair[1]
        path = os.path.join(TOP_DIR + "/engagement_id=" + key + "/" + file)
        data = dataframe.read_csv(path,
                                  dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
        print(data.columns, data.head())
        treat(data)
        dataframes.append(data)
    return dataframes

files = find_files()
dataframes = construct_frames(files)
visualize(dataframes)

输出(在 Jupyter 中):

Dask DataFrame Structure:
                   id data_import_id  posted  amount
npartitions=1                                       
               object         object  object  object
                  ...            ...     ...     ...
Dask Name: from-delayed, 3 tasks
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-8-e30d04e9aed0> in <module>
     47 
     48 files = find_files()
---> 49 dataframes = construct_frames(files)
     50 
     51 

<ipython-input-8-e30d04e9aed0> in construct_frames(files)
     42                                   dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
     43         print(data)
---> 44         treat(data)
     45         dataframes.append(data)
     46     return dataframes

<ipython-input-8-e30d04e9aed0> in treat(frame)
     15 
     16 def treat(frame):
---> 17     frame["day"] = [pd.Timestamp(value) for value in frame.posted]
     18     print(frame.columns)
     19     return frame

<ipython-input-8-e30d04e9aed0> in <listcomp>(.0)
     15 
     16 def treat(frame):
---> 17     frame["day"] = [pd.Timestamp(value) for value in frame.posted]
     18     print(frame.columns)
     19     return frame

/anaconda3/envs/dask-tutorial/lib/python3.6/site-packages/dask/dataframe/core.py in __getitem__(self, key)
   2059             return Series(graph, name, self._meta, self.divisions)
   2060         raise NotImplementedError(
-> 2061             "Series getitem in only supported for other series objects "
   2062             "with matching partition structure"
   2063         )

NotImplementedError: Series getitem in only supported for other series objects with matching partition structure

数据看起来有点像这样,即字母数字字符串和在新列中转换为“天”的日期字符串:

id  data_import_id  posted  amount
00000000  3c221ff  2014-01-02T19:00:00.000-05:00  3656506
00000013  3c221ff  2014-01-03T19:00:00.000-05:00  3656506
00000015  3c221ff  2014-01-04T19:00:00.000-05:00  3656506
0000000a  3c221ff  2014-01-05T19:00:00.000-05:00  3656506
00000001  3c221ff  2014-01-06T19:00:00.000-05:00  3656506

最佳答案

我在这一行遇到错误

frame["day"] = [pd.Timestamp(value) for value in frame.posted]

事实证明,有几种可能性可以将列附加到 dask DataFrame

  • 这些方法假设时区信息重要
  • 如果时区很重要,请参阅@MikeB2019x here的评论了解如何考虑这一点

使用map_partitions(根据this SO post)

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        dtype={"id":str,"data_import_id": str, "amount": str})
meta = ('posted', 'datetime64[ns]')
ddf['posted'] = ddf.posted.map_partitions(pd.to_datetime, meta=meta)
ddf = treat(ddf)

print(ddf.head())

         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                        object
data_import_id            object
posted            datetime64[ns]
amount                    object
day_of_week                int64
weekday                   object
dtype: object

使用.to_datetime(根据this SO post)

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        dtype={"id":str,"data_import_id": str, "amount": str})
ddf['posted']=dataframe.to_datetime(ddf.posted, format="%Y%m%d %H:%M:%S") # option 1
# ddf['posted']=dataframe.to_datetime(ddf.posted, unit='ns') # option 2
ddf = treat(ddf)

print(ddf.head())
         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                        object
data_import_id            object
posted            datetime64[ns]
amount                    object
day_of_week                int64
weekday                   object
dtype: object

或者,只需将 parse_dates 参数指定为 .read_csv

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        parse_dates=['posted'],
                        dtype={"id":str,"data_import_id": str, "amount": str})
ddf = treat(ddf)

print(ddf.head())
         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                                                object
data_import_id                                    object
posted            datetime64[ns, pytz.FixedOffset(-300)]
amount                                            object
day_of_week                                        int64
weekday                                           object
dtype: object

顺便说一句,datetime 属性(.dt datetime 命名空间)可以在 dask 系列上使用,类似于 pandas - 请参阅 here

def treat(frame):
    frame['day_of_week'] = frame['posted'].dt.day
    frame['weekday'] = frame['posted'].dt.weekday_name
    return frame

关于python-3.x - Dask - dataframe.read_csv 无法识别正确的数据类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55910503/

相关文章:

python - 如何实现这个python post并获取?

dask - 将 dask 集合异步存储到文件/CSV

python - dask 数据框应用元

conda - dask-yarn 作业在读取 parquet 时失败,并出现 dumps_msgpack ImportError

python - Dask分布式异常处理方式

python - 查询维护外键关系的CSV文件?这可能吗?

python - 如何验证 cerberus 中的嵌套对象?

python - 将 CSV 文件读取为执行加法和平均的文本文件

Python:为什么字典类型的数据可以自动排除某些项目?

dask - 为什么 dask DataFrame.to_parquet 在将文件存储到磁盘时尝试推断数据模式?