в чем особенность scheduledexecutorservice java
Фреймворк Executor в Java
Вплоть до Java 5 создавать потоки и управлять ими было возможно только на уровне приложения. Объекты Thread требуют значительного объема памяти. Таким образом, если постоянно создавать много таких объектов в крупномасштабном приложении, это приведет к существенным затратам памяти. Поэтому создание потоков и управление ими лучше отделить от остальной части приложения.
В качестве решения Java предоставляет фреймворк Executor. Он содержит множество функций для эффективного управления несколькими потоками. Не нужно каждый раз создавать новые потоки с помощью Executor — он позволяет использовать уже созданные потоки, когда вам это необходимо. В результате экономится как память вашего Java-приложения, так и ваше драгоценное время.
Из этой статьи вы узнаете о фреймворке Executor, пуле потоков и различных методах их создания, а также о том, как с их помощью управлять потоками. Давайте начнем.
Интерфейсы и классы Executor
Пулы потоков
newSingleThreadExecutor()
newFixedThreadPool()
newCachedThreadPool()
Когда мы создаем пул потоков с помощью этого метода, максимальный размер пула потоков устанавливается на максимальное целочисленное значение в Java. Этот метод создает новые потоки по запросу и разрушает потоки, которые простаивают больше минуты, если запрос отсутствует.
В данном примере метод newCachedThreadPool() изначально создаст пять новых потоков и обработает пять задач. Никакой очереди ожидания здесь не будет. Если поток остается в бездействии более минуты, метод устраняет его. Таким образом, этот метод — хороший выбор, если вам хочется добиться большей производительности очереди, чем это возможно с методом newFixedThreadPool() . Но если вы хотите ограничить количество параллельно выполняемых задач во имя управления ресурсами, лучше использовать newFixedThreadPool() .
newScheduledThreadPool()
Как видно из примера:
Каждый из этих методов полезен в своём определённом сценариев.
Вот и всё о пулах потоков. Теперь — некоторые важные замечания насчет того, как использовать фреймворк Executor.
Важные замечания
На этом всё. Надеюсь, пулы потоков и Java ExecutorService стали для вас понятнее. Спасибо за чтение и счастливого кодирования!
Руководство по Java ExecutorService
Введение и руководство по фреймворку ExecutorService, предоставляемому JDK, что упрощает выполнение задач в асинхронном режиме.
1. Обзор
ExecutorService – это API JDK, который упрощает выполнение задач в асинхронном режиме. Вообще говоря, ExecutorService автоматически предоставляет пул потоков и API для назначения ему задач.
Дальнейшее чтение:
Руководство по фреймворку Fork/Join в Java
Обзор java.util.concurrent
Руководство по java.util.concurrent.Блокировки
2. Создание экземпляра ExecutorService
2.1. Заводские методы класса Исполнителей
Например, в следующей строке кода будет создан пул потоков с 10 потоками:
2.2. Непосредственно создайте службу ExecutorService
Поскольку ExecutorService является интерфейсом, можно использовать экземпляр любой его реализации. В пакете java.util.concurrent есть несколько реализаций на выбор, или вы можете создать свою собственную.
Например, класс ThreadPoolExecutor имеет несколько конструкторов, которые мы можем использовать для настройки executorservice и его внутреннего пула:
Вы можете заметить, что приведенный выше код очень похож на исходный код заводского метода newSingleThreadExecutor(). В большинстве случаев подробная ручная настройка не требуется.
3. Назначение задач исполнителю
ExecutorService может выполнять Выполняемые и вызываемые задачи. Чтобы все было просто в этой статье, будут использоваться две примитивные задачи. Обратите внимание, что мы используем здесь лямбда-выражения вместо анонимных внутренних классов:
Метод execute() является void и не дает никакой возможности получить результат выполнения задачи или проверить состояние задачи (выполняется ли она):
submit() отправляет Вызываемую или выполняемую задачу в ExecutorService и возвращает результат типа Future :
Прежде чем идти дальше, нам нужно обсудить еще два вопроса: завершение работы ExecutorService и работа с типами Future return.
4. Завершение работы службы ExecutorService
В общем случае ExecutorService не будет автоматически уничтожен, если нет задачи для обработки. Он останется в живых и будет ждать новой работы.
В некоторых случаях это очень полезно, например, когда приложению необходимо обрабатывать задачи, которые появляются нерегулярно, или количество задач неизвестно во время компиляции.
С другой стороны, приложение может дойти до конца, но не будет остановлено, потому что ожидание ExecutorService приведет к продолжению работы JVM.
Этот метод возвращает список задач, ожидающих обработки. Разработчик должен решить, что делать с этими задачами.
Один хороший способ закрыть ExecutorService (который также рекомендуется Oracle ) – использовать оба этих метода в сочетании с методом awaitTermination() :
При таком подходе ExecutorService сначала прекратит принимать новые задачи, а затем будет ждать до указанного периода времени, пока все задачи будут завершены. Если это время истекает, выполнение немедленно прекращается.
5. Будущий Интерфейс
Вызов метода get() во время выполнения задачи приведет к блокировке выполнения до тех пор, пока задача не будет выполнена должным образом и результат не будет доступен.
Если период выполнения больше указанного (в данном случае 200 миллисекунд), a TimeoutException будет выброшено.
Интерфейс Future также предусматривает отмену выполнения задачи с помощью метода cancel() и проверку отмены с помощью метода isCancelled() :
6. Интерфейс ScheduledExecutorService
ScheduledExecutorService выполняет задачи с некоторой предопределенной задержкой и/или периодически.
Для этого раздела мы используем ScheduledExecutorService с одним потоком:
Два метода scheduled() позволяют выполнять Выполняемые или Вызываемые задачи:
Следующий блок кода запустит задачу после начальной задержки в 100 миллисекунд. И после этого он будет выполнять одну и ту же задачу каждые 450 миллисекунд:
Например, следующий код гарантирует 150-миллисекундную паузу между окончанием текущего выполнения и началом другого:
7. ExecutorService vs Fork/Join
После выпуска Java 7 многие разработчики решили заменить фреймворк ExecutorService фреймворком fork/join.
Однако это не всегда правильное решение. Несмотря на простоту и частое повышение производительности, связанное с fork/join, это снижает контроль разработчика над параллельным выполнением.
ExecutorService дает разработчику возможность контролировать количество генерируемых потоков и детализацию задач, которые должны выполняться отдельными потоками. Лучшим вариантом использования ExecutorService является обработка независимых задач, таких как транзакции или запросы по схеме “один поток для одной задачи.”
8. Заключение
Давайте подведем их итоги:
Неправильная емкость пула потоков при использовании пула потоков фиксированной длины : Очень важно определить, сколько потоков потребуется приложению для эффективного выполнения задач. Слишком большой пул потоков приведет к ненужным накладным расходам только для создания потоков, которые в основном будут находиться в режиме ожидания. Слишком немногие могут заставить приложение казаться невосприимчивым из-за длительных периодов ожидания задач в очереди.
Неожиданно длительная блокировка с помощью Future ‘s get() method : Мы должны использовать тайм-ауты, чтобы избежать неожиданных ожиданий.
Русские Блоги
Принцип анализа пула потоков синхронизации Java _ScheduledExecutorService
В предыдущей главе мы подробно объяснили роль пула потоков, который может выполнять задачи в новых потоках, а также является эффективным и быстрым, поскольку потоки не нужно часто создавать и уничтожать.
Но мы часто сталкиваемся с таким требованием, что задача не выполняется сразу, а будет выполнена через определенный период времени. Или задача может выполняться периодически. Итак, как добиться такого спроса?
предоставляет интерфейс ScheduledExecutorService на java, после чего выполняются указанные выше требования.
1. Интерфейс ScheduledExecutorService
Интерфейс ScheduledExecutorService определяет четыре метода:
Проиллюстрируем на примерах.
2. Задержка в выполнении задач.
Видно, что запуск задачи действительно выполняется с задержкой в 1 секунду.
3. Периодически выполняйте задания.
3.1 метод scheduleAtFixedRate
Вызовите метод scheduleAtFixedRate, установите время цикла на 2000 миллисекунд и обнаружите, что запуск задачи действительно выполняется каждые 2000 миллисекунд. Это потому, что время, затрачиваемое на нашу задачу, составляет 1000 миллисекунд, что меньше времени цикла. Что произойдет, если мы изменим время цикла на 500 миллисекунд?
Результат выполнения:
Мы обнаружили, что запуск задачи не выполняется каждые 500 миллисекунд времени цикла, а выполняется каждые 1000 миллисекунд времени завершения задачи.
Итак, для метода scheduleAtFixedRate:
3.2 метод scheduleWithFixedDelay
Смысл метода scheduleWithFixedDelay заключается в том, чтобы отложить время задержки после завершения задачи, снова выполнить задачу и продолжить цикл.
подводить итоги
Методы в интерфейсе ScheduledExecutorService были подробно проанализированы.Если вы просто хотите использовать пул синхронизированных потоков, вы уже знаете, как его использовать. Но если вы хотите узнать, как реализован пул потоков синхронизации, см. Мою следующую статью.
Что такое ExecutorService?
Честно говоря, вопрос этот не слишком новый. Со времени выхода Java 5 и пакета java.util.concurrent.* прошло более 13 лет, но мне, за всю мою десятилетнюю практику, так ни разу и не пришлось столкнуться с этим зверем. Тем не менее, мне этот вопрос несколько раз задавали на собеседованиях и пришлось знакомиться.
Первое естественно с чего я начал это — Хабр. Но, к сожалению, нашёл здесь только две статьи:
Первая, очевидно, для тех, кто понимает и имеет опыт работы с ExecutorService. Вторая в меня, к сожалению, не вошла. Вроде небольшая и «по делу», но перечитав несколько раз я так и не понял, что же такое ExecutorService и с чем его едят. Поэтому пришлось садиться за Eclipse, курить читать javadoc и разбираться.
Итак, давайте рассмотрим простой пример:
В данном примере мы создали сам обьект ExecutorService и вызвали на нём метод execute. Передав в него самую обычную имплементацию потока. Всё это можно было бы соорудить и старым дедовским способом, но так, согласитесь, гораздо проще и изящнее. Фактически, мы быстро ответвили от текущего потока другой, асинхронный, который может что-то там выполнить в фоне.
Сам объект ExecutorService мы создали с помощью фабрики. Её методы вполне очевидны, поэтому не будем особо мусолить. Вот некоторые из них:
Помимо метода execute, который вызывается по принципу «выстрелил и забыл», наш сервис ещё имеет метод submit. Отличие от первого лишь в том, что последний возвращает объект интерфейса Future. Это просто замечательная возможность контролировать состояние потока, который мы запустили в фоне. Делается примерно так:
Обратите внимание, что метод get насмерть блокирует текущий поток и будет ждать пока фоновый не завершится. Теперь не нужно всех этих неочевидных join-ов! Если же мы боимся что наш фоновый поток не завершится никогда, можем использовать get(long,TimeUnit).
Иногда приходится из фонового потока возвратить данные в текущий. В этом тоже нам поможет метод submit, но теперь нам нужно передать в него не Runnable, а Callable обьект. По сути это два одинаковых интерфейса, отличаются только тем, что последний может возвращать что-то:
Ну вот вкратце и всё. Остались методы создания ExecutorService-а в фабрике (их там много), остались методы самого ExecutorService, вопросы об обработке ошибок в фоновых потоках, но это уже совсем другая история…
На завершение не забывайте делать:
Или не делать, в случае, что все фоновые потоки у вас будут демонами.
10 советов по использованию ExecutorService
Предлагаю читателям «Хабрахабра» перевод публикации «ExecutorService — 10 tips and tricks».
Абстракция ExecutorService была представлена еще в Java 5. На дворе шел 2004 год… На секунду – сейчас Java 5 и 6 больше не поддерживаются и Java 7 готовится пополнить список. А многие Java-программисты по-прежнему не в полной мере понимают как работает ExecutorService. В вашем распоряжении множество источников, но сейчас я хотел бы рассказать о малоизвестных тонкостях и практиках по работе с ней.
1. Именуйте пулы потоков
Не могу не упомянуть об этом. При дампинге или во время дебаггинга можно заметить, что стандартная схема именования потока следующая: pool-N-thread-M, где N обозначает последовательный номер пула (каждый раз, когда вы создаете новый пул, глобальный счетчик N инкрементится), а M – порядковый номер потока в пуле. Например, pool-2-thread-3 означает третий поток во втором пуле жизненного цикла JVM. См.: Executors.defaultThreadFactory(). Не очень информативно, не правда ли? JDK немного затрудняет правильное именование потоков, т.к. стратегия именования скрыта внутри ThreadFactory. К счастью, Google Guava имеет встроенный класс для этого:
По умолчанию создаются non-daemon пулы потоков, решайте сами где какие уместнее.
2. Изменяйте имена в зависимости от контекста
Про этот трюк я узнал из статьи «Supercharged jstack: How to Debug Your Servers at 100mph». Раз мы знаем про имена потоков, мы можем менять их в рантайме, когда захотим! Это имеет смысл, поскольку дамп потока содержит имена классов и методов без параметров и локальных переменных. Включая некоторую важную информацию в имя потока, мы можем легко проследить какие сообщения/записи/запросы и т.п. тормозят систему или вызывают взаимную блокировку.
Внутри блока try-finally текущий поток называется Обработка-ID-текущего-сообщения, что может пригодиться при отслеживании потока сообщений в системе.
3. Явное и безопасное завершение
Между клиентскими потоками и пулом потоков лежит очередь заданий. Когда приложение завершает работу, вы должны побеспокоиться о двух вещах: что произойдет с заданиями, ожидающими в очереди, и как поведут себя уже выполняющиеся (об этом позже). Удивительно, но многие разработчики не закрывают пул потоков должным образом. Есть два способа: либо разрешите отработать всем задачам в очереди (shutdown()), либо удалите их (shutdownNow()) – в зависимости от конкретного случая. Например, если мы поставили в очередь набор задач и хотим вернуть управление как только все они выполнятся, используем shutdown():
В этом примере мы отправляем пачку писем, каждое в виде отдельного задания для пула потоков. После постановки этих заданий в очередь мы закрываем пул, чтобы он больше не мог принять новых задач. Далее мы ждем максимум одну минуту пока все задания не будут выполнены. Однако, если какие-то задания еще не выполнены, awaitTermination() просто вернет false. Кроме того, оставшиеся задания продолжат выполняться. Знаю, хипстеры готовы пойти на:
Зовите меня старомодным, но мне нравится контролировать количество параллельных потоков. А альтернатива постепенному завершению shutdown() – это shutdownNow():
На этот раз все стоящие в очереди задачи отбрасываются и возвращаются. Уже запущенным задачам разрешено продолжить работу.
4. Обрабатывайте прерывание потока с осторожностью
Менее известная особенность интерфейса Future – возможность отмены. Далее приводится одна из моих предыдущих статей: InterruptedException and interrupting threads explained.
Поскольку исключение InterruptedException явно пробрасываемое (checked), никто, скорее всего, даже не задумывался о том, сколько ошибок оно подавило за все эти годы. И так как оно должно быть обработано, многие делают это неправильно или необдуманно. Давайте рассмотрим простой пример потока, который периодически делает некую очистку, а в промежутках большую часть времени спит.
Обратите внимание, что блок try-catch в данном примере окружает цикл while. Таким образом, если sleep() выбросит InterruptedException, мы прервем этот цикл. Вы можете возразить, что мы должны логировать стек исключения InterruptedException. Это зависит от ситуации. В данном случае прерывание потока является ожидаемым поведением, а не падением. В общем, на ваше усмотрение. В большинстве случаев поток прервется во время sleep() и мы быстренько завершим метод run() в это же время. Если вы очень осторожны, то наверняка спросите – а что будет, если поток прервется во время выполнения чистки cleanUp()? Зачастую вы столкнетесь с решением вручную выставить флаг, наподобие этого:
Помните, что стоп-флаг (он должен быть волатильным!) не будет прерывать блокирующие операции, мы должны дождаться пока отработает метод sleep(). С другой стороны, этот явный флаг дает нам лучший контроль, т.к. мы можем мониторить его в любое время. Оказывается, прерывание потоков работает точно так же. Если кто-то прервал поток, пока он выполнял неблокирующие вычисления (например, cleanUp()), такие вычисления не будут прерваны незамедлительно. Однако поток уже отмечен как прерванный, поэтому любая следующая блокирующая операция, такая как sleep() немедленно прервется и выбросит InterruptedException, поэтому мы не потеряем этот сигнал.
Мы также можем воспользоваться этим фактом, если реализуем неблокирующий поток, который по-прежнему хочет использовать преимущества механизма прерывания потоков. Вместо того чтобы полагаться на InterruptedException, мы должны просто периодически проверять Thread.isInterrupted():
Как видите, если кто-то прервет наш поток, мы отменим вычисления так скоро, насколько позволят предыдущая итерация someHeavyComputations(). Если она выполняется очень долго или бесконечно, мы никогда не достигнем флага прерывания. Примечательно, что этот флаг не одноразовый. Мы можем вызвать Thread.interrupted() вместо isInterrupted(), что сбросит значение флага и мы сможем продолжить. Иногда вы можете захотеть проигнорировать флаг прерывания и продолжить выполнение. В этом случае interrupted() может пригодиться.
Если вы олдскульный программист, вы наверняка помните метод Thread.stop(), который устарел 10 лет назад. В Java 8 были планы по его «деимплементации», но в 1.8u5 он по-прежнему с нами. Тем не менее, не используйте его и рефакторите любой код, в котором он встречается, используя Thread.interrupt().
Возможно, иногда вы захотите полностью проигнорировать InterruptedException. В этом случае обратите внимание на класс Uninterruptibles из Guava. Он содержит много методов таких как sleepUninterruptibly() или awaitUninterruptibly(CountDownLatch). Просто будьте осторожны с ними. Они не декларируют InterruptedException, но также полностью избавляют поток от прерывания, что довольно необычно.
5. Следите за длиной очереди и определяйте границу
Пулы потоков неправильного размера могут привести к падению производительности, нестабильности и утечкам памяти. Если вы укажете слишком мало потоков, очередь будет расти, потребляя много памяти. С другой стороны, слишком много потоков будут замедлять всю систему из-за частых переключений контекста, что приведет к тем же симптомам. Важно сохранять глубину очереди и определять ее границы. А перегруженный пул может просто временно отказываться от новых задач.
Вышеприведенный код эквивалентен Executors.newFixedThreadPool(n), однако вместо того, чтобы использовать по умолчанию неограниченный LinkedBlockingQueue, мы используем ArrayBlockingQueue с фиксированной емкостью в 100. Это означает, что если 100 задач уже набраны, следующая задача будет отклонена с исключением RejectedExecutionException. Кроме того, поскольку очередь теперь доступна извне, мы можем периодически справляться о ее размере, чтобы записать в лог, отправить в JMX и т.д.
6. Помните об обработке исключений
Каков результат выполнения следующего кода?
Я был озадачен тем, как много раз он ничего не печатал. Никаких признаков java.lang.ArithmeticException: / by zero, ничего. Пул потоков просто проглатывал исключение, как будто оно никогда не выбрасывалось. Если бы это был поток, созданный «с нуля», без обертки в виде пула, мог бы сработать UncaughtExceptionHandler. Но с пулом потоков вы должны быть более осторожны. Если вы отправили на выполнение Runnable (без какого-либо результата, как выше), вы обязаны поместить все тело метода внутрь try-catch. Если вы помещаете в очередь Callable, удостоверьтесь, что вы всегда достаете его результат с помощью блокирующего get(), чтобы заново бросить исключение:
Примечательно, что даже в Spring framework допустили эту ошибку в @Async, см.: SPR-8995 и SPR-12090.
7. Следите за временем ожидания в очереди
Мониторинг глубины рабочей очереди односторонний. При решении проблем с одиночной транзакцией/задачей, имеет смысл посмотреть сколько времени прошло между постановкой задачи и началом ее выполнения. Это время в идеале должно стремиться к нулю (когда в пуле имеется простаивающий поток), однако оно будет увеличиваться по мере постановки задач в очередь. Кроме того, если пул не имеет фиксированного числа потоков, запуск новой задачи может потребовать рождения нового потока, что тоже займет какое-то время. Чтобы четко измерять этот показатель, оберните оригинальный ExecutorService во что-то похожее:
Это не полная реализация, но суть понятна. В момент, когда мы поставили задание в пул потоков, мы незамедлительно засекли время. Затем остановили секундомер, как только задача была извлечена и отправлена на выполнение. Не обманывайтесь близостью startTime и queueDuration в исходном коде. На самом деле эти две строки исполняются в разных потоках, в миллисекундах или даже в секундах друг от друга.
8. Сохраняйте трассировку стека клиента
Реактивному программированию в наши дни уделяется повышенное внимание: Reactive manifesto, reactive streams, RxJava (уже 1.0!), Clojure agents, scala.rx… Все это выглядит здорово, но стектрейс – больше не ваш друг, он по большому счету бесполезен. Рассмотрим, к примеру, следующее исключение, возникающее во время выполнения задания в пуле потоков:
Мы можем легко заметить, что MyTask выбросило NPE в строке 76. Но мы не имеем никакого представления, кто утвердил эту задачу, поскольку стек относится только к Thread и ThreadPoolExecutor. Технически, мы можем просто перемещаться по коду в надежде найти только один участок, где выполняется постановка MyTask в очередь. Но без отдельных потоков (не говоря уже о событийно-ориентированном, реактивном и т.п. программировании), мы всегда видим сразу всю картину целиком. Что если мы могли бы сохранить стектрейс клиентского кода (того, что инициирует задание) и показать его, допустим, при возникновении ошибки? Идея не нова, например, Hazelcast распространяет исключения из узла-владельца в клиентский код. Ниже приведен незамысловатый пример как сделать подобное:
В этот раз в случае неудачи, мы извлекаем полный стектрейс и название потока, где задание было поставлено в очередь. Гораздо более ценная информация по сравнению со стандартным исключением, рассмотренным ранее:
Исключение java.lang.NullPointerException в задании из потока main:
9. Предпочитайте CompletableFuture
В Java 8 был представлен более мощный класс CompletableFuture. Пожалуйста, используйте его там, где это возможно. ExecutorService не был расширен, чтобы поддерживать эту абстракцию, так что вы должны заботиться об этом самостоятельно. Вместо:
CompletableFuture расширяет Future, так что все работает как раньше. Но более продвинутые пользователи вашего API по-настоящему оценят расширенную функциональность, предоставляемую с помощью CompletableFuture.
10. Синхронные очереди
Каждая добавляемая операция должна ожидать соответствующей операции удаления в другом потоке, и наоборот. Синхронная очередь не имеет никакой внутренней емкости, даже единичной. Вы не можете заглянуть в синхронную очередь, потому что элемент представлен только при попытке его удаления; вы не можете вставить элемент (используя любой метод), пока другой поток не удалит его: вы не можете обойти очередь потому что обходить нечего.
Синхронные очереди похожи на «rendezvous channels», используемые в CSP и Ada.
Как все это относится к пулам потоков? Попробуем использовать SynchronousQueue вместе с ThreadPoolExecutor:
Мы создали пул потоков с двумя потоками и SynchronousQueue перед этим. По сути SynchronousQueue — очереди с емкостью 0, поэтому такие ExecutorService будут только принимать новые задачи, если доступен простаивающий поток. Если все потоки заняты, новая задача будет немедленно отклонена и никогда не будет ждать очереди. Такой режим может быть полезен для незамедлительной обработки в фоновом режиме, если это возможно.
Вот и все, надеюсь, вы открыли для себя как минимум одну интересную фичу!