java - 为什么SynchronousQueue允许其他线程随机取?

标签 java multithreading sorting

我正在尝试实现一个小程序,该程序将使用 SynchronousQueue 按升序对整数数组/vector 进行排序。

我接手的方法是创建一个线程池,其中每个线程将通过迭代元素并比较位置 i< 上的元素按升序对元素进行一次排序/em> 和 i+1 并交换它们。

所以我创建了一个执行此交换的类,并将 iteration 数字、两个 SynchronousQueues 和数组本身作为参数,如下所示:

class OrderAsc extends Thread {
int length;
int iteration;
SynchronousQueue<ArrayList> st, dr; 
ArrayList<Integer> vector;
boolean swapMade;

    OrderAsc( int iteration, SynchronousQueue<ArrayList> st, SynchronousQueue<ArrayList> dr, ArrayList<Integer> vector) throws InterruptedException{

        this.length = vector.size();
        this.st = st;
        this.dr = dr;
        this.vector = vector;
        this.iteration = iteration;
        this.swapMade = false;
    }

    // Rularea thread-ului
    @Override
    public void run(){

        if(iteration > 0){

            try {
                vector = st.take();
                System.out.println(iteration + " get " + vector);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
        }

        // System.out.println(iteration + " >>> " + vector);


        for(int i = 0; i < length-1; i++){
            if(vector.get(i) > vector.get(i+1)){

                int tmp = vector.get(i);
                vector.set(i, vector.get(i+1));
                vector.set(i+1, tmp);

                swapMade = true;
            }
        }

        if(swapMade){
            try {

                dr.put(vector);
                System.out.println(iteration + " put " + vector);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
        } else { 
            // Display result
            System.out.println(vector);
        }
        System.exit(0);

    }

}

在主类中,我创建了一个包含此类对象的 ArrayList,如下所示:

ArrayList<OrderAsc> orderFilter = new ArrayList<OrderAsc>(n+2);
            for(int i = 0; i < n + 2; i++){
                orderFilter.add(i, new OrderAsc(i, st, dr, vector));
                st = dr;
                dr = new SynchronousQueue<ArrayList>();                    

            }

            for(int i=0; i < n + 2; i++){
                orderFilter.get(i).setDaemon(true);
                orderFilter.get(i).start();
            }

考虑到 OrderAsc 对象的实现,第一个线程将对数据进行排序迭代,然后将结果放入 dr 队列。然后,被阻塞的下一个线程将等待第一个线程TAKE这个结果,再次过滤,然后将它发送给下一个,直到整个数组被排序。

出乎意料的是,似乎线程并没有一直等待那些 SynchronousQueues,并且出于某些原因,它们有时会随机获取数据,如下所示:

1 get [5, 4, 3, 2, 1, 6]
0 put [4, 3, 2, 1, 5, 6]
1 put [4, 3, 2, 1, 5, 6]
2 get [4, 3, 2, 1, 5, 6]
BUILD SUCCESSFUL (total time: 5 seconds)

SynchronousQueue 是否有不同的工作方式而我实现错误?

最佳答案

我试过用一个同步队列重写你的程序... 简而言之,线程 0 放入初始的未排序 vector ,并且 其余线程开始以迭代方式进行排序。

import java.util.ArrayList;
import java.util.concurrent.SynchronousQueue;


class OrderAsc extends Thread {
    int length;
    int iteration;
    SynchronousQueue<ArrayList> st, dr; 
    ArrayList<Integer> vector;
    boolean swapMade;

    OrderAsc( int iteration, SynchronousQueue<ArrayList> st, SynchronousQueue<ArrayList> dr, ArrayList<Integer> vector) throws InterruptedException{
        this.length = vector.size();
        this.st = st;
        this.dr = dr;
        this.vector = vector;
        this.iteration = iteration;
        this.swapMade = false;
    }

    @Override
    public void run(){

        if ( iteration == 0 ){
            try {

                System.out.println("\n  " + iteration  + " put " + vector);

                st.put(vector);

                Thread.currentThread().interrupt();
                return;
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        else{

            try {
                vector = st.take();
                // System.out.println("\n Take in Thread : " + Thread.currentThread().getName());
                System.out.println(iteration + " take " + vector);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }

        } 


        for(int i = 0; i < vector.size() - 1; i++){

            if(vector.get(i) > vector.get(i+1)){
                int tmp = vector.get(i);
                vector.set(i, vector.get(i+1));
                vector.set(i+1, tmp);

                swapMade = true;
            }

        }


        if(swapMade){
                //System.out.println("\n Swap Done in  Thread : " + Thread.currentThread().getName());

            try {
                st.put(vector);

                System.out.println(iteration + " sorted " + vector);

            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
        }  

        //System.out.println("\n Exiting Thread : " + Thread.currentThread().getName());
        // System.exit(0);

    }


    public static void main(String args[]){
        ArrayList<OrderAsc> orderFilter = new ArrayList<OrderAsc>(5);

        ArrayList<Integer> vector = new ArrayList<Integer>(6);
        vector.add(4);
        vector.add(1);
        vector.add(3);
        vector.add(5);
        vector.add(6);
        vector.add(2);


        System.out.println("\n Unsorted Array " + vector + "\n\n");

        int n = vector.size();

        //vector.add(1);
        //vector.add(5);

        SynchronousQueue<ArrayList> st1 = new SynchronousQueue<ArrayList>();
        SynchronousQueue<ArrayList> dr1 = new SynchronousQueue<ArrayList>();

        for(int i = 0; i < n ; i++){
            try {

                orderFilter.add(i, new OrderAsc(i, st1, dr1, vector));

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            //st1 = dr1;
            //dr1 = new SynchronousQueue<ArrayList>();                    
        }

        for(int i=0; i < n - 1; i++){
            //orderFilter.get(i).setDaemon(true);
             orderFilter.get(i).start();
        }


        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("\n Voila Thanks Threads and the  Sorted Array Is ");

        for(int i = 0; i < vector.size(); i++){
            System.out.print(" " + vector.get(i) +  " ");
        }   

    }

}

关于java - 为什么SynchronousQueue允许其他线程随机取?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30286541/

相关文章:

c++ - 从另一个线程中断 c++ 中的 lua 脚本?

c++ - 如何使用 STL 容器实现库排序算法?

java - 找不到变量 e.getkeycode

Java,JCheckbox - 消耗/阻止所有事件,但仍然启用

java - 为 grails 应用程序指定 java -D 选项

c# - 等待从弹出表单中获取信息

java - 可以根据条件重新启动的线程

java - Eclipse Oxygen 项目链接到类文件而不是源文件

python - 如何将多列中的时间戳重新排序为单列python

python 2 : Sorting algorithm does not work