java - 使用动态负载调度的并行图像卷积滤波器: artifacts

标签 java multithreading image-processing convolution

我遇到以下问题:我创建了一个基于 this paper 的并行 prewitt 过滤器采用动态负载调度方法。不幸的是,我遇到了序列化过滤器未显示的工件,它们出现在看似随机的位置,这对我来说意味着线程中存在同步问题。但我不知道它在哪里。我有一个非常相似的灰度滤镜......到目前为止没有显示出类似的问题,但现在出现了。

desired result result from parallel filtering
左图是通过顺序算法实现的期望结果,右图显示了底部的上述工件。
通过进一步的测试,我现在相当确定我的线程会跳过图像的某些部分而不对其进行过滤。我会继续调查。

我的代码结构如下:ParallelPrewittFilter继承自ParallelSobelianFilter,并且仅实现创建正确类型的worker的工厂方法,即以下一个(实现可运行接口(interface)的类)。 PrewittFilterWorker 继承自 SobelianFilterWorker(SobelianFilterWorker 又继承自 ParallelFilterWorker),并且仅实现一个返回卷积核的方法。因此,我现在将发布 ParallelSobelianFilter 和 SobelianFilter 工作人员的相关代码。最后一个代码块是负载调度代码。

并行Sobelian过滤器:

public BufferedImage applyFilter(BufferedImage image) {
  //taking the red and alpha channels from image and placing them
  //in the arrays red[width*height] and alpha[width*height]

  ParallelFilterWorker.resetDynamicLoadCounter();
  for (SobelianFilterWorker worker : workers) {
    worker.setSourceArrays(width, height, alpha, red, green, blue, hasAlpha);
    worker.setDestImage(result);
  }

  for (Thread thread : threads) {
    System.out.println("starting thread ");
    thread.start();
  }

  for (Thread thread : threads) {
    try {
      thread.join();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  return result;
}

SobelianFilterWorker:

protected void filterPixel(int index) {
  //[..]
  if (x < 1 || y < 1 || x > width - 2 || y > height - 2) {
    //do nothing
    color = red[index];
  } else {
    firstPass = red[index - 1] * kernel[0][1] + red[index + 1] * kernel[2][1]
        + red[index - width - 1] * kernel[0][0] + red[index - width] * kernel[1][0] 
        + red[index - width + 1] * kernel[2][0] + red[index + width - 1] * kernel[0][2]
        + red[index + width] * kernel[1][2] + red[index + width + 1] * kernel[2][2];

    //transposed kernel
    secondPass = red[index - 1] * kernel[1][0] + red[index + 1] * kernel[1][2]
        + red[index - width - 1] * kernel[0][0] + red[index - width] * kernel[0][1] 
        + red[index - width + 1] * kernel[0][2] + red[index + width - 1] * kernel[2][0]
        + red[index + width] * kernel[2][1] + red[index + width + 1] * kernel[2][2];
    color = (int) Math.floor(Math.sqrt(firstPass * firstPass + secondPass * secondPass));
  }
  if (color > 255) {
    color = 255;
  }
  // ... color turned into an ARGB integer argb
    destImage.setRGB(x, y, argb);      
  }

}

我怀疑错误出在上面两个 block 中,因为当filterPixel是一个简单的灰度滤镜时,以下代码可以正常工作:

并行过滤器工作器:

private static final int loadPerInterval = 500;

private static volatile int dynamicLoadCounter = 0;

public static synchronized void resetDynamicLoadCounter() {
  dynamicLoadCounter = -loadPerInterval;
}

public void run() {
  if (checkNull()) {
    return;
  }

  int localCounter = loadPerInterval - 1;
  int start = 0;
  while (dynamicLoadCounter < width * height) {
    localCounter++;
    if (localCounter == loadPerInterval) {
      //fetch a package of pixels to work on and mark them as being worked on
      start = syncCounterUp();
      System.out.println("#" + threadID + " starting at " + start);
      localCounter = 0;
    }
    if (start + localCounter < width * height) {
      filterPixel(start + localCounter);
    } else {
      return;
    }
  }
}

private static synchronized int syncCounterUp() {
  dynamicLoadCounter += loadPerInterval;
  return dynamicLoadCounter;
}

出了什么问题,我错过了同步吗?我对解释我的线程到底在做什么以及为什么出现这些工件非常感兴趣。感谢您的浏览!

最佳答案

这可以通过使用 ThreadPool 并向其提供 Callable 来解决。由于锁的原因,这种接缝设计有点过度,但它可以使线程分开,即使一个图像没有通过第一个过滤器 channel 完成并在第二个 channel 中使用。 这是一个关于如何工作的示例:

import java.awt.image.BufferedImage;
import java.util.*;
import java.util.concurrent.*;

public class Filter {
    int lockcount = 0;
    Worker worker = new Worker();
    List<Worker> fpThreads = new ArrayList<Worker>();
    List<Worker> spThreads = new ArrayList<Worker>();
    ExecutorService executor = Executors.newCachedThreadPool();
    Map<Integer, Object> lockMap = Collections.synchronizedMap(new Hashtable<Integer, Object>());


    public static void main(String[] args) {
        Filter filter = new Filter();

        for (int i = 0; i < 1000; i++) {
            Worker w1 = new Worker();
            filter.fpThreads.add(w1);
        }
        for (int i = 0; i < 1000; i++) {
            Worker w1 = new Worker();
            filter.spThreads.add(w1);
        }

        filter.filer();
    }


    public void filer() {
        runPass(lockMap, fpThreads);
        runPass(lockMap, spThreads);
    }

    private BufferedImage runPass(Map<Integer, Object> lockMap, List<Worker> threads) {
        Future<BufferedImage> future = null;
        Object lock = null;
        for (Worker thread : threads) {
            lock = lockMap.get(worker.hashCode());
            if (lock == null) {
                lock = thread;
                lockMap.put(thread.hashCode(), lock);
                future = executor.submit(thread);
            } else { //we have a lock
                waitOnLock(thread, lock);
            }
        }
        try {
            //get() waits until it gets an result
            return future.get();

        } catch (InterruptedException | ExecutionException ex) {
            // TODO Auto-generated catch block
            ex.printStackTrace();
        }
        synchronized (lock) {
            lockMap.remove(lock.hashCode());
            lock.notifyAll();
            System.out.println("Notify: " + lock.hashCode() + " with " + lockcount + " locks in use.");
        }
        return null;
    }

    private void waitOnLock(Worker thread, Object lock) {
        synchronized (lock) {
            try {
                lockcount++;
                System.out.println("Wait: " + thread.hashCode() + " with " + lockcount + " locks in use.");
                lock.wait();
                lockcount--;
                System.out.println("Continuing: " + thread.hashCode() + " with " + lockcount + " locks in use.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Worker implements Callable<BufferedImage> {

    public BufferedImage call() throws Exception {
        //Here you do you filtering
        System.out.println("Thread called: " + this.hashCode());
        Thread.sleep(1000);
        return null;
    }
}

为了您的目的,您可以实现工作人员来进行过滤。

关于java - 使用动态负载调度的并行图像卷积滤波器: artifacts,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38248281/

相关文章:

java - 创建对象和保持对象静态之间的区别

java - 使用具有当前时间的随机数生成器与不使用

c++ - 如何在旧版 C++ 上创建自己的线程池?

image-processing - 每像素阈值

android - 了解 ImageProcessing 代码

java - 如何在jsp中制作漂亮的表格

java - 某些语言环境在 TTS 中不可用——包括西类牙语

matlab - 在 MATLAB 中仅将图形内容转换为图像

c# - 如何在线程中运行方法并在线程退出时调用方法

python - 对于我的应用程序,最佳线程数是多少?