c# - 一种使我的 Parallel.ForEach 线程安全的更好方法?

标签 c# multithreading dictionary parallel.foreach concurrentdictionary

我想让下面的代码线程安全。不幸的是,我曾尝试在此代码中的各个级别进行锁定,但均未成功。我似乎可以实现线程安全的唯一实例是在整个循环周围放置一个锁,这有效地使 Parallel.ForEach 不比仅使用 foreach 更快(可能甚至更慢)。该代码相对/几乎安全,没有锁定。它似乎只显示 geneTokens.Value[-1] 键和 gtCandidates.Value[-1] 键的总和略有变化,大约每 20 次左右执行一次。

我意识到 Dictionary 不是线程安全的。但是,如果不对下游性能造成重大影响,我无法将此特定对象更改为 ConcurrentDictionary。我宁愿使用常规 foreach 运行这部分代码,也不愿更改该特定对象。但是,我使用 ConcurrentDictionary 来保存各个 Dictionary 对象。我也尝试过进行此更改,但它并没有解决我的种族问题。

这是我的类级别变量:

//Holds all tokens derived from each sequence chunk
public static ConcurrentBag<sequenceItem> tokenBag = 
  new ConcurrentBag<sequenceItem>();
public BlockingCollection<sequenceItem> sequenceTokens = new 
  BlockingCollection<sequenceItem>(tokenBag);
public ConcurrentDictionary<string, int> categories = new 
  ConcurrentDictionary<string, int>();
public ConcurrentDictionary<int, Dictionary<int, int>> gtStartingFrequencies = new 
  ConcurrentDictionary<int, Dictionary<int, int>>();
public ConcurrentDictionary<string, Dictionary<int, int>> gtCandidates = new 
  ConcurrentDictionary<string, Dictionary<int, int>>();
public ConcurrentDictionary<string, Dictionary<int, int>> geneTokens = new 
  ConcurrentDictionary<string, Dictionary<int, int>>();

这是 Parallel.ForEach:

Parallel.ForEach(sequenceTokens.GetConsumingEnumerable(), seqToken =>
{
  lock (locker)
  {
    //Check to see if the Sequence Token is a Gene Token
    Dictionary<int, int> geneTokenFreqs;
    if (geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs))
    { //The Sequence Token is a Gene Token 


      *****************Race Issue Seems To Occur Here**************************** 
      //Increment or create category frequencies for each category provided
      int frequency;
      foreach (int category in seqToken.categories)
      {
        if (geneTokenFreqs.TryGetValue(category, out frequency))
        {   //increment the category frequency, if it already exists
            frequency++;
            geneTokenFreqs[category] = frequency;
        }
        else
        {   //Create the category frequency, if it does not exist
            geneTokenFreqs.Add(category, 1);
        }
      }

      //Update the frequencies total [-1] by the total # of categories incremented.
      geneTokenFreqs[-1] += seqToken.categories.Length;
      ******************************************************************************
    }
    else
    { //The Sequence Token is NOT yet a Gene Token
      //Check to see if the Sequence Token is a Gene Token Candidate yet
      Dictionary<int, int> candidateTokenFreqs;
      if (gtCandidates.TryGetValue(seqToken.text, out candidateTokenFreqs))
      {
        *****************Race Issue Seems To Occur Here****************************
        //Increment or create category frequencies for each category provided
        int frequency;
        foreach (int category in seqToken.categories)
        {
          if (candidateTokenFreqs.TryGetValue(category, out frequency))
          { //increment the category frequency, if it already exists
            frequency++;
            candidateTokenFreqs[category] = frequency;
          }
          else
          { //Create the category frequency, if it does not exist
            candidateTokenFreqs.Add(category, 1);
          }
        }

        //Update the frequencies total [-1] by the total # of categories incremented.
        candidateTokenFreqs[-1] += seqToken.categories.Length;
        *****************************************************************************

        //Only update the candidate sequence count once per sequence
        if (candidateTokenFreqs[-3] != seqToken.sequenceId)
        {
          candidateTokenFreqs[-3] = seqToken.sequenceId;
          candidateTokenFreqs[-2]++;

          //Promote the Token Candidate to a Gene Token, if it has been found >=
          //the user defined candidateThreshold
          if (candidateTokenFreqs[-2] >= candidateThreshold)
          {
            Dictionary<int, int> deletedCandidate;
            gtCandidates.TryRemove(seqToken.text, out deletedCandidate);
            geneTokens.TryAdd(seqToken.text, candidateTokenFreqs);
          }
        }
      }
      else
      {
        //create a new token candidate frequencies dictionary by making 
        //a copy of the default dictionary from
        gtCandidates.TryAdd(seqToken.text, new 
          Dictionary<int, int>(gtStartingFrequencies[seqToken.sequenceId]));
      }
    }
  }
});

最佳答案

很明显,一个数据竞争来自于一些线程将在此处添加项目这一事实:

geneTokens.TryAdd(seqToken.text, candidateTokenFreqs);

其他人将在这里阅读:

if (geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs))

关于c# - 一种使我的 Parallel.ForEach 线程安全的更好方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12766011/

相关文章:

c++ - 使用指向 map vector 的指针 (C++)

c# - ICollection<Person> 的返回类型是什么意思?

android - 在 onResume/onPause 中重新启动/暂停线程

java - Spring 集成 : Message released twice after delay

javascript - 奇怪的 HTML5 Worker JavaScript 代码(消息)

c# - Fluent Nhibernate 映射字典创建额外表

c# - 如何使用 Linq to XML 将 XML 转换为嵌套字典?

javascript - Asp Gridview 在打开模式对话框时更改布局

c# - 我的 InvokeRequied #2 有什么问题?

c# - 将 SQL 查询转换为 LINQ 查询