java - 对 BlockingQueue java 和 Storm(分布式计算)的实现有问题吗?

标签 java apache-storm blockingqueue

这是我的输入喷口的代码片段,用于将元组发送到节点以通过集群进行流处理。问题是 BlockingQueue 抛出 InterruptedException 。

private SpoutOutputCollector collector;
public BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();

public boolean isDistributed() {
    return true;
    }


public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {

    this.collector=collector;

}

@Override
public void nextTuple() {


    try {
        //Utils.sleep(100);
        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }



}

public void readInputfile() throws IOException, InterruptedException{
    FileInputStream file = new FileInputStream("/home/529076/Desktop/Temperature");
    DataInputStream readDate=new DataInputStream(file);
    BufferedReader readText=new BufferedReader(new InputStreamReader(readDate));

    String line;
    String singleReading = null;
    while((line=readText.readLine())!=null){
         singleReading=line;
         blockingQueue.add(singleReading);

    }

}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("Single Temperature Reading"));
}

异常描述如下:---

java.lang.InterruptedException10930 [Thread-20] INFO backtype.storm.util - 异步循环中断!

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at com.tcs.storm.test.InputStreamSpout.nextTuple(InputStreamSpout.java:65)
at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:413)

而 nextTuple(InputStreamSpout.java:65 是 ------>

        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));

谢谢

最佳答案

该错误是由于以下事实造成的:阻塞队列未在输出收集器中初始化;

关于java - 对 BlockingQueue java 和 Storm(分布式计算)的实现有问题吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11343299/

相关文章:

java - 在 XL 中创建新工作表

java - 从命令行运行 jar 时覆盖 pom.xml 中指定的主类

java - 从 Spout 发出自定义 Java 对象作为元组

c++ - 如何在调用C++析构函数时优雅地通过阻塞调用来停止/销毁线程?

c++ - 是否使用 unique_ptr 成员变量?

haskell - Haskell 中的阻塞队列实现

java - 使用 Spring 的对象创建和初始化顺序

java - 我做错了什么?安卓JUnit

java - 键盘输入java

java - Storm 部署 JAR 拓扑时缺少资源文件夹