我正在设计一个并发 Java 应用程序,该应用程序从医院 Intranet 上提供的各种医疗设备读取数据。
我已经阅读了“Java 并发实践 - Brian Goetz...”来了解如何做事,但我认为我仍然缺少一些东西。
这是一个quick simple diagram我正在尝试做的事情,下面有一些代码片段..
工作线程(MedicalDeviceData 实例)不断从医疗设备读取数据,并将其提供给 MedicalDeviceWorkManager,而 MedicalDeviceWorkManager 又将其提供给最终用户。
工作线程继续无限地读取数据(理想情况下),并且在我的场景中不存在“工作已完成”的情况。
此外,用户可以根据需要选择启动所有设备或启动特定设备或停止设备。
下面是我如何实现它的代码片段(已编译但未测试)。
MedicalDeviceWorkManager - 生成工作线程并管理它们。
MedicalDeviceData - 工作线程无限地从医疗设备获取数据并更新此类的实例。
主要看startDevice、stopDevice和run方法。
您显然会注意到,我没有使用 ThreadPoolExecutor 和 Future,并且我只是在这里推出了自己的实现。
由于 future.get 会阻塞直到工作完成,这对我的情况没有意义,因为我的工作线程永远不会“完成”任务......它只是一个无限持续的任务......
问题:如何将下面所示的实现更改为更标准化的实现,以便我可以更好地使用 java.util.concurrent 包(ThreadPoolExecutor/Future)。
我应该考虑其他更好的设计模式吗?
public class MedicalDeviceWorkManager {
private ThreadGroup rootThreadGroup = null;
Hashtable<String, MedicalDeviceData> deviceObjs = new Hashtable<String, MedicalDeviceData>();
public void manageMedicalDevices() throws InterruptedException {
String[] allDevices={"Device1","Device2","Device3","Device4"};
//-- Start all threads to collect data
for(String deviceToStart:allDevices){
this.startDevice(deviceToStart);
}
//-- Stop all threads
for(String deviceToStop:allDevices){
this.stopDevice(deviceToStop);
}
//-- Start on request from user
String deviceToStart="Device1";
this.startDevice(deviceToStart);
//-- Stop on request from user.
String deviceToStop="Device1";
this.stopDevice(deviceToStop);
/*
* Get Data and give it to client
* This is happening via a separate TCP port
* */
while(true){
for(String deviceName:allDevices){
if(deviceObjs.get(deviceName)!=null){
ConcurrentHashMap<String,BigDecimal> devData=deviceObjs.get(deviceName).getCollectedData();
//--Loop and send data to client on TCP stream
;
}
}//-- loop the devices
}//-- infinite
}
//-- Start the device to start acquiring data using a worker thread
private void startDevice(String deviceName){
//-- Get Device instance
MedicalDeviceData thisDevice=deviceObjs.get(deviceName);
if(thisDevice==null){
thisDevice=new MedicalDeviceData(deviceName);
deviceObjs.put(deviceName, thisDevice);
}
//-- Create thread to start data acquisition
//-- Start if not being processed already (Handle what if thread hung scenario later)
if(this.getThread(deviceName)==null){
Thread t=new Thread(thisDevice);
t.setName(deviceName);
t.start();
}
}
//-- Stop the worker thread thats collecting the data.
private void stopDevice(String deviceName) throws InterruptedException {
deviceObjs.get(deviceName).setShutdownRequested(true);
Thread t=this.getThread(deviceName);
t.interrupt();
t.join(1000);
}
private Thread getThread( final String name ) {
if ( name == null )
throw new NullPointerException( "Null name" );
final Thread[] threads = getAllThreads( );
for ( Thread thread : threads )
if ( thread.getName( ).equals( name ) )
return thread;
return null;
}
private ThreadGroup getRootThreadGroup( ) {
if ( rootThreadGroup != null )
return rootThreadGroup;
ThreadGroup tg = Thread.currentThread( ).getThreadGroup( );
ThreadGroup ptg;
while ( (ptg = tg.getParent( )) != null )
tg = ptg;
return tg;
}
private Thread[] getAllThreads( ) {
final ThreadGroup root = getRootThreadGroup( );
final ThreadMXBean thbean = ManagementFactory.getThreadMXBean( );
int nAlloc = thbean.getThreadCount( );
int n = 0;
Thread[] threads;
do {
nAlloc *= 2;
threads = new Thread[ nAlloc ];
n = root.enumerate( threads, true );
} while ( n == nAlloc );
return java.util.Arrays.copyOf( threads, n );
}
}//-- MedicalDeviceWorkManager
public class MedicalDeviceData implements Runnable{
//-- Data Collected from medical device
private final ConcurrentHashMap<String,BigDecimal> collectedData=new ConcurrentHashMap<String,BigDecimal>();
//-- Set by Thread Manager to request a shutdown..after which it should interrupt the thread
private AtomicBoolean shutdownRequested;
//-- Simple data Counter
private AtomicInteger dataCounter=new AtomicInteger(0);
//-- Device Name
private String thisDeviceName;
public void run() {
//-- Initialize I/O for the device
;
while(!this.getShutdownRequested()){
try{
//-- just to compile the code
Thread.sleep(0);
//-- perform I/O operation to get data from medical device
;
//-- Add data into the ConcurrentHashMap...Both key and value are immutable.
collectedData.put("DataKey", new BigDecimal("9999"));
//-- data counter
dataCounter.getAndIncrement();
}
catch(InterruptedException ie){
if(this.getShutdownRequested()){
return;
}
//throw new InterruptedException();
}
}
}//-- run
public MedicalDeviceData(String thisDeviceName){
this.thisDeviceName=thisDeviceName;
}
/**
* @return the shutdownRequested
*/
public boolean getShutdownRequested() {
return this.shutdownRequested.get();
}
/**
* @param shutdownRequested the shutdownRequested to set
*/
public void setShutdownRequested(boolean shutdownRequested) {
this.shutdownRequested.set(shutdownRequested);
}
/**
* Both key and value are immutable, so ok to publish reference.
*
* @return the collectedData
*/
public ConcurrentHashMap<String, BigDecimal> getCollectedData() {
return collectedData;
}
/**
* @return the dataCounter
*/
public AtomicInteger getDataCounter() {
return dataCounter;
}
}
最佳答案
因此,线程池可能是也可能不是最好的用法,因为没有真正的工作抽象。然而,这可能是扩展线程的一个有趣的用例。我将使用简单的 j.u.c.Lock 和 j.u.c.Condition 来处理通信。最后,我会将停止和启动委托(delegate)给接受 Runnable 类型作为单独的常量工作单元的特定委托(delegate)类。
我想要的唯一区别是使用 stop 来关闭并使用暂停来暂停,启动将继续工作。例如,它可能看起来像这样。
public class MedicalDeviceWorkManager {
private ConcurrentHashMap<String, DelegatingThread> devices = new ConcurrentHashMap<...>();
public synchronized void registerDevice(String device, Runnable singleUnitOfWork){
DelegatingThread worker = new DelegatingThread(singleUnitOfWork);
devices.put(device,worker);
worker.start();
}
public void startDevice(String device){
devices.get(device).startDevice();
}
public void stopDevice(String device){
devices.get(device).stopDevice();
}
public void pauseDevice(String device){
devices.get(device).pauseDevice();
}
private static class DelegatingThread extends Thread{
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
final Runnable r;
boolean paused = true;
boolean stopped =false;
private DelegatingThread (Runnable r){ this.r = r; }
@Override
public void run(){
while(true){
lock.lock();
while(paused)
condition.await();
if(stopped) return;
lock.unlock();
r.run();
}
}
private void startDevice(){
lock.lock();
paused = false;
condition.signal();
lock.unlock();
}
private void pauseDevice(){
lock.lock();
if(!stopped)
paused = true;
lock.unlock();
}
private void stopDevice(){
lock.lock();
stopped= true;
paused=false;
condition.signal();
lock.unlock();
}
}
}
我知道要演示的代码很多,但线程池背后的原理相同。您将始终为每个设备创建一个线程,并在必要时重用。可运行程序将是您在 while 循环之间运行的单个函数。
此外,为了简洁起见,我没有包含 try/finally 语义。
关于Java 并发 - 管理线程生命周期(启动/停止)的更好设计方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6286702/