swift - 如何在 Swift 中实现线程安全哈希表(电话簿)数据结构?

标签 swift multithreading grand-central-dispatch data-synchronization barrier

我正在尝试实现一个线程安全的电话簿对象。电话簿应该可以添加一个人,并根据姓名和电话号码查找一个人。从实现的角度来看,这只涉及两个哈希表,一个关联名称 -> Person,另一个关联 phone# -> Person。

需要注意的是我希望这个对象是线程安全的。这意味着我希望能够支持电话簿中的并发查找,同时确保一次只有一个线程可以将一个人添加到电话簿。这是基本的读写器问题,我正在尝试使用 GrandCentralDispatch 和调度障碍来解决这个问题。尽管我遇到了问题,但我正在努力解决这个问题。下面是我的 Swift Playground 代码:

//: Playground - noun: a place where people can play

import UIKit
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

public class Person: CustomStringConvertible {
    public var description: String {
        get {
            return "Person: \(name), \(phoneNumber)"
        }
    }

    public var name: String
    public var phoneNumber: String
    private var readLock = ReaderWriterLock()

    public init(name: String, phoneNumber: String) {
        self.name = name
        self.phoneNumber = phoneNumber
    }


    public func uniquePerson() -> Person {
        let randomID = UUID().uuidString
        return Person(name: randomID, phoneNumber: randomID)
    }
}

public enum Qos {
    case threadSafe, none
}

public class PhoneBook {

    private var qualityOfService: Qos = .none
    public var nameToPersonMap = [String: Person]()
    public var phoneNumberToPersonMap = [String: Person]()
    private var readWriteLock = ReaderWriterLock()


    public init(_ qos: Qos) {
        self.qualityOfService = qos
    }

    public func personByName(_ name: String) -> Person? {
        var person: Person? = nil
        if qualityOfService == .threadSafe {
            readWriteLock.concurrentlyRead { [weak self] in
                guard let strongSelf = self else { return }
                person = strongSelf.nameToPersonMap[name]
            }
        } else {
            person = nameToPersonMap[name]
        }

        return person
    }

    public func personByPhoneNumber( _ phoneNumber: String) -> Person? {
        var person: Person? = nil
        if qualityOfService == .threadSafe {
            readWriteLock.concurrentlyRead { [weak self] in
                guard let strongSelf = self else { return }
                person = strongSelf.phoneNumberToPersonMap[phoneNumber]
            }
        } else {
            person = phoneNumberToPersonMap[phoneNumber]
        }

        return person
    }

    public func addPerson(_ person: Person) {
        if qualityOfService == .threadSafe {
            readWriteLock.exclusivelyWrite { [weak self] in
                guard let strongSelf = self else { return }
                strongSelf.nameToPersonMap[person.name] = person
                strongSelf.phoneNumberToPersonMap[person.phoneNumber] = person
            }
        } else {
            nameToPersonMap[person.name] = person
            phoneNumberToPersonMap[person.phoneNumber] = person
        }
    }

}


// A ReaderWriterLock implemented using GCD and OS Barriers.
public class ReaderWriterLock {

    private let concurrentQueue = DispatchQueue(label: "com.ReaderWriterLock.Queue", attributes: DispatchQueue.Attributes.concurrent)
    private var writeClosure: (() -> Void)!

    public func concurrentlyRead(_ readClosure: (() -> Void)) {
        concurrentQueue.sync {
            readClosure()
        }
    }

    public func exclusivelyWrite(_ writeClosure: @escaping (() -> Void)) {
        self.writeClosure = writeClosure
        concurrentQueue.async(flags: .barrier) { [weak self] in
            guard let strongSelf = self else { return }
            strongSelf.writeClosure()
        }
    }

}

// MARK: Testing the synchronization and thread-safety

for _ in 0..<5 {
    let iterations = 1000
    let phoneBook = PhoneBook(.none)

    let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: DispatchQueue.Attributes.concurrent)
    for _ in 0..<iterations {
        let person = Person(name: "", phoneNumber: "").uniquePerson()
        concurrentTestQueue.async {
            phoneBook.addPerson(person)
        }
    }

    sleep(10)
    print(phoneBook.nameToPersonMap.count)
}

为了测试我的代码,我运行了 1000 个并发线程,这些线程只是将一个新的 Person 添加到 PhoneBook。每个人都是独一无二的,所以在 1000 个线程完成后,我希望 PhoneBook 包含 1000 个计数。每次执行写入时,我都会执行 dispatch_barrier 调用,更新哈希表,然后返回。据我所知,这就是我们需要做的;然而,在重复运行 1000 个线程后,我发现电话簿中的条目数量不一致并且到处都是:

Phone Book Entries: 856
Phone Book Entries: 901
Phone Book Entries: 876
Phone Book Entries: 902
Phone Book Entries: 912

谁能帮我弄清楚这是怎么回事?我的锁定代码有问题,或者更糟的是我的测试构建方式有问题吗?我对这个多线程问题空间很陌生,谢谢!

最佳答案

问题出在您的 ReaderWriterLock 上。您将 writeClosure 保存为一个属性,然后异步调度一个调用该保存属性的闭包。但是,如果在此期间另一个 exclusiveWrite 出现,您的 writeClosure 属性将被替换为新的闭包。

在这种情况下,这意味着您可以多次添加相同的 Person。因为您使用的是字典,所以这些重复项具有相同的键,因此不会导致您看到所有 1000 个条目。

您实际上可以简化 ReaderWriterLock,完全消除该属性。我还会将 concurrentRead 设为泛型,返回值(就像 sync 一样),并重新抛出任何错误(如果有的话)。

public class ReaderWriterLock {
    private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
    
    public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
        return try queue.sync {
            try block()
        }
    }
    
    public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
        queue.async(flags: .barrier) {
            block()
        }
    }
}

其他一些不相关的观察结果:

  1. 顺便说一句,这个简化的ReaderWriterLock正好解决了另一个问题。 writeClosure 属性(我们现在已删除)可能很容易引入强引用循环。

    是的,您在使用 [weak self] 时很谨慎,所以没有任何强引用循环,但这是可能的。我建议无论您在何处使用闭包属性,都应在使用完后将该闭包属性设置为 nil,这样闭包可能意外包含的任何强引用都将得到解决。这样一来,持久的强引用循环就永远不可能了。 (另外,闭包本身和它拥有的任何局部变量或其他外部引用都将被解析。)

  2. 你睡了 10 秒。这应该绰绰有余,但我建议不要只添加随机 sleep 调用(因为您永远无法 100% 确定)。幸运的是,您有一个并发队列,因此您可以使用它:

    concurrentTestQueue.async(flags: .barrier) { 
        print(phoneBook.count) 
    }
    

    由于这个障碍,它会一直等到你放入该队列的所有其他内容都完成。

  3. 请注意,我不只是打印 nameToPersonMap.count。此数组已在 PhoneBook 中仔细同步,因此您不能让随机的外部类在不同步的情况下直接访问它。

    每当你有一些你在内部同步的属性时,它应该是 private 然后创建一个线程安全的函数/变量来检索你需要的任何东西:

    public class PhoneBook {
    
        private var nameToPersonMap = [String: Person]()
        private var phoneNumberToPersonMap = [String: Person]()
    
        ...
    
        var count: Int {
            return readWriteLock.concurrentlyRead {
                nameToPersonMap.count
            }
        }
    }
    
  4. 你说你正在测试线程安全,但随后创建了带有 .none 选项的 PhoneBook(没有实现线程安全)。在那种情况下,我预计会出现问题。您必须使用 .threadSafe 选项创建您的 PhoneBook

  5. 您有许多 strongSelf 模式。那是相当不灵巧。在 Swift 中通常不需要它,因为您可以使用 [weak self] 然后只进行可选链接。

综合所有这些,这是我最后的 Playground :

PlaygroundPage.current.needsIndefiniteExecution = true

public class Person {
    public let name: String
    public let phoneNumber: String
    
    public init(name: String, phoneNumber: String) {
        self.name = name
        self.phoneNumber = phoneNumber
    }
    
    public static func uniquePerson() -> Person {
        let randomID = UUID().uuidString
        return Person(name: randomID, phoneNumber: randomID)
    }
}

extension Person: CustomStringConvertible {
    public var description: String {
        return "Person: \(name), \(phoneNumber)"
    }
}

public enum ThreadSafety { // Changed the name from Qos, because this has nothing to do with quality of service, but is just a question of thread safety
    case threadSafe, none
}

public class PhoneBook {
    
    private var threadSafety: ThreadSafety
    private var nameToPersonMap = [String: Person]()        // if you're synchronizing these, you really shouldn't expose them to the public
    private var phoneNumberToPersonMap = [String: Person]() // if you're synchronizing these, you really shouldn't expose them to the public
    private var readWriteLock = ReaderWriterLock()
    
    public init(_ threadSafety: ThreadSafety) {
        self.threadSafety = threadSafety
    }
    
    public func personByName(_ name: String) -> Person? {
        if threadSafety == .threadSafe {
            return readWriteLock.concurrentlyRead { [weak self] in
                self?.nameToPersonMap[name]
            }
        } else {
            return nameToPersonMap[name]
        }
    }
    
    public func personByPhoneNumber(_ phoneNumber: String) -> Person? {
        if threadSafety == .threadSafe {
            return readWriteLock.concurrentlyRead { [weak self] in
                self?.phoneNumberToPersonMap[phoneNumber]
            }
        } else {
            return phoneNumberToPersonMap[phoneNumber]
        }
    }
    
    public func addPerson(_ person: Person) {
        if threadSafety == .threadSafe {
            readWriteLock.exclusivelyWrite { [weak self] in
                self?.nameToPersonMap[person.name] = person
                self?.phoneNumberToPersonMap[person.phoneNumber] = person
            }
        } else {
            nameToPersonMap[person.name] = person
            phoneNumberToPersonMap[person.phoneNumber] = person
        }
    }
    
    var count: Int {
        return readWriteLock.concurrentlyRead {
            nameToPersonMap.count
        }
    }
}

// A ReaderWriterLock implemented using GCD concurrent queue and barriers.

public class ReaderWriterLock {
    private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
    
    public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
        return try queue.sync {
            try block()
        }
    }
    
    public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
        queue.async(flags: .barrier) {
            block()
        }
    }
}


for _ in 0 ..< 5 {
    let iterations = 1000
    let phoneBook = PhoneBook(.threadSafe)
    
    let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: .concurrent)
    for _ in 0..<iterations {
        let person = Person.uniquePerson()
        concurrentTestQueue.async {
            phoneBook.addPerson(person)
        }
    }
    
    concurrentTestQueue.async(flags: .barrier) {
        print(phoneBook.count)
    }
}

就个人而言,我倾向于更进一步

  • 将同步移到通用类中;和
  • 将模型更改为 Person 对象的数组,以便:
    • 该模型支持多人使用相同或电话号码;和
    • 如果需要,您可以使用值类型。

例如:

public struct Person {
    public let name: String
    public let phoneNumber: String
    
    public static func uniquePerson() -> Person {
        return Person(name: UUID().uuidString, phoneNumber: UUID().uuidString)
    }
}

public struct PhoneBook {
    
    private var synchronizedPeople = Synchronized([Person]())
    
    public func people(name: String? = nil, phone: String? = nil) -> [Person]? {
        return synchronizedPeople.value.filter {
            (name == nil || $0.name == name) && (phone == nil || $0.phoneNumber == phone)
        }
    }
    
    public func append(_ person: Person) {
        synchronizedPeople.writer { people in
            people.append(person)
        }
    }
    
    public var count: Int {
        return synchronizedPeople.reader { $0.count }
    }
}

/// A structure to provide thread-safe access to some underlying object using reader-writer pattern.

public class Synchronized<T> {
    /// Private value. Use `public` `value` computed property (or `reader` and `writer` methods)
    /// for safe, thread-safe access to this underlying value.
    
    private var _value: T
    
    /// Private reader-write synchronization queue
    
    private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".synchronized", qos: .default, attributes: .concurrent)
    
    /// Create `Synchronized` object
    ///
    /// - Parameter value: The initial value to be synchronized.
    
    public init(_ value: T) {
        _value = value
    }
    
    /// A threadsafe variable to set and get the underlying object, as a convenience when higher level synchronization is not needed        
    
    public var value: T {
        get { reader { $0 } }
        set { writer { $0 = newValue } }
    }
    
    /// A "reader" method to allow thread-safe, read-only concurrent access to the underlying object.
    ///
    /// - Warning: If the underlying object is a reference type, you are responsible for making sure you
    ///            do not mutating anything. If you stick with value types (`struct` or primitive types),
    ///            this will be enforced for you.
    
    public func reader<U>(_ block: (T) throws -> U) rethrows -> U {
        return try queue.sync { try block(_value) }
    }
    
    /// A "writer" method to allow thread-safe write with barrier to the underlying object
    
    func writer(_ block: @escaping (inout T) -> Void) {
        queue.async(flags: .barrier) {
            block(&self._value)
        }
    }
}

关于swift - 如何在 Swift 中实现线程安全哈希表(电话簿)数据结构?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49101953/

相关文章:

java - 多线程适合这种情况吗?

java - 使用 Thread.sleep 线程 hibernate 时的 CPU 消耗

ios4 - GCD和回调-并发问题

ios - func setupSession() { session = AVCaptureSession() session.sessionPreset = AVCaptureSessionPresetPhoto

java - java中的并发读/写缓冲区字节[]

ios - 为什么 Dispatch_group_notify 在不同的环境下工作不同?

ios - 在后台执行网络和其他耗时的任务

ios - 重新加载 TableView 不显示所有对象

swift - 在macOS的Swift 5中声明ExtAudioFileRef的正确方法

ios - 图像中带有复选框的 UICollectionView Cell