Эволюция экосистемы Hadoop, взявшей курс на роль фактического стандарта в мире распределенной обработки, во многом повторяет путь, пройденный в 1950–1970-е годы программной индустрией, когда от пакетного режима она двинулась к интерактивным системам и далее к многозадачным системам с разделением времени. В ядре Hadoop 2.0 функции по управлению ресурсами и планированию заданий вынесены из модуля MapReduce в модуль YARN, который на высоком уровне абстракции и с подобающей гибкостью управляет кластером как мультиарендной системой, несущей одновременно разнопрофильные и разнокалиберные нагрузки. YARN часто называют кластерной операционной системой: как и ОС компьютера, этот модуль отвечает за взаимодействие с вычислительными ресурсами и предоставляет их по запросу другим программам с учетом текущих возможностей, приоритетов, расписаний и ограничений. Проекты Spark и Storm на полную мощность используют возможности YARN, привнося в мир Hadoop динамизм, ранее труднодостижимый в простой пакетной парадигме, а проект Tez призван стать мостом между YARN и разработчиками интерактивных и пакетных решений, которые получат доступ к распределенной обработке средствами как новых, так и уже созданных в рамках «старого» Hadoop приложений [1].

Проект Spark был инициирован в Калифорнийском университете в Беркли как замена MapReduce в распределенной обработке данных для кластера Hadoop. MapReduce обладает двумя серьезными недостатками. Первый состоит в том, что он работает только с данными постоянного хранения, на жестких дисках, даже если это промежуточные данные. Второй недостаток актуален для итерационных вычислений и интерактивного анализа данных — задания в MapReduce работают в один проход, и для повторной обработки надо запускать новое задание со всеми вытекающими отсюда накладными расходами. Spark устраняет эти недостатки и для соответствующего класса задач значительно повышает скорость обработки — до двух порядков по сравнению с MapReduce [2], если данные находятся в оперативной памяти, и до 10 раз, если они хранятся на жестком диске, благодаря сокращению операций чтения/записи. Кроме того, ускорение достигается за счет хранения информации о каждом операторе в оперативной памяти, что дает и еще одно важное преимущество — все процессы, связанные с данными (в том числе обработка потоков и пакетов информации и машинное обучение), происходят на одном и том же кластере, в одном и том же приложении. Все это, естественно, обеспечивается без потери способности фреймворка к горизонтальной масштабируемости и с сохранением «бесшовной» архитектуры приложения.

Spark состоит из пяти модулей (рис. 1), работающих в пределах одного кластера данных и одновременно распределенных горизонтально по всему кластеру Hadoop. Модуль SparkSQL обеспечивает интеграцию SQL-запросов со всеми элементами фреймворка. Благодаря представлению информации в виде так называемых упругих распределенных наборов данных (Resilient Distributed Dataset, RDD) удается обеспечить быстрый и простой доступ к данным через прикладные интерфейсы языков Python, Scala и Java. Кроме того, RDD позволяет запрашивать данные и одновременно запускать сложные алгоритмы их анализа. Модуль SparkStreaming предоставляет пользователю возможность писать и запускать приложения на Scala и Java в потоковом режиме. При этом приложение без существенных изменений в коде может работать и с потоками данных, и осуществлять пакетную обработку. Имеется также функция автоматического восстановления данных после сбоев.

Рис. 1. Модули Spark
Рис. 1. Модули Spark

 

Следующие два модуля предназначены для решения конкретных задач. Библиотека машинного обучения MLLib обладает отличной интегрируемостью — ее можно подключать к прикладным интерфейсам, написанным на Java, Python и Scala. Еще два ключевых момента заключаются в скорости тестирования и обучения (в 100 раз быстрее по сравнению с MapReduce) и простом развертывании — библиотека инсталлируется на уже имеющемся кластере Hadoop и работает с уже имеющимися данными.

Последний модуль — прикладной интерфейс GraphX для работы с графами и графо-параллельных вычислений. Модуль обладает такими важными преимуществами, как гибкость («бесшовность» работы как с графами, так и с коллекциями данных) и высокая скорость работы.

Итак, основными преимуществами Saprk являются: высокая скорость работы на всех уровнях, начиная от SQL-запросов и кончая вычислениями на графах и машинным обучением; многофункциональность; сохранение сильных сторон MapReduce (горизонтальное масштабирование, отказоустойчивость) и полная совместимость с экосистемой Hadoop.

Проект Storm инициирован с целью разработки технологии для агрегатора BackType социальных медиа, созданием которого занимается одноименный стартап, финансируемый фондом Y Combinator. Руководитель команды разработки Storm Натан Марц достаточно хорошо известен в мире технологий Больших Данных и функционального программирования в качестве создателя Cascalog — реализации логического языка запросов Datalog к данным в Hadoop, написанной на Clojure (вариант Лиспа для виртуальных машин Java). В 2010–2011-е годы проект развивался по модели с закрытыми исходными кодами, но уже тогда интерес к нему был разогрет предложенной Марцем концепцией «λ-архитектуры», согласно которой большая система обработки входящих потоков данных должна быть устроена трехслойно: данные попадают одновременно в слои пакетной и быстрой обработки, комбинируясь в отдельном слое раздачи (рис. 2).

 

Рис. 2. ?-архитектура в экосистеме Hadoop: поток новых данных принимается как в пакетный слой для обстоятельной обработки, так и в «быстрый» слой для оперативных задач, а в слое раздачи формируется комбинированный агрегат, решающий задачи аналитического уровня
Рис. 2. λ-архитектура в экосистеме Hadoop: поток новых данных принимается как в пакетный слой для обстоятельной обработки, так и в «быстрый» слой для оперативных задач, а в слое раздачи формируется комбинированный агрегат, решающий задачи аналитического уровня

 

Трудно сказать, где в этой концепции греческая буква «λ», — явных пояснений автор на этот счет не дал. Скорее всего, будучи до мозга костей функциональным программистом, таким образом Марц прославил λ-исчисление, являющееся его теоретической основой, а потом уже обязательно как-нибудь свяжет λ-абстракцию с каким-либо важным свойством архитектуры. Возможно, это будет мысль о том, что данные в этой архитектуре рассматриваются как универсум, а запрос над ними должен олицетворять собой чистую (свободную от побочных эффектов) функцию над всем универсумом.

Идеологическую поддержку Storm как одной из ключевых технологий для реализации λ-архитектуры Марц усилил книгой с громким названием «Большие Данные. Принципы и лучшие практики масштабируемых систем работы с данными в реальном времени» (Big Data Principles and best practices of scalable realtime data systems), которая стала знаменитой еще задолго до подписания в печать, — по состоянию на конец 2014 года она еще не дописана и читателям доступны только ее отдельные главы в электронном виде. Вероятно, именно Storm, а не агрегатор социальных медиа стал решающим активом стартапа BackType, когда компания Twitter принимала решение о его поглощении в 2011 году. Для Storm итогом этого поглощения стали открытие исходных кодов и передача проекта в инкубатор фонда Apache, где за год с небольшим он стал проектом верхнего уровня, заняв видное и устойчивое место в экосистеме Hadoop.

Рис. 3. Пример топологии в Storm с метафорой «труба — задвижка»
Рис. 3. Пример топологии в Storm с метафорой «труба — задвижка»

Что же такого особенного в Storm? Прежде всего это специфичный для мира Больших Данных подход к обработке — если верхним уровнем при обработке данных в кластере MapReduce является пакетное задание (job), то в Storm его роль выполняет «топология», некоторая конструкция обработки входящего потока. Задания запускаются и по мере выполнения завершаются, а «топология», будучи единожды созданной, функционирует постоянно, пока ее не остановит системный администратор. Иначе говоря, топология в Storm — это программно реализованная схема преобразования потока, непрерывного и потенциально бесконечного набора кортежей. По аналогии с потоком в системе водоснабжения разработчики Storm назвали два вида элементов топологии «труб» (spout) и «задвижек» (bolt) (рис. 3). Для создания топологии разработчик должен реализовать методы, заданные в интерфейсах «труб» и «задвижек», отражающие специфические для конкретной задачи атомарные преобразования, которые формируют целостный непрерывный процесс преобразования входящего потока сырых данных.

Кластер в Storm, как и в MapReduce, состоит из главного узла, отслеживающего топологию, и рабочих узлов, непосредственно занимающихся обработкой потока. Демон главного узла — Nimbus — распределяет код обработки по рабочим узлам кластера и занимается отслеживанием их работы. На рабочих узлах функционирует демон Supervisor, запускающий или приостанавливающий задания по командам от Nimbus. Координация между главным узлом и рабочими налажена через Apache Zookeeper, и, вообще, все в Storm изначально было направлено на обеспечение максимальной отказоустойчивости и стабильности.

Storm часто сравнивают со Spark, что в каком-то смысле правомерно — оба проекта являются альтернативами пакетному MapReduce, вносящими в экосистему Hadoop эффективные средства оперативной обработки. Заметна и культурная общность проектов: оба реализованы на модных языках программирования (Storm — на Clojure, а Spark — на функционально-объектном языке Scala, претендующем ни много ни мало на вытеснение Java), оба проекта достаточно быстро прошли «инкубацию» в системе проектов фонда Apache и за год вышли на верхний ее уровень, снискав значительную популярность. Иногда доходит даже до «религиозных войн» между апологетами того или иного проекта, в ходе которых сравнивают производительность (каждая сторона, естественно, в свою пользу) и утверждают единственную правильность выбранного языка и архитектуры. Однако предназначение у проектов разное — Storm представляет собой классическую распределенную систему обработки сложных событий (Complex Event Processing, CEP), выполненную в экосистеме Hadoop, а Spark — это интерактивно-аналитическая система, предназначенная для задач, использующих многопроходные обработки (машинное обучение, граф-анализ, глубинный анализ данных). Обработка входящих потоков событий в Spark Streaming — это, скорее, побочный эффект, равно как и побочным эффектом можно считать возможность в Spark решать «на лету» некоторые аналитические задачи.

Рис. 4. Место Tez в экосистеме Hadoop 2.0
Рис. 4. Место Tez в экосистеме Hadoop 2.0

Так же как и для создания программ, эффективно использующих возможности ОС, в случае с YARN, воспринимаемой как кластерная операционная система, нужны средства разработки и отладки, библиотеки и фреймворки с достаточно высоким уровнем абстракции. Иначе говоря, нужна среда, позволяющая реализовывать новые парадигмы распределенной обработки, заботясь в первую очередь об алгоритмической части проблематики и не отвлекаясь на низкоуровневые вопросы управления ресурсами. Проект Apache Tez создавался с целью разработки такой среды (рис. 4) и был инициирован в недрах компании Hortonworks. Tez предоставляет разработчикам основные примитивы для создания готовых распределенных приложений и, что особенно важно, механизмы распределенной обработки, предназначенные для проблемно-ориентированных приложений. Идеологическое ядро Tez — это представление стратегий вычислений в виде направленных ациклических графов. Получив задание, сформулированное на языке таких стратегий, Tez может осуществить манипуляции над графами и, обладая доступом к сведениям о ресурсах, передать в YARN задачи таким образом, чтобы решение было максимально эффективным, заранее рассчитывая нужные последовательности выполнения и определяя необходимые уровни параллелизма. Таким образом, миссия создателя нового «движка» распределенной обработки сводится к переводу проблемно-специфичного языка для своего класса задач на более универсальный и стандартный язык примитивов и стратегий Tez. При этом у разработчика, использующего Tez, сохраняется пространство для маневра — узлы направленных ациклических графов, являющиеся входами, выходами и обработчиками, представляют собой Java-классы, лишь реализующие соответствующие интерфейсы, заданные фреймворком. Такой подход роднит Tez со Storm, в котором разработчик создает топологию, реализуя на Java или Clojure интерфейсы «труб» и «задвижек». Но есть и существенное различие — топология Storm после развертывания работает сразу и непрерывно, с бесконечным входящим потоком, а стратегии выполнения Tez отправляются на выполнение по требованию и действуют над определенными наборами данных.

Одно из ключевых инженерных решений, используемых для повышения скорости выполнения заданий в Tez, — повторное использование контейнеров во время выполнения. Для каждого узла графа, представляющего вычислительную стратегию, Tez заранее определяет необходимые параметры контейнера (процессорные ресурсы, ресурсы памяти, окружение, опции командной строки) и, когда контейнеров в кластере еще нет, запрашивает у YARN их создание. В боевом кластере нередко уже существующие контейнеры пригодны для исполнения нового задания, и тогда Tez помечает их для повторного использования, чтобы они не высвобождали ресурсы и не отвлекались на повторную инициализацию, а, дожидаясь новой работы, занимались освобождением ресурсов и повторной инициализацией. Кроме того, часто бывает так, что большинство узлов кластера уже закончили выполнение очередного задания, и, чтобы они не простаивали в ожидании нескольких отстающих, Tez может перепоручить им часть оставшегося задания, если это возможно, либо загрузить их новым.

Еще одна ценная возможность Tez — мощные средства отладки, позволяющие локально «прогонять» стратегии вычислений в одном экземпляре виртуальной машины Java, без запуска на кластере. В этой же связи полезны и встроенные в Tez средства диагностики, позволяющие постфактум изучить статистику выполнения заданий и найти «бутылочные горла» с точки зрения производительности.

Tez уже прочно занял роль среды создания «движков» распределенной обработки над кластерной операционной системой YARN, и лучшее подтверждение этому — переход на использование Tez исторически первых интерактивных Hadoop-средств Hive и Pig. Но самое важное то, что появилось средство, признанное широкими массами разработчиков компонентов экосистемы Hadoop в качестве стандартного фреймворка для взаимодействия с YARN, избавляющего их от поиска в каждом случае уникальных низкоуровневых решений. В конечном счете все это должно внести изрядную долю стабильности и определенности во все усложняющуюся экосистему Hadoop.

***

Зародившись как набор библиотек вокруг пакетной среды MapReduce, Hadoop превратился в развитую экосистему Hadoop 2.0 для работы уже не только с пакетными нагрузками, что стало возможно благодаря проектам Spark, Storm и Tez, которые принесли в мир Hadoop динамизм, ранее труднодостижимый в простой пакетной парадигме.

Литература

  1. Vijay Agneeswaran. Big Data Analytics Beyond Hadoop: Real-Time Applications with Storm, Spark and More Hadoop Alternatives. — N. J.: Pearson FT Press, 2014, ISBN 9780133837940
  2. Леонид Черняк. Альтернативы MapReduce для реального времени // Открытые системы. СУБД. — 2014. — № 5. — С. 12–13. URL: http://www.osp.ru/os/2014/05/13041818 (дата обращения: 15.12.2014).

Дмитрий Волков (vlk@keldysh.ru) — сотрудник ИПМ им. М. В. Келдыша РАН; Андрей Николаенко (anikolaenko@acm.org) — системный архитектор, компания IBS. Статья подготовлена на основе материалов серии семинаров «Hadoop на практике. Новые инструменты и проекты».