На околоземную орбиту Земли с целью дистанционного зондирования ее поверхности (ДЗЗ) сегодня запускаются различные спутники (Terra, Aqua, Suomi NPP и др.) с сенсорами, передающими на станции слежения огромные потоки данных, что, помимо проблем с их хранением, делает весьма актуальной задачу их обработки в реальном времени. Однако скорость последовательного выполнения всех алгоритмов меньше скорости поступления сырых данных со спутника, поэтому требуется распараллеливание процесса обработки потока получаемых данных.

В общем случае процесс обработки данных со спутника можно рассматривать в виде конвейера — технология подразумевает разбиение передаваемых данных по времени, по типу сенсора и диапазону. На вход системы обработки подается последовательность (очередь) файлов в заданном формате, упорядоченная по времени и типу полученных данных, а задача обработки состоит в выборке из этой последовательности отдельных файлов, запуске задач их обработки на отдельных вычислительных узлах, интеграции получаемых промежуточных результатов в общий продукт (например, карту) и их «сервировке» для дальнейшего использования. В дополнение к этому требуется контролировать всю цепочку выполнения процесса обработки для своевременного обнаружения ошибок, повторного запуска задач, а также их равномерного распределения по вычислительным ресурсам.

Десятки разнообразных алгоритмов, их версий, множество стадий обработки данных, а также необходимость обслуживания на одной вычислительной системе нескольких научных групп с разными требованиями к результатам обработки приводят к необходимости виртуализации вычислительных узлов, с тем чтобы один и тот же узел мог перенастраиваться на новые задачи без внесения изменений в конфигурацию его физического оборудования. Кроме того, при обработке больших объемов данных требуется обеспечить их промежуточное хранение и разбиение по узлам.

Для обработки данных дистанционного зондирования Земли необходима система, позволяющая производить обработку потенциально бесконечных потоков данных по мере их поступления в режиме реального времени с возможностью добавления новых задач и коррекции существующих настроек без изменения конфигурации оборудования. Такая система может быть использована для анализа данных, получаемых с мультиспектрального сенсора VIIRS, установленного на спутнике Suomi NPP. В этой задаче поток данных со спутника имеет объем в несколькj терабайтов в сутки, данные поступают в виде «гранул» — файлов, содержащих результаты съемки за пять минут и имеющих размер в несколько десятков гигабайтов. При этом решение только одной подзадачи коррекции данных по рельефу занимает для одной гранулы около десяти минут, что вдвое больше времени, необходимого для получения и регистрации файла гранулы. Поскольку в цепи обработки имеются и другие подзадачи, происходит переполнение очереди задач, что приводит к невозможности их выполнения в режиме реального времени.

Технологии распределенного хранения и обработки больших объемов данных

Для работы с данным классом задач существуют программные системы, реализующие подход MapReduce (например, Apache Hadoop), конвейерную обработку очередей заданий (например, HTCondor [1–2], TORQUE), использование виртуальных вычислительных узлов (например, OpenStack) и кластерной файловой системы (например, Ceph). Существуют решения, объединяющие вместе эти технологии и предназначенные для обработки конечного объема данных за конечное время, что ограничивает возможность их применения для обработки потенциально бесконечных потоков данных, поступающих в режиме реального времени [3–4]. Вместе с тем известны примеры специального применения перечисленных технологий для анализа данных ДЗЗ. В проекте MODISAzure [3] данные, получаемые с сенсора MODIS, установленного на спутниках Terra и Aqua, обрабатываются на облачной платформе Microsoft Windows Azure для перепроекции и изменения разрешения изображений, получения производных индексов и их визуализации в виде графиков, таблиц и карт. Данные загружаются из каталогов ЦОД и обрабатываются по фиксированной схеме (рис. 1, 2).

Рис.1. Схема обработки данных MODIS
Рис.1. Схема обработки данных MODIS

 

Рис. 2. Архитектура системы MODISAzure
Рис. 2. Архитектура системы MODISAzure

 

Вместе с тем проект MODISAzure не универсален: его нельзя применять даже в ряде областей обработки данных ДЗЗ — например, можно решать только ограниченное число конкретных задач. Кроме того, MODISAzure предназначен для действий с заранее известным объемом данных, поэтому заведомо не накладывает требований на продолжительность и последовательность обработки данных.

В проекте Google Earth Engine используется облачная платформа Google Cloud, позволившая обработать 20 Тбайт данных MODIS, накопленных за последние 30 лет, для формирования результата в виде временной последовательности спутниковых изображений высокого разрешения, покрывающих всю Землю. Обработка всех данных потребовала 260 тыс. часов процессорного времени на Google Cloud. Для визуализации в проекте используется программное обеспечение GigaPan Time Machine, позволяющее строить и визуализировать многомасштабные видеопотоки, разбитые на видеотайлы, аналогично пирамидам изображений, используемым для работы с изображениями в высоком разрешении.

В отличие от MODISAzure в Google Earth Engine можно задать конфигурацию для различных задач обработки, однако и здесь нет функционала и механизмов поддержки процесса обработки бесконечного потока в режиме реального времени. Еще одним недостатком Google Earth Engine является жесткая привязка к облачной платформе Google Cloud и интерфейсу Google Earth.

В НАСА создана и используется для различных научных задач (в том числе и для обработки данных ДЗЗ) своя облачная платформа Nebula, реализованная на стеке OpenStack, представляющем собой набор сервисов IaaS, но без средств конвейерной обработки очереди задач. В то же время в реальной практике имеется масса задач обработки данных ДЗЗ, которые хорошо решаются именно конвейерно-параллельным методом. Среди них задачи построения глобальной карты путем «склейки» отдельных гранул, полученных со спутника за день, или усреднения полученных данных за месяц для приложений анализа растительности, облачности, ночных огней, аэрозолей и т. п.

Параллельно-конвейерная обработка

Для решения задачи обработки данных ДЗЗ предложена параллельно-конвейерная система в облаке, включающая модель MapReduce, очереди заданий, виртуальные вычислительные узлы и кластерную файловую систему.

Система ориентирована на обработку файлов — ключевую роль в рабочем потоке играют первичные данные ДЗЗ и результаты следующих этапов обработки, причем режим обработчиков (активность или ожидание) определяется наличием или отсутствием всех нужных файлов, а также состоянием таймеров или наличием запросов пользователя. Файлам приписываются атрибуты (например, имя сенсора, тип данных и т.п.), которые могут быть получены из имени файла или из метаданных его формата данных — например, заголовков NetCDF или HDF5 (форматы хранения климатических данных). Обработчики файлов задаются в командной строке исполнения либо определяются требуемыми ресурсами или приоритетом, но главное — зависят от описания входных и выходных файлов, условий их соответствия заданному конвейеру и взаимосвязей на уровнях файлов и других обработчиков. Каждое имя, условие или ограничение может быть представлено в виде шаблона и формализовано. Шаблоны позволяют формально представить группу задач, объединенных входными и выходными данными, различными ограничениями на данные и процесс исполнения, и служат для реализации обработки потока входных файлов. Конвейеры регистрируются в системе на основе шаблонов их описания, создают необходимое число обработчиков и начинают работу. При удовлетворении условий на входные файлы и с учетом взаимосвязей подзадач, обработчиками конвейеров создаются конкретные экземпляры задач, которые затем планировщиком распределяются по вычислительному кластеру.

В системе имеются средства динамической адаптации к нагрузкам и оптимизации путем запуска или останова виртуальных машин из библиотеки образов с предустановленным ПО, что позволяет обрабатывать данные в пределах квот пользователя или проекта в облачной инфраструктуре. Обеспечение отказоустойчивости системы гарантируется за счет фиксации состояний — наличие точек восстановления, очередей обмена сообщений и задач, а также распределенной файловой системы позволяет повысить надежность системы.

Система построена на базе языка Python 2.7, распределенной файловой системы CEPH, системы низкоуровневой пакетной обработки задач на распределенных кластерах SLURM, СУБД MySQL для регистрации объектов и субъектов очереди (шаблонов, файлов, управляющих структур и т. п.) (с некоторыми исправлениями возможна поддержка PostgreSQL), системы обмена сообщениями RabbitMQ и ПО из OpenStack для реализации инфраструктуры класса IaaS. На рис. 3 приведена общая структура управления рабочим потоком.

Рис. 3. Схема управления рабочим потоком системы параллельно-конвейерной обработки данных ДЗЗ в облаке
Рис. 3. Схема управления рабочим потоком системы параллельно-конвейерной обработки данных ДЗЗ в облаке

 

Файлы из удаленного хранилища спутниковых данных предварительно загружаются в буферную систему хранения, которая затем может использоваться как архив входных и обработанных данных. Модуль загрузки может включать подмодули, работающие с различными протоколами передачи данных (HTTP, FTP, scp, gridftp ) и удаленными системами хранения (NFS, Lustre, CEPH, OpenStack Swift, каталоги грид). Обработка происходит либо по мере поступления данных, либо по таймеру или пользовательскому запросу. Поступающие от модуля данные регистрируются в СУБД, где хранится информация о выходных файлах работы заданий, структурах, соответствующих шаблонам конвейеров, квотах, ресурсах, состояниях выполнения задач, загрузках очередей и т. п. Затем модуль, реализующий обработку конвейеров, на основе шаблонов создает объекты конвейеров, которые, выделяя необходимые файлы из входного потока, по заданным условиям инициализации формируют элементы заданий и подзаданий. Эти элементы  передаются  средствами масштабируемой системы планирования заданий и управления ресурсами кластеру SLURM, осуществляющему их распределение и выполнение при определенных условиях (наличие конкретного ПО, доступной оперативной памяти, ядер процессора и свободного места на дисках).

Возможно формирование нескольких очередей с разными приоритетами выполнения, разными пользователями и ресурсами. Очереди SLURM и других систем управления ресурсами для реализации управляющих элементов системы не применяются — система должна функционировать и при полном отсутствии ресурсов на вычислительном кластере. Например, для системы Condor модуль PVM осуществляет запуск виртуальных машин посредством специализированной очереди, при этом не предоставляется интерфейсов надстройки для учета и динамического масштабирования ресурсов, а функциональность запуска элементарна и не допускает расширения. Система SLURM, напротив, позволяет создавать и использовать резервный контролирующий сервер, однако одного только устойчивого к падениям сохранения информации о задачах и их состоянии недостаточно для функционирования надстроенной системы управления в том случае, когда становится невозможным исполнение этих задач. Поэтому информационный модуль хранит данные о загруженности очереди заданий, количестве свободной оперативной памяти и процессорных ядер, сведения о вычислительных узлах, распределении квот по различным конвейерам, запущенным и используемым виртуальным машинам, их суммарным ресурсам и ограничениям квот облачного провайдера. Информационный модуль может расширяться подмодулями, реализующими паттерны поведения или дополнительную функциональность — например, обеспечение приоритетов для поддержки справедливого использования ресурсов задачами различных конвейеров или подключение экономических моделей учета стоимости расширения ресурсов при привлечении платных облачных провайдеров.

Модуль динамических ресурсов и виртуальных машин на основе паттернов поведения и анализа состояния системы принимает решения об изменении состава кластера — путем запуска или остановки виртуальных машин с задаваемыми характеристиками. В базовой модели поведения имеется библиотека образов виртуальных машин с предустановленным ПО (например, Matlab Compiler Runtime, JVM). Кроме того, машины сконфигурированы таким образом, чтобы при запуске они автоматически регистрировались в инфраструктуре вычислительного кластера, управляемого модулем SLURM, и с этого момента были бы готовы принимать и обрабатывать задания. Простейшим способом изменения доступных ресурсов является увеличение пула виртуальных машин при превышении некоторого порога загрузки очереди с возможным учетом требований задач и агрегированием запуска нескольких задач на одной более мощной машине либо выделением нескольких небольших виртуальных машин по схеме «одна задача — одна машина». Соответственно, уменьшение пула производится либо путем останова машин, не выполняющих в данный момент задач, при снижении загруженности очереди вплоть до достижения минимального количества машин, заданного в квотах, либо же отключением всех машин.

Для реализации системы параллельно-конвейерной обработки данных ДЗЗ в облаке был создан конвейер, позволяющий выбирать данные из центрального каталога спутниковых данных NOAA (National Oceanic and Atmospheric Administration, США) и помещать их в буферную зону, из которой они загружаются для дальнейшей обработки. Поддерживаются такие типы данных, как наблюдение Земли в разных спектрах и данные геопривязки. Данные из промежуточного буфера центрального источника загружаются по нескольким диапазонам в облачную систему и распределяются между узлами подзадач (шаг «map» в MapReduce). При этом происходит независимая идентификация в каждом диапазоне инфракрасных источников свечения. Шаг reduce состоит в обработке промежуточных результатов после соотнесения информации о найденных источниках в разных диапазонах, а также после оценки по соотношению энергии таких характеристик, как температура, площадь и т. п. Объединение результатов происходит для нескольких гранул с построением общей карты ИК-источников, например, за день. Многодневные наблюдения повторяющихся событий интегрируются в базе данных (создается аналог звездного каталога в астрономии) с уточнением координат и дополнительных параметров, таких как температура горения, площадь источника и др.

***

Сегодня весьма актуальна задача организации автоматического процесса многоэтапной распределенной обработки потенциально бесконечного потока данных большого объема, в котором каждый этап обработки производится уже существующим программным пакетом, реализованным на своем языке программирования. Имеющиеся менеджеры ресурсов не решают проблемы отказоустойчивости и гарантированности обработки всех входных данных, что особенно проявляется в случае нерегулярного поступления данных по ненадежному каналу и потенциальной бесконечности этого потока. В качестве примера можно привести поток данных со спутника, поступающих после предварительной обработки из центра данных по подписке через FTP-протокол. Реализация процесса обработки такого потока с использованием существующих менеджеров ресурсов требует ручной настройки схемы обработки, выделения и конфигурации хранилища данных достаточного объема для промежуточного хранения обрабатываемых данных. Необходимо также написание собственных скриптов и программных модулей для мониторинга и разбиения пространства входных данных между обработчиками, мониторинга ошибок, восстановления системы и перезапуска этапов обработки при возникновении ошибок и т. п.

На платформе OpenStack, развернутой на вычислительном кластере национального исследовательского центра «Курчатовский институт», реализована среда для детектирования инфракрасных источников свечения на ночных снимках поверхности Земли. Результатом применения разработанного подхода стали следующие конечные продукты: общие панорамные снимки, полученные путем совмещения отдельных гранул по координатам с последующей калибровкой и переводом в различные проекции; карты производных данных — например, толщины облачного покрова, плотности аэрозолей, типов растительности и т. п.

Литература

  1. Douglas Thain, Todd Tannenbaum, Miron Livny. Distributed Computing in Practice: The Condor Experience // Concurrency and Computation: Practice and Experience. — 2005, — Vol. 17, No. 2-4, — P. 323-356.
  2. Parag Mhashilkar, Zachary Miller, Rajkumar Kettimuthu, Gabriele Garzoglio, Burt Holzman, Cathrin Weiss, Xi Duan, Lukasz Lacinski, End-To-End Solution for Integrated Workload and Data Management using GlideinWMS and Globus Online. Journal of Physics: Conference Series, Vol. 396, Issue 3, 2012.
  3. Jie Li, Marty Humphrey, Deborah A. Agarwal, Keith R. Jackson, Catharine van Ingen, Youngryel Ryu: eScience in the cloud: A MODIS satellite data reprojection and reduction pipeline in the Windows Azure platform. IPDPS 2010: 1-10
  4. Qi Xing and Estela Blaisten-Barojas. A cloud computing system in windows azure platform for data analysis of crystalline materials. Concurrency and computation: practice and experience. 2012. Published online in Wiley Online Library. DOI: 10.1002/cpe.2912.

Алексей Пойда (Poyda_AA@nrcki.ru), Андрей Поляков (andrew@kiae.ru), Александр Новиков (novikov@wdcb.ru) — сотрудники НИЦ «Курчатовский институт» (Москва).