我们能够成功地将 drools 与 spark 集成,当我们尝试应用来自 Drools 的规则时,我们能够对存在于 HDFS 中的批处理文件执行操作,但是我们尝试将 drools 用于流文件,以便我们可以立即做出决定,但我们不知道该怎么做。下面是我们试图实现的代码片段。
案例 1:。
SparkConf conf = new SparkConf().setAppName("sample");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat");
List<String> store = new ArrayList<String>();
store = javaRDD.collect();
情况 2:当我们使用流上下文时
SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
JavaStreamingContext ssc =
new JavaStreamingContext(sparkconf, new Duration(1));
JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);
在第一种情况下,我们能够在变量存储上应用我们的规则,但在第二种情况下,我们无法在 dstream
行上应用规则。
如果有人有一些想法,如何完成,将是一个很大的帮助。
最佳答案
这是完成它的一种方法。
首先使用业务规则创建您的知识 session 。
//Create knowledge and session here KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(); KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); kbuilder.add( ResourceFactory.newFileResource( "rulefile.drl"), ResourceType.DRL ); Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages(); kbase.addKnowledgePackages( pkgs ); final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession();
使用 StreamingContext 创建 JavaDStream。
SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming"); JavaStreamingContext ssc = new JavaStreamingContext(sparkconf, new Duration(1)); JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);
调用 DStream 的 foreachRDD 来创建事实并触发您的规则。
lines.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> rdd) throws Exception { List<String> facts = rdd.collect(); //Apply rules on facts here ksession.execute(facts); return null; } });
关于java - Drools In Spark 流文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28407742/