Поставка данных из очереди Apache Kafka® в OpenSearch
В кластер Managed Service for OpenSearch можно в реальном времени поставлять данные из топиков Apache Kafka®.
Чтобы запустить поставку данных:
- Подготовьте тестовые данные.
- Настройте кластер-приемник.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
-
Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
-
Плата за каждый трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).
-
Плата за кластер Managed Service for OpenSearch: использование вычислительных ресурсов, выделенных хостам (в том числе хостам с ролью
MANAGER), и дискового пространства (см. тарифы Managed Service for OpenSearch). -
Плата за использование публичных IP-адресов:
- для хостов кластера Managed Service for OpenSearch;
- для хостов кластера Managed Service for Apache Kafka®, если для них включен публичный доступ.
Перед началом работы
-
Подготовьте инфраструктуру поставки данных:
ВручнуюTerraform-
Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.
-
Создайте в кластере-источнике топик с именем
sensors. -
Создайте в кластере-источнике пользователя с именем
mkf-userи правами доступаACCESS_ROLE_PRODUCERиACCESS_ROLE_CONSUMERк созданному топику. -
Создайте кластер-приемник Managed Service for OpenSearch любой подходящей конфигурации со следующими настройками:
- В той же зоне доступности, что и кластер-источник.
- С публичным доступом к хостам с ролью
DATA.
-
Для подключения к кластерам с локальной машины пользователя, настройте группы безопасности:
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-mos.tf
.В этом файле описаны:
- сеть;
- подсеть;
- группа безопасности и правила, необходимые для подключения к кластерам Managed Service for Apache Kafka® и Managed Service for OpenSearch;
- кластер-источник Managed Service for Apache Kafka®;
- топик Apache Kafka® с именем
sensors; - пользователь Apache Kafka®
mkf-userс правами доступаACCESS_ROLE_PRODUCER,ACCESS_ROLE_CONSUMERк топикуsensors; - кластер-приемник Managed Service for OpenSearch;
- трансфер.
-
Укажите в файле
data-transfer-mkf-mos.tfпеременные:kf_version— версия Apache Kafka® в кластере-источнике;kf_user_password— пароль пользователяmkf-user;os_version— версия OpenSearch в кластере-приемнике;os_user_password— пароль пользователяadmin;transfer_enabled— значение0, чтобы не создавать трансфер до создания эндпоинтов вручную.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validateЕсли в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform planЕсли конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply -
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
-
-
Установите утилиты:
-
kafkacat
— для чтения и записи данных в топики Apache Kafka®.sudo apt update && sudo apt install --yes kafkacatУбедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.
-
jq
— для потоковой обработки JSON-файлов.sudo apt update && sudo apt-get install --yes jq
-
Подготовьте тестовые данные
Пусть в качестве сообщения в топик Apache Kafka® sensors кластера-источника поступают данные от сенсоров автомобиля в формате JSON.
Создайте локально файл sample.json с тестовыми данными:
sample.json
{
"device_id": "iv9a94th6rzt********",
"datetime": "2020-06-05 17:27:00",
"latitude": 55.70329032,
"longitude": 37.65472196,
"altitude": 427.5,
"speed": 0,
"battery_voltage": 23.5,
"cabin_temperature": 17,
"fuel_level": null
}
{
"device_id": "rhibbh3y08qm********",
"datetime": "2020-06-06 09:49:54",
"latitude": 55.71294467,
"longitude": 37.66542005,
"altitude": 429.13,
"speed": 55.5,
"battery_voltage": null,
"cabin_temperature": 18,
"fuel_level": 32
}
{
"device_id": "iv9a94th6rzt********",
"datetime": "2020-06-07 15:00:10",
"latitude": 55.70985913,
"longitude": 37.62141918,
"altitude": 417.0,
"speed": 15.7,
"battery_voltage": 10.3,
"cabin_temperature": 17,
"fuel_level": null
}
Настройте кластер-приемник
Совет
Вы можете поставлять данные в кластер Managed Service for OpenSearch от имени пользователя admin, имеющего роль superuser, но безопаснее для каждой задачи создавать отдельных пользователей с ограниченными привилегиями. Подробнее см. в разделе Управление пользователями OpenSearch.
-
Создайте роль
с привилегиямиcreate_indexиwriteдля всех индексов (*). -
Создайте пользователя и назначьте ему эту роль.
Подготовьте и активируйте трансфер
-
Создайте эндпоинт для источника
Apache Kafka®:Параметры эндпоинта:
-
Настройки подключения:
-
Тип подключения —
Кластер Managed Service for Apache Kafka.-
Кластер Managed Service for Apache Kafka — выберите кластер-источник из списка.
-
Аутентификация — SASL.
- Имя пользователя —
mkf-user. - Пароль — укажите пароль пользователя.
- Имя пользователя —
-
-
Полное имя топика —
sensors.
-
-
Расширенные настройки → Правила конвертации:
- Правила конвертации —
json.-
Схема данных —
JSON-спецификация.Вставьте схему данных в формате JSON:
json
[ { "name": "device_id", "type": "utf8", "key": true }, { "name": "datetime", "type": "utf8" }, { "name": "latitude", "type": "double" }, { "name": "longitude", "type": "double" }, { "name": "altitude", "type": "double" }, { "name": "speed", "type": "double" }, { "name": "battery_voltage", "type": "double" }, { "name": "cabin_temperature", "type": "uint16" }, { "name": "fuel_level", "type": "uint16" } ]
-
- Правила конвертации —
-
-
Создайте эндпоинт для приемника
OpenSearch:Параметры эндпоинта → Настройки подключения:
-
Тип подключения —
Кластер Managed Service for OpenSearch.- Кластер Managed Service for OpenSearch — выберите кластер-приемник из списка.
-
Пользователь — укажите имя пользователя.
-
Пароль — укажите пароль пользователя.
-
-
Создайте трансфер:
ВручнуюTerraform- Создайте трансфер типа Репликация, использующий созданные эндпоинты.
- Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
-
Укажите в файле
data-transfer-mkf-mos.tfпеременные:source_endpoint_id— идентификатор эндпоинта для источника;target_endpoint_id— идентификатор эндпоинта для приемника;transfer_enabled— значение1для создания трансфера.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validateЕсли в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform planЕсли конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply -
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
-
Трансфер активируется автоматически. Дождитесь его перехода в статус Реплицируется.
Проверьте работоспособность трансфера
Убедитесь, что в кластер Managed Service for OpenSearch переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:
-
Отправьте данные из файла
sample.jsonв топикsensorsManaged Service for Apache Kafka® с помощью утилитjqиkafkacat:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t sensors \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="mkf-user" \ -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -ZПодробнее о настройке SSL-сертификата и работе с
kafkacatсм. в разделе Подключение к кластеру Apache Kafka® из приложений. -
Проверьте, что индекс
sensorsкластера Managed Service for OpenSearch содержит отправленные данные:BashOpenSearch DashboardsВыполните команду:
curl \ --user <имя_пользователя_в_кластере-приемнике>:<пароль_пользователя_в_кластере-приемнике> \ --cacert ~/.opensearch/root.crt \ --header 'Content-Type: application/json' \ --request GET 'https://<идентификатор_хоста_OpenSearch_с_ролью_DATA>.rw.mdb.yandexcloud.net:9200/sensors/_search?pretty'- Подключитесь к кластеру-приемнику с помощью OpenSearch Dashboards.
- Выберите общий тенант
Global. - Откройте панель управления, нажав на значок
. - В разделе OpenSearch Dashboards выберите Discover.
- В поле CHANGE INDEX PATTERN выберите индекс
sensors.
Удалите созданные ресурсы
Примечание
Перед тем как удалить созданные ресурсы, деактивируйте трансфер.
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите трансфер.
- Удалите эндпоинты для источника и приемника.
Остальные ресурсы удалите в зависимости от способа их создания:
-
В терминале перейдите в директорию с планом инфраструктуры.
Важно
Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.
-
Удалите ресурсы:
-
Выполните команду:
terraform destroy -
Подтвердите удаление ресурсов и дождитесь завершения операции.
Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.
-