Операции управления выполнением

79

Использовать PLINQ можно, просто вызывая AsParallel или одну из описанных ранее операций создания. Но если нужен более тонкий контроль над формированием запроса PLINQ, то для этого понадобится работать с одной или более операциями, описанными в этой статье.

AsOrdered

Операция AsOrdered предохраняет порядок результатов в соответствии с порядком исходной последовательности (как известно, параллельная обработка по умолчанию не предохраняет порядка результатов).

Операция AsOrdered имеет два прототипа:

Первый прототип AsOrdered
public static ParallelQuery<T> AsOrdered<T>( 
      this ParallelQuery<T> source)

Первый прототип AsOrdered принудительно устанавливает порядок результатов для ParallelQuery<T>. Этот прототип будет применяться наиболее часто. Результатом операции также является объект ParallelQuery<T>, который можно использовать как входную последовательность для запроса PLINQ.

Второй прототип AsOrdered

Второй прототип AsOrdered работает на слабо типизированном ParallelQuery:

public static ParallelQuery AsOrdered( 
          this ParallelQuery source)

Это разновидность ParallelQuery, которая получается при вызове AsOrdered на унаследованной коллекции. Прежде чем можно будет использовать последовательность данных в запросе PLINQ, необходимо применить операцию OfType или Cast.

Ниже демонстрируется использование первого прототипа AsOrdered. В отношении результатов операции AsParallel, примененной к последовательности названий машин, выполняется операция AsOrdered:

string[] cars = { "Nissan", "Aston Martin", "Chevrolet", "Alfa Romeo", "Chrysler", "Dodge", "BMW",
                              "Ferrari", "Audi", "Bentley", "Ford", "Lexus", "Mercedes", "Toyota", "Volvo", "Subaru", "Жигули :)"};

            IEnumerable<string> auto = cars
                .AsParallel()
                .AsOrdered()
                .Where(p => p.Contains('a'))
                .Select(p => p);

            foreach (string s in auto)
                Console.WriteLine("Совпадение: " + s);

Запуск кода дает показанные ниже результаты. Здесь видно, что порядок исходной последовательности предохранен:

Использование первого прототипа AsOrdered

AsUnordered

Операция AsUnordered отменяет действие операции AsOrdered. Это может пригодиться в запросах, состоящих из множества частей, когда нужно упорядочить одну часть, но избежать накладных расходов по упорядочиванию результатов другой части. Более подробно вопросы упорядочивания результатов рассматривались ранее.

Операция AsUnordered имеет один прототип. Операция применяется к ParallelQuery, который можно использовать в качестве основы для запроса PLINQ:

public static ParallelQuery<T AsUnordered<T> ( 
       this ParallelQuery<T> source)

Ниже демонстрируется использование операции AsUnordered в запросе PLINQ, состоящем из двух стадий:

string[] cars = { "Nissan", "Aston Martin", "Chevrolet", "Alfa Romeo", "Chrysler", "Dodge", "BMW",
                              "Ferrari", "Audi", "Bentley", "Ford", "Lexus", "Mercedes", "Toyota", "Volvo", "Subaru", "Жигули :)"};

            IEnumerable<string> auto = cars
                .AsParallel()
                .AsOrdered()
                .Where(p => p.Contains('a'))
                .Take(5)
                .AsUnordered()
                .Where(p => p.Contains('s'))
                .Select(p => p);

            foreach (string s in auto)
                Console.WriteLine("Совпадение: " + s);

В коде сначала находятся все названия машин, содержащие букву "а" с предохранением порядка результатов с помощью операции AsOrdered. Результаты упорядочены, потому что необходимо получить первые пять соответствий, которые затем использовать для нахождения всех названий, содержащих букву "s". Упорядочивать эту часть не нужно, поэтому вызывается операция AsUnordered для исключения накладных расходов PLINQ, связанных с сортировкой результатов. Скомпилировав и запустив код, получаем следующие результаты:

Смешанное упорядочивание в запросе PLINQ

AsSequential

Операция AsSequential — противоположность операции AsParallel. Она навязывает последовательное выполнение, преобразуя ParallelQuery<T> в IEnumerable<T>.

Операция AsSequential имеет один прототип, который оперирует с ParallelQuery<T> и возвращает IEnumerable<T>. Запросы, выполняемые на результате этой операции, будут последовательными:

public static IEnumerable<T> AsSeguential<T> ( 
       this ParallelQuery<T> source)

Операция AsSequential больше всего нужна, когда требуется включать или отключать параллельное выполнение запроса с множеством частей. Пример приведен ниже:

string[] cars = { "Nissan", "Aston Martin", "Chevrolet", "Alfa Romeo", "Chrysler", "Dodge", "BMW",
                              "Ferrari", "Audi", "Bentley", "Ford", "Lexus", "Mercedes", "Toyota", "Volvo", "Subaru", "Жигули :)"};

            IEnumerable<string> auto = cars
                .AsParallel()
                .AsOrdered()
                .Where(p => p.Contains('a'))
                .Take(5)
                .AsSequential()
                .Where(p => p.Contains('o'))
                .Select(p => p);

            foreach (string s in auto)
                Console.WriteLine("Совпадение: " + s);

Для второй части запроса решено, что накладные расходы, связанные с параллельным выполнением не оправданы, поскольку заранее известно, что требуется обработать всего пять элементов. По этой причине с помощью операции AsSequential было произведено переключение с PLINQ на LINQ, когда осуществляется выбор имен, содержащих букву "о". Переключаться с параллельного на последовательное выполнение с использованием операций AsParallel и AsSequential можно столько раз, сколько необходимо. Ниже показаны результаты запуска кода. Поскольку последняя часть запроса выполнялась последовательно, результаты получаются в том же порядке, в котором расположены данные в исходной последовательности:

Переход от параллельного к последовательному выполнению в запросе с множеством частей

AsEnumerable

Операция AsEnumerable дает тот же эффект, что и AsSequential. Она преобразует ParallelQuery<T> в IEnumerable<T> и тем самым навязывает последовательное выполнение запроса.

Эта операция имеет один прототип:

public static IEnumerable<T> AsEnumerable<T> ( 
   this ParallelQuery<T> source)

WithMergeOptions

Операция WithMergeOptions позволяет управлять буферизацией результатов по мере их производства запросом. По умолчанию PLINQ создает буфер, хранящий несколько результирующих элементов, и поставляет их потребителю результата только при наполнении буфера. Это поведение можно изменить так, чтобы отправлять потребителю результаты только после их накопления или же передавать каждый результат немедленно.

Операция WithMergeOptions имеет один прототип. Операция применяется к экземплярам ParallelQuery<T> и принимает один аргумент — значение из перечисления ParallelMergeOptions:

public static ParallelQuery<T> WithMergeOptions<T> ( 
        this ParallelQuery<T> source, 
        ParallelMergeOptions mergeOptions)

Перечисление ParallelMergeOptions включает четыре значения. Значение NotBuffered заставляет каждый результирующий элемент передаваться по мере получения. Значение FullBuffered обеспечивает ожидание, пока все результаты не будут получены, прежде чем передавать их потребителю. Значение AutoBuffered позволяет системе выбирать размер буфера и передавать элементы по мере наполнения буфера. Последнее из значений перечисления — Default — это то же самое, что и AutoBuffered.

Ниже содержится пример использования операции WithMergeOptions со значением FullBuffered из перечисления ParallelMergeOptions:

using System.Diagnostics;
...                  
IEnumerable<int> results = ParallelEnumerable.Range(0, 10)
                .WithMergeOptions(ParallelMergeOptions.FullyBuffered)
                .Select(i =>
                {
                    System.Threading.Thread.Sleep(1000);
                    return i;
                });

            Stopwatch sw = Stopwatch.StartNew();

            foreach (int i in results)
            {
                Console.WriteLine("Значение: {0}, Время: {1}", i, sw.ElapsedMilliseconds);
            }

В этом примере к конструкции select запроса добавлено ожидание, чтобы перед обработкой каждого элемента созданной последовательности происходила задержка в 1 секунду. Затем результаты запроса перечисляются (что инициирует отложенное выполнение) и каждый элемент выводится на консоль. Для каждого вызова Console.Writeline выводится временная метка с использованием класса Stopwatch, чтобы приблизительно видеть, сколько времени проходит между получением результирующих элементов.

Ниже показаны результаты запуска кода:

Полная буферизация результатов PLINQ

Взглянув на значение времени рядом с каждым результирующим значением, можно заметить, что проходит около пяти секунд перед получением каких-либо элементов, но затем они все приходят одним блоком. Поскольку использовалась опция FullyBuffered, это именно то, чего следовало ожидать. Все результаты будут переданы, только когда все они будут обработаны.

Ниже показан пример запроса без буферизации:

IEnumerable results = ParallelEnumerable.Range(0, 10)
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select(i =>
                {
                    System.Threading.Thread.Sleep(1000);
                    return i;
                });

            Stopwatch sw = Stopwatch.StartNew();

            foreach (int i in results)
            {
                Console.WriteLine("Значение: {0}, Время: {1}", i, sw.ElapsedMilliseconds);
            }

Скомпилировав и запустив код, получаются следующие результаты:

Запрос PLINQ без буферизации результатов

Это ожидаемый результат, хотя поначалу это может быть не очевидно.

Стоит напомнить, что код этого примера был запущен на двухядерной машине, и потому PLINQ смог обработать два раздела одновременно и одновременно произвести результаты. Поэтому то, что видно в выводе — это два результата, полученных примерно через секунду (вспомните о секундной задержке в конструкции select), секундой позже — еще два результата и так далее. Именно этого и следовало ожидать при выполнении запроса на двухядерной машине с отключенной буферизацией.

Пройди тесты
Лучший чат для C# программистов