Работа с топиками Apache Kafka® с помощью PySpark-заданий в Yandex Data Processing
Кластеры Yandex Data Processing поддерживают интеграцию с кластерами Managed Service for Apache Kafka®. Вы можете записывать сообщения в топики Apache Kafka® и читать сообщения из топиков с помощью PySpark-заданий. При чтении поддерживается пакетная обработка (batch processing) и потоковая обработка (stream processing).
Чтобы настроить интеграцию между кластерами Managed Service for Apache Kafka® и Yandex Data Processing:
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
- Плата за кластер Yandex Data Processing (см. тарифы Yandex Data Processing).
- Плата за NAT-шлюз (см. тарифы Virtual Private Cloud).
- Плата за бакет Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
Подготовьте инфраструктуру
-
Создайте облачную сеть
dataproc-networkбез подсетей. -
Создайте подсеть
dataproc-subnet-bв зоне доступностиru-central1-b. -
Настройте NAT-шлюз для подсети
dataproc-subnet-b. -
Создайте группу безопасности
dataproc-security-groupв сетиdataproc-network. -
Создайте сервисный аккаунт
dataproc-saс ролями:storage.viewer;storage.uploader;dataproc.agent;dataproc.user.
-
Создайте бакет
dataproc-bucket. -
Предоставьте сервисному аккаунту
dataproc-saразрешениеFULL_CONTROLна бакетdataproc-bucket. -
Создайте кластер Yandex Data Processing с параметрами:
-
Имя кластера —
dataproc-cluster. -
Окружение —
PRODUCTION. -
Версия —
2.1. -
Сервисы:
HDFSLIVYSPARKTEZYARN
-
Сервисный аккаунт —
dataproc-sa. -
Зона доступности —
ru-central1-b. -
Имя бакета —
dataproc-bucket. -
Сеть —
dataproc-network. -
Группы безопасности —
dataproc-security-group. -
Подкластеры — мастер, один подкластер
Dataи один подкластерCompute.
-
-
Создайте кластер Managed Service for Apache Kafka® с параметрами:
- Имя кластера —
dataproc-kafka. - Окружение —
PRODUCTION. - Версия —
3.5. - Зона доступности —
ru-central1-b. - Сеть —
dataproc-network. - Группы безопасности —
dataproc-security-group. - Подсеть —
dataproc-subnet-b.
- Имя кластера —
-
Создайте топик Apache Kafka® с параметрами:
- Имя —
dataproc-kafka-topic. - Количество разделов —
1. - Фактор репликации —
1.
- Имя —
-
Создайте пользователя Apache Kafka® с параметрами:
- Имя —
user1. - Пароль —
password1. - Топики, на которые выдаются разрешения пользователю —
*(все топики). - Разрешения на топики —
ACCESS_ROLE_CONSUMER,ACCESS_ROLE_PRODUCERиACCESS_ROLE_ADMIN.
- Имя —
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации kafka-and-data-proc.tf
.В этом файле описаны:
- сеть;
- NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
- подсеть;
- группа безопасности, необходимая для кластеров Yandex Data Processing и Managed Service for Apache Kafka®;
- сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
- сервисный аккаунт для управления бакетом Yandex Object Storage;
- бакет Yandex Object Storage;
- статический ключ доступа, необходимый для выдачи сервисному аккаунту нужных разрешений на бакет;
- кластер Yandex Data Processing;
- кластер Managed Service for Apache Kafka®;
- пользователь Apache Kafka®;
- топик Apache Kafka®.
-
Укажите в файле
kafka-and-data-proc.tf:folder_id— идентификатор облачного каталога, такой же, как в настройках провайдера.dp_ssh_key— абсолютный путь к публичному ключу для кластера Yandex Data Processing. Подробнее о подключении к хосту Yandex Data Processing по SSH.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validateЕсли в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform planЕсли конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply -
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Создайте задания PySpark
-
На локальном компьютере сохраните следующие скрипты:
kafka-write.pyСкрипт для записи сообщений в топик Apache Kafka®:
#!/usr/bin/env python3 from pyspark.sql import SparkSession, Row from pyspark.sql.functions import to_json, col, struct def main(): spark = SparkSession.builder.appName("dataproc-kafka-write-app").getOrCreate() df = spark.createDataFrame([ Row(msg="Test message #1 from dataproc-cluster"), Row(msg="Test message #2 from dataproc-cluster") ]) df = df.select(to_json(struct([col(c).alias(c) for c in df.columns])).alias('value')) df.write.format("kafka") \ .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \ .option("topic", "dataproc-kafka-topic") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " "username=user1 " "password=password1 " ";") \ .save() if __name__ == "__main__": main()kafka-read-batch.pyСкрипт для чтения из топика и для пакетной обработки:
#!/usr/bin/env python3 from pyspark.sql import SparkSession, Row from pyspark.sql.functions import to_json, col, struct def main(): spark = SparkSession.builder.appName("dataproc-kafka-read-batch-app").getOrCreate() df = spark.read.format("kafka") \ .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \ .option("subscribe", "dataproc-kafka-topic") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " "username=user1 " "password=password1 " ";") \ .option("startingOffsets", "earliest") \ .load() \ .selectExpr("CAST(value AS STRING)") \ .where(col("value").isNotNull()) df.write.format("text").save("s3a://dataproc-bucket/kafka-read-batch-output") if __name__ == "__main__": main()kafka-read-stream.pyСкрипт для чтения из топика и для потоковой обработки:
#!/usr/bin/env python3 from pyspark.sql import SparkSession, Row from pyspark.sql.functions import to_json, col, struct def main(): spark = SparkSession.builder.appName("dataproc-kafka-read-stream-app").getOrCreate() query = spark.readStream.format("kafka")\ .option("kafka.bootstrap.servers", "<FQDN_хоста>:9091") \ .option("subscribe", "dataproc-kafka-topic") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " "username=user1 " "password=password1 " ";") \ .option("startingOffsets", "earliest")\ .load()\ .selectExpr("CAST(value AS STRING)")\ .where(col("value").isNotNull())\ .writeStream\ .trigger(once=True)\ .queryName("received_messages")\ .format("memory")\ .start() query.awaitTermination() df = spark.sql("select value from received_messages") df.write.format("text").save("s3a://dataproc-bucket/kafka-read-stream-output") if __name__ == "__main__": main() -
Получите FQDN хоста Apache Kafka® и укажите FQDN в каждом скрипте.
-
Загрузите в корень бакета подготовленные скрипты.
-
Создайте задание PySpark для записи сообщения в топик Apache Kafka®. В поле Main python файл укажите путь до скрипта
s3a://dataproc-bucket/kafka-write.py. -
Дождитесь, когда статус задания изменится на
Done. -
Убедитесь, что данные в топик были успешно записаны. Для этого создайте новое задание PySpark для чтения из топика и для пакетной обработки данных. В поле Main python файл укажите путь до скрипта
s3a://dataproc-bucket/kafka-read-batch.py. -
Дождитесь, когда статус нового задания изменится на
Done. -
Скачайте из бакета файл с прочитанными данными:
part-00000
{"msg":"Test message #1 from dataproc-cluster"} {"msg":"Test message #2 from dataproc-cluster"}Файл хранится в новой папке
kafka-read-batch-outputв бакете. -
Прочитайте сообщения из топика при потоковой обработке. Для этого создайте еще одно задание PySpark. В поле Main python файл укажите путь до скрипта
s3a://dataproc-bucket/kafka-read-stream.py. -
Дождитесь, когда статус нового задания изменится на
Done. -
Скачайте из бакета файлы с прочитанными данными:
part-00000
{"msg":"Test message #1 from dataproc-cluster"}part-00001
{"msg":"Test message #2 from dataproc-cluster"}Файлы хранятся в новой папке
kafka-read-stream-outputв бакете.
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
-
Удалите объекты из бакета.
-
Остальные ресурсы удалите в зависимости от способа их создания:
ВручнуюTerraform-
В терминале перейдите в директорию с планом инфраструктуры.
Важно
Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.
-
Удалите ресурсы:
-
Выполните команду:
terraform destroy -
Подтвердите удаление ресурсов и дождитесь завершения операции.
Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.
-
-