В этой статье рассмотрим настройку кластера из трех узлов ZooKeeper (служба координации распределенной системы), два из которых — брокеры сообщений Kafka, третий — управляющий.
В результате будет реализована следующая схема компонентов:
Все компоненты на одной машине, для простоты воспользуемся встроенными консольными издателем (producer) и подписчиком (consumer) сообщений.
Модуль ZooKeeper встроен в пакет Kafka, используем его.
Установка и настройка
Установка пакета Kafka
В данном случае операционная система — Ubuntu 16.04 LTS.
Актуальные версии и подробная инструкция — на официальном сайте.
wget http://mirror.linux-ia64.org/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
tar -xvzf kafka_2.12-2.4.1.tgz
Создадим рабочие директории для трех узлов server_1, server_2 и server_3.
mv kafka_2.12-2.4.1 kafka_server_1
cp -r kafka_server_1/ kafka_server_2/
cp -r kafka_server_1/ kafka_server_3/
Настройка узлов
Конфигурация Kafka-брокера
(из директорий kafka_server_1, kafka_server_2 и kafka_server_3 соответственно)
vim config/server.properties
- id брокера (0, 1, 2 соответственно)
- клиентский порт (9092, 9093, 9094)
- порт ZooKeeper (2181, 2182, 2183)
- директория логов (/tmp/kafka-logs-1, /tmp/kafka-logs-2, /tmp/kafka-logs-3)
broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=localhost:2181
log.dirs=/tmp/kafka-logs-1
Конфигурация znode:
vim config/zookeeper.properties
- директория для данных (/tmp/zookeeper_1, /tmp/zookeeper_2, /tmp/zookeeper_3)
- клиентский порт (2181, 2182, 2183)
- максимальное количество клиентских соединений и лимиты соединения
- порты для обмена данными между узлами (т. е. сообщаем каждому узлу о существовании других)
dataDir=/tmp/zookeeper_1
clientPort=2181
maxClientCnxns=60
initLimit=10
syncLimit=5
tickTime=2000
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
Создание директорий для узлов ZooKeeper, запись id узлов в служебные файлы:
mkdir -p /tmp/zookeeper_[1..3]
echo "1" >> /tmp/zookeeper_1/myid
echo "2" >> /tmp/zookeeper_2/myid
echo "3" >> /tmp/zookeeper_3/myid
В случае возникновения проблем с правами доступа изменить их у директорий и файлов:
sudo chmod 777 /tmp/zookeeper_1
Запуск z-узлов и брокеров
Для первых двух узлов (kafka_server_1/, kafka_server_2/) запускаем скрипты ZooKeeper и Kafka-серверов, аргументы — соответствующие конфиг файлы:
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
sudo bin/kafka-server-start.sh config/server.properties
Для третьего узла (kafka_server_3/) — только ZooKeeper.
Скрипты для остановки серверов:
sudo bin/kafka-server-stop.sh
sudo bin/zookeeper-server-stop.sh
Создание темы, консольного producer-а и consumer-а
Тема (topic) — поток сообщений определенного типа, делится на партиции (количество задается ключом —partition). Каждая партиция дублируется на двух серверах (—replication-factor). После ключа —bootstrap-server указываем через запятую порты Kafka-брокеров. Ключ —topic задает имя темы.
sudo bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093 --replication-factor 2 --partitions 2 --topic TestTopic
Узнать список топиков на порту, информацию о каждом топике можно командой:
sudo bin/kafka-topics.sh --list --zookeeper localhost:2181
sudo bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic TestTopic
Leader — сервер с основным экземпляром партиции, replica — сервер, на котором информация дублируется, ISR — сервера, берущие на себя роль лидеров в случае отказа leader.
Console producer, console consumer создаем с помощью соответствующих скриптов:
sudo bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic TestTopic
sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093 --from-beginning --topic TestTopic
Таким образом обеспечивается непрерывная передача сообщений получателю, в случае отказа одного брокера сообщения обрабатывает второй.
Сервисы для zookeeper и kafka в systemctl
Для удобства запуска кластера можно создать сервисы в systemctl: zookeeper_1.service, zookeeper_2.service, zookeeper_3.service, kafka_1.service, kafka_2.service.
Редактируем файл /etc/systemd/system/zookeeper_1.service (директорию /home/user и пользователя user изменить на нужные).
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=user
ExecStart=home/user/kafka_server_1/bin/zookeeper-server-start.sh /home/user/kafka_server_1/config/zookeeper.properties
ExecStop=/home/user/kafka_server_1/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Для сервисов zookeeper_2.service и zookeeper_3.service аналогично.
Файл /etc/systemd/system/kafka_1.service:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=user
ExecStart=/bin/sh -c '/home/user/kafka_server_1/bin/kafka-server-start.sh /home/user/kafka_server_1/config/server.properties > /tmp/kafka-logs-1 2>&1'
ExecStop=/home/user/kafka_server_1/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
kafka_2.service аналогично.
Активация и проверка работы сервисов
systemctl daemon-reload
systemctl enable zookeeper_1.service # создание симлинка
systemctl start zookeeper_1.service
sudo journalctl -u zookeeper_1.service # информация о сервисе
systemctl stop zookeeper_1.service
То же самое для zookeeper_2.service, zookeeper_3.service, kafka_1.service, kafka_1.service.
Спасибо за внимание!
Специально для сайта ITWORLD.UZ. Новость взята с сайта Хабр