网易乐得技术团队

kafka producer(0.8.1.1) 使用过程中遇到的问题及解决

1. 前言

在我们的项目中,不同系统之间的数据传递,基本都是基于kafka消息队列。尤其是生产者producer,使用起来也比较简单,所以一直也没有深入研究。直到最近接入新产品,使用kafka producer遇到了一系列严重问题,才逐步引起我们的重视。

本文主要讲述了kafka producer (版本 0.8.1.1)使用过程中遇到的问题及解决过程。

2. kafka producer 在我们项目中的配置及使用

目前我们的项目中,生产者producer使用的版本都是 0.8.1.1

1
2
3
4
5
6
//maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>

我们的应用场景是url访问监控,特点是访问量比较大,所以为了增大吞吐量使用了如下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
//kafka producer参数配置
//消息序列化类型 String
serializer.class=kafka.serializer.StringEncode
//broker信息,服务器需要配置hosts指向真正IP
metadata.broker.list=fa-common1.ip1.lede.com:9093,fa-common2.ip2.lede.com:9093
//异步模式,增大吞吐量
producer.type=async
//producer不会等待broker的ack,可能存在数据丢失,但是我们是允许的
request.required.acks=0
//在异步模式下,producer端允许buffer的最大消息数
queue.buffering.max.messages=10000
//queue.buffering.max.messages 配置队列满了,直接抛弃
queue.enqueue.timeout.ms=0

使用上述配置,我们的系统一直可以正常运行,且未出现问题。
但是新产品在接入使用时,却遇到了问题,下面将详细说明。

3. 问题分析及解决

问题1:消息无法发送成功。

现象及分析

产品接入方使用的kakfa客户端版本也是 0.8.1,在配置了fa-common1.ip1.lede.com和fa-common2.ip2.lede.com的host后,依然无法成功发送kafka消息,但是端口是通的。
后来经过使用方同学抓包分析,发现kafka消息的发送地址,并不是配置文件中 metadata.broker.list 的列表,而是下图中的地址:
alt

为什么会出现上面的状况,这就需要深入了解kafka producer 消息发送机制了。
从0.8.1.1 版本开始,producer 除了提供scala版本的代码,也提供了 java版本的client代码。我们这里都是针对scala版本的代码来讲的。

下面的图来源于网络,能够更加直观的说明异步发送模式:
alt

消息在发送时,会先写入缓存队列,同时启动一个消息发送线程( SendThread )批量对消息发送;
消息发送线程的 handle() 处理方法中,在调用send() 发送消息前,会定期调用brokerPartitionInfo.updateInfo() 方法,刷新 kafka broker 的metaData ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
//scala 代码
def handle(events: Seq[KeyedMessage[K,V]]) {
. . .
if (topicMetadataRefreshInterval >= 0 &&
SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
topicMetadataToRefresh.clear
lastTopicMetadataRefreshTime = SystemTime.milliseconds
}
. . .
}

由于lastTopicMetadataRefreshTime 初始为0,所以第一次发送消息时,会触发一次 kafka broker 的metaData 信息的更新;后续send()方法发送消息时,使用的地址也是从kafka broker的metaData获取的。

小结

  • kafka producer发送消息的地址,并不是直接使用metadata.broker.list 中的内容;
  • producer 可以根据metadata.broker.list 中的配置,获取kafka broker 的完整metaData 信息。这样设计的目的很明显,是为了在kafka broker运行过程中,在出现故障、或增加节点时,能使 producer 平滑适应。
  • 这也表示在配置 metadata.broker.list 时,并不需要配置集群的完整broker,因为producer 会根据 metadata.broker.list自动去获取集群的完整broker 信息;

了解了消息的发送机制后,再看我们之前的问题,为什么从抓包中看到kafka消息发到一个新地址 hz-10.ip1.lede.com?

答: kafka broker 的metaData 信息返回的地址就是 hz-10.ip1.lede.com,那这又是什么原因?

通过查询相关资料,发现这个与kafka集群的server端配置有关,相关配置说明如下:

1
2
3
4
5
6
7
8
9
10
//Socket Server Settings
listeners=PLAINTEXT://:9092
# The port the socket server listens on
# port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>

当Kafka broker启动时,它会在ZK上注册自己的IP和端口号,客户端就通过这个IP和端口号来连接。如果broker如果没有配置advertised.host.name,会默认使用 host.name注册,如果host.name 也没有,则使用 java.net.InetAddress.getCanonicalHostName() 的返回值。

而我们的kafka server配置如下,未配置advertised.host.name 和 host.name,所以会默认使用机器名去ZK注册。

1
2
3
4
5
broker.id=1
inter.broker.protocol.version=0.10.1.0
log.message.format.version=0.10.1.0
port=9093
#listeners=PLAINTEXT://:9093

在服务器上,使用 hostname命令,查询机器的名称,得到和 kafka消息的真实发送地址一致。
alt

至此,无法成功发送kafka消息的原因终于清楚了。产品接入方由于与我们不在一个网段,属于外网,无法通过机器名直接解析地址。

解决方案

解决方法有两种:

(1)使用方在服务器上,不仅需要配置 metadata.broker.list 中地址的host,还需要配置kafka消息发送时,真正使用的地址的host;

(2)需要kafka server 调整配置文件server.properties。配置 advertised.host.name 地址为外网可以访问的IP,这样kafka broker注册ZK的时候,会直接用IP注册,producer 可以直接使用ip发送消息。

问题2:由kafka broker 服务宕机, 引发的Tomcat服务不可用。

现象及分析

原以为解决问题1,基本就可以高枕无忧了,没想到过了几天又出现了一个新更严重的问题:kafka消息发送方反馈,他们测试环境linux 服务器出现了文件描述符超过上限,服务不可用状况,且怀疑与kafka有关。

我们紧急查看了kafka集群状态,发现那段时间,broker全挂了,即整个kafka都无法使用了。最终经过分析,发现这居然是kafka客户端 0.8.1.1的一个bug。

如前文所说,生产者会定期更新broker metadata 信息,默认10分钟一次,成功后会修改最后更新时间。但是如果更新失败了,会导致每发送一个消息,都触发一次更新(由于最后成功更新时间一直未变)。而获取集群 metadata 信息会产生网络请求,在kafka客户端 0.8.1.1版本,未对失败请求做关闭连接操作。而且获取失败,还伴随遍历broker list多次获取等操作、所以每发送一个消息,并不只建立一个网络连接。
随着消息的增加,出现大量未关闭的连接,最终导致linux服务器出现了文件描述符超过上限,服务不可用。

解决方案

在kafka client Version 0.8.2.0 版本后修复的这个问题,所以为了解决这个问题,只能升级版本了。
(详细也可参考 https://issues.apache.org/jira/browse/KAFKA-1041)

下面是其他人对这个问题的描述,基本和我们的症状一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Number of file handles increases indefinitely in producer if broker host is unresolvable

Description
We found a issue that if broker host is un resolvable, the number of file handle keep increasing for every message we produce and eventually it uses up all available files handles in operating system. If broker itself is not running and broker host name is resolvable, open file handles count stays flat.
lsof output shows number of these open file handles continue to grow for every message we produce.
java 19631 relango 81u sock 0,6 0t0 196966526 can't identify protocol
I can easily reproduce this with console producer, If I run console producer with right hostname and if broker is not running, the console producer will exit after three tries. But If I run console producer with unresolvable broker, it throws below exception and continues to wait for user input, every time I enter new message, it opens socket and file handle count keeps increasing..
Here is Exception in producer
ERROR fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test-1378245487417)] from broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
at kafka.utils.Utils$.swallow(Utils.scala:186)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:30)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:59)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:151)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37)
... 12 more

0.8.2.0版本修复上述问题,连接异常会释放:
alt

4. 结束语

目前kafka 客户端版本已经更新到 0.11.0.0 ,每个版本的更新,都会出现一些新特征,并修复不少bug。
我们这边相关同事,已经开始着手研究新版本,并后期长期跟进,从而减少系统出现问题的概率。