Параллельные коллекции

90

В версию 4.0 среды .NET Framework добавлено новое пространство имен System.Collections.Concurrent. Оно содержит коллекции, которые являются потокобезопасными и специально предназначены для параллельного программирования. Это означает, что они могут безопасно использоваться в многопоточной программе, где возможен одновременный доступ к коллекции со стороны двух или больше параллельно исполняемых потоков.

Для безопасного в отношении потоков доступа к коллекциям определен интерфейс IProducerConsumerCollection<T>. Наиболее важными методами этого интерфейса являются TryAdd() и TryTake(). Метод TryAdd() пытается добавить элемент в коллекцию, но это может не получиться, если коллекция заблокирована от добавления элементов. Метод возвращает булевское значение, сообщающее об успехе или неудаче операции.

TryTake() работает аналогичным образом, информируя вызывающий код об успехе или неудаче, и в случае успеха возвращает элемент из коллекции. Ниже перечислены классы из пространства имен System.Collections.Concurrent с кратким описанием их функциональности:

ConcurrentQueue<T>

Этот класс коллекции реализован со свободным от блокировок алгоритмом и использует 32 массива, которые внутренне скомбинированы в связный список. Для доступа к элементам очереди применяются методы Enqueue(), TryDequeue() и TryPeek(). Имена этих методов очень похожи на уже известные методы Queue<T>, но с добавлением префикса Try к тем из них, которые могут дать сбой. Поскольку этот класс реализует интерфейс IProducerConsumerCollection<T>, методы TryAdd() и TryTake() просто вызывают Enqueue() и TryDequeue().

ConcurrentStack<T>

Очень похож на ConcurrentQueue<T>, но с другими методами доступа к элементам. Класс ConcurrentStack<T> определяет методы Push(), PushRange(), TryPeek(), TryPop() и TryPopRange(). Внутри этот класс использует связный список для хранения элементов.

ConcurrentBag<T>

Этот класс не определяет никакого порядка для добавления или извлечения элементов. Он реализует концепцию отображения потоков на используемые внутренне массивы, и старается избежать блокировок. Для доступа к элементам применяются методы Add(), TryPeek() и TryTake().

ConcurrentDictionary<TKey, TValue>

Безопасная в отношении потоков коллекция ключей и значений. Для доступа к членам в неблокирующем режиме служат методы TryAdd(), TryGetValue(), TryRemove() и TryUpdate(). Поскольку элементы основаны на ключах и значениях, ConcurrentDictionary<TKey, TValue> не реализует интерфейс IProducerConsumerCollection<T>.

ConcurrentXXX

Эти коллекции безопасны к потокам в том смысле, что возвращают false, если какое-то действие над ними невозможно при текущем состоянии потоков. Прежде чем предпринимать какие-то дальнейшие действия, всегда следует проверять успешность добавления или извлечения элементов. Полностью доверять коллекции решение задачи нельзя.

BlockingCollection<T>

Коллекция, которая осуществляет блокировку и ожидает, пока не появится возможность выполнить действие по добавлению или извлечению элемента. BlockingCollection<T> предлагает интерфейс для добавления и извлечения элементов методами Add() и Take(). Эти методы блокируют поток и затем ожидают, пока не появится возможность выполнить задачу.

Метод Add() имеет перегрузку, которой можно также передать CancellationToken. Эта лексема всегда отменяет блокирующий вызов.

Если не нужно, чтобы поток ожидал бесконечное время, и не хотите отменять вызов извне, доступны также методы TryAdd() и TryTake(). В них можно указать значение таймаута — максимального периода времени, в течение которого вы готовы блокировать поток и ждать, пока вызов не даст сбой.

Давайте рассмотрим пример применения параллельных коллекций:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static BlockingCollection<int> bc;

        static void producer()
        {
            for (int i = 0; i < 100; i++)
            {
                bc.Add(i * i);
                Console.WriteLine("Производится число " + i*i);
            }
            bc.CompleteAdding();
        }

        static void consumer()
        {
            int i;
            while (!bc.IsCompleted)
            {
                if (bc.TryTake(out i))
                    Console.WriteLine("Потребляется число: " + i);
            }
        }

        static void Main()
        {
            bc = new BlockingCollection<int>(4);

            // Создадим задачи поставщика и потребителя
            Task Pr = new Task(producer);
            Task Cn = new Task(consumer);

            // Запустим задачи
            Pr.Start();
            Cn.Start();

            try
            {
                Task.WaitAll(Cn, Pr);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            finally
            {
                Cn.Dispose();
                Pr.Dispose();
                bc.Dispose();
            }

            Console.ReadLine();
        }
    }
}

Параллельные коллекции зачастую применяются в комбинации с библиотекой распараллеливания задач (TPL) или языком PLINQ.

Пройди тесты
Лучший чат для C# программистов