Параллелизм задач

194

Параллелизм задач - это парадигма и набор API для разделения больших задач на более мелкие, и выполнение их с применением нескольких потоков. Библиотека Task Parallel Library (TPL) обладает превосходным API для управления миллионами задач, выполняющимися одновременно (посредством пула потоков CLR). Основу TPL составляет класс Task из пространства имен System.Threading.Tasks, являющийся представлением задачи. Класс Task поддерживает следующие возможности:

Так как задачи можно рассматривать как высокоуровневую абстракцию, построенную поверх потоков выполнения, мы могли бы переписать пример нахождения простых чисел из предыдущей статьи с использованием задач вместо потоков. Это позволило бы даже сократить код - по крайней мере, при таком подходе мы можем отказаться от счетчика завершившихся заданий и от объекта ManualResetEvent, следящего за выполнением заданий. Но не смотря на то, что прикладной интерфейс библиотеки TPL еще лучше подходит для распараллеливания цикла поиска простых чисел в заданном диапазоне, мы обратимся к другой проблеме.

Существует хорошо известный рекурсивный алгоритм быстрой сортировки Quicksort, легко поддающийся распараллеливанию (и имеющий среднюю производительность O(n*log(n)) и достаточно оптимальный - хотя можно сосчитать по пальцам все современные крупные фреймворки, использующие алгоритм Quicksort для сортировки чего бы то ни было). Алгоритм Quicksort имеет следующую реализацию:

public static void QuickSort<T>(T[] items) where T : IComparable<T>
{
    QuickSort(items, 0, items.Length);
}

private static void QuickSort<T>(T[] items, int left, int right) 
    where T : IComparable<T>
{
    if (left == right) return;
    int pivot = Partition(items, left, right);
    QuickSort(items, left, pivot);
    QuickSort(items, pivot + 1, right);
}

private static int Partition<T>(T[] items, int left, int right)
    where T : IComparable<T>
{
    int pivotPos = 1; // часто используется случайный индекс
    T pivotValue = items[pivotPos];

    Swap(ref items[right - 1], ref items[pivotPos]);
    int store = left;

    for (int i = left; i < right - 1; ++i)
    {
        if (items[i].CompareTo(pivotValue) < 0)
        {
            Swap(ref items[i], ref items[store]);
            ++store;
        }
    }

    Swap(ref items[right - 1], ref items[store]);
    return store;
}

private static void Swap<T>(ref T a, ref T b)
{
    T temp = a;
    a = b;
    b = temp;
}

Схема на рисунке ниже иллюстрирует единственный шаг, выполняемый методом Partition(). В качестве точки опоры выбирается четвертый элемент (со значением 5). Сначала он перемещается в правый конец массива. Затем все элементы, больше точки опоры, сдвигаются вправо. Наконец, точка опоры позиционируется так, чтобы справа от нее оказались элементы, которые больше ее, а слева - которые меньше или равны:

Алгоритм быстрой сортировки Quicksort

Рекурсивные вызовы Quicksort, выполняемые на каждом шаге, могут служить точками распараллеливания задачи. Сортировка левой и правой частей массива выполняются независимо и не требуют синхронизации, и класс Task идеально подходит для этой цели. Ниже представлена первая попытка реализовать параллельную версию Quicksort с применением класса Task:

public static void QuickSort<T>(T[] items) 
            where T : IComparable<T>
{
    QuickSort(items, 0, items.Length);
}

private static void QuickSort<T>(T[] items, int left, int right) 
    where T : IComparable<T>
{
    if (right - left < 2) return;
    int pivot = Partition(items, left, right);

    Task leftTask = Task.Run(() => QuickSort(items, left, pivot));
    Task rightTask = Task.Run(() => QuickSort(items, pivot + 1, right));

    Task.WaitAll(leftTask, rightTask);
}

private static int Partition<T>(T[] items, int left, int right)
    where T : IComparable<T>
{
    // ...
}

private static void Swap<T>(ref T a, ref T b)
{
    // ...
}

Метод Task.Run() создает новую задачу (действует подобно вызову new Task()) и планирует ее на выполнение (подобно методу start вновь созданной задачи). Статический метод Task.WaitAll() ждет завершения обеих задач и затем возвращает управление. Обратите внимание, что нам не пришлось определять, как ждать завершения задач, когда создавать потоки и когда уничтожать их.

Существует один очень удобный вспомогательный метод с именем Parallel.Invoke(), который выполняет указанный набор задач и возвращает управление, когда все задачи будут выполнены. Его применение позволило бы переписать ядро метода Quicksort, как показано ниже:

Parallel.Invoke(
    () => QuickSort(items, left, pivot),
    () => QuickSort(items, pivot + 1, right)
);

Неважно, какую версию выбрать, использующую Parallel.Invoke() или создающую задачи вручную, если сравнить ее с последовательной версией, обнаружится, что она работает существенно медленнее, даже при том, что в ее распоряжении все доступные процессоры. И действительно, последовательная версия выполняет сортировку массива с 1 000 000 случайных целых чисел (на нашей тестовой системе) примерно за 250 миллисекунд, а параллельная версия - примерно за 650 миллисекунд!

Проблема в том, что параллельно выполняемые задачи должны быть достаточно объемными; бессмысленно пытаться распараллелить сортировку массива из трех элементов, потому что накладные расходы на создание объектов Task, планирование отдельных заданий и ожидание их выполнения оказываются значительно больше расходов на несколько операций сравнения.

Ограничение параллелизма в рекурсивных алгоритмах

Можно ли как-то ограничить применение параллелизма, чтобы предотвратить подобные потери? Существует несколько решений этой проблемы:

И действительно, применение первого варианта ограничения параллелизма для массивов с числом элементов меньше 500 дает превосходные результаты. В моей системе на процессоре Intel Core i7 было получено 4-кратное увеличение производительности, в сравнении с последовательной версией. Чтобы добиться таких результатов потребовалось внести достаточно простые изменения в код, но имейте в виду, что в окончательной версии лучше не использовать жестко заданное пороговое значение:

private static void QuickSort<T>(T[] items, int left, int right)
    where T : IComparable<T>
{
    if (right - left < 2) return;
    int pivot = Partition(items, left, right);
    if (right - left > 500)
    {
        Parallel.Invoke(
            () => QuickSort(items, left, pivot),
            () => QuickSort(items, pivot + 1, right)
        );
    }
    else
    {
        QuickSort(items, left, pivot);
        QuickSort(items, pivot + 1, right);
    }
}

Примеры рекурсивной декомпозиции

Существует множество других способов распараллеливания подобных рекурсивных алгоритмов. В действительности, почти все рекурсивные алгоритмы, разбивающие исходные данные на части, могут обрабатывать эти части независимо и объединять результаты. Позже рассмотрим примеры, которые сложнее поддаются распараллеливанию, но сначала познакомимся с самими алгоритмами:

Алгоритм умножения матриц Штрассена (Strassen)

Этот алгоритм умножения матриц обеспечивает лучшую производительность, чем обычный кубический алгоритм. Алгоритм Штрассена рекурсивно разлагает матрицу размера 2n x 2n на четыре одинаковых матрицы размером 2n-1 x 2n-1 и использует хитрый трюк с применением семи умножений вместо восьми, чтобы получить асимптотическое время выполнения ~O(n2.807). Как и в примере быстрой сортировки, практические реализации алгоритма Штрассена часто возвращаются к стандартному кубическому алгоритму для достаточно маленьких матриц; при распараллеливании алгоритма Штрассена с использованием приема рекурсивной декомпозиции, определение порогового значения приобретает особую важность.

Быстрое преобразование Фурье (алгоритм Кули-Тьюки (Cooley-Tukey))

Этот алгоритм вычисляет ДПФ (дискретное преобразование Фурье) вектора с длиной 2n с использованием рекурсивной декомпозиции вектора на два подвектора размером 2n-1. Организовать параллельное выполнение этих вычислений очень просто, но здесь снова очень важно определить порогового значения, чтобы исключить параллельную обработку слишком маленьких векторов.

Обход графа (поиск в глубину или поиск в ширину)

При обсуждении сборщика мусора в предыдущих статьях было сказано, что сборщик мусора CLR выполняет обход графа, в котором объекты являются вершинами, а ссылки между ними - ребрами. Обход графа в глубину или в ширину можно значительно ускорить за счет распараллеливания, как и многие другие рекурсивные алгоритмы; однако, в отличие от быстрой сортировки или быстрого преобразования Фурье, при распараллеливании обхода ветвей графа заранее сложно предсказать объем работы, который будет выполнен рекурсивным вызовом.

Эта особенность требует применения некоторых эвристик при разделении пространства поиска между несколькими потоками: мы видели, что серверная разновидность сборщика мусора использует довольно грубое разбиение, опираясь на различные области динамической памяти, используемые разными процессорами для размещения объектов.

Желающие попрактиковаться в параллельном программировании могут также обратить внимание на алгоритм умножения Карацубы, опирающийся на рекурсивную декомпозицию при умножении n-значных чисел, и имеющий сложность ~O(n1.585); на алгоритм сортировки слиянием, опирающийся на рекурсивную декомпозицию при сортировке, подобно алгоритму быстрой сортировки; и многочисленные алгоритмы динамического программирования, которые часто требуют дополнительных ухищрений, таких как мемоизация, в разных ветвях параллельных вычислений.

Исключения и отмена

Мы пока познакомились не со всеми возможностями класса Task. Представьте, что нам потребовалось реализовать обработку исключений, которые могут возникать в рекурсивных вызовах Quicksort, и добавить поддержку отмены всей операции сортировки, если она еще не завершилась.

Среда выполнения задачи обеспечивает все необходимое для передачи исключений, возникающих в задаче, обратно любому потоку выполнения, желающему принять их. Представьте, что в одном из рекурсивных вызовов Quicksort в задаче возникло исключение, возможно потому, что мы были невнимательны при назначении границ массива и спровоцировали ошибку выхода за границы массива. В этом случае исключение возникнет в потоке из пула, над которым у нас нет явного контроля, и мы не можем переопределить порядок обработки исключений в нем. К счастью TPL автоматически перехватит исключение и сохранит его внутри объекта Task для последующей передачи дальше.

Исключение, возникшее в задаче, будет возбуждено повторно (завернутое в объект AggregateException), когда программа попытается перейти в режим ожидания завершения задачи (вызвав метод Task.Wait() экземпляра) или получить ее результаты (обратившись к свойству Task.Result). Это дает возможность организовать автоматическую и централизованную обработку исключений в коде, создавшем задачу, и не требует передачи ошибок вручную или использования инструментов синхронизации.

Следующий небольшой пример демонстрирует парадигму обработки исключений, реализованную в TPL:

int i = 0;
Task<int> divideTask = Task.Run(() => { return 5 / i; });

try
{
    // Обращение к свойству Result возбудит исключение
    Console.WriteLine(divideTask.Result);
}
catch (AggregateException ex)
{
    foreach (Exception inner in ex.InnerExceptions)
    {
        Console.WriteLine(inner.Message);
    }
}

При создании одной задачи внутри другой, отношение между ними устанавливается с помощью значения TaskCreationOptions.AttachedToParent. В продолжение темы обработки исключений, следует заметить, что под ожиданием завершения родительской задачи подразумевается ожидание завершения всех ее дочерних задач, в ходе которого все исключения, возникшие в дочерних задачах, будут переданы родительской задаче. Именно поэтому TPL возбуждает экземпляр исключения AggregateException, содержащий иерархию исключений, которая может быть порождена иерархией задач.

Отмена выполняющихся заданий - еще одна важная тема. Представьте, что имеется иерархия задач, как, например, иерархия, которая могла бы быть создана методом Quicksort с помощью значения TaskCreationOptions.AttachedToParent. Даже при том, что одновременно могут выполняться сотни задач, у нас может появиться необходимость дать пользователю возможность отмены, например, если надобность в отсортированных данных отпала. В других случаях возможность отмены задания может быть неотъемлемой частью процесса выполнения задачи. Например, представьте параллельный алгоритм, выполняющий поиск узлов в графе в глубину или в ширину. Когда желаемый узел обнаруживается, всю иерархию выполняющихся задач поиска необходимо остановить.

Отмена задач тесно связана с использованием типов CancellationTokenSource и CancellationToken, и требует содействия со стороны отменяемой задачи. Иными словами, если задача уже выполняется, ее нельзя просто грубо оборвать с использованием механизмов отмены в библиотеке TPL. Для отмены уже выполняющегося задания требуется содействие со стороны кода, выполняющего это задание. Однако еще не запущенную задачу можно отменить немедленно, без каких-либо отрицательных последствий.

Следующий код реализует поиск в двоичном дереве, каждый узел которого содержит потенциально длинный массив, элементы которого нужно обойти; вся процедура поиска может быть отменена вызывающей программой с использованием механизмов отмены TPL. С одной стороны, незапущенные задачи могут отменяться автоматически, самой библиотекой TPL; с другой стороны, уже запущенные задачи будут периодически проверять наличие признака отмены с инструкциями и содействовать прекращению работы, когда это необходимо.

public class TreeNode<T>
{
    public TreeNode<T> Left, Right;
    public T[] Data;
}

public class Program
{
    public static void TreeLookup<T>(
        TreeNode<T> root, Predicate<T> condition, CancellationTokenSource cts)
    {
        if (root == null)
        {
            return;
        }

        // Запустить рекурсивные задачи, передать им признак отмены, чтобы они 
        // могли останавливаться автоматически, пока не запущены, и реагировать 
        // на запрос отмены в противном случае
        Task.Run(() => TreeLookup(root.Left, condition, cts), cts.Token);
        Task.Run(() => TreeLookup(root.Right, condition, cts), cts.Token);

        foreach (T element in root.Data)
        {
            if (cts.IsCancellationRequested) break; // удовлетворить запрос
            if (condition(element))
            {
                cts.Cancel(); // отменить все запущенные задания

                // ... Выполнить требуемые операции с элементом element
            }
        }
    }

    static void Main(string[] args)
    {
        TreeNode<int> treeRoot = new TreeNode<int>();

        // Пример вызывающего кода
        CancellationTokenSource cts = new CancellationTokenSource();
        Task.Run(() => TreeLookup(treeRoot, i => i % 77 == 0, cts));

        // Спустя некоторое время, например, когда пользователя перестали 
        // интересовать результаты операции:
        cts.Cancel();
    }
}

В вашей практике вам неизбежно будут встречаться алгоритмы, для реализации которых желательно иметь более простой способ их распараллеливания. Вспомните пример поиска простых чисел, с которого началось обсуждение этой темы. Мы могли бы вручную разбить диапазон на поддиапазоны, создать для каждого из них отдельную задачу и ждать, когда они завершатся. В действительности существует целое семейство алгоритмов обработки массивов данных, применяющих к ним определенные операции. Такие алгоритмы требуют еще более высокоуровневой абстракции, чем параллельные задачи. Давайте познакомимся с этой абстракцией в следующей статье.

Пройди тесты
Лучший чат для C# программистов