go - WaitGroup 在之前的 Wait 未知原因之前被重用

标签 go wait race-condition

我使用以下代码,但不知道为什么它会因错误 (WaitGroup is reused before previous Wait) at line:

for _, proxy := range proxies {
                    wgGroup.Wait()

我想确保在调用 proxySource.GetProxies() 时, 和 proxyProvider.receivingProxyBC.In() <- proxy然后不允许 remoteSources调用proxyProvider.receivingProxyBC.In() <- proxy

详细代码在这里:

    wgGroup := sync.WaitGroup{}
    wgGroup.Add(len(localSources))
    for _, proxySource := range localSources {
        go func(proxySource *ProxySource) {
            lastTimeGet := time.Now()
            firstTimeLoad := true
            wgGroup.Done()
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                wgGroup.Add(1)
                proxies, err := proxySource.GetProxies()
                wgGroup.Done()
                LogInfo("Get proxy from source ", proxySource.Id)
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                wgGroup.Add(1)
                for _, proxy := range proxies {
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                wgGroup.Done()
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }
    for _, proxySource := range remoteSources {
        go func(proxySource *ProxySource) {
            time.Sleep(2 * time.Second)
            lastTimeGet := time.Now()
            firstTimeLoad := true
            for {
                currentTimeGet := time.Now()
                totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
                if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                    time.Sleep(proxySource.WatchWait)
                    continue
                }
                firstTimeLoad = false
                proxies, err := proxySource.GetProxies()
                if err != nil {
                    time.Sleep(5 * time.Second)
                    continue
                }
                for _, proxy := range proxies {
                    wgGroup.Wait()
                    proxyProvider.receivingProxyBC.In() <- proxy
                }
                lastTimeGet = time.Now()
                time.Sleep(20 * time.Second)
            }
        }(proxySource)
    }

更新 RWLOCK

使用这些代码我可以锁定 localSources但它似乎没有优化;我什么时候需要localSources然后锁定所有remoteSources ;当没有 localSources 时得到,所有remoteSources被允许得到。目前,只有一个 remoteSources允许同时获取。

wgGroup := sync.WaitGroup{}
wgGroup.Add(len(localSources))
localGroupRwLock := sync.RWMutex{}
for _, proxySource := range localSources {
  go func(proxySource *ProxySource) {
    lastTimeGet := time.Now()
    firstTimeLoad := true
    wgGroup.Done()
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      localGroupRwLock.RLock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.RUnlock()
      LogInfo("Get proxy from source ", proxySource.Id)
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      localGroupRwLock.RLock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.RUnlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}
for _, proxySource := range remoteSources {
  go func(proxySource *ProxySource) {
    time.Sleep(2 * time.Second)
    lastTimeGet := time.Now()
    firstTimeLoad := true
    for {
      currentTimeGet := time.Now()
      totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
      LogInfo("Total proxies ", totalProxy)
      if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
        LogInfo("Enough proxy & proxy are not new sleep ", proxySource.Id, " for ", proxySource.WatchWait.Seconds())
        time.Sleep(proxySource.WatchWait)
        continue
      }
      firstTimeLoad = false
      LogInfo("Not enough proxy or proxies are new ", proxySource.Id)
      LogInfo("Get proxy from source ", proxySource.Id)
      localGroupRwLock.Lock()
      proxies, err := proxySource.GetProxies()
      localGroupRwLock.Unlock()
      if err != nil {
        LogError("Error when get proxies from ", proxySource.Id)
        time.Sleep(5 * time.Second)
        continue
      }
      LogInfo("Add proxy from source ", proxySource.Id)
      wgGroup.Wait()
      localGroupRwLock.Lock()
      for _, proxy := range proxies {
        proxyProvider.receivingProxyBC.In() <- proxy
      }
      localGroupRwLock.Unlock()
      LogInfo("Done add proxy from source ", proxySource.Id)
      //LogInfo("Gotten proxy source ", proxySource.Id, " done now sleep ", proxySource.Cooldown.String())
      lastTimeGet = time.Now()
      time.Sleep(20 * time.Second) // 20 seconds for loading new proxies
      LogInfo("Watch for proxy source", proxySource.Id)
    }
  }(proxySource)
}

最佳答案

来自 document :

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

Wait() :

Wait blocks until the WaitGroup counter is zero.

你也可以在那里看到例子。问题是,WaitGroup用于阻塞,直到计数器变为零。所以在原始代码中,假设没有运行时错误,第二个 for 循环中的每个 goroutine 都会阻塞,直到第一个 goroutines 完成。在第一部分中,Add(1)Done()根本不会阻止。数据竞争将继续存在。

错误记录在 Add() 中方法: 添加向 WaitGroup 计数器添加可能为负的增量。如果计数器变为零,则释放所有阻塞在 Wait 上的 goroutine。如果计数器变为负值,添加 panic 。

Note that calls with a positive delta that occur when the counter is zero must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned. See the WaitGroup example.

但是,您也不是在等待独立组。

适合您的代码的工具是 sync.Mutex .再次记录:

A Mutex is a mutual exclusion lock. The zero value for a Mutex is an unlocked mutex.

A Mutex must not be copied after first use.

type Mutex struct { // contains filtered or unexported fields }

func (*Mutex) Lock

func (m *Mutex) Lock()

Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available. func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.

正如您所描述的,您希望“在调用 proxyProvider.receivingProxyBC.In() <- proxyproxySource.GetProxies() 时暂停调用 for _, proxy := range proxies”。用术语 block 更好地描述暂停,这是教科书上的互斥锁问题:用锁保护所有三个“调用”(因为 for 循环不是调用),它就完成了。

关于如何用互斥量保护 for 循环可能有点棘手,它应该是这样的:

lock.Lock
for ... {
    lock.Unlock()
    ...
    lock.Lock()
}

所以我更改了您的代码,希望它能按预期工作:

lock := sync.Mutex{}
lock.Lock()
for _, proxySource := range localSources {
    lock.Unlock()
    go func(proxySource *ProxySource) {
        lock.Lock()
        lastTimeGet := time.Now()
        firstTimeLoad := true
        lock.Unlock()
        for {
            currentTimeGet := time.Now()
            totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
            if totalProxy > 200 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                time.Sleep(proxySource.WatchWait)
                continue
            }
            firstTimeLoad = false
            lock.Lock()
            proxies, err := proxySource.GetProxies()
            lock.Unlock()
            LogInfo("Get proxy from source ", proxySource.Id)
            if err != nil {
                time.Sleep(5 * time.Second)
                continue
            }
            lock.Lock()
            for _, proxy := range proxies {
                proxyProvider.receivingProxyBC.In() <- proxy
            }
            lock.Unlock()
            lastTimeGet = time.Now()
            time.Sleep(20 * time.Second)
        }
    }(proxySource)
    lock.Lock()
}
for _, proxySource := range remoteSources {
    go func(proxySource *ProxySource) {
        time.Sleep(2 * time.Second)
        lastTimeGet := time.Now()
        firstTimeLoad := true
        for {
            currentTimeGet := time.Now()
            totalProxy := proxyProvider.receivingProxyBC.Len() + proxyProvider.workingProxyBC.Len()
            if totalProxy > 100 && currentTimeGet.Sub(lastTimeGet) < DURATION_FORCE_UPDATE && !firstTimeLoad {
                time.Sleep(proxySource.WatchWait)
                continue
            }
            firstTimeLoad = false
            proxies, err := proxySource.GetProxies()
            if err != nil {
                time.Sleep(5 * time.Second)
                continue
            }
            for _, proxy := range proxies {
                lock.Lock()
                proxyProvider.receivingProxyBC.In() <- proxy
                lock.Unlock()
            }
            lastTimeGet = time.Now()
            time.Sleep(20 * time.Second)
        }
    }(proxySource)
}

注意 1:您可能会想使用 defer .不。 defer是为了功能,而不是 block 。

注2:在golang中使用mutex时,经常会引发一个设计问题。人们应该总是看看使用 channel 是否更好并重构代码,尽管在许多情况下互斥锁并不是一个坏主意。但在这里我无法阅读有关设计的任何信息,所以我将放弃它。

注3:代码实际上存在暂停的问题proxySource.GetProxies()和调用 proxyProvider.receivingProxyBC.In() <- proxy 时的 for 循环.这是否需要取决于。如果不需要,您应该查看 sync.RWMutex , 并根据它进行更改。我会留给你。

关于go - WaitGroup 在之前的 Wait 未知原因之前被重用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48351816/

相关文章:

go - Firebase:无法验证 MAC

c - wait() 和 exit() 与父子进程

javascript - Chrome 扩展程序错误 : "Unchecked runtime.lastError while running browserAction.setIcon: No tab with id"

java - 在Java中,条件表达式是线程安全操作吗?

arrays - 追加函数覆盖 slice 中的现有数据

c++ - Go context.Background() 的 gRPC++ 等价物是什么?

macos - go + SDL + OpenGL + MacOS示例=在DrawArrays()之后我得到了INVALID_OPERATION

java - 带等待/通知和不带它们的同步块(synchronized block)之间的区别?

Java:永远等待 Thead

scala - 完成 scala promise 竞赛