Параллелизм данных

197

Парадигма параллелизма задач, рассмотренная ранее, в первую очередь относится к задачам. Основной целью парадигмы параллелизма данных является полное устранение задач из поля зрения и их замена высокоуровневой абстракцией - параллельными циклами. Иными словами, распараллеливается не реализация алгоритма, а данные, которыми она оперирует. Библиотека Task Parallel Library предлагает несколько вариантов поддержки параллелизма данных.

Методы Parallel.For и Parallel.ForEach

Циклы for и foreach часто являются отличными кандидатами для распараллеливания. В действительности, еще на заре развития параллельных вычислений предпринимались попытки автоматического распараллеливания таких циклов. Некоторые попытки воплотились в языковые конструкции или расширения, такие как стандарт OpenMP (описывающий такие директивы, как #pragma omp parallel for, обеспечивающую распараллеливание циклов for). Библиотека Task Parallel Library предоставляет поддержку распараллеливания циклов посредством явных методов, очень близких своим языковым эквивалентам. Речь идет о методах Parallel.For() и Parallel.ForEach(), максимально близко имитирующих поведение циклов for и foreach.

В примере поиска простых чисел у нас имелся цикл, который выполнял итерации по большому диапазону чисел, проверял каждое из них на принадлежность к категории простых чисел и добавлял их в коллекцию, как показано ниже:

for (uint number = start; number < end; ++number)
{
    if (IsPrime(number))
    {
         primes.Add(number);
    }
}

Преобразование этого кода, с целью задействовать в нем метод Parallel.For() - по большей степени механическая задача, если не считать необходимость синхронизации доступа к коллекции простых чисел (существуют более удачные решения, такие как агрегирование, которые мы рассмотрим ниже):

Parallel.For(start, end, number => {
    if (IsPrime(number)) {
        lock (primes)
        {
            primes.Add(number);
        }
    }
});

Заменив языковую инструкцию цикла вызовом метода, мы автоматически обеспечили параллельное выполнение итераций цикла. Кроме того, метод Parallel.For - это не простой цикл, генерирующий задачи в каждой итерации или для каждого фрагмента данных определенного размера. Вместо этого Parallel.For не спеша приспосабливается к темпу выполнения отдельных итераций, учитывая количество задач, выполняющихся в каждый момент, и исключает вероятность дробления диапазона на слишком мелкие фрагменты, производя его деление динамически.

Реализовать подобное поведение вручную весьма непросто, но вам доступны некоторые настройки (такие как управление максимальным количеством выполняемых задач), которые можно выполнить с помощью перегруженной версии метода Parallel.For, принимающей объект ParallelOptions, или используя собственный метод, выполняющий деление диапазона между задачами.

Существует похожий метод и для цикла foreach, который с успехом можно использовать, когда объем источника данных заранее неизвестен и может даже оказаться бесконечным. Представьте, что нам потребовалось загрузить из Интернета множество полос RSS, объявленных как IEnumerable<string>. В этом случае общая структура цикла могла бы иметь следующий вид:

IEnumerable<string> rssFeeds = // ...;
WebClient webClient = new WebClient();

foreach (string url in rssFeeds) {
    Process(webClient.DownloadString(url));
}

Данный цикл легко можно распараллелить механической заменой инструкции foreach вызовом метода Parallel.ForEach. Обратите внимание, что источник данных (коллекция rssFeeds) необязательно должен быть потокобезопасным, потому что Parallel.ForEach автоматически синхронизирует операции обращения к нему из разных потоков:

IEnumerable<string> rssFeeds = // ... не должен быть потокобезопасным
WebClient webClient = new WebClient();

Parallel.ForEach(rssFeeds, url => {
    Process(webClient.DownloadString(url));
});

Однако, распараллелить цикл совсем не просто, как могло бы показаться из предыдущего обсуждения. Существует ряд «недостающих» особенностей, которые необходимо рассмотреть, прежде чем подвесить этот инструмент на пояс. Для начинающих отметим, что в языке C# существует ключевое слово break, которое может вызывать преждевременное завершение циклов. Но как завершить цикл, параллельно выполняемый несколькими потоками, когда мы даже не знаем, какая итерация в данный момент выполняется в других потоках?

Класс ParallelLoopState представляет состояние параллельного цикла и позволяет прервать его. Например:

int invitedToParty = 0;
Parallel.ForEach(customers, (customer, loopState) => 
{
    if (customer.Orders.Count > 10 && customer.City == "Москва") 
    {
        if (Interlocked.Increment(ref invitedToParty) >= 25) 
        {
            // прервать выполнение итераций 
            loopState.Stop(); 
        }
    }
});

Обратите внимание: метод Stop() не гарантирует, что итерация, вызвавшая его, будет последней - итерации, уже запущенные к этому моменту, будут выполнены до конца (если они не проверяют свойство ParallelLoopState.ShouldExitCurrentIteration). Но гарантирует, что никакие другие итерации не будут запланированы на выполнение.

Одним из недостатков метода ParallelLoopState.Stop() является отсутствие гарантий, что все итерации, предшествующие данной, будут выполнены. Например, при обработке списка из 1000 заказчиков таким способом может получиться так, что заказчики с 1 по 100 будут обработаны полностью, заказчики с 101 по 110 вообще не будут обработаны, и заказчик 111 окажется последним обработанным перед вызовом Stop(). Если необходимо обеспечить выполнение всех итераций, предшествующих данной (даже если они еще не были запущены!), следует использовать метод ParallelLoopState.Break().

Parallel LINQ (PLINQ)

Пожалуй, самой высокоуровневой абстракцией параллельных вычислений является возможность объявить: «Я хочу, чтобы этот фрагмент кода выполнялся параллельно», - и переложить все хлопоты на используемый фреймворк. Именно это позволяет фреймворк Parallel LINQ. Но сначала вспомним, что такое LINQ.

LINQ (Language Integrated Query - язык интегрированных запросов) - это фреймворк и набор расширений языка, появившийся в версии C# 3.0 и .NET 3.5, стирающий грань между императивным и декларативным программированием там, где требуется выполнять итерации через данные. Например, следующий запрос LINQ извлекает из источника customers - который может быть обычной коллекцией в памяти, таблицей в базе данных или иметь более экзотическое происхождение - имена и возраст клиентов, проживающих в Москве, сделавших не менее трех покупок на сумму более 100 руб. за последние десять месяцев, и выводит эти данные в консоль:

public class Customer
{
    public int Id { get; set; }
    public string City { get; set; }
    public string Name { get; set; }
    public byte Age { get; set; }
}

public class Order
{
    public int Id { get; set; }
    public DateTime Date { get; set; }
    public int Amount { get; set; }
}

public class Program
{
    static void Main(string[] args)
    {
        // Здесь должен находиться код доступа к данным
        List<Customer> customers = new List<Customer>();
        List<Order> orders = new List<Order>();

        // LINQ-запрос
        var results = from customer in customers
                      where customer.City == "Москва"
                      let custOrders = (from order in orders
                                        where customer.Id == order.Id
                                        select new { order.Date, order.Amount })
                      where custOrders.Count(
                          co => co.Amount >= 100 &&
                                co.Date >= DateTime.Now.AddMonths(-10)) >= 3
                      select new { customer.Name, customer.Age };

        foreach (var result in results)
        {
            Console.WriteLine("{0} {1}", result.Name, result.Age);
        }
    }
}

Первое, на что следует обратить внимание, - большая часть запроса определена декларативно, подобно запросу на языке SQL. Здесь не используются циклы для фильтрации объектов или группировки объектов из разных источников. Часто вам не придется даже волноваться о синхронизации итераций, выполняемых запросом, потому что запросы LINQ являются исключительно функциональными и не имеют побочных эффектов - они преобразуют одну коллекцию (IEnumerable<T>) в другую, не изменяя никакие объекты в процессе работы.

Чтобы распараллелить запрос, представленный выше, достаточно лишь изменить тип коллекции источника с обобщенного IEnumerable<T> на ParallelQuery<T>. Для этого можно воспользоваться методом AsParallel() расширения и получить в результате следующий элегантный код:

// ...
var results = from customer in customers.AsParallel()
              where customer.City == "Москва"
              let custOrders = (from order in orders
                                where customer.Id == order.Id
                                select new { order.Date, order.Amount })
              where custOrders.Count(
                   co => co.Amount >= 10 &&
                         co.Date >= DateTime.Now.AddMonths(-10)) >= 3
              select new { customer.Name, customer.Age };
// ...

Параллельная обработка запросов выполняется фреймворком PLINQ в три этапа, как показано на рисунке ниже. Сначала PLINQ решает, сколько потоков следует использовать для выполнения запроса. Затем рабочие потоки извлекают свои фрагменты их исходной коллекции, под защитой блокировок. Все потоки выполняют свои задания независимо и помещают результаты в свои локальные очереди. В заключение, локальные результаты объединяются в единую коллекцию, которая подается в цикл foreach, в примере выше.

Параллельная обработка запроса в PLINQ

Серые прямоугольники представляют результаты завершенных заданий, помещенные в локальные очереди потоков, откуда они последовательно перемещаются в общий выходной буфер, доступный вызывающей программе. Заштрихованные прямоугольники представляют задания, выполняющиеся в данный момент.

Главное преимущество PLINQ перед методом Parallel.ForEach() заключается в автоматическом объединении результатов, полученных разными потоками. В примере поиска простых чисел с использованием Parallel.ForEach)_ мы были вынуждены вручную добавлять результаты работы каждого потока в глобальную коллекцию. При этом необходимо было использовать механизм синхронизации и тем самым увеличивать накладные расходы. Тот же результат легко можно получить с помощью PLINQ:

List<int> primes = (from n in Enumerable.Range(3, 100000).AsParallel()
                    where IsPrime(n)
                    select n).ToList();
					
// Вместо Enumerable.Range(...).AsParallel() можно использовать ParallelEnumerable.Range

Настройка параллельных циклов и PLINQ

Параллельные циклы (Parallel.For и Parallel.ForEach) и PLINQ поддерживают несколько методов для выполнения настройки, которые делают эти инструменты чрезвычайно гибкими и близкими в богатстве и выразительности к механизму параллельных задач, обсуждавшемуся выше. Методы параллельных циклов принимают объект ParallelOptions с различными свойствами, определяющими дополнительные параметры, а фреймворк PLINQ - дополнительные методы объектов ParallelQuery<T>. В число настраиваемых параметров входят:

При использовании параллельных циклов чаще всего ограничивают максимально возможное количество задач, выполняемых одновременно, тогда как при использовании PLINQ обычно настраивают режим слияния результатов и порядок их вывода.

Асинхронные методы в C# 5

До сих пор мы рассматривали приемы распараллеливания, которые могут быть выражены с использованием классов и методов из библиотеки Task Parallel Library. Однако существует еще одна среда параллельных вычислений, основанная на расширениях языка и позволяющая получить еще большую выразительность там, где методы выглядят несколько неуклюже или недостаточно выразительно. В этом разделе мы познакомимся с нововведениями в языке C# 5, предназначенными для решения задач параллельного программирования и упрощающими реализацию продолжений (continuations). Но сначала познакомимся с продолжениями с точки зрения асинхронного выполнения.

Достаточно часто возникает необходимость связать с некоторой задачей продолжение (continuation), или обратный вызов (callback), то есть некоторый код, который должен быть выполнен по завершении задачи. Имея контроль над задачей - то есть, когда вы явно управляете ее запуском - вы можете встроить обратный вызов в саму задачу, но когда вы получаете задачу от какого-то другого метода, необходимо использовать явный API продолжения.

Библиотека TPL предлагает метод экземпляра ContinueWith и статические методы ContinueWhenAll() и ContinueWhenAny() (их имена говорят сами за себя) для управления продолжениями в некоторых ситуациях. Используя класс TaskScheduler можно запланировать выполнение продолжения только при определенных обстоятельствах (например, только когда задача завершилась успешно или только когда в задаче возникло исключение) и в определенном потоке (группе потоков).

Продолжения - удобный способ программирования асинхронных приложений и очень ценный, при выполнении асинхронных операций ввода/вывода в приложениях с графическим интерфейсом. Например, чтобы обеспечить высокую отзывчивость приложений для Windows 8 с интерфейсом в стиле Метро (Metro), WinRT (Windows Runtime) API в Windows 8 поддерживает только асинхронные версии всех операций, длительность которых может составить больше 50 миллисекунд. При наличии множества асинхронных вызовов, выполняемых друг за другом, вложенные продолжения могут стать неудобными в использовании.

Архитекторы C# 5 решили устранить эти проблемы, введением в синтаксис языка двух новых ключевых слов, async и await. Асинхронный метод должен быть отмечен ключевым словом async и может возвращать значение типа void, Task или Task<T>. Внутри асинхронного метода можно использовать оператор await, чтобы реализовать продолжение без использования метода ContinueWith(). Взгляните на следующий пример:

// Данный код не компилируется (создан в целях демонстрации)
private async void updateButton_Clicked(/*...*/)
{
    using (LocationService location = new LocationService())
    {
        Task<Location> locTask = location.GetCurrentLocationAsync();
        Location loc = await locTask;
        cityTextBox.Text = loc.City.Name;
    }
}

Здесь выражение "await locTask" реализует продолжение для задачи, возвращаемой вызовом GetCurrentLocationAsync(). Собственно продолжением является оставшаяся часть тела метода (начиная с инструкции присваивания переменной loc), а значением выражения await является результат выполнения задачи, в данном случае - объект Location. Кроме того, продолжение неявно планируется для выполнения в потоке управления пользовательским интерфейсом, о чем, при использовании TaskScheduler, необходимо было позаботиться явно.

Заботу обо всех синтаксических особенностях тела метода берет на себя компилятор C#. Например, в только что написанном методе имеется блок try...finally, спрятанный под покровом инструкции using. Компилятор переделает продолжение так, что метод Dispose() переменной location будет вызван независимо от успешности завершения задачи. Эта особенность делает замену вызовов синхронных методов их асинхронными аналогами почти тривиальным делом. Компилятор поддерживает обработку исключений, сложные циклы, рекурсивные вызовы методов - языковые конструкции, которые плохо сочетаются с явным механизмом продолжений.

Всего лишь две языковые особенности (не очень сложные в реализации!) существенно снизили порог вхождения в асинхронное программирование и упростили работу с методами, возвращающими задачи и управляющими ими. Кроме того, реализация оператора await несовместима с библиотекой Task Parallel Library; низкоуровневый WinRT API В Windows 8 возвращает экземпляры типа IAsyncOperation<T>, а не задачи Task (которые являются управляемой концепцией), которые, тем не менее, с успехом можно передавать оператору await.

Дополнительные шаблоны в TPL

До сих пор мы рассматривали довольно простые примеры алгоритмов, легко поддающихся распараллеливанию. В этом разделе мы коротко исследуем несколько дополнительных приемов, которые могут пригодиться вам при решении настоящих проблем; в некоторых случаях мы можем обеспечить прирост производительности в самых неожиданных местах.

Первым приемом оптимизации, который может использоваться при распараллеливании циклов с общим состоянием, является агрегирование (иногда называется сверткой (reduction)). Когда в параллельном цикле используется общее состояние, масштабируемость часто утрачивается из-за необходимости синхронизировать доступ к общим данным; чем больше ядер в процессоре оказывается доступно, тем меньше выигрыш из-за синхронизации (этот эффект является прямым следствием закона Амдала (Amdahl Law), который часто называют законом убывающей отдачи (The Law of Diminishing Returns)). Значительный прирост производительности часто достигается за счет создания локальных состояний потоков или задач, выполняющих параллельные итерации цикла, и их объединения в конце. Методы из библиотеки TPL, используемые для организации циклов, имеют перегруженные версии, обслуживающие такого рода локальные агрегаты.

Вернемся к примеру поиска простых чисел, реализованному ранее в одной из статей. Одной из основных помех масштабированию в нем является необходимость добавления новых обнаруженных простых чисел в совместно используемый список, для чего требуется использовать механизм синхронизации. Вместо этого мы можем использовать в каждом потоке свой, локальный список и объединить их по завершении цикла:

public class Program
{
    static void Main(string[] args)
    {
        IEnumerable<int> counters = new List<int>();

        Stopwatch timer = Stopwatch.StartNew();
        counters = PrimesInRange(100, 200000);
        timer.Stop();

        Console.WriteLine("Прошло: {0} мс., найдено {1} простых чисел",
            timer.ElapsedMilliseconds,
            counters.Count());
    }

    public static IEnumerable<int> PrimesInRange(uint start, uint end)
    {
        List<int> primes = new List<int>();

        Parallel.For(start, end,
            () => new List<int>(),     // инициализировать локальную копию
            (i, pls, localPrimes) =>   // каждый шаг вычислений
            {
                // возвращает новое локальное состояние
                if (IsPrime(i))
                {
                    // синхронизация не требуется
                    localPrimes.Add((int)i);
                }
                return localPrimes;
            },
            localPrimes =>
            { 
                // объединить локальные списки
                lock (primes)
                { 
                    // синхронизация необходима
                    primes.AddRange(localPrimes);
                }
            }
        );

        return primes;
    }

    private static bool IsPrime(long number)
    {
        // Крайне неэффективный алгоритм O(n), но достаточный для демонстрационных целей
        if (number == 2) return true;
        if (number % 2 == 0) return false;

        for (uint divisor = 3; divisor < number; divisor += 2)
        {
            if (number % divisor == 0) 
                return false;
        }

        return true;
    }
}

В примере выше количество попыток приобрести блокировку значительно меньше, чем в предыдущих примерах - блокировку требуется приобрести лишь один раз для каждого потока, а не для каждого найденного простого числа. Мы добавили накладные расходы на объединение списков, но эта цена незначительна, в сравнении с увеличившейся масштабируемостью.

Еще одно место, где можно применить оптимизацию, - итерации цикла, слишком короткие, чтобы их эффективно можно было распараллелить. Даже при том, что механизм параллелизма данных объединяет несколько итераций иногда тело цикла может выполняться настолько быстро, по скорости превосходят вызов делегата, необходимый для вызова тела цикла в каждой итерации. В этом случае можно использовать класс Partitioner и с его помощью вручную группировать итерации, уменьшая количество вызовов делегатов:

Parallel.For(
    Partitioner.Create(start, end),
    range => {      // range - это тип Tuple<int, int>
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            // тело цикла без вызова делегата
        }
});

За дополнительной информацией о разбиении циклов на фрагменты, являющемся важным способом оптимизации, обращайтесь к статье «Пользовательские разделители для PLINQ и TPL» на сайте MSDN.

Наконец, существуют приложения, в которых могут пригодиться собственные планировщики задач. В качестве примеров можно привести планирование заданий в потоке управления пользовательским интерфейсом, назначение приоритетов задачам, планируя их с помощью высокоприоритетного планировщика, и связывание задач с определенным процессором, планируя их с помощью планировщика, использующего потоки, привязанные к определенному процессору. Реализовать собственные планировщики можно путем наследования класса TaskScheduler. Пример такой реализации можно найти в статье «Практическое руководство. Создание планировщика заданий, ограничивающего степень параллелизма» на сайте MSDN.

Пройди тесты
Лучший чат для C# программистов