hadoop - 构建不适合内存的流

标签 hadoop mapreduce apache-spark java-8

这是关于这个问题的后续问题:Spark FlatMap function for huge lists

总结:我想在 Java8 中编写一个 Spark FlatMap 函数,它生成与一组 dna 序列匹配的所有可能的正则表达式。对于巨大的字符串,这是有问题的,因为正则表达式集合不适合内存(一个映射器很容易生成千兆字节的数据)。我知道我必须求助于惰性序列之类的东西,我想我必须使用 Stream<String>为了这。 我现在的问题是如何构建这个流。 我在这里看:Java Streams - Stream.Builder .

如果我的算法开始生成模式,则可以使用 accept(String) 将它们“推送”到流中方法,但是当我尝试链接中的代码(用字符串生成器函数替换它)和中间的一些日志语句时,我注意到随机字符串生成函数在 build() 之前执行。叫做。我不明白如果所有随机字符串无法放入内存,将如何存储它们。

我必须以不同的方式构建流吗?基本上我想要相当于我的 context.write(substring)我有我的 MapReduce Mapper.map功能。

UPDATE1: cannot use the range function, in fact I am using a structure which iterates over a suffix tree.

UPDATE2: Upon request a more complete implementation, I didn't replace the interface with the actual implementation because the implementations are very big and not essential to grasp the idea.

更完整的问题草图:

我的算法试图发现 DNA 序列的模式。该算法采用与同一基因相对应的不同生物体的序列。假设我在 Jade 米中有一个基因 A,在水稻和其他一些物种中也有相同的基因 A,然后我比较它们 上游序列。我正在寻找的模式类似于正则表达式,例如 TGA..GA..GA。探索 所有可能的模式我从序列构建一个广义后缀树。此树提供有关不同序列的信息 一个模式出现在。为了将树与搜索算法分离,我实现了某种迭代器结构:TreeNavigator。 它具有以下界面:

interface TreeNavigator {
        public void jumpTo(char c); //go from pattern p to p+c (c can be a dot from a regex or [AC] for example)
        public void backtrack(); //pop the last character
        public List<Position> getMatches();
        public Pattern trail(); //current pattern p
    }

interface SearchSpace {
        //degrees of freedom in regex, min and maxlength,...
    public boolean inSearchSpace(Pattern p); 
    public Alphabet getPatternAlphabet();
}

interface ScoreCalculator {
    //calculate a score, approximately equal to the number of occurrences of the pattern
    public Score calcConservationScore(TreeNavigator t);
}

//Motif algorithm code which is run in the MapReduce Mapper function:
public class DiscoveryAlgorithm {
    private Context context; //MapReduce context object to write to disk
    private Score minScore;

    public void runDiscovery(){
    //depth first traveral of pattern space A, AA, AAA,... AAC, ACA, ACC and so fort
        exploreSubTree(new TreeNavigator());
    }

    //branch and bound for pattern space, if pattern occurs too little, stop searching
    public boolean survivesBnB(Score s){
        return s.compareTo(minScore)>=0;
    }

    public void exploreSubTree(Navigator nav){
        Pattern current = nav.trail();
        Score currentScore = ScoreCalculator.calc(nav);

        if (!survivesBnB(currentScore)}{
           return;
        }


        if (motif in searchspace)
            context.write(pattern);

        //iterate over all possible extensions: A,C,G,T, [AC], [AG],... [ACGT]
        for (Character c in SearchSpace.getPatternAlphabet()){
             nav.jumpTo(c);
             exploreSubTree(nav);
             nav.backtrack();
        }
    }
}

完整的 MapReduce 源代码 @ https://github.com/drdwitte/CloudSpeller/ 相关研究论文:http://www.ncbi.nlm.nih.gov/pubmed/26254488

UPDATE3: I have continued reading about ways to create a Stream. From what I have read so far I think I have to rewrite my runDiscovery() into a Supplier. This Supplier can then be transformed into a Stream via the StreamSupport class.

最佳答案

这是对您的要求的简单、惰性评估:

public static void main(String[] args) {
    String string = "test";

    IntStream.range(0, string.length())
             .boxed()
             .flatMap(start -> IntStream
                 .rangeClosed(start + 1, string.length())
                 .mapToObj(stop -> new AbstractMap.SimpleEntry<>(start, stop))
             )
             .map(e -> string.substring(e.getKey(), e.getValue()))
             .forEach(System.out::println);
}

它产生:

t
te
tes
test
e
es
est
s
st
t

说明:

// Generate "start" values between 0 and the length of your string
IntStream.range(0, string.length())
         .boxed()

// Combine each "start" value with a "stop" value that is between start + 1 and the length
// of your string
         .flatMap(start -> IntStream
             .rangeClosed(start + 1, string.length())
             .mapToObj(stop -> new AbstractMap.SimpleEntry<>(start, stop))
         )

// Convert the "start" / "stop" value tuple to a corresponding substring
         .map(e -> string.substring(e.getKey(), e.getValue()))
         .forEach(System.out::println);

关于hadoop - 构建不适合内存的流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32669615/

相关文章:

hadoop - 配置单元中带有参数的两个查询

hadoop - 并行运行多个 map task

java - hadoop eclipse插件未将代码部署到hadoop集群

python - 使用 appengine-mapreduce 达到内存限制

python - 了解 Spark 中的 LDA

hadoop - Avro 与 Parquet

Hadoop 自定义可写与第二遍

hadoop - 维基百科页面使用 Hadoop 共同编辑图形提取

python - Pyspark - UnicodeEncodeError : 'ascii' codec can't encode character

python - pyspark Dataframe API cast ('timestamp' ) 不适用于时间戳字符串