我正在尝试从阻塞队列中消费数据。消费方法需要在run方法中实现。 我有以下代码需要在 run 方法中实现
@Override
public String consume(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) throws StageException {
long nextSourceOffset = 0;
if (lastSourceOffset != null) {
nextSourceOffset = Long.parseLong(lastSourceOffset);
}
if (queue.size() != 0) {
Record record = getContext().createRecord("some-id::" + nextSourceOffset);
Map<String, Field> map = new HashMap<>();
try {
map.put("fieldName", Field.create(queue.take()));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
record.set(Field.create(map));
batchMaker.addRecord(record);
++nextSourceOffset;
}
return String.valueOf(nextSourceOffset);
}
我正在尝试使上述方法在“运行方法”下面运行
@Override
public void run() {
// TODO Auto-generated method stub
}
有什么办法可以调用它吗?
最佳答案
您可以创建一个接受参数的类构造函数,如下所示:
public class ConsumeRunner implements Runnable{
String lastSourceOffset;
int maxBatchSize;
BatchMaker batchMaker;
public ConsumeRunner(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker)
{
this.lastSourceOffset=lastSourceOffset;
this.maxBatchSize=maxBatchSize;
this.batchMaker=batchMaker;
}
@Override
public void run() {
consume(lastSourceOffset, maxBatchSize, batchMaker);
}
}
您可以正常构造线程,只是使用新的自定义构造函数而不是无参数构造函数。否则运行它是一样的。
关于java - java中多线程用in run方法调用方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45845029/