[Java, IT-инфраструктура, DevOps] Kafkaрианский переезд или как расшевелить партиции
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
ВступлениеПривет, Хабр! Работаю java программистом в одной финансовой организации. Решил оставить свой след на хабре и написать первую свою статью. В силу проблем с наличием девопсеров передо мной была поставлена задача обновить кластер кафки с 2.0 до 2.6 без даунтайма и потери сообщений (сами понимаете, никто не любит, когда денежки зависают в воздухе или где-то теряются). Хочу поделиться этим опытом с Вами и получить конструктивный фидбек. Итак, хватит воды, переходим сразу к делу.Схема миграцииЗадача усложнялась тем, что надо было мигрировать со старых виртуалок на новые, поэтому вариант с тем, чтобы выключить брокера, поменять бинарники и запустить, мне не подходил.Ниже представлена схема миграции. Имеем 3 брокеров на старых ВМ, 3 брокеров на новых ВМ, также для каждого брокера имеется свой zookeeper.
План миграцииДля того чтобы нам мигрировать так, чтобы этого никто не заметил, придется "немного" попотеть. Ниже приведен общий план.
- Добавить в настройки приложения адреса новых брокеров
- Пробить доступы между всеми и вся
- Подготовить инфраструктуру на новых виртуалках
- Поднять новый кластер зукиперов и объединить его со старым
- Поднять новых брокеров кафки
- Мигрировать все партиции со старых брокеров на новые
- Отключить старые брокеры кафки и старых зукиперов
- Убрать из новых конфигов старых брокеров и зукиперов
А теперь поехали по порядку разбирать подробнее каждый пунктДобавление брокеров в приложениеСамый простой пункт. В коде клиента, там где были прописаны адреса старых брокеров, туда же дописываем новых. В свойство "bootstrap.servers" Строка настройкиold-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092Пробитие доступовОдно из самых нелюбимых занятий - это пробитие доступов. Заявки согласуются долго, а выполняются еще дольше. Доступов нам придется заказать немало, давайте разберемся какие именно.
- Как вы уже догадались по строке настройки брокеров, нам нужен порт 9092 в направлении Приложение -> новые брокеры кафки (в сумме 3 доступа)
- Этот же порт для Новые брокеры <----> Старые брокеры (+18 доступов)
- Все тот же порт 9092 между каждым новым брокером (+6 доступов)
- Аналогичные доступы из пунктов 2 и 3 для портов зукиперов: 2181, 2888, 3888 ( (18+6)*3 = 72)
Итого: 99 доступов. Получите и распишитесь! А потом еще проверьте все. обидно что до 100 не дотянули, было бы посолиднееКогда мы прошли через эти страдания и настроили доступы, приступаем к настройке инфраструктуры.Инфраструктура нового кластераДля начала нам нужно создать пользователя kafka на новых виртуалкахДалее очень важный пункт - необходимо прописать разрешенное количество открытых файлов пользователю kafka. А именно, в файле /etc/security/limits.conf дописываем строки
kafka hard nofile 262144
kafka soft nofile 262144
Если мы этого не сделаем, то это грозит нам внезапным падением брокера, так как кафка по своей природе любит открывать много файлов, поэтому надо подготовиться к ее аппетитам.Честно говоря, число 262144, взято просто потому, что на старом кластере было установлено такое значение (исторически сложилось). Хотя у нас кафка открывает не больше 10 тысяч файлов, поэтому вы можете провести анализ и выставить более подходящее.Далее нам надо прописать переменную среды к javaв файл /home/kafka/.bash_profile дописываем
export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH
Соответственно у вас должна лежать jre в папке /opt/javaДалее раскидываем папки по нужным директориям и устанавливаем праваНиже описан скрипт, реализующий, то что я описал выше. Его надо выполнить на каждом новом брокере.setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt
ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java
chown -R kafka:kafka /opt
chmod -R 700 /opt
#env_var start ------------------------------->
kafkaProfile=/home/kafka/.bash_profile
homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")
if [ "$javaHome" != "$homeVar" ]; then
echo -e "\n$homeVar\n" >> $kafkaProfile
fi
pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")
if [ "$path" != "$pathVar" ]; then
echo -e "\n$pathVar\n" >> $kafkaProfile
fi
#env_var end --------------------------------->
#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
limitsFile=/etc/security/limits.conf
soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")
if [ "$limitSoft" != "$soft" ]; then
echo -e "\n$soft\n" >> $limitsFile
fi
hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")
if [ "$limitHard" != "$hard" ]; then
echo -e "\n$hard\n" >> $limitsFile
fi
#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
Далее из под пользователя kafka проверяем что у нас все установлено правильно
ulimit -n = 262144
echo $JAVA_HOME = /opt/java
echo $PATH должен содержать /opt/java/bin
Новый кластер зукиперовНеобходимо проверить, что в конфиге новых зукиперов прописаны все участники ансабля, а также указана директория, для работы зукипера, где хранится файл myid, в котором прописаны id зукиперов.ниже кусок файла конфигурации зукипера /opt/zookeeper/current/conf/zoo.cfgzoo.cfg
dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888
server.4=server4:2888:3888
server.5=server5:2888:3888
server.6=server6:2888:3888
Стартуем все зукиперы /opt/zookeeper/current/bin/zkServer.sh startДалее нам надо добавить в конфиг старых зукиперов новых участников. Для этого на старых серверах выполняем такой скриптzk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
Теперь нужно определить кто из зукиперов был лидером, и перезапустить все зукиперы. Лидера в последнюю очередь.
/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
Наконец проверяем, что все зукиперы увидели друг друга и выбрали лидера. Везде выполняем
.../zkServer.sh status
Настройка ансамбля зукиперов завершена.Поднятие новых брокеровТеперь можно заняться запуском новых брокеров, но перед этим нужно убедиться что в server.properties у нас проставлен broker.id и zookeeper.connectПример для 4 брокера
broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name
Здесь мы указали только новых зукиперов, но так как они находятся в ансамбле, то новые брокеры узнают через них о старом кластере.Также необходимо там же в server.properties прописать текущую версию протокола взаимодействия между брокерами (2.0.0). Иначе мы не сможем переместить партиции.
inter.broker.protocol.version=2.0.0
Запускаем новых брокеров
nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
Теперь необходимо проверить что новые брокеры присоединились к кластеру. Для этого заходим в cli любого из зукиперов.
/opt/zookeeper/current/bin/zkCli.sh
и выполняем там команду
ls /cluster_name/brokers/ids
Должен быть выведен такой ответ
[1,2,3,4,5,6]
Это значит что в кластере у нас все 6 штук брокеров, чего мы и добивались.Перемещение партицийСейчас начинается самое интересное. Изначально где-то сидело в голове, а иногда и подкреплялось чатиками по кафке, что партиции сами переедут, нужно только поднять новых брокеров и поочереди гасить старые. replication.factor должен сработать и реплицировать все на новых брокеров. В действительности все оказалось не так просто. Текущие партиции мертво приклеены к старым брокерам и только создание новых топиков приводит к появлению партиций на новых брокерах.Заглянув в документацию я увидел , что перемещение партиций "автоматическое", но надо проделать немалое количество манипуляций, чтобы запустить все это дело. Сначала нужно сгенерировать json в котором надо указать все топики, которые будут перемещаться.
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
Так как топиков у нас было много, я сгенерил несколько таких файлов Далее все эти json файлы надо скормить утилите kafka-reassign-partitions.sh с аргументом --generate и указать id брокеров куда мы хотим переехать --broker-list "4,5,6".У нас получится несколько файлов такого видаgenerate1.json
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
{"topic":"foo1","partition":0,"replicas":[3,2,1]},
{"topic":"foo2","partition":2,"replicas":[1,2,3]},
{"topic":"foo2","partition":0,"replicas":[3,2,1]},
{"topic":"foo1","partition":1,"replicas":[2,3,1]},
{"topic":"foo2","partition":1,"replicas":[2,3,1]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
{"topic":"foo1","partition":0,"replicas":[4,5,6]},
{"topic":"foo2","partition":2,"replicas":[6,4,5]},
{"topic":"foo2","partition":0,"replicas":[4,5,6]},
{"topic":"foo1","partition":1,"replicas":[5,4,6]},
{"topic":"foo2","partition":1,"replicas":[4,5,6]}]
}
Все что идет до Proposed partition reassignment configuration это текущее распределение партиций (может пригодиться для роллбека). Все что после - распределение после переезда. Соответственно нам надо собрать все эти нижние половины со всех файлов, сформировать новые json. Так как все это делать руками ну ооочень долго. я "быстро" накатал скрипт, который вызывает все эти команды и режет файлы как нам надо. За резку файлов отвечает джарник kafka-reassign-helper.jar код можно глянуть тут.Ниже скрипт подготовки файлов для переезда партицийprepare-for-reassignment.sh
#параметр отвечает за количество топиков в одном файле. у нас было 20
if [ "$#" -eq 0 ]
then
echo "no arguments"
exit 1
fi
echo "Start reassignment preparing"
/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt
echo created topics.txt
java -jar kafka-reassign-helper.jar generate topics.txt $1
fileCount=$(ls -dq generate*.json | wc -l)
echo "created $fileCount file for topics to move"
echo -e "\nCreating generated files\n"
mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done
echo -e "\nCreating execute/rollback files"
java -jar kafka-reassign-helper.jar execute $fileCount
echo -e "\nexecute/rollback files created"
echo -e "\nPreparing finished successfully!"
После выполнения скрипта скармливаем сгенеренные файлы по очереди утилите kafka-reassign-partitions.sh, но на этот раз с параметром --executemove-partitions.sh
#параметр отвечает за номер файла execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
echo "no arguments"
exit 1
fi
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute
Вот теперь кафка начнет перемещать партиции. В логе новых брокеров судорожно забегают строки, как и в логах старых. Пока все это происходит, чтобы не скучать, нам предоставили третий аргумент kafka-reassign-partitions.sh, а именно --verify, который предоставляет узнать текущий статус переезда. Опять же в силу лени, чтобы постоянно не дергать руками эту команду, был написан очередной скрипт.reassign-verify.sh
progress=-1
while [ $progress != 0 ]
do
progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)
echo "In progress:" $progress;
echo "Is complete:" $complete;
echo "Failed:" $failed;
sleep 2s
done
Вызываем и ждем пока In progress станет равным 0. а Is complete примет какое-то конечное значение. И надеемся что Failed так и останется 0.Когда все закончилось вызываем снова move-partitions.sh и reassign-verify.sh для следующего файла, до тех пор пока не переберем все наши файлы миграции.После обработки всех файлов идем в log.dirs старых брокеров и убеждаемся, что там остались только технические логи самой кафки - значит все партиции точно на новом кластере.Отключение старого кластераТут все просто. выполняем kafka-server-stop.sh и zkStop.sh для брокеров и зукиперов соответственно.Чистим конфиги нового кластераНачинаем с зукиперов. Из файлов zoo.cfg удаляем лишние записиzk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg
Далее, как обычно, у зукиперов определяем кто лидер и перезапускаем по очереди, лидера в последнюю очередь. После перезапуска проверяем статусы: 2 фолловера и 1 лидер.Удаляем из server.properties брокеров старый протокол общенияremove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
Теперь вроде как можно перезапустить брокеров, но надо убедиться, что все партиции находятся в состоянии insync, причем не в количестве min.insync.replicas (у нас равно 2), а в количестве default.replication.factor (у нас 3)Иначе если где-то будет 2 реплики, а не 3, то при перезапуске брокера, на котором лежит одна из этих двух реплик, мы получим ошибку на клиенте, что не хватает insync реплик.Ниже скриптик для проверкиcheck-insync.sh
input="check-topics.txt"
rm -f $input
/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt
checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
((i=i+1))
list+="${line}|"
if [ $i -eq $checkPerIter ]
then
list=${list::${#list}-1}
echo "checking $list"
count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
if [ "$count" -ne 0 ]
then
/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
fi
((notInsync=notInsync+count))
list=""
i=0
fi
done < "$input"
echo "not insync: $notInsync"
Если на выходе получаем not insync: 0, то можно по очереди перезапустить брокеров.Вот и все. Миграция брокеров на этом завершена, если не считать последущей перенастройки мониторинга и прочих вспомогательных дел.Вот как выглядит инструкция, которую я отправил админам, которые все это проворачивали на бою. На удивление справились с первого раза и без вопросов.README.txt
Инструкция по миграции кафка 2.0.0 -> 2.6.0
1 этап. Подготовка инфраструктуры
1.1 Скопировать архивы для миграции на сервера кафки
NEW4.tar.gz -> адрес 4 брокера
migration.tar.gz -> адрес 4 брокера
NEW5.tar.gz -> адрес 5 брокера
NEW6.tar.gz -> адрес 6 брокера
1.2 Разархивировать архивы
tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka
1.3 От пользователя root выполнить скрипт на каждом новом сервере (установка файлов в нужные директории, прав и лимитов)
/home/kafka/scripts/setup.sh
1.4 Перейти на пользователя kafka (на каждом новом сервере)
1.5 Проверить следующие параметры среды (на каждом новом сервере)
ulimit -n = 262144
echo $JAVA_HOME = /opt/java
echo $PATH должен содержать /opt/java/bin
содержимое папки /opt должно принадлежать kafka kafka
2 этап. Запуск зукиперов и брокеров
2.1 Перейти под пользователя kafka (на каждом новом сервере)
2.2 Выполнить скрипт на каждом новом сервере (запускаются новые зукиперы)
/opt/scripts/zkStart.sh
2.3 Скопировать архивы на старые сервера кафки
OLD1.tar.gz -> адрес 1 брокера
OLD2.tar.gz -> адрес 2 брокера
OLD3.tar.gz -> адрес 3 брокера
2.4 Разархивировать архивы
tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka
2.5 От пользователя root выполнить скрипт на каждом старом сервере (установка прав на скрипты миграции)
/home/kafka/scripts/setup-old.sh
2.6 Перейти под пользователя kafka на каждом старом сервере
2.7 Выполнить скрипт на каждом старом сервере (добавляем новые зукиперы в конфиг старых зукиперов)
/home/kafka/scripts/zk-add-new-servers.sh
2.8 Выполнить скрипт на каждом старом сервере (проверка статуса зукиперов)
/home/kafka/scripts/zkStatus.sh
определить какой из старых серверов является лидером
2.9 Перезапустить по очереди зукиперы на старых серверах
лидера перезапускать в последнюю очередь
/home/kafka/scripts/zkRestart.sh
2.10 Проверить что кластер зукиперов подхватил новые сервера
для этого на всех серверах выполнить
/home/kafka/scripts/zkStatus.sh (на старых)
/opt/scripts/zkStatus.sh (на новых)
все сервера кроме одного должны быть в состоянии follower
один сервер leader
2.11 запуск новых брокеров
на новых серверах выполнить скрипт
/opt/kafka-start.sh
2.12 проверить что новые брокеры в кластере
выполнить на новом сервере
/home/kafka/migration/zkCli.sh
ls /cluster_name/brokers/ids
должно быть выведено [1,2,3,4,5,6]
3 этап Перемещение партиций на новых брокеров
3.1 Выполнить скрипт (подготовка файлов для перемещения партиций)
/home/kafka/migration/prepare-for-assignment.sh 20
3.2 Выполнить данный пункт по очереди для каждого файла в директории /home/kafka/migration/execute
Пример:
для файла execute1.json
/home/kafka/migration/move-partitions.sh 1
после начала выполнения скрипта выполнить скрипт
Пример:
/home/kafka/migration/reassign-verify.sh 1
когда количество партиций в состоянии "in progress" достигнет 0 дождаться ОК от программиста и приступить к п.3.2 для следующего файла
3.3 После завершения перемещения партиций производится проверка логов на наличие ошибок, а также проверка директории /opt/kafka/kafka-data на всех старых брокерах
Должны остаться только технические логи кафки
3.4 По очереди выключить на старых серверах брокеров, выполнив скрипт
/opt/kafka/current/bin/kafka-server-stop.sh
3.5 По очереди выключить зукиперов на старых серверах, выполнив скрипт
/home/kafka/scripts/zkStop.sh
3.6 На новых серверах выполнить скрипт (удаляем старых зукиперов из конфига новых зукиперов)
/opt/scripts/zk-remove-old-servers.sh
3.7 На новых серверах перезапустить зукиперов
/opt/scripts/zkRestart.sh
3.8 На новых серверах проверить статус зукиперов (один лидер, 2 фоловера)
/opt/scripts/zkStatus.sh
3.9 На новых серверах выполнить скрипт (удаляем из конфига старый протокол общения между брокерами)
/opt/scripts/remove-old-protocol.sh
3.10 Проверить что все реплики в статусе insync
для этого запустить скрипт /home/kafka/migration/check-insync.sh
not insync должно быть равно 0
3.11 По очереди для каждого нового брокера выполнить
/opt/kafka/current/bin/kafka-server-stop.sh
дождаться пока брокер остановиться (ps aux | grep kafka должен показывать только зукипера)
выполнить /opt/kafka-start.sh
Приступить к 3.10 следующего брокера
Миграция завершена!
Надеюсь, что кому-то помог в этом вопросе. Жду ваших комментов и замечаний.Все скрипты и инструкции, в том числе для роллбеков можно найти тут
===========
Источник:
habr.com
===========
Похожие новости:
- [Настройка Linux, *nix, DevOps, Разработка на Raspberry Pi] Установка docker-контейнеров c Zabbix на Raspberry Pi
- [Системное администрирование, *nix, DevOps, Микросервисы, Kubernetes] Ломаем и чиним Kubernetes
- [Настройка Linux, Разработка веб-сайтов, CSS, JavaScript] Просто вертикальный монитор не значит, что я на телефоне (перевод)
- [JavaScript, Монетизация мобильных приложений, Голосовые интерфейсы] Как создавать навыки для виртуальных ассистентов Салют, выйти на многомиллионную аудиторию Сбера и выиграть 2,5 млн руб?
- [JavaScript, ReactJS] createRef, setRef, useRef и зачем нужен current в ref
- [IT-инфраструктура, Сетевые технологии, Будущее здесь, IT-компании] Подводный кабель Google Dunant c пропускной способностью 250 Тбит/с готов к работе
- [Microsoft Azure, DevOps, Облачные сервисы] Основы сервиса Microsoft Azure Blueprints
- [Настройка Linux, Open source, Системное администрирование, IT-инфраструктура, Серверное администрирование] Тиражирование Fedora из-под Fedora
- [Системное администрирование, IT-инфраструктура, Виртуализация] Exchange Server и правила его виртуальной жизни
- [IT-инфраструктура, Исследования и прогнозы в IT, IT-компании] Когда только RPA недостаточно: создание интерфейсов для взаимодействия с роботами с помощью low-code платформы
Теги для поиска: #_java, #_itinfrastruktura (IT-инфраструктура), #_devops, #_kafka (кафка), #_migratsija (миграция), #_devops (девопс), #_java, #_itinfrastruktura (
IT-инфраструктура
), #_devops
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 19:00
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
ВступлениеПривет, Хабр! Работаю java программистом в одной финансовой организации. Решил оставить свой след на хабре и написать первую свою статью. В силу проблем с наличием девопсеров передо мной была поставлена задача обновить кластер кафки с 2.0 до 2.6 без даунтайма и потери сообщений (сами понимаете, никто не любит, когда денежки зависают в воздухе или где-то теряются). Хочу поделиться этим опытом с Вами и получить конструктивный фидбек. Итак, хватит воды, переходим сразу к делу.Схема миграцииЗадача усложнялась тем, что надо было мигрировать со старых виртуалок на новые, поэтому вариант с тем, чтобы выключить брокера, поменять бинарники и запустить, мне не подходил.Ниже представлена схема миграции. Имеем 3 брокеров на старых ВМ, 3 брокеров на новых ВМ, также для каждого брокера имеется свой zookeeper. План миграцииДля того чтобы нам мигрировать так, чтобы этого никто не заметил, придется "немного" попотеть. Ниже приведен общий план.
kafka hard nofile 262144
kafka soft nofile 262144 export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt mv /home/kafka/zookeeper /opt mv /home/kafka/kafka-start.sh /opt mv /home/kafka/scripts /opt ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current ln -sfn /opt/jdk1.8.0_181/ /opt/java chown -R kafka:kafka /opt chmod -R 700 /opt #env_var start -------------------------------> kafkaProfile=/home/kafka/.bash_profile homeVar="export JAVA_HOME=/opt/java" javaHome=$(cat $kafkaProfile | grep "$homeVar") if [ "$javaHome" != "$homeVar" ]; then echo -e "\n$homeVar\n" >> $kafkaProfile fi pathVar="export PATH=\$JAVA_HOME/bin:\$PATH" path=$(cat $kafkaProfile | grep "$pathVar") if [ "$path" != "$pathVar" ]; then echo -e "\n$pathVar\n" >> $kafkaProfile fi #env_var end ---------------------------------> #ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> limitsFile=/etc/security/limits.conf soft="kafka soft nofile 262144" limitSoft=$(cat $limitsFile | grep "$soft") if [ "$limitSoft" != "$soft" ]; then echo -e "\n$soft\n" >> $limitsFile fi hard="kafka hard nofile 262144" limitHard=$(cat $limitsFile | grep "$hard") if [ "$limitHard" != "$hard" ]; then echo -e "\n$hard\n" >> $limitsFile fi #ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ulimit -n = 262144
echo $JAVA_HOME = /opt/java echo $PATH должен содержать /opt/java/bin dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888 server.2=server2:2888:3888 server.3=server3:2888:3888 server.4=server4:2888:3888 server.5=server5:2888:3888 server.6=server6:2888:3888 echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg /opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart
.../zkServer.sh status
broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name inter.broker.protocol.version=2.0.0
nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &
/opt/zookeeper/current/bin/zkCli.sh
ls /cluster_name/brokers/ids
[1,2,3,4,5,6]
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}], "version":1 } {"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]}, {"topic":"foo1","partition":0,"replicas":[3,2,1]}, {"topic":"foo2","partition":2,"replicas":[1,2,3]}, {"topic":"foo2","partition":0,"replicas":[3,2,1]}, {"topic":"foo1","partition":1,"replicas":[2,3,1]}, {"topic":"foo2","partition":1,"replicas":[2,3,1]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]}, {"topic":"foo1","partition":0,"replicas":[4,5,6]}, {"topic":"foo2","partition":2,"replicas":[6,4,5]}, {"topic":"foo2","partition":0,"replicas":[4,5,6]}, {"topic":"foo1","partition":1,"replicas":[5,4,6]}, {"topic":"foo2","partition":1,"replicas":[4,5,6]}] } #параметр отвечает за количество топиков в одном файле. у нас было 20
if [ "$#" -eq 0 ] then echo "no arguments" exit 1 fi echo "Start reassignment preparing" /opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt echo created topics.txt java -jar kafka-reassign-helper.jar generate topics.txt $1 fileCount=$(ls -dq generate*.json | wc -l) echo "created $fileCount file for topics to move" echo -e "\nCreating generated files\n" mkdir -p generated for ((i = 1; i < $fileCount+1; i++ )) do /opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt" echo "generated/generated$i.txt" created done echo -e "\nCreating execute/rollback files" java -jar kafka-reassign-helper.jar execute $fileCount echo -e "\nexecute/rollback files created" echo -e "\nPreparing finished successfully!" #параметр отвечает за номер файла execute1.json, execute2.json ....
if [ "$#" -eq 0 ] then echo "no arguments" exit 1 fi /opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute progress=-1
while [ $progress != 0 ] do progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c) complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c) failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c) echo "In progress:" $progress; echo "Is complete:" $complete; echo "Failed:" $failed; sleep 2s done sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties
input="check-topics.txt"
rm -f $input /opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt checkPerIter=100 i=0 list="" notInsync=0 while IFS= read -r line do ((i=i+1)) list+="${line}|" if [ $i -eq $checkPerIter ] then list=${list::${#list}-1} echo "checking $list" count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c) if [ "$count" -ne 0 ] then /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" fi ((notInsync=notInsync+count)) list="" i=0 fi done < "$input" echo "not insync: $notInsync" Инструкция по миграции кафка 2.0.0 -> 2.6.0
1 этап. Подготовка инфраструктуры 1.1 Скопировать архивы для миграции на сервера кафки NEW4.tar.gz -> адрес 4 брокера migration.tar.gz -> адрес 4 брокера NEW5.tar.gz -> адрес 5 брокера NEW6.tar.gz -> адрес 6 брокера 1.2 Разархивировать архивы tar -xf NEW4.tar.gz -C /home/kafka tar -xf NEW5.tar.gz -C /home/kafka tar -xf NEW6.tar.gz -C /home/kafka 1.3 От пользователя root выполнить скрипт на каждом новом сервере (установка файлов в нужные директории, прав и лимитов) /home/kafka/scripts/setup.sh 1.4 Перейти на пользователя kafka (на каждом новом сервере) 1.5 Проверить следующие параметры среды (на каждом новом сервере) ulimit -n = 262144 echo $JAVA_HOME = /opt/java echo $PATH должен содержать /opt/java/bin содержимое папки /opt должно принадлежать kafka kafka 2 этап. Запуск зукиперов и брокеров 2.1 Перейти под пользователя kafka (на каждом новом сервере) 2.2 Выполнить скрипт на каждом новом сервере (запускаются новые зукиперы) /opt/scripts/zkStart.sh 2.3 Скопировать архивы на старые сервера кафки OLD1.tar.gz -> адрес 1 брокера OLD2.tar.gz -> адрес 2 брокера OLD3.tar.gz -> адрес 3 брокера 2.4 Разархивировать архивы tar -xf OLD1.tar.gz -C /home/kafka tar -xf OLD2.tar.gz -C /home/kafka tar -xf OLD3.tar.gz -C /home/kafka 2.5 От пользователя root выполнить скрипт на каждом старом сервере (установка прав на скрипты миграции) /home/kafka/scripts/setup-old.sh 2.6 Перейти под пользователя kafka на каждом старом сервере 2.7 Выполнить скрипт на каждом старом сервере (добавляем новые зукиперы в конфиг старых зукиперов) /home/kafka/scripts/zk-add-new-servers.sh 2.8 Выполнить скрипт на каждом старом сервере (проверка статуса зукиперов) /home/kafka/scripts/zkStatus.sh определить какой из старых серверов является лидером 2.9 Перезапустить по очереди зукиперы на старых серверах лидера перезапускать в последнюю очередь /home/kafka/scripts/zkRestart.sh 2.10 Проверить что кластер зукиперов подхватил новые сервера для этого на всех серверах выполнить /home/kafka/scripts/zkStatus.sh (на старых) /opt/scripts/zkStatus.sh (на новых) все сервера кроме одного должны быть в состоянии follower один сервер leader 2.11 запуск новых брокеров на новых серверах выполнить скрипт /opt/kafka-start.sh 2.12 проверить что новые брокеры в кластере выполнить на новом сервере /home/kafka/migration/zkCli.sh ls /cluster_name/brokers/ids должно быть выведено [1,2,3,4,5,6] 3 этап Перемещение партиций на новых брокеров 3.1 Выполнить скрипт (подготовка файлов для перемещения партиций) /home/kafka/migration/prepare-for-assignment.sh 20 3.2 Выполнить данный пункт по очереди для каждого файла в директории /home/kafka/migration/execute Пример: для файла execute1.json /home/kafka/migration/move-partitions.sh 1 после начала выполнения скрипта выполнить скрипт Пример: /home/kafka/migration/reassign-verify.sh 1 когда количество партиций в состоянии "in progress" достигнет 0 дождаться ОК от программиста и приступить к п.3.2 для следующего файла 3.3 После завершения перемещения партиций производится проверка логов на наличие ошибок, а также проверка директории /opt/kafka/kafka-data на всех старых брокерах Должны остаться только технические логи кафки 3.4 По очереди выключить на старых серверах брокеров, выполнив скрипт /opt/kafka/current/bin/kafka-server-stop.sh 3.5 По очереди выключить зукиперов на старых серверах, выполнив скрипт /home/kafka/scripts/zkStop.sh 3.6 На новых серверах выполнить скрипт (удаляем старых зукиперов из конфига новых зукиперов) /opt/scripts/zk-remove-old-servers.sh 3.7 На новых серверах перезапустить зукиперов /opt/scripts/zkRestart.sh 3.8 На новых серверах проверить статус зукиперов (один лидер, 2 фоловера) /opt/scripts/zkStatus.sh 3.9 На новых серверах выполнить скрипт (удаляем из конфига старый протокол общения между брокерами) /opt/scripts/remove-old-protocol.sh 3.10 Проверить что все реплики в статусе insync для этого запустить скрипт /home/kafka/migration/check-insync.sh not insync должно быть равно 0 3.11 По очереди для каждого нового брокера выполнить /opt/kafka/current/bin/kafka-server-stop.sh дождаться пока брокер остановиться (ps aux | grep kafka должен показывать только зукипера) выполнить /opt/kafka-start.sh Приступить к 3.10 следующего брокера Миграция завершена! =========== Источник: habr.com =========== Похожие новости:
IT-инфраструктура ), #_devops |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 19:00
Часовой пояс: UTC + 5