Нашли ошибку или опечатку? Выделите текст и нажмите

Поменять цветовую

гамму сайта?

Поменять
Обновления сайта
и новые разделы

Рекомендовать в Google +1

Параллелизм потоков

65

Вначале были потоки. Потоки - это наиболее элементарные средства распараллеливания и асинхронного выполнения заданий; они являются самой низкоуровневой абстракцией, доступной программам, выполняющимся с привилегиями обычного пользователя. Потоки предлагают не так много в смысле структуры и управления, а программирование потоков близко напоминает давно прошедшие времена неструктурированного программирования, до того как обрели популярность подпрограммы, объекты и агенты.

Рассмотрим простую задачу: дан большой диапазон натуральных чисел, в нем требуется найти все простые числа и сохранить их в коллекции. Это - исключительно вычислительная задача и ее легко можно разбить на несколько подзадач, решаемых параллельно. Но, для начала реализуем самое простое решение, использующее единственный поток выполнения:

public class Program
{
    static void Main(string[] args)
    {
        IEnumerable<uint> counters = new List<uint>();
        
        Stopwatch timer = Stopwatch.StartNew();
        counters = PrimesInRange(100, 200000);
        timer.Stop();

        Console.WriteLine("Прошло: {0} мс., найдено {1} простых чисел", 
            timer.ElapsedMilliseconds,
            counters.Count());
    }

    // Возвращает все простые числа в диапазоне [start, end]
    public static IEnumerable<uint> PrimesInRange(uint start, uint end)
    {
        List<uint> primes = new List<uint>();

        for (uint number = start; number < end; ++number)
        {
            if (IsPrime(number))
            {
                primes.Add(number);
            }
        }

        return primes;
    }

    private static bool IsPrime(uint number)
    {
        // Крайне неэффективный алгоритм O(n), но достаточный для демонстрационных целей
        if (number == 2) return true;
        if (number % 2 == 0) return false;

        for (uint divisor = 3; divisor < number; divisor += 2)
        {
            if (number % divisor == 0) 
                return false;
        }

        return true;
    }
}

Можно ли здесь что-то улучшить, если допустить, что используется самый оптимальный алгоритм и в нем нечего больше улучшать? Для достаточно широкого диапазона чисел, такого как [100, 200000], реализация выше работает в течение нескольких секунд на современном процессоре, оставляя простор для оптимизации.

У кого-то могут возникнуть серьезные сомнения в эффективности алгоритма (например, весьма тривиальная оптимизация может повысить его производительность до O(√n), вместо O(n)), но, независимо от оптимальности алгоритма, одного взгляда достаточно, чтобы заметить, что его легко можно распараллелить. В конце концов, проверка числа 4911 на принадлежность к категории простых чисел никак не зависит от проверки числа 2321, поэтому самый простой способ распараллелить решение задачи состоит в том, чтобы разделить диапазон чисел на фрагменты и запустить дополнительные потоки выполнения для параллельной обработки фрагментов, как показано на рисунке ниже:

Деление диапазона чисел для обработки несколькими потоками выполнения

Разумеется, нам придется синхронизировать доступ к коллекции простых чисел, чтобы предотвратить повреждение данных. Ниже приводится простейшая реализация такого решения:

// ...

public static IEnumerable<uint> PrimesInRange(uint start, uint end)
{
    List<uint> primes = new List<uint>();
    uint range = end - start;

    uint numThreads = (uint)Environment.ProcessorCount; // это удачная идея?
    uint chunk = range / numThreads; // надеемся, деление будет без остатка

    Thread[] threads = new Thread[numThreads];

    for (uint i = 0; i < numThreads; ++i)
    {
        uint chunkStart = start + i * chunk;
        uint chunkEnd = chunkStart + chunk;

        threads[i] = new Thread(() =>
        {
            for (uint number = chunkStart; number < chunkEnd; ++number)
            {
        if (IsPrime(number))
        {
            lock (primes)
            {
                primes.Add(number);
            }
        }
            }
        });
        threads[i].Start();
    }

    foreach (Thread thread in threads)
    {
        thread.Join();
    }

    return primes;
}

// ...

В системе на процессоре Intel Core i7 последовательная версия обрабатывала диапазон чисел [100, 200000] в среднем за 4419 миллисекунд, а параллельная версия - за 1251 миллисекунд. От системы с 8-ядерным процессором можно было бы ожидать большего. Но дело в том, что данная модель процессоров i7 использует технологию HyperThreading, а это означает, что в процессоре имеется лишь 4 физических ядра (каждое физическое ядро делится на два логических). Учитывая это вполне можно было бы ожидать 4-кратного прироста производительности, но мы получили лишь 3-кратный, что кажется недостаточным.

Однако, как показывают результаты применения профилировщика параллелизма (Concurrency Profiler), изображенные на рисунках ниже, некоторые потоки завершили работу раньше других, в результате чего общее использование процессора оказалось много ниже 100% (порядок запуска профилировщика параллелизма описывался ранее в статье "Профилировщики баз данных и ввода/вывода", напомню, что в Visual Studio он запускается командой Analyze --> Concurency Visualizer --> Start with Current Project):

Общее использование процессора при выполнении параллельной задачи
Работа потоков при выполнении параллельной задачи

Как видно на первом рисунке, общее использование процессора сначала повысилось почти до 8 логических ядер (100%), а затем постепенно снижалось. На втором рисунке видно, что некоторые потоки справились со своей долей работы значительно раньше других. Поток 7780 выполнялся менее 200 мсек, а поток 4492 более 800 мсек.

В действительности эта программа может работать намного быстрее последовательной версии (хотя масштабируемость и не будет линейной), особенно если задействовать большое количество ядер. Однако при этом возникает несколько вопросов:

  • Какое количество потоков будет оптимальным? Следует ли в системе на процессоре с восемью ядрами создавать восемь потоков?

  • Как убедиться, что программа не монополизирует системные ресурсы и не превышает ее возможности? Например, что произойдет, если другой поток выполнения в нашем процессе попытается найти простые числа с использованием того же самого параллельного алгоритма?

  • Как синхронизировать доступ потоков выполнения к коллекции с результатами? Одновременный доступ к List<uint> из нескольких потоков выполнения небезопасен и может повлечь повреждение данных. Однако, приобретение блокировки перед записью каждого найденного простого числа в коллекцию (что делает решение, представленное выше) слишком дорогое удовольствие и не позволит масштабировать алгоритм с увеличением количества ядер в процессоре.

  • Стоит ли для узкого диапазона чисел порождать несколько потоков выполнения или лучше выполнить всю операцию поиска синхронно, в единственном потоке? (Создание и уничтожение потоков выполнения - достаточно недорогая операция в Windows, но не настолько дешевая, чтобы с ее применением пытаться найти 20 первых простых чисел.)

  • Как обеспечить равную загруженность всех потоков выполнения? Некоторые потоки выполнения могут завершаться раньше других, особенно те, которые обрабатывают начало диапазона. Для диапазона [100, 100 000], разделенного на четыре равные части, поток, обрабатывающий поддиапазон [100, 25 075], справится со своей работой более чем в два раза быстрее, чем поток, обрабатывающий поддиапазон [75 025, 100 000], потому что скорость нашего алгоритма проверки простых чисел падает с увеличением значений чисел.

  • Как обрабатывать исключения, которые могут возникнуть в других потоках? В данном конкретном случае может показаться, что появление ошибок в методе IsPrime() невозможно, но в реальном мире параллельной обработки данных можно найти массу примеров потенциальных ловушек и исключений. (По умолчанию среда выполнения CLR завершает процесс целиком, когда в каком-нибудь потоке выполнения появляется необработанное исключение, что в целом не так уж и плохо, но это не дает возможности программе, вызвавшей PrimesInRange, взять на себя обработку исключений.)

Ответить на эти вопросы очень непросто. Поэтому разработка фреймворка, не порождающего излишнего количества потоков, чтобы не вызвать перегрузку системы, обеспечивающего равномерную загруженность всех потоков выполнения, сообщающего об ошибках и возвращающего достоверные результаты, а также учитывающего другие источники параллелизма в процессе, была весьма непростой задачей для создателей библиотеки Task Parallel Library, которую мы будем использовать далее.

Естественным первым шагом от ручного управления потоками стал переход к использованию пулов потоков. Пул потоков - это компонент, управляющий группой потоков, доступных для выполнения некоторой работы. Вместо создания потока для выполнения какой-то определенной задачи, вы помещаете задание в очередь, а компонент извлекает его оттуда и передает первому свободному потоку для выполнения. Пулы потоков помогают решить некоторые проблемы, обозначенные выше, - они снижают затраты на создание и уничтожение потоков для очень коротких заданий, помогают избежать монополизации ресурсов и перегрузки системы, ограничивая общее количество потоков, используемых приложением, и автоматизируют принятие решения о выборе оптимального количества потоков для решения данной задачи.

В нашем конкретном случае можно попробовать разбить диапазон на большее число фрагментов (например, в каждой итерации цикла выделять один фрагмент) и передать их пулу потоков. Ниже приводится пример реализации этого подхода с размером фрагмента, равным 100:

public static IEnumerable<uint> PrimesInRange(uint start, uint end)
{
    List<uint> primes = new List<uint>();
    const uint ChunkSize = 100;
    int completed = 0;

    ManualResetEvent allDone = new ManualResetEvent(initialState: false);

    // Разделить диапазон на равные фрагменты
    uint chunks = (end - start) / ChunkSize;

    for (uint i = 0; i < chunks; ++i)
    {
        uint chunkStart = start + i * ChunkSize;
        uint chunkEnd = chunkStart + ChunkSize;

        ThreadPool.QueueUserWorkItem(_ =>
        {
            for (uint number = chunkStart; number < chunkEnd; ++number)
            {
                if (IsPrime(number))
                {
                    lock (primes)
                    {
                        primes.Add(number);
                    }
                }
            }
            if (Interlocked.Increment(ref completed) == chunks)
            {
                allDone.Set();
            }
        });
    }

    allDone.WaitOne();
    return primes;
}

Эта версия имеет значительно больший запас масштабируемости и выполняется быстрее предыдущих версий. Она улучшила производительность с 1251 миллисекунд (для диапазона [100, 200000]) простейшей многопоточной версии до 1040 миллисекунд (в 4 раза быстрее последовательной версии). Более того, использование процессора остается постоянно на высоком уровне, близком к 100%,, как видно из отчета профилировщика Concurrency Profiler, изображенного на рисунке ниже:

Работа потоков при выполнении параллельной задачи с разбиением на блоки

Пул потоков в CLR содержит 8 потоков (по одному на каждое логическое ядро). Каждый поток оказался задействован практически на всем протяжении работы приложения.

В версии CLR 4.0, в пул потоков было добавлено несколько компонентов поддержки кооперативной работы. Когда какой-то поток выполнения, не принадлежащий пулу (например, главный поток приложения), передает задания пулу потоков, они ставятся в глобальную очередь FIFO (First-In-First-Out - первым пришел, первым ушел). Каждый поток в составе пула имеет свою, локальную очередь LIFO (Last-In-First-Out - последним пришел, первым ушел), куда пул помещает задания для потоков:

Работа пула потоков CLR

Как видно на рисунке, поток №2 в настоящий момент обрабатывает задание №5; завершив обработку, он извлечет следующее задание из глобальной очереди FIFO. Поток №1 сначала опустошит свою очередь, прежде чем приняться за другую работу.

Когда поток из пула освобождается, он обращается к своей очереди LIFO и извлекает из нее очередное задание, пока очередь не опустеет. Когда поток опустошит свою очередь LIFO, он попытается «захватить» задание из локальной очереди другого потока в порядке FIFO. Наконец, после опустошения всех локальных очередей потоки будут обращаться к глобальной очереди (FIFO) и выполнять задания оттуда.

Семантика FIFO и LIFO пула потоков

Причина странного использования очередей FIFO и LIFO заключается в следующем: когда задание добавляется в глобальную очередь, ни один из потоков не пользуется каким-либо преимущественным правом на это задание. Справедливость - единственный критерий выполнения этого задания. Именно поэтому для глобальной очереди избрана семантика FIFO. Однако, когда пул потоков перемещает задание в локальную очередь потока, весьма вероятно, что при этом будут использоваться те же данные и те же машинные инструкции, что и в текущем задании; именно поэтому для локальной очереди потока избрана семантика LIFO - очередное задание из очереди будет выполнено сразу же после текущего, что позволят максимально использовать кеши данных и инструкций.

Кроме того, доступ к заданиям в локальной очереди потока требует меньше синхронизации и вероятность конфликтов с другими потоками ниже, чем при доступе к глобальной очереди. Аналогично, когда поток захватывает задания из локальной очереди другого потока, он использует ее как очередь FIFO, чтобы поддержать оптимальное использование кешей процессора этим потоком, которое дает семантика UFO. Такая организация пула потоков оказывается очень дружественной к заданиям с иерархической организацией, когда единственное задание, добавленное в глобальную очередь, порождает десятки дополнительных заданий, обеспечивая работой несколько потоков из пула.

Как и любая другая абстракция, пулы потоков берут на себя некоторые хлопоты по управлению жизненным циклом потоков и распределению заданий между ними, снимая это бремя с разработчика приложения.

Реализация пулов потоков в CLR имеет некоторые методы, такие как ThreadPool.SetMinThreads() и SetMaxThreads(), позволяющие управлять количеством потоков, но в ней отсутствуют встроенные средства управления приоритетами потоков или заданий. Однако зачастую эта нехватка средств управления с лихвой компенсируется возможностью приложения автоматически масштабироваться, в зависимости от вычислительной мощности системы, и дополнительным приростом производительности из-за отсутствия необходимости создавать и уничтожать потоки для выполнения коротких заданий.

Однако механизм пула потоков с его очередью заданий не всегда удобен; задания в очереди не имеют состояния, не несут информацию об исключениях, не имеют поддержки асинхронных продолжений (continuations) и отмены (cancellation), и не предоставляют никаких механизмов для получения результатов после выполнения задания. Библиотека Task Parallel Library в .NET 4.0 вводит понятие задачи - мощной абстракции поверх заданий для потоков в составе пула. Задачи являются структурированной альтернативой заданиям, почти так же, как объекты и подпрограммы стали структурированной альтернативой программированию на языке ассемблера, основанному на переходах. В следующей статье мы более подробно рассмотрим параллельное программирование с помощью задач.

Пройди тесты