java - 如何将这些新消息传递给另一个类

标签 java multithreading design-patterns concurrency threadpool

现在基本上我已经创建了三个类。

public void run() {  
int seqId = 0;  
while(true) {  
    List<KamMessage> list = null;  
    try {  
        list = fullPoll(seqId);  
    } catch (Exception e1) {  
        e1.printStackTrace();  
    }  
    if (!list.isEmpty()) {  
        seqId = list.get(0).getSequence();  
        incomingMessages.addAll(list);  
        System.out.println("waiting 3 seconds");  
        System.out.println("new incoming message");  
    }  
    try {  
        Thread.sleep(3000);  
        System.out.println("new incoming message");  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    }   
   }  
 }  
 public List<KamMessage> fullPoll(int lastSeq) throws Exception {  
 Statement st = dbConnection.createStatement();  
 ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION =  804 and SEQ >" +  
 lastSeq + "order by SEQ DESC");        
 List<KamMessage> pojoCol = new ArrayList<KamMessage>();  
  while (rs.next()) {  
    KamMessage filedClass = convertRecordsetToPojo(rs);  
    pojoCol.add(filedClass);  
  }  
for (KamMessage pojoClass : pojoCol) {  
    System.out.print(" " + pojoClass.getSequence());  
    System.out.print(" " + pojoClass.getTableName());  
    System.out.print(" " + pojoClass.getAction());  
    System.out.print(" " + pojoClass.getKeyInfo1());  
    System.out.print(" " + pojoClass.getKeyInfo2());  
    System.out.println(" " + pojoClass.getEntryTime());  
   }             
return pojoCol;  
  }   

以下是类(class): 1.Poller-进行轮询并将新数据从数据库传递到 Controller

2.Controller-该类有一个线程池,它同时调用Poller并有向处理器请求的新数据

3.Processor - 这个类必须寻找新数据,处理它并将其返回到 Controller 。

所以现在我的问题是如何实现第三阶段......

这是我的 Controller 类:

public class RunnableController {  

/** Here This Queue initializes the DB and have the collection of incoming message
 *                    
 */  
  private static Collection<KpiMessage> incomingQueue = new ArrayList<KpiMessage>();  
  private Connection dbConncetion;  
  public ExecutorService threadExecutor;  
  private void initializeDb()  
  {  
    //catching exception must be adapted - generic type Exception prohibited  
    DBhandler conn = new DBhandler();  
    try {  
        dbConncetion = conn.initializeDB();  
    } catch (Exception e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
     }  
  }  


private void initialiseThreads()  
{         
    try {  

        threadExecutor = Executors.newFixedThreadPool(10);  
            PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion);  
        threadExecutor.submit(read);  

    }catch (Exception e){  
    e.printStackTrace();  
    }  

}  

@SuppressWarnings("unused")  
private void shutDownThreads()  
{         
    try {  
        threadExecutor.shutdown();  
        //DB handling should be moved to separate DB class  
        dbConncetion.close();  

    }catch (Exception e){  
    e.printStackTrace();  
    }  

}  

/** Here This Queue passes the messages and have the collection of outgoing message 
 *  
 */  

//private Collection<KpiMessage> outgingQueue = new ArrayList<KpiMessage>();  
//have to implement something here for future  

     public static void main(String[] args) throws InterruptedException {  
     RunnableController controller = new RunnableController();  

    System.out.println(incomingQueue.size());  

    controller.initializeDb();  
    controller.initialiseThreads();  

    Thread.sleep(3000);  
    System.out.println("Polling");  

  }  

} 

最佳答案

我建议使用 BlockingQueue 来执行此操作,而不是简单的 ArrayList。只需更改 incomingQueue 变量的类型即可。然后你可以让另一个线程(或线程池)做类似的事情

//pseudocode
while (true) {
   // it polls data from the incomingQueue that shares with the producers
    KpiMessage message = this.incomingQueue.take()

   //Then process the message and produces an output... you can put that output in a different queue as well for other part of the code to pick it up
}

关于 BlockingQueues 的一个很好的例子可以在这里找到 http://www.javamex.com/tutorials/blockingqueue_example.shtml

关于java - 如何将这些新消息传递给另一个类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14261506/

相关文章:

java - 在Java中表示可变长度矩阵(二维数组)的最佳方式?

java - Servlet 看不到属性

java - 在进行查询的同一页面中显示结果的简单方法 - Spring App

c++ - 在C++11中如何表达普通的存储(导出)和加载(导入)屏障(fence)?

java - 为什么我们在这里特别说 ArrayList 不是线程安全的?

design-patterns - 使用空白接口(interface)是一个糟糕的设计吗?

java - @Produces 方法中 @Mock 的 Junit 4 范围

c++ - 如何理解 C++ Concurrency in Action 中同一类的实例导致死锁问题?

java - 使用 RxJava2 正确处理背压和并发

java - 如何在JAVA中使用带有重载的Double Dispatch?