Kafka 海滩拾贝


written by Alex Stocks on 2017/02/02,版权所有,无授权不得转载

0 引言


大年初三(2017-01-30)下午15:56公司线上kafka集群(3 instances)挂了一台,导致整个线上服务瘫痪,由于正处于假期时间,用手机联系了相关同事手工重启系统且待系统服务正常后就暂时弃置一边。

今日稍有闲暇,赶往公司想把事故复盘一遍,以追踪事故原因。下面分别列出相关问题,并记录解决方法。

1 kafka启动与无法连接broker问题若干


由于测试环境机器数目有限,我便在一个测试机器启动了3个kafka实例(kafka_2.11-0.10.1.1)和1个zk实例(zookeeper-3.4.9),并写了相关python程序去连接kafka集群。

Q1 kafka broker无法启动

broker无法启动大致有两个原因:第一是内存不足,第二是jmx无法启动。
可以通过修改kafka-server-start.sh如下一行代码来确定broker修改JVM HEAP size:

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

把kafka-run-class.sh如下代码删除掉就可以关闭kafka的JMX:

# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
  KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
fi

# JMX port to use
if [  $JMX_PORT ]; then
  KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi

把JMX关掉的坏处在于一些依赖于JMX(KafkaOffsetMonitor)就无法工作了。

Q2 python程序无法连接kafka broker

程序一直报如下错误:

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

首先查看了kafka集群的网络监听情况。执行命令 netstat -nlp | grep 9092 得到如下结果:

tcp6   0      0 127.0.0.1:19092         :::*     LISTEN      18782/java
tcp6   0      0 127.0.0.1:29092         :::*     LISTEN      19111/java
tcp6   0      0 127.0.0.1:9092          :::*     LISTEN      18406/java

注意到了kafka实例使用的tcp协议的版本是tcp6,google一番后发现解决方法是把如下语句加入你的bash启动脚本(.bash_profile or .bashrc):

export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"

再次执行上面的命令查验后,结果如下:

tcp   0      0 127.0.0.1:19092  0.0.0.0:*               LISTEN   25551/java
tcp   0      0 127.0.0.1:29092  0.0.0.0:*               LISTEN   25842/java
tcp   0      0 127.0.0.1:9092   0.0.0.0:*               LISTEN   25254/java

客户端程序是kafka python(https://github.com/dpkp/kafka-python)写的,再次启动后报如下错误:

Traceback (most recent call last):
File "producer.py", line 34, in <module>
producer_timings['python_kafka_producer'] = python_kafka_producer_performance()
File "producer.py", line 21, in python_kafka_producer_performance
producer = KafkaProducer(bootstrap_servers=brokers)
File "/usr/local/lib/python2.7/dist-packages/kafka/producer/kafka.py", line 328, in __init__
**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 791, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

再次google后,在producer的参数里加上api_conf字段解决问题,修改后的代码如下:

brokers = bootstrap_servers.split(',')
producer = KafkaProducer(
    bootstrap_servers=brokers,
    api_version = (0, 10))

2 kafka集群稳定性测试


测试环境:

测试流程:

case 1 kill全部kafka实例然后30s内再全部重启

P与C依然能正常工作,但丢失消息若干且部分乱序。

case 2 kill一个kafka实例然后重启之

重启kafka之前,P与C都能正常工作, 但又部分消息乱序。重启kafka实例之后,60S内P与C都与新实例建立了正常连接,且partition2以新实例为leader。

case 3 kill一个kafka实例,kill P然后重启P,再kill C再重启C

kill P且重启之后,P与C都可以正常工作。干掉C又重启之后,P与C依然能正常工作,但丢失消息若干且部分乱序。

case 4 新建一个topic,其partition为3,其replica为1,然后kill掉两个kafka实例

kill掉一个kafka实例后,这个topic的信息如下图:

kafka-topic-one-replica

所以kafka中topic的replica应该大于1。

上面程序的相关代码详见kafka failure test

不改变测试环境其他条件,仅改变topic的replica为1的情况下,再次以下测试:

case 1 kill全部kafka实例,3分钟后再全部重启

P与C依然能正常工作,但丢失消息若干且部分乱序。但如果P为confluent_kafka(以下简称CK)实现,则仅仅有消息乱序现象。

case 2 kill全部kafka实例,48分钟后再全部重启

P与C依然能正常工作,但丢失消息若干。

3 线上kafka集群服务恢复


第一次把线上那台死掉的机器重启后,它不断在重建数据,大约10分钟后仍然没有启动成功,目测是数据彻底乱掉了。于是我们把其数目录清空,然后再启动就成功了。

整个kafka集群服务恢复后,发现服务仍然很慢,通过日志发现这个kafka实例是在复制数据。这台机器从当天17:00pm开始一直到第二天08:00am才把数据重建成功,数据量约为598G,复制速率约为40G/H = 11.38KB/s。

到线上发现kafka数据保存时间配置如下:log.retention.hours=168,也就是保存了7天的数据。

参考上面的case4和这个参数,大约就知道优化方向了。

4 kafka消费者与broker连接不断挂掉


在上海一家做wifi软件的公司工作的时候遇到这样一个问题:kafka consumer(Java)与broker之间的连接总是不断挂掉,查看了consumer的源码(主要是poll函数)后,发现主要原因是:

consumer是单线程程序,从broker批量取出一批消息后处理,处理完毕后向broker汇报心跳,即messge process逻辑和heartbeat逻辑在一个线程上。

解决方法是:设置max.partition.fetch.bytes=4096(kafka v0.9.0.0)或者max.poll.records=10(kafka v0.10.0.1),这两个参数是用来设置每次拉取消息的最大量。

通过缩小batch message size来缩短message process时间,从而不阻塞hearbeat上报时间,后面这种现象就再也没有发生了。

5 kafka使用建议及相关参数


5.1 kafka使用建议


5.1.1 broker

5.2 kafka最优参数


在Kafka Beijing Meetup(3rd)上,胡夕给出了下图各个参数集:

下面分别解释下各个参数的意义。

5.2.1 Broker

5.2.2 Producer

5.2.3 Consumer

参照MirrorMaker,这两个要求很好理解。
* “当消费端成为瓶颈时,最简单的方法是增加 topic的分区数量,从而增加消费端的并行度。” - 参考文档15。

5.2.4 Os & Jvm & Mem

5.2.5 CPU

5.2.6 disk

6 kafka lastest feature list


6.1 broker


6.2 consumer


7 kafka toolset


kafka自身不建议跨IDC部署,可以使用confluent的官方工具MirrorMaker或者uber开源的工具uReplicator把数据在跨IDC之间进行复制。uReplicator是基于MirrorMaker做的二次开发,但是uReplicator不能很好的支持kafka 0.10以上版本,暂不考虑。

7.1 MirrorMaker


MirrorMaker自身支持把多个kafka集群的数据复制到一个目的地kafka集群中。关于MirrorMake怎么使用请详细参考8到13,但是请注意,这里面很多参数是针对kafka 0.9的,在kafka 0.10里面已经无用,具体哪些无用,自己多测试即可。

下面重点说明conf/mirror-producer.properties的“partition.assignment.strategy”和MirrorMaker的”num.streams”。

“partition.assignment.strategy”关系到consumer的负载均衡策略,默认的range策略会导致负载不均衡且consumer group内consumer的个数多于topic partition的数目时多余的consumer处于idle状态无法工作, 参考文档6参考文档11 都提到这个问题,具体原因参见 参考文档7参考文档14。采用RoundRobin策略后,所有的consumer都会处于工作状态,加快数据复制速度。

num.streams则是指定consumers的个数。经个人测试,共两个kafka topic,每个topic partition数目为36时,实际测试环境中num.streams给定值为40,而MirrorMaker的实际consumer的个数为72,把num.streams改为100后consumer个数也是100,所以二者不一定一致。而当consumer个数为100,只消费一个topic的时候,有36个consumer线程处于工作状态,其他线程都处于空闲状态(如下图)。所以num.streams的值应该设置为所有topic的partition数目之和。

consumer-partition

至于MirrorMaker的性能还可以,想kafka集群写入13993021个长度为100B的kv后,MirrorMaker两分钟内复制完毕。

至于MirrorMaker的缺点,参考文档13提到 当 MirrorMaker 节点发生故障时,数据复制延迟较大,对于动态添加 topic 则需要重启进程、黑白名单管理完全静态等。虽然 uReplicator 针对 MirrorMaker 进行了大量优化,但在我们的大量测试之后仍遇到众多问题,我们需要具备动态管理 MirrorMaker 进程的能力,同时我们也不希望每次都重启 MirrorMaker进程,至于MirrorMaker有哪些深坑,囿于测试条件限制,个人无法给出定论。

个人的看法是,一个topic启动一个MirrorMaker,参考文档12说为了系统稳定性和高可用,建议“Always run more than one mirror-maker processes. Make sure you use the same groupId in consumer config.”。

至于MirrorMaker 集群自身的稳定性,参考文档12 认为可以通过检查MirrorMaker的consumer group的lag值来验证,确实是一个好办法。

参考文档

Payment

Timeline