python - 有条件地组合/减少 key 对

标签 python apache-spark pyspark

我已经遇到这个问题有一段时间了,我认为这与我对如何使用combineByKey和reduceByKey缺乏了解有关,所以希望有人能够解决这个问题。

我正在处理 DNA 序列,因此我有一个程序来生成它的一堆不同版本(向前、向后和赞美)。我有几个阅读框,这意味着对于字符串 ABCABC,我需要以下一系列键:ABC ABCA BCA BCAB CAB C

现在我正在使用以下函数来分解(我在 flatMap 过程中运行它):

# Modified from http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python
def chunkCodons((seq, strand, reading_frame)):
    """
     Yield successive codons from seq
     """
    # Get the first characters
    if reading_frame > 0:
        yield (0, seq[0:reading_frame], strand, reading_frame)
    for i in xrange(reading_frame, len(seq), 3):
        if i % 1000000 == 0:
            print "Base # = {:,}".format(i)
        yield (i, seq[i:i + 3], strand, reading_frame)

我像这样运行:reading_frames_rdd = nascent_reading_frames_rdd.flatMap(chunkCodons)

但是,这在一长串 DNA 上需要很长时间,所以我知道这一定是错误的。

因此,我想让 Spark 以更直接的方式做到这一点,即按字符(即基数)将其分解,然后一次重新组合 3 个。问题是我必须组合不同但相邻的键。这意味着如果我有 (1, 'A'), (2, 'B'), (3, 'C'),....,我希望能够生成 (1 ,“ABC”)。

我不知道该怎么做。我怀疑我需要使用combineByKey 并让它仅有条件地产生输出。如果满足我的条件,我是否只是让它只产生可由combineByKey 使用的输出?我应该这样做吗?

编辑:

这是我的输入:[(0, 'A'), (1, 'A'), (2, 'B'), (3, 'A'), (4, 'C' ), ....]

我想要这样的输出:[(0, 0, 'AAB'), (0, 1, 'ABX'), ...][(1, 0, 'A'), (1, 1, 'ABA'), (1, 2, 'CXX')...]

格式为[(阅读框,第一个碱基#,序列)]

最佳答案

你可以尝试这样的事情:

seq = sc.parallelize(zip(xrange(16), "ATCGATGCATGCATGC"))

(seq
 .flatMap(lambda (pos, x): ((pos - i, (pos, x)) for i in range(3)))
 .groupByKey()
 .mapValues(lambda x: ''.join(v for (pos, v)  in sorted(x)))
 .filter(lambda (pos, codon): len(codon) == 3)
 .map(lambda (pos, codon): (pos % 3, pos, codon))
 .collect())

结果:

[(0, 0, 'ATC'),
 (1, 1, 'TCG'),
 (2, 2, 'CGA'),
 (0, 3, 'GAT'),
 (1, 4, 'ATG'),
 (2, 5, 'TGC'),
 (0, 6, 'GCA'),
 (1, 7, 'CAT'),
 (2, 8, 'ATG'),
 (0, 9, 'TGC'),
 (1, 10, 'GCA'),
 (2, 11, 'CAT'),
 (0, 12, 'ATG'),
 (1, 13, 'TGC')]

实际上我会尝试其他方法:

from toolz.itertoolz import sliding_window, iterate, map, zip
from itertools import product
from numpy import uint8

def inc(x):
    return x + uint8(1)

# Create dictionary mapping from codon to integer
mapping = dict(zip(product('ATCG', repeat=3), iterate(inc, uint8(0))))

seq = sc.parallelize(["ATCGATGCATGCATGC"])

(seq
  # Generate pairs (start-position, 3-gram)
  .flatMap(lambda s: zip(iterate(inc, 0), sliding_window(3, s)))
  # Map 3-grams to respective integers
  .map(lambda (pos, seq): (pos, mapping.get(seq)))
  .collect())

阅读框显然是多余的,并且可以随时从起始位置获取,所以我在这里省略了它。

密码子和小整数之间的简单映射可以节省大量内存和流量。

关于python - 有条件地组合/减少 key 对,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30926627/

相关文章:

python - Pandas 在加载 CSV 时保留 'null' 和 ' '

mongodb - 在 Spark 中创建分层 JSON

python - Spark中通过reduceByKey()或其他函数来减少作业?

sqlite - 将 sqlite 文件加载到 DataFrame 中

hadoop - Hadoop或Spark读取tar.bzip2读取

python-3.x - 使用 Pyspark 检查 hive Metastore 中是否存在表

python - 在 python Flask 应用程序中找不到 google-cloud-sdk

python - 为什么 pow(x,y,z) 在 python 中有用?

python - 比较两个 pandas 系列的 float 接近相等?

python - 如何将 spark sql 数据帧转换为 numpy 数组?