文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

C#集合之并发集合的用法

2024-04-02 19:55

关注

.NET 4 开始,在System.Collection.Concurrent中提供了几个线程安全的集合类。线程安全的集合可防止多个线程以相互冲突的方式访问集合。
为了对集合进行线程安全的访问,定义了IProducerConsumerCollection<T>接口。这个接口中最重要的方法是TryAdd()和TryTake()。TryAdd()方法尝试给集合添加一项,但如果集合禁止添加项,这个操作就可能失败。TryAdd()方法返回一个布尔值,以说明操作成功还是失败。TryTake()同样,在成功时返回集合中的项。

BlockingCollection<T>是对实现了 IProducerConsumerCollection<T>接口的任意类的修饰器,它默认使用ConcurrentQueue<T>类。还可以给构造函数传递任何实现了 IProducerConsumerCollection<T>接口的类。
下面使用一个例子演示BlockingCollection<T>的使用,一个任务向一个集合写入,同时另一个任务从这个集合读取。

    static void Main(string[] args)
        {
          StartPipeline();
          Console.ReadLine();
        }

        private static async void StartPipeline()
        {
            //存储文件名
          var fileNames = new BlockingCollection<string>();
          //存储文件的每一行内容
          var lines = new BlockingCollection<string>();
          //存储每一行的每个单词,单词为键,单词个数为值
          var words = new ConcurrentDictionary<string, int>();
          //存储words信息
          var items = new BlockingCollection<Info>();
          var coloredItems = new BlockingCollection<Info>();
            
          Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames);
          ConsoleHelper.WriteLine("started stage 1");
          Task t2 = PipelineStages.LoadContentAsync(fileNames, lines);
          ConsoleHelper.WriteLine("started stage 2");
          Task t3 = PipelineStages.ProcessContentAsync(lines, words);
          await Task.WhenAll(t1, t2, t3);
          ConsoleHelper.WriteLine("stages 1, 2, 3 completed");
        
          //当上面三个任务完成时,才执行下面的任务
          Task t4 = PipelineStages.TransferContentAsync(words, items);
          Task t5 = PipelineStages.AddColorAsync(items, coloredItems);
          Task t6 = PipelineStages.ShowContentAsync(coloredItems);
          ConsoleHelper.WriteLine("stages 4, 5, 6 started");

          await Task.WhenAll(t4, t5, t6);

          ConsoleHelper.WriteLine("all stages finished");
        }
        
        
          public static class PipelineStages
          {
            public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output)
            {
              return Task.Run(() =>
                {
                  foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
                  {
                    output.Add(filename);
                    ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename));
                  }
                  //调用CompleteAdding,通知所有读取器不应再等待集合中的任何额外项
                    //如果不调用该方法,读取器会在foreach循环中等待更多的项被添加
                  output.CompleteAdding();
                });
            }

            public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output)
            {
                //使用读取器读取集合时,需要使用GetConsumingEnumerable获取阻塞集合的枚举器,
            //如果直接使用input迭代集合,这只会迭代当前状态的集合,不会迭代以后添加的项
              foreach (var filename in input.GetConsumingEnumerable())
              {
                using (FileStream stream = File.OpenRead(filename))
                {
                  var reader = new StreamReader(stream);
                  string line = null;
                  while ((line = await reader.ReadLineAsync()) != null)
                  {
                    output.Add(line);
                    ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line));
                  }
                }
              }
              output.CompleteAdding();
            }

            public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
            {
              return Task.Run(() =>
                {
                  foreach (var line in input.GetConsumingEnumerable())
                  {
                    string[] words = line.Split(' ', ';', '\t', '{', '}', '(', ')', ':', ',', '"');
                    foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
                    {
                    //这里使用了字典的一个扩展方法
                      output.AddOrIncrementValue(word);
                      ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
                    }
                  }
                });
            }

            public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output)
            {
              return Task.Run(() =>
                {
                  foreach (var word in input.Keys)
                  {
                    int value;
                    if (input.TryGetValue(word, out value))
                    {
                      var info = new Info { Word = word, Count = value };
                      output.Add(info);
                      ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info));
                    }
                  }
                  output.CompleteAdding();
                });
            }

            public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
            {
              return Task.Run(() =>
                {
                  foreach (var item in input.GetConsumingEnumerable())
                  {
                    if (item.Count > 40)
                    {
                      item.Color = "Red";
                    }
                    else if (item.Count > 20)
                    {
                      item.Color = "Yellow";
                    }
                    else
                    {
                      item.Color = "Green";
                    }
                    output.Add(item);
                    ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color));
                  }
                  output.CompleteAdding();
                });
            }

            public static Task ShowContentAsync(BlockingCollection<Info> input)
            {
              return Task.Run(() =>
                {
                  foreach (var item in input.GetConsumingEnumerable())
                  {
                    ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color);
                  }
                });
            }
          }
          
          
          //创建一个字典的扩展方法
         public static class ConcurrentDictionaryExtension
          {
            public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict, string key)
            {
              bool success = false;
              while (!success)
              {
                int value;
                if (dict.TryGetValue(key, out value))
                {
                  if (dict.TryUpdate(key, value + 1, value))
                  {
                    success = true;
                  }
                }
                else
                {
                  if (dict.TryAdd(key, 1))
                  {
                    success = true;
                  }
                }
              }
            }
          }

这里使用了一个管道模型的编程模式,上面的添加内容,下面处理内容

到此这篇关于C#集合之并发集合的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持编程网。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯