在进行性能测试时,如果Kafka消息队列堆积过长,等待程序自行消费完毕可能需要耗费大量时间。为了解决这一问题,可以通过备份和还原Kafka及Zookeeper文件夹的方式快速清理队列。以下是实现该功能的脚本化方案。
### 1. 环境清理脚本 (clean_environment.sh)
此脚本用于一次性清理并备份Kafka和Zookeeper环境。请确保路径配置正确,并且仅在没有备份文件时执行。
#!/bin/bash
# 导出Kafka Topic(可根据需求添加)
echo "Stopping Zookeeper..."
/app/zookeeper/bin/zkServer.sh stop
ps -ef | grep zookeeper | grep -v grep | awk '{print $2}' | xargs kill -9
sleep 5
# 清理Zookeeper数据
rm -rf /app/zookeeper/data/version-2/
rm -rf /app/zookeeper/logs/*
rm -rf /app/zookeeper_backup
rm -rf /app/zookeeper_org
# 备份Zookeeper文件
cp -rp /app/zookeeper/ /app/zookeeper_backup
cp -rp /app/zookeeper/ /app/zookeeper_org
# 停止Kafka
echo "Stopping Kafka..."
/app/kafka_cluster/bin/kafka-server-stop.sh
ps -ef | grep kafka_cluster | grep -v grep | awk '{print $2}' | xargs kill -9
sleep 5
# 清理Kafka日志目录
rm -rf /app/kafka_cluster/kafka-logs/*
rm -rf /app/kafka_cluster/logs/*
rm -rf /app/kafka_cluster_org
rm -rf /app/kafka_cluster_backup
# 备份Kafka文件
cp -rp /app/kafka_cluster/ /app/kafka_cluster_org
cp -rp /app/kafka_cluster/ /app/kafka_cluster_backup
### 2. 初始化Zookeeper脚本 (init_zk.sh)
此脚本用于停止、清理并重新启动Zookeeper服务。
#!/bin/bash
echo "Stopping Zookeeper service..."
/app/zookeeper/bin/zkServer.sh stop
sleep 3
echo "Initializing Zookeeper installation directory..."
rm -rf /app/zookeeper
cp -rp /app/zookeeper_backup /app/zookeeper
echo "Starting Zookeeper service..."
/app/zookeeper/bin/zkServer.sh start
PID=$(ps -ef | grep "/app/zookeeper/" | grep -v grep | awk '{print $2}')
echo -e "\033[42;30m Zookeeper process ID: $PID \033[0m"
### 3. 初始化Kafka脚本 (init_kafka.sh)
此脚本用于停止、清理并重新启动Kafka服务。
#!/bin/bash
KAFKA_DIR="/app/kafka_cluster"
BACKUP_DIR="${KAFKA_DIR}_backup"
echo "Stopping Kafka service..."
${KAFKA_DIR}/bin/kafka-server-stop.sh
sleep 10
ps -ef | grep kafka_cluster | grep -v grep | awk '{print $2}' | xargs kill -9
if [[ -d ${BACKUP_DIR} && ${#KAFKA_DIR} -ge 8 ]]; then
echo "Deleting Kafka folder: ${KAFKA_DIR}"
rm -rf ${KAFKA_DIR}
echo "Restoring from backup folder: ${BACKUP_DIR}"
cp -rp ${BACKUP_DIR} ${KAFKA_DIR}
else
echo "Backup folder not found: ${BACKUP_DIR}, exiting..."
exit 1
fi
echo "Starting Kafka service..."
${KAFKA_DIR}/bin/kafka-server-start.sh -daemon ${KAFKA_DIR}/config/server.properties
PID=$(ps -ef | grep kafka_cluster | grep -v grep | awk '{print $2}')
echo -e "\033[42;30m Kafka process ID: $PID \033[0m"
### 4. 创建Topic脚本 (create_topics.sh)
此脚本根据指定文件创建Kafka Topics。
#!/bin/bash
TOPIC_FILE="/home/root/topics.txt"
ZK_HOSTS="192.168.53.125:2181,192.168.53.126:2181,192.168.53.127:2181"
CMD="/app/kafka_cluster/bin/kafka-topics.sh"
while read -r topic; do
${CMD} --create --zookeeper ${ZK_HOSTS} --replication-factor 3 --partitions 8 --topic ${topic}
done < "${TOPIC_FILE}"
### Topic文件示例 (topics.txt)
以下为`topics.txt`文件内容:
### 执行步骤
按照以下顺序执行清理和恢复操作:
- 停止Kafka Manager:`ps -ef | grep manager | grep -v grep | awk '{print $2}' | xargs kill -9`
- 运行初始化Zookeeper脚本:`./init_zk.sh`
- 运行初始化Kafka脚本:`./init_kafka.sh`
- 启动Kafka Manager:`nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &`
- 创建Topics:`./create_topics.sh`