本文共 4153 字,大约阅读时间需要 13 分钟。
在某次运维发现线上的kafka server集群的默认配置的size太小,不能满足业务发送数据的要求,导致业务阻塞,于是,更改了kafka server的某项参数的size大小之后,并重启了线上kafka server集群。
在重启集群之后,线上实时业务消费kafka topic的消费者开始报错,在消费端的错误信息为:"Container exception":org.apache.kafka.common.errors.TimeoutException: Timeout of 6000ms expected expired before successfully committing offsets{orders-5=OffsetAndMetadata{offset=197572354, leaderEpoch=null, metadata=''}}
[Consumer clientId=consumer-25, groupId=orderconsumer-my-consumer] Offset commit failed on partition order-3 at offset 197449610: The coordinator is loading and hence can't process requests.
输出日志信息待补充
在kafka server 集群重启后,业务这边消费端的偏移量无法提交,并不断报上述的错误,导致某些线上业务出现异常。我们第一直觉上,是认为可能是kafak 集群重启后,导致消费端和kafka server 集群的网络通信有问题,因此,在线上业务反馈异常后马上重启了线上的消费端的java进程服务,之后业务正常消费并提交偏移量offset数据。
事后,我们通过分析上述的日志,主要问题是kafka server集群的组协调器在loading __consumer_offsets这个topic的数据,而我们的这个topic的数据量有接近七八百兆,因此,整个loading处理数据完成大概要15分钟左右。因此,上面我们通过重启业务消费端的机器其实是无效的,因为我们重启完成后,kafka server的loading也差不多操作完成。
在kafka 0.9.0版本之后,消费端consumer的偏移量每次提交是保存在kafka的一个特殊的topic中,即__consumer_offsets这个topic中,而这个topic的配置需要特殊的配置。
在与运维沟通过程中,发现运维一直以为有在server.conf配置文件中配置这个topic的过期时间以及压缩策略,…
log.dirs=/data/kafka-logs log.cleaner.enable=true log.cleanup.policy=delete // delete | compact log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 …
以上配置对于特殊的topic,比如__consumer_offsets不一定有效,可以通过bin目录自带的kafka-config.sh脚本查看,操作如下:
./kafka-configs.sh --zookeeper 172.19.228.188:2181 --entity-type topics --entity-name __consumer_offsets --describe
显示如下:
Configs for topic
__consumer_offsets
are segment.bytes=104857600,clieanup.policy=compact,compression.type=producer
看的出来,segment.bytes、cleanup.policy、compression.type这三个配置项是针对topic,server.conf配置log.cleanup.policy,log.segment.bytes没有效果。
./kafka-configs.sh --zookeeper 172.19.228.188:2181 --entity-type topics --entity-name __consumer_offsets --alter--delete-config cleanup.policy
默认状况下,_consumer_offsets有50个分区。
查看offset内容的数据:bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
输出:
...[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434][console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246][console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246][console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246][console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436][console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436][console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436] ...
在_consumer_offsets topic中的每一项日志格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
offset是从coordinator的缓存中读取的 To avoid re-processing the last message read if a consumer is restarted, the commited offset should be the next message your application should consume, i.e:: lastoffset+1。 即kafak提交的位移是下一条消费记录的位移。当期__consumer_offsets的清理策略为compact,日志保留周期为24小时,但是系统默认的log.cleaner.enable为false
,导致kafka不会对超过保留周期的数据进行压缩处理,topic保留了系统上线以来的所有历史数据。
https://www.oreilly.com/library/view/apache-kafka-cookbook/9781785882449/ch02s04.html
https://dbaplus.cn/news-73-1202-1.html
转载地址:http://zmomi.baihongyu.com/