我正在尝试编写一个写入 Mongodb 的自定义流分组。我现在正在运行本地集群。我有一个自定义流类和一个 mongo 对象。我在prepare()和chooseTask()中都写入mongodb。它写入 mongodb 但主管无法启动。我在主管日志中看到此错误:
b.s.d.worker [ERROR] Error on initialization of server mk-worker
java.lang.NoClassDefFoundError: com/mongodb/MongoClient
at storm.starter.MongoMonitorObject.<init>(MongoMonitorObject.java:23) ~[stormjar.jar:0.10.0]
at storm.starter.ModStreamGrouping.prepare(ModStreamGrouping.java:94)
~[stormjar.jar:0.10.0]
我现在正在 Storm 启动器项目中进行更改。
public class ModStreamGrouping implements CustomStreamGrouping, Serializable{
java.util.List<java.lang.Integer> targetTasks = new ArrayList();
@Override
public List<Integer> chooseTasks(int taskId,List<Object> values) {
System.out.println("taskiD = " + taskId);
System.out.println("values = " + values);
return numTasks[0];
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, java.util.List<java.lang.Integer> targetTasks) {
MongoMonitorObject mmo = new
System.out.println(" in prep() ");
System.out.println("targetTasks = " + targetTasks);
numTasks = targetTasks.size();
}
}
public class MongoMonitorObject {
private static final Logger LOG = LoggerFactory.getLogger(MongoMonitorObject.class);
public MongoMonitorObject(java.util.List<java.lang.Integer> targetTasks){
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB( "loadDB" );
DBCollection collection = db.getCollection("testCollection");
for (Integer task : targetTasks) {
BasicDBObject document = new BasicDBObject();
document.put("tid", task);
collection.insert(document);
}
}
catch (UnknownHostException e) {
System.out.println(" in UnknownHostException ");
LOG.info(" in UnknownHostException ");
}
catch (Exception e) {
System.out.println(" in Exception ");
LOG.info(" in Exception ");
}
}
}
流分组在 ModStreamGrouping.java 中定义,mongo 连接在 MongoMonitorObject.java 中定义。两者都属于storm.starter包。
我可以上传拓扑,但主管无法生成工作人员。我在某处缺少一个小链接,但我不知 Prop 体在哪里。我在 Storm starter 的 pom.xml 中添加了以下内容以包含 mongodb 连接:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>2.13.3</version>
</dependency>
最佳答案
编辑: 我在这里读到:https://github.com/mongodb/mongo-java-driver
mongodb-java-driver是一个一体化的jar,它包含bson和core
因此依赖 mongodb-java-driver
就足够了。
如果使用依赖mongodb-driver
,则需要依赖bson
和core
。
原帖:
尝试添加mongodb-driver-core
并使用较新版本的依赖项
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>3.2.2</version>
</dependency>
MongoDB Java 驱动程序 uber-artifact,包含 mongodb-driver、mongodb-driver-core 和 bson
检查一下here
关于java - 从 Apache Storm 写入 Mongodb,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36189610/