python:以灵活的方式处理深度嵌套数据的有效技术是什么?

标签 python dictionary nested

我的问题不是关于特定的代码片段,而是更笼统的问题,所以请耐心等待:

我应该如何组织我正在分析的数据,我应该使用哪些工具来管理它?

我正在使用 python 和 numpy 来分析数据。因为python的文档说字典在python中是非常优化的,也因为数据本身是非常结构化的,所以我把它存储在一个深度嵌套的字典中。

这是字典的骨架:在层次结构中的位置定义了元素的性质,每一个新行定义了前一级中一个键的内容:

[AS091209M02] [AS091209M01] [AS090901M06] ... 
[100113] [100211] [100128] [100121] 
[R16] [R17] [R03] [R15] [R05] [R04] [R07] ... 
[1263399103] ... 
[ImageSize] [FilePath] [Trials] [Depth] [Frames] [Responses] ... 
[N01] [N04] ... 
[Sequential] [Randomized] 
[Ch1] [Ch2]

编辑:为了更好地解释我的数据集:

[individual] ex: [AS091209M02]
[imaging session (date string)] ex: [100113]
[Region imaged] ex: [R16]
[timestamp of file] ex [1263399103]  
[properties of file] ex: [Responses]
[regions of interest in image ] ex [N01]
[format of data] ex [Sequential]
[channel of acquisition: this key indexes an array of values] ex [Ch1]

例如,我执行的操作类型是计算数组的属性(列在 Ch1、Ch2 下)、拾取数组以创建新集合,例如分析来自给定区域 16 (R16) 的 N01 的响应个体在不同时间点等

这个结构对我来说效果很好,而且速度非常快,正如我所 promise 的那样。我可以非常快速地分析完整的数据集(而且字典太小,无法填满我的计算机内存:半个演出)。

我的问题来自于我需要对字典操作进行编程的繁琐方式。我经常有这样一段代码:

for mk in dic.keys():
    for rgk in dic[mk].keys():
        for nk in dic[mk][rgk].keys():
            for ik in dic[mk][rgk][nk].keys():
                for ek in dic[mk][rgk][nk][ik].keys():
                    #do something

它丑陋、笨重、不可重用且脆弱(需要为字典的任何变体重新编码)。

我尝试使用递归函数,但除了最简单的应用程序之外,我还遇到了一些非常讨厌的错误和奇怪的行为,导致大量时间浪费(我没有设法在 ipython 中使用 pdb 进行调试也无济于事当我处理深度嵌套的递归函数时)。最后,我经常使用的唯一递归函数如下:

def dicExplorer(dic, depth = -1, stp = 0):
    '''prints the hierarchy of a dictionary.
    if depth not specified, will explore all the dictionary
    '''
    if depth - stp == 0: return
    try : list_keys = dic.keys()
    except AttributeError: return
    stp += 1
    for key in list_keys:
        else: print '+%s> [\'%s\']' %(stp * '---', key)
        dicExplorer(dic[key], depth, stp)

我知道我这样做是错误的,因为我的代码很长、单调且不可重用。我需要使用更好的技术来灵活地操作字典,或者将数据放入某种数据库格式(sqlite?)。我的问题是,由于我在编程方面(严重)自学,我缺乏实践经验和背景知识来欣赏可用的选项。我已准备好学习新工具(SQL、面向对象编程),不惜一切代价完成工作,但我不愿意将我的时间和精力投入到对我的需求来说毫无意义的事情上。

那么您有什么建议来解决这个问题,并能够以更简洁、灵活和可重用的方式编写我的工具?

附录:除了对数据字典的特定子字典进行操作外,这里还有一些我为数据集 dic 或其子字典实现的操作示例:

实际上我有一些运行良好的递归函数:

def normalizeSeqDic(dic, norm_dic = {}, legend = ()):
    '''returns a normalized dictionary from a seq_amp_dic. Normalization is performed using the first time point as reference
    '''
    try : 
        list_keys = dic.keys()
        for key in list_keys:
            next_legend = legend + (key,) 
            normalizeSeqDic(dic[key], norm_dic, next_legend)
    except AttributeError:
        # normalization
        # unpack list
        mk, ek, nk, tpk = legend
        #assign values to amplitude dict
        if mk not in norm_dic: norm_dic[mk] = {}
        if ek not in norm_dic[mk]: norm_dic[mk][ek] = {}
        if nk not in norm_dic[mk][ek]: norm_dic[mk][ek][nk] = {}
        if tpk not in norm_dic[mk][ek][nk]: norm_dic[mk][ek][nk][tpk] = {}
        new_array = []
        for x in range(dic.shape[0]):
            new_array.append(dic[x][1:]/dic[x][0])
        new_array = asarray(new_array)
        norm_dic[mk][ek][nk][tpk] = new_array
    return norm_dic

def poolDic(dic):
    '''returns a dic in which all the values are pooled, and root (mk) keys are fused
    these pooled dics can later be combined into another dic
    '''
    pooled_dic = {}
    for mk in dic.keys():
        for ek in dic[mk].keys():
            for nk in dic[mk][ek].keys():
                for tpk in dic[mk][ek][nk].keys():
                    #assign values to amplitude dict
                    if ek not in pooled_dic: pooled_dic[ek] = {}
                    if nk not in pooled_dic[ek]: pooled_dic[ek][nk] = {}
                    if tpk not in pooled_dic[ek][nk]:
                        pooled_dic[ek][nk][tpk] = dic[mk][ek][nk][tpk]
                    else: pooled_dic[ek][nk][tpk]= vstack((pooled_dic[ek][nk][tpk], dic[mk][ek][nk][tpk]))
    return pooled_dic

def timePointsDic(dic):
    '''Determines the timepoints for each individual key at root
    '''
    tp_dic = {}
    for mk in dic.keys():
        tp_list = []
        for rgk in dic[mk].keys():
            tp_list.extend(dic[mk][rgk]['Neuropil'].keys())
        tp_dic[mk]=tuple(sorted(list(set(tp_list))))
    return tp_dic

对于某些操作,除了压平字典之外别无他法:

def flattenDic(dic, label):
    '''flattens a dic to produce a list of of tuples containing keys and 'label' values
    '''
    flat_list = []
    for mk in dic.keys():
        for rgk in dic[mk].keys():
            for nk in dic[mk][rgk].keys():
                for ik in dic[mk][rgk][nk].keys():
                    for ek in dic[mk][rgk][nk][ik].keys():
                        flat_list.append((mk, rgk, nk, ik, ek, dic[mk][rgk][nk][ik][ek][label])
    return flat_list

def extractDataSequencePoints(flat_list, mk, nk, tp_list):
        '''produces a list containing arrays of time point values
        time_points is a list of the time points wished (can have 2 or 3 elements)
        '''
        nb_tp = len(tp_list)
        # build tp_seq list
        tp_seq = []
        tp1, tp2, tp3 = [], [], []
        if nk == 'Neuropil':
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil' and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and  x[3] == tp_list[1])
        else:
            tp1.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[0])
            tp2.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[1])
        if nb_tp == 3:
            if nk == 'Neuropil':
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] == 'Neuropil'and x[3] == tp_list[2])
            else:
                tp3.extend(x for x in flat_list if x[0]==mk and x[2] != 'Neuropil'and x[3] == tp_list[2])
        for x in tp1:
            for y in tp2:
                if x[0:3] == y[0:3] :
                    if nb_tp == 3:
                        for z in tp3:
                            if x[0:3] == z[0:3] :
                                tp_seq.append(asarray([x[4],y[4],z[4]]))
                    else:
                        tp_seq.append(asarray([x[4],y[4]]))
        return tp_seq

最佳答案

"I stored it in a deeply nested dictionary"

而且,如您所见,效果并不理想。

还有什么选择?

  1. 复合键和浅字典。您有一个由 8 部分组成的 key : (个人、成像 session 、成像区域、文件时间戳、文件属性、图像感兴趣区域、数据格式、采集 channel )映射 到一组值。

    { ('AS091209M02', '100113', 'R16', '1263399103', 'Responses', 'N01', 'Sequential', 'Ch1' ): array, 
    ...
    

    问题在于搜索。

  2. 正确的类(class)结构。实际上,完整的类定义可能有点矫枉过正。

"The type of operations I perform is for instance to compute properties of the arrays (listed under Ch1, Ch2), pick up arrays to make a new collection, for instance analyze responses of N01 from region 16 (R16) of a given individual at different time points, etc."

推荐

首先,为您的最终对象使用一个namedtuple

Array = namedtuple( 'Array', 'individual, session, region, timestamp, properties, roi, format, channel, data' )

或者类似的东西。构建这些命名元组对象的简单列表。然后您可以简单地迭代它们。

其次,对这个数组对象的主列表使用许多简单的 map-reduce 操作。

过滤:

for a in theMasterArrrayList:
    if a.region = 'R16' and interest = 'N01':
        # do something on these items only.

通过公共(public)键减少:

individual_dict = defaultdict(list)
for a in theMasterArrayList:
    individual_dict[ a.individual ].append( a )

这将在 map 中创建一个完全包含您想要的项目的子集。

然后您可以执行 indiidual_dict['AS091209M02'] 并获得他们的所有数据。您可以对任何(或所有)可用键执行此操作。

region_dict = defaultdict(list)
for a in theMasterArrayList:
    region_dict[ a.region ].append( a )

这不会复制任何数据。它速度快,内存相对紧凑。

映射(或转换)数组:

for a in theMasterArrayList:
    someTransformationFunction( a.data )

如果数组本身是一个列表,您可以更新该列表而不用破坏整个元组。如果您需要从现有数组创建一个新数组,您正在创建一个新元组。这没什么问题,但它是一个新的元组。您最终会得到这样的程序。

def region_filter( array_list, region_set ):
    for a in array_list:
        if a.region in region_set:
            yield a

def array_map( array_list, someConstant ):
    for a in array_list:
        yield Array( *(a[:8] + (someTranformation( a.data, someConstant ),) )

def some_result( array_list, region, someConstant ):
    for a in array_map( region_filter( array_list, region ), someConstant ):
        yield a

您可以将转换、归约、映射构建成更精细的东西。

最重要的是只从主列表中创建您需要的词典,这样您就不会进行任何超出最低限度必要的过滤。

顺便说一句。这可以简单地映射到关系数据库。它会更慢,但你可以有多个并发更新操作。除了多个并发更新,关系数据库不提供任何高于此的功能。

关于python:以灵活的方式处理深度嵌套数据的有效技术是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2544055/

相关文章:

python - 获取数组内的字典键值

javascript - 如何在 OpenLayers 3 中使用我自己的类 myMap(使用继承)初始化 ol.map?

Python:编辑字典键 - 使用 Strip 方法

python - Pandas groupby : treat two columns as one

python - 按字典值对字典列表进行排序

LET block 内的嵌套 FLET block (反之亦然)

c - CUDA 中的图像序列处理

java - 为什么我收到此错误 "Nested in org.springframework.beans.factory.BeanCreationException: Error creating bean with name ' workService' ”?

python - 使用 Python 解析大型文本文件

python - 如何创建一个组合嵌套在元组中的列表值的索引?