java - 在作家阅读器中使用信号灯

标签 java multithreading semaphore

因此,我正在参加多线程开发类(class),并且目前正在学习信号量。在我们的最新任务中,我们应该使用三个线程和两个队列。编写器线程将把字符写入第一个队列,然后“加密器”线程将从该队列中读取字符,对字符进行加密,然后将其添加到第二个队列中。然后,我们有了一个读取器线程,该线程从第二个队列进行读取。为了处理同步,我们应该使用信号量和互斥量,但是我没有任何管理:

public class Buffer {
private Queue<Character> qPlain = new LinkedList<Character>();
private Queue<Character> qEncrypt = new LinkedList<Character>();
private final int CAPACITY = 3;

public Buffer() {
    System.out.println("New Buffer!");
}

public synchronized void addPlain(char c) {
    while (qPlain.size() == CAPACITY) {
        try {
            wait();
            System.out.println("addPlain is waiting to add Data");
        } catch (InterruptedException e) {
        }
    }
    qPlain.add(c);
    notifyAll();
    System.out.println("addPlain Adding Data-" + c);
}

public synchronized char removePlain() {
    while (qPlain.size() == 0) {
        try {
            wait();
            System.out.println("----------removePlain is waiting to return Data.");
        } catch (InterruptedException e) {
        }
    }
    notifyAll();
    char c = qPlain.remove();
    System.out.println("---------------removePlain Returning Data-" + c);
    return c;
}

public synchronized void addEncrypt(char c) {
    while (qEncrypt.size() == CAPACITY) {
        try {
            wait();
            System.out.println("addEncrypt is waiting to add Data");
        } catch (InterruptedException e) {
        }
    }

    qEncrypt.add(c);
    notifyAll();
    System.out.println("addEncrypt Adding Data-" + c);
}

public synchronized char removeEncrypt() {
    while (qEncrypt.size() == 0) {
        try {
            wait();
            System.out.println("----------------removeEncrypt is waiting to return Data.");
        } catch (InterruptedException e) {
        }
    }
    notifyAll();
    char c = qEncrypt.remove();
    System.out.println("--------------removeEncrypt Returning Data-" + c);
    return c;
}

}

所以这很好用,但是由于我没有使用任何信号量,所以我不会通过。我确实理解这个概念,但是我看不出在这种情况下使用任何东西的意义。我有2个队列,每个队列只有一个读写器。

编辑:改为使用信号量。几乎可以正常工作,当队列为空时调用removePlain()方法会出现问题。我很确定我应该阻止它,但是我在这里迷路了。我可以不只是在这里使用互斥锁吗?
public class Buffer {
private Semaphore encryptedSem = new Semaphore(0);
private Semaphore decryptedSem = new Semaphore(0);
private final Queue<Character> qPlain = new LinkedList<Character>();
private final Queue<Character> qEncrypt = new LinkedList<Character>();
private final int CAPACITY = 3;
private boolean startedWrite = false;
private boolean startedRead = false;

/**
 * Adds a character to the queue containing non encrypted chars.
 * 
 * @param c
 */
public void addPlain(char c) {

    // Makes sure that this writer executes first.
    if (!startedWrite) {
        startedWrite = true;
        encryptedSem = new Semaphore(1);
    }

    if (qPlain.size() < CAPACITY) {
        aquireLock(encryptedSem);
        System.out.println("addPlain has lock");
        qPlain.add(c);
        realeseLock(encryptedSem);
    }
}

/**
 * Removes and returns the next char in the non encrypted queue.
 * 
 * @return
 */
public char removePlain() {
    // TODO Need to fix what happens when the queue is 0. Right now it just
    // returns a char that is 0. This needs to be blocked somehow.

    char c = 0;

    if (qPlain.size() > 0) {
        aquireLock(encryptedSem);
        System.out.println("removePlain has lock");
        c = qPlain.remove();
        realeseLock(encryptedSem);
    } else {
        System.out.println("REMOVEPLAIN CALLED WHEN qPlain IS EMPTY");
    }
    return c;
}

/**
 * Adds a character to the queue containing the encrypted chars.
 * 
 * @param c
 */
public void addEncrypt(char c) {
    if (!startedRead) {
        startedRead = true;
        decryptedSem = new Semaphore(1);
    }

    if (qEncrypt.size() < CAPACITY) {
        aquireLock(decryptedSem);
        System.out.println("addEncrypt has lock");
        qEncrypt.add(c);
        realeseLock(decryptedSem);
    }

}

/**
 * Removes and returns the next char in the encrypted queue.
 * 
 * @return
 */
public char removeEncrypt() {
    char c = 0;
    if (qEncrypt.size() > 0) {
        aquireLock(decryptedSem);
        System.out.println("removeEncrypt has lock");
        c = qEncrypt.remove();
        realeseLock(decryptedSem);

    }
    return c;
}

/**
 * Aquries lock on the given semaphore.
 * 
 * @param sem
 */
private void aquireLock(Semaphore sem) {
    try {
        sem.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

/**
 * Realeses lock on the given semaphore.
 * 
 * @param sem
 */
private void realeseLock(Semaphore sem) {
    sem.release();
}

}

最佳答案

好吧,所以尝试解决您的问题,而不做功课:-)

关于您的第一个样本

乍一看,这是一个有效的示例。您正在通过synchronized关键字使用互斥形式,这使您可以正确使用this.wait/notify。这也提供了确保每个线程在同一监视器上同步的保护措施,从而提供了足够的happen-before安全性。

换句话说,借助此单个监视器,您可以确保synchronized方法下的所有内容都独家执行,并且这些方法的副作用在其他方法内部可见。

唯一的麻烦是您的队列不是final,根据safe object publication指南,并且取决于整个系统/线程的引导方式,这可能会导致可见性问题。多线程代码(甚至可能是一般代码)的经验法则:可以使final成为可能。

您的代码的真正问题是它不能满足您的要求:请使用信号量。

关于您的第二个样本

不安全的 boolean 值突变

这是一个真正的问题。首先,您的startedWrite/startedRead boolean 值:您可以在任何同步(锁定,信号量,同步,……一无所有)之外对它们进行突变(更改其true/false值)。这是不安全的,在Java内存模型下,未执行变异的线程看不到变异值是合法的。换句话说,第一次写入可以将startedWrite设置为true,并且可能是所有其他线程都从未看到该true值。

关于此的一些讨论:
-https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html
-Java's happens-before and synchronization

因此,依赖于这些 boolean 值的任何内容在您的样本中都存在固有的缺陷。也就是说,您的信号量分配是一回事。

纠正此问题的几种方法:

  • 始终在某种同步化工具下更改共享状态(在您的第一个示例中,它是synchronized关键字,在这里它可能是您的信号灯),并确保所有线程更改或访问该变量都使用相同的工具
  • 或使用并发安全类型,例如AtomicBoolean,这种情况具有并发性,可以确保任何其他线程都可以看到任何突变

  • 比赛条件

    第二个代码示例的另一个问题是,在获取锁并对其进行修改之前,您需要检查队列的大小,即:
    if (qPlain.size() > 0) {
        aquireLock(encryptedSem);
        ...
        c = qPlain.remove();
        realeseLock(encryptedSem);
    } else {
        System.out.println("REMOVEPLAIN CALLED WHEN qPlain IS EMPTY");
    }
    

    两个并发线程可能同时在第一行执行检查,并且行为错误。典型的场景是:
  • qplain的大小为1
  • 线程1到达if,检查qplain不为空,检查成功,然后OS调度程序将线程1暂停在此处,现在
  • 线程2到达相同的if,并且由于相同的原因而成功执行了相同的检查
  • 线程1和线程2从那里恢复,都认为它们被允许从qplain中取出1个元素,这是错误的,因为qplain的大小实际上为1。

  • 它会失败。您应该进行某种互斥。您不能(再次根据经验法则)执行检查,并且不能在锁定下执行更改。从广义上讲,检查和突变都应在同一锁中进行。 (或者您是一个非常高级的多线程专家,并且您非常了解乐观锁定和stuf)。

    可能的死锁

    另一个经验法则:每当您在同一调用站点上获取和释放锁和/或资源时,都应该具有try/finally模式。
    也就是说,无论如何完成,您的代码应始终看起来像
    acuquireSemaphore();
    try { 
        // do something
    } finally {
        releaseSemaphore();
    }
    

    锁,输入或输出流,套接字等也一样。如果不这样做,可能会导致获取信号量但从不释放信号量,尤其是在发生未捕获的异常的情况下。因此,请在资源周围使用tryfinally

    结论

    由于存在如此严重的缺陷,我没有真正阅读您的代码来查看其“精神”是否有效。也许可以,但是在这一点上,进一步检查是不值得的。

    继续您的作业

    系统会要求您使用两种工具:Semaphore和互斥(例如,我猜是synchonizedLock)。两者不完全一样!

    如您的第一个样本所示,您可能会相互排斥。可能还没有信号量。信号量的要点是,它们(安全地)管理许多“许可证”。一个人(一个线程)可以请求一个许可,如果该信号量有一个可用的并且可以授予它,那么一个人就可以继续进行自己的工作。否则,将其置于“等待模式”(等待),直到获得许可证为止。在某个时候,期望*将许可还给信号量,以供其他人使用。

    (*请注意:信号量并非强制要求执行许可获取的线程是执行许可证释放的线程。这是使锁和信号量如此不同的一部分,这是一件好事。)

    让我们开始简单:只有一个许可的信号量可以用作互斥。优点:它可以由获取它的线程之外的另一个线程释放。这使得它非常适合在线程之间传递消息:它们可以交换许可。

    还记得我们什么?当然是wait/notify!

    解决方案的可能途径

    因此,我们有一个信号灯,并且有许多许可证。这个数字的含义是什么?一个自然的选择是:让信号量保存队列中元素的数量。起初,它可能为零。
  • 每当有人将一个元素放入队列中时,它就会将允许的数量增加一。
  • 每当有人将某个元素从队列中移出时,它都会减少许可数量。
  • 然后:尝试将元素从空队列中移出意味着尝试从空信号量中获取许可,它将自动阻止调用者。这似乎是您想要的。

  • 但!
  • 我们还没有“将元素放在完整队列的顶部”的定义。这是因为信号量不受许可限制。一个人可以从一个空的信号灯开始,然后叫“释放”一千次,最后得到一千个许可。我们将无限发挥自己的最大能力。
  • 假设我们有一个解决方法,但仍未完成:此时我们尚未确保读者和作家不会同时修改队列。这对于改正需求至关重要!

  • 因此,我们需要其他想法。

    好了,问题2很简单:我们被允许使用排他锁,因此我们将使用它们。只要使用相同的 监视器确保对列表本身的任何操作都在同步块(synchronized block)下。

    问题一...好吧,我们有一个表示“非空”状态的信号量。这是您在第一个样本中获得的两对等待/通知之一。好酷,让我们制作另一个表示“未满”状态的信号量,另一个代码示例的wait/notifyPair!

    因此,回顾一下:在原始样本中,每等待/通知两次就使用一个信号量。保持互斥,以实际修改队列对象的内容。并十分注意与信号灯相互排斥的部分,这是问题的症结所在。

    我会停在那里,让您根据需要走这条路。

    加分点

    您不必在这里编写两次相同的代码。在您的示例中,您对相同的逻辑进行了两次编码(一次用于“明文”,一次用于“加密”),基本上,等待(至少)一个点,然后再堆叠一个char,并等待(at至少一个字符,然后将其弹出。

    这应该是一个相同的代码/方法。只需执行一次,您就会始终正确(或错误)。写两次,您犯错误的机会就会加倍。

    future 的想法
  • 这一切仍然非常复杂,可以使用`BlockingQueuè'来完成,但是同样,家庭作业的确还有另一个目的:-)。
  • 稍微复杂一点,但是这种消息传递模式的信令具有等待“notEmpty”信号的线程,而另一个则等待“notFull”信号,这是JDK Condition对象的确切用例,它模仿了等待/通知。
  • 关于java - 在作家阅读器中使用信号灯,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40865669/

    相关文章:

    c - 为什么我的程序在调用 sem_wait 时不等待?

    与条件变量相比,队列应用程序中的 C++20 信号量似乎很慢

    c - Dining Philosophers in C 内存泄漏

    java - 在客户端处理 session 超时

    java - 应用关闭时的Android调用功能

    java - Web 模块之间是否共享线程?

    python - 如何使用线程池做死循环功能?

    java - java服务器信息中的postgres

    java - Android 两个网络同时操作

    c# - C# 中的 Task<T> 和 TaskContinuationOptions 说明?