java - Drools In Spark 流文件

标签 java apache-spark hadoop spark-streaming drools

我们能够成功地将 drools 与 spark 集成,当我们尝试应用来自 Drools 的规则时,我们能够对存在于 HDFS 中的批处理文件执行操作,但是我们尝试将 drools 用于流文件,以便我们可以立即做出决定,但我们不知道该怎么做。下面是我们试图实现的代码片段。
案例 1:

    SparkConf conf = new SparkConf().setAppName("sample");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat");
    List<String> store = new ArrayList<String>();
    store = javaRDD.collect();

情况 2:当我们使用流上下文时

SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc = 
              new JavaStreamingContext(sparkconf, new Duration(1));

    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);

在第一种情况下,我们能够在变量存储上应用我们的规则,但在第二种情况下,我们无法在 dstream 行上应用规则。

如果有人有一些想法,如何完成,将是一个很大的帮助。

最佳答案

这是完成它的一种方法。

  1. 首先使用业务规则创建您的知识 session 。

    //Create knowledge and session here
    KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
    KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
    kbuilder.add( ResourceFactory.newFileResource( "rulefile.drl"),
            ResourceType.DRL );
    Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages();
    kbase.addKnowledgePackages( pkgs );
    final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession();
    
  2. 使用 StreamingContext 创建 JavaDStream。

    SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc = 
              new JavaStreamingContext(sparkconf, new Duration(1));
    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);  
    
  3. 调用 DStream 的 foreachRDD 来创建事实并触发您的规则。

    lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
         List<String> facts = rdd.collect();
         //Apply rules on facts here
         ksession.execute(facts);
         return null;
      }
    });
    

关于java - Drools In Spark 流文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28407742/

相关文章:

java - 实现树可写类

java - 为什么私有(private) getter 方法会违反封装性,而缺少 getter 方法则不会?

java - 有没有一种干净的方法可以为 Java 对象提供破坏范围?

java - Apache Spark : How can i access nested array of integers within Tuple2 object in Java?

apache-spark - 如何创建一个空的DataFrame?为什么是 “ValueError: RDD is empty”?

hadoop - 你如何导入Oracle表到 hive 表

java - 组织 firebase 数据

java - 使用 String.format 格式化 SSN

apache-spark - 为 Spark 集群和 Cassandra 设置和配置 Titan

hadoop - distcp hdfs 到 s3 失败