hadoop - 在本地模式下运行一个简单的层叠程序

标签 hadoop cascading

我正在努力运行此简单的级联程序。由于某种原因,它什么也不做。至少我希望它能打印记录。任何帮助,将不胜感激。

package com.myLearning.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Debug;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.local.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

public class operations_example 
{
    public static void main(String[] args) 
    {
    Scheme sourceScheme = new TextDelimited(new Fields("username", "age"), true, ",");
    String sourcePath = "C:/Users/Desktop/cascading/data/names.txt";
    Tap sourceTap = new FileTap(sourceScheme, sourcePath);

    Scheme targetScheme = new TextDelimited(new Fields("username", "age"), true, ",");
    String targetPath = "C:/Users/Desktop/cascading/data/output2.txt";
    Tap targetTap = new FileTap(targetScheme, targetPath, SinkMode.REPLACE);

    Pipe dataPipe = new Pipe("data");
    dataPipe = new Each(dataPipe, new Debug());
    ExpressionFilter filter = new ExpressionFilter("age >= 30", Integer.TYPE);

    dataPipe = new Each( dataPipe,new Fields("username","age"), filter);

    FlowDef flowdef = FlowDef.flowDef().
            addSource(dataPipe, sourceTap).
            addTailSink(dataPipe, targetTap);

    Flow flow = new LocalFlowConnector().connect(flowdef);
    flow.stop();    
    }
}

最佳答案

您没有执行该流程。

创建流程后,调用complete()(阻止)或start()来执行它。调用stop(),而不是来执行该流程。

http://docs.cascading.org/impatient/impatient1.html
http://docs.cascading.org/cascading/1.2/javadoc/cascading/flow/Flow.html#complete()

关于hadoop - 在本地模式下运行一个简单的层叠程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45534925/

相关文章:

hadoop - 添加对 scalding 的 parquet-avro 支持

mapreduce - 在哪里可以找到 hbase-0.89.20100924+28 的 HBase 级联模块?

hadoop - Hadoop级联框架以​​更新特定的列数据

java - Hadoop:使用 Cascading 2.5.1 和 Hadoop 2.2.0 进行文件复制

hadoop - Spark Streaming 和 Spark 应用程序可以在同一个 YARN 集群中运行吗?

hadoop - HBase 在 Hadoop 2.2.0 中不工作

java - 无法在 Hadoop 中使用 Mapreduce 获得预期的减少输出

java - 无法在Hadoop jar上加载application.properties(NullPointerException)

hadoop - FSImage 读取效率高,但不适合进行小的增量更新

regex - 级联-正则表达式解析器-错​​误的字段数