python - Python函数的Dask Apply

标签 python python-3.x pandas dataframe dask

我有一个 df:

    id  log
0   24  2*C316*first_field_name17*second_field_name16*third_field_name2*N311*field value1*Y5*hello2*O30*0*0*
1   25  2*C316*first_field_name17*second_field_name16*third_field_name2*N311*field value1*Y5*hello2*O30*0*0*

我有一个解析字符串的函数:

dfs = []

def parse_log(id, log):
    split = log.split('*')
    number_of_fields = int(split[1][1:int(split[0][0])])

    i=2
    string_length = int(split[1][int(split[0][0]):])
    field_names_list = []
    while i < number_of_fields + 2:
        field_name = split[i][0:string_length]
        field_names_list.append(field_name)
        string_length = int(split[i][string_length:])
        i+=1

    i = 3 + number_of_fields
    string_length = int(split[2 + number_of_fields][string_length:])
    new_values_list = []
    while i < 3+number_of_fields*2:
        field_name = split[i][0:string_length]
        new_values_list.append(field_name)
        string_length = int(split[i][string_length:])
        i+=1

    i = 4 + number_of_fields*2
    string_length = int(split[3 + number_of_fields*2][string_length:])
    old_values_list = []
    while i <= 3 + number_of_fields*3:
        old_value = split[i][0:string_length]
        old_values_list.append(old_value)
        if i == 3 + number_of_fields*3:
            string_length = 0
        else:
            string_length = int(split[i][string_length:])
        i+=1

    df = pd.DataFrame(
    {'id':id,
     'field': field_names_list,
     'old_value': old_values_list,
     'new_value': new_values_list
    })

dfs.append(df)  

这个函数适用于普通的 pandas apply:

data.apply(lambda x: parse_audit_log(x['id'], x['log']), axis=1) 

然后我尝试用 dask 代替(data 在这种情况下是从 SQL 读取的 dask 数据帧):

out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.id,x.log),axis=1), meta=('result', int)).compute(get=get)

这会导致错误:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-12-2468010e0124> in <module>()
----> 1 out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.ROW_ID,x.AUDIT_LOG),axis=1), meta=('result', int)).compute(get=get)

~\_installed\anaconda\lib\site-packages\dask\base.py in compute(self, **kwargs)
    153         dask.base.compute
    154         """
--> 155         (result,) = compute(self, traverse=False, **kwargs)
    156         return result
    157 

~\_installed\anaconda\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    402     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    403                     else (None, a) for a in args]
--> 404     results = get(dsk, keys, **kwargs)
    405     results_iter = iter(results)
    406     return tuple(a if f is None else f(next(results_iter), *a)

~\_installed\anaconda\lib\site-packages\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
    175                            get_id=_process_get_id, dumps=dumps, loads=loads,
    176                            pack_exception=pack_exception,
--> 177                            raise_exception=reraise, **kwargs)
    178     finally:
    179         if cleanup:

~\_installed\anaconda\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

~\_installed\anaconda\lib\site-packages\dask\compatibility.py in reraise(exc, tb)
     64     def reraise(exc, tb=None):
     65         if exc.__traceback__ is not tb:
---> 66             raise exc.with_traceback(tb)
     67         raise exc
     68 

~\_installed\anaconda\lib\site-packages\dask\local.py in execute_task()
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

~\_installed\anaconda\lib\site-packages\dask\local.py in _execute_task()
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

~\_installed\anaconda\lib\site-packages\dask\dataframe\core.py in apply_and_enforce()
   3402 
   3403     Ensures the output has the same columns, even if empty."""
-> 3404     df = func(*args, **kwargs)
   3405     if isinstance(df, (pd.DataFrame, pd.Series, pd.Index)):
   3406         if len(df) == 0:

<ipython-input-12-2468010e0124> in <lambda>()
----> 1 out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.ROW_ID,x.AUDIT_LOG),axis=1), meta=('result', int)).compute(get=get)

~\_installed\anaconda\lib\site-packages\pandas\core\frame.py in apply()
   4875                         f, axis,
   4876                         reduce=reduce,
-> 4877                         ignore_failures=ignore_failures)
   4878             else:
   4879                 return self._apply_broadcast(f, axis)

~\_installed\anaconda\lib\site-packages\pandas\core\frame.py in _apply_standard()
   4971             try:
   4972                 for i, v in enumerate(series_gen):
-> 4973                     results[i] = func(v)
   4974                     keys.append(v.name)
   4975             except Exception as e:

<ipython-input-12-2468010e0124> in <lambda>()
----> 1 out = data.map_partitions(lambda df : df.apply(lambda x : parse_audit_log(x.ROW_ID,x.AUDIT_LOG),axis=1), meta=('result', int)).compute(get=get)

<ipython-input-11-08a2f8f06a76> in parse_audit_log()
      1 def parse_audit_log(row_id, audit_log):
----> 2     split = audit_log.split('*')
      3     number_of_fields = int(split[1][1:int(split[0][0])])
      4 
      5     i=2

AttributeError: ("'NoneType' object has no attribute 'split'", 'occurred at index 1')

这是一个错误,但我在调整函数以满足 dask 要求时遇到了其他几个错误。该功能的快速应用程序缺少什么?我的元标记几乎肯定是不正确的。尽管有 dask 应用函数的示例,但我还没有看到任何涉及函数而没有显式返回。

更新 - 期望的输出:

理想情况下,我会得到一个 df:

   row_id  field_name          new_value      old_value
0   24      first_field_name    field value 
1   24      second_field_name   Y   
2   24      third_field_name    hello
3   25      first_field_name    field value 
4   25      second_field_name   Y   
5   25      third_field_name    hello

问题(也是我尝试使用 dask 的原因)是数据集有 5500 万条记录。由于此解析过程将这些记录分解为一条或多条记录,因此我需要能够适合我的 32GB 内存的高效工具。

最佳答案

错误指向拆分 Nonestr 是预期的问题。一个简单的解决方案是确保相关列具有空字符串而不是 None,例如:

df['log'] = df['log'].fillna("")

关于python - Python函数的Dask Apply,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51049897/

相关文章:

python - 如何根据变量更改不和谐命令

python - 通过使用 pandas 在时间序列中将值分配给先前的 NaN 来回填值

python - Google 应用引擎上的 Django : "Instance names cannot contain the ' :' character."

Python 异步 : event loop does not seem to stop when stop method is called

python - 为所有Flask路线添加前缀

python - 如何强制Dataframe中的特定列类型?

python - Pandas 数据框将整数列添加到日期时间列中

python - 在 python 中更改和解析大型 XML 文件的内存有效方法

python - 如何检查节点列表是否已包含在列表列表中的列表中?

Python 2 到 3 转换 : iterating over lines in subprocess stdout