Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 702|回复: 1

[默认分类] Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性測试,以及各种坑

[复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2018-3-26 09:42:19 | 显示全部楼层 |阅读模式

      
       
       Kafka 0.9版本号对java client的api做出了较大调整,本文主要总结了Kafka 0.9在集群搭建、高可用性、新API方面的相关过程和细节,以及本人在安装调试过程中踩出的各种坑。
        
       关于Kafka的结构、功能、特点、适用场景等,网上到处都是,我就不再赘述了,直接进入正文
        
       Kafka 0.9集群安装配置
        
       操作系统:CentOS 6.5
        
        
       1. 安装Java环境
           ZooKeeper和Kafka的执行都须要Java环境。所以先安装JRE。Kafka默认使用G1垃圾回收器,假设不更改垃圾回收器,官方推荐使用 7u51以上版本号的JRE。假设你使用老版本号的JRE。须要更改Kafka的启动脚本,指定G1以外的垃圾回收器。
           Java环境的安装过程在此不赘述了。
       
        
        
       2. Zookeeper集群搭建
           Kafka依赖Zookeeper管理自身集群(Broker、Offset、Producer、Consumer等),所以先要安装 Zookeeper。自然,为了达到高可用的目的,Zookeeper自身也不能是单点,接下来就介绍怎样搭建一个最小的Zookeeper集群(3个 zk节点)
           此处选用Zookeeper的版本号是3.4.6。此为Kafka0.9中推荐的Zookeeper版本号。
          
           首先解压
       
      
    1. tar -xzvf zookeeper-3.4.6.tar.gz
    复制代码
       

       
           进入zookeeper的conf文件夹,将zoo_sample.cfg复制一份,命名为zoo.cfg,此即为Zookeeper的配置文件
       
      
    1. cp zoo_sample.cfg zoo.cfg
    复制代码
       

       
           编辑zoo.cfg
       
       
    1. # The number of milliseconds of each tick
    2. tickTime=2000
    3. # The number of ticks that the initial
    4. # synchronization phase can take
    5. initLimit=10
    6. # The number of ticks that can pass between
    7. # sending a request and getting an acknowledgement
    8. syncLimit=5
    9. # the directory where the snapshot is stored.
    10. dataDir=/data/zk/zk0/data
    11. dataLogDir=/data/zk/zk0/logs
    12. # the port at which the clients will connect
    13. clientPort=2181
    14. server.0=10.0.0.100:4001:4002
    15. server.1=10.0.0.101:4001:4002
    16. server.2=10.0.0.102:4001:4002
    复制代码
       
       
        dataDir和dataLogDir的路径须要在启动前创建好
        clientPort为zookeeper的服务端
        server.0/1/2为zk集群中三个node的信息,定义格式为hostname:port1:port2,当中port1是node间通信使用的端口,port2是node选举使用的端口,需确保三台主机的这两个端口都是互通的
       
           在另外两台主机上执行相同的操作,安装并配置zookeeper
           分别在三台主机的dataDir路径下创建一个文件名称为myid的文件。文件内容为该zk节点的编号。
       比如在第一台主机上建立的myid文件内容是0,第二台是1。
        
           接下来,启动三台主机上的zookeeper服务:
       
      
    1. bin/zkServer.sh start
    复制代码
       

       
           3个节点都启动完毕后,可依次执行例如以下命令查看集群状态:
       
      
    1. bin/zkServer.sh status
    复制代码
       

       
           命令输出例如以下:
       
      
    1. Mode: leader 或 Mode: follower
    复制代码

       
           3个节点中。应有1个leader和两个follower
        
           验证zookeeper集群高可用性:
           假设眼下3个zk节点中。server0为leader,server1和server2为follower
           我们停掉server0上的zookeeper服务:
       
      
    1. bin/zkServer.sh stop
    复制代码
       

       
           再到server1和server2上查看集群状态。会发现此时server1(也有可能是server2)为leader,还有一个为follower。
       

           再次启动server0的zookeeper服务,执行zkServer.sh status检查。发现新启动的server0也为follower
           至此。zookeeper集群的安装和高可用性验证完毕。
       
        
           附:Zookeeper默认会将控制台信息输出到启动路径下的zookeeper.out中。显然在生产环境中我们不能同意Zookeeper这样做,通过例如以下方法,能够让Zookeeper输出按尺寸切分的日志文件:
           改动conf/log4j.properties文件,将
           zookeeper.root.logger=INFO, CONSOLE
           改为
           zookeeper.root.logger=INFO, ROLLINGFILE
           改动bin/zkEnv.sh文件。将
           ZOO_LOG4J_PROP="INFO,CONSOLE"
           改为
           ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
           然后重新启动zookeeper,就ok了
        
        
       3. Kafka集群搭建
           此例中,我们会安装配置一个有两个Broker组成的Kafka集群。并在其上创建一个两个分区的Topic
           本例中使用Kafka最新版本号0.9.0.1
        
           首先解压
       
      
    1. tar -xzvf kafka_2.11-0.9.0.1.tgz
    复制代码
       

       
           编辑config/server.properties文件,以下列出关键的參数
       
      
    1. #此Broker的ID。集群中每一个Broker的ID不可相同
    2. broker.id=0
    3. #监听器,端口号与port一致就可以
    4. listeners=PLAINTEXT://:9092
    5. #Broker监听的端口
    6. port=9092
    7. #Broker的Hostname,填主机IP就可以
    8. host.name=10.0.0.100
    9. #向Producer和Consumer建议连接的Hostname和port(此处有坑,具体见后)
    10. advertised.host.name=10.0.0.100
    11. advertised.port=9092
    12. #进行IO的线程数。应大于主机磁盘数
    13. num.io.threads=8
    14. #消息文件存储的路径
    15. log.dirs=/data/kafka-logs
    16. #消息文件清理周期。即清理x小时前的消息记录
    17. log.retention.hours=168
    18. #每一个Topic默认的分区数,一般在创建Topic时都会指定分区数,所以这个配成1就可以了
    19. num.partitions=1
    20. #Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口就可以
    21. zookeeper.connect=10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
    复制代码
       

       
           配置项的具体说明请见官方文档:http://kafka.apache.org/documentation.HTML#brokerconfigs
        
           此处的坑:
       
       
         依照官方文档的说法。advertised.host.name和advertised.port这两个參数用于定义集群向Producer和 Consumer广播的节点host和port,假设不定义的话。会默认使用host.name和port的定义。
         
         但在实际应用中,我发现假设不定义 advertised.host.name參数。使用Java客户端从远端连接集群时,会发生连接超时,抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired

    经过debug发现,连接到集群是成功的。但连接到集群后更新回来的集群meta信息却是错误的:

       

       
         
         

         
       
         能够看到,metadata中的Cluster信息。节点的hostname是iZ25wuzqk91Z这种一串数字,而不是实际的ip地址 10.0.0.100和101。iZ25wuzqk91Z事实上是远端主机的hostname,这说明在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name。而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的。要解决这一问题,把host.name和advertised.host.name都配置成绝对 的ip地址就能够了。
         
         
       

       
       
     
           接下来。我们在还有一台主机也完毕Kafka的安装和配置。然后在两台主机上分别启动Kafka:
       
      
    1. bin/kafka-server-start.sh -daemon config/server.properties
    复制代码
       

       
           此处的坑:
      

       
         官方给出的后台启动kafka的方法是:
         

       
    1. bin/kafka-server-start.sh config/server.properties &
    复制代码

         但用这种方式启动后,仅仅要断开Shell或登出,Kafka服务就会自己主动shutdown,不知是OS的问题还是SSH的问题还是Kafka自己的问题,总之我改用-daemon方式启动Kafka才不会在断开shell后自己主动shutdown。
          
         
       

       

        
           接下来,我们创建一个名为test,拥有两个分区,两个副本的Topic:
      
    1. bin/kafka-topics.sh --create --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --replication-factor 2 --partitions 2 --topic test
    复制代码

       
           创建完毕后,使用例如以下命令查看Topic状态:
       
      
    1. bin/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test
    复制代码
       

       
           输出:
       
      
    1. Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
    2.      Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
    3.      Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
    复制代码

       
           解读:test这个topic,当前有2个分区,分别为0和1,分区0的Leader是1(这个1是broker.id)。分区0有两个 Replica(副本),各自是1和0,这两个副本中。Isr(In-sync)的是0和1。分区2的Leader是0,也有两个Replica,相同也 是两个replica都是in-sync状态
       

       至此。Kafka 0.9集群的搭建工作就完毕了。接下来我们将介绍新的Java API的使用,以及集群高可用性的验证測试。
       
       
      
       

       
       

        4. 使用Kafka的Producer API来完毕消息的推送
         
        1) Kafka 0.9.0.1的java client依赖:
         
       
    1.         <dependency>
    2.             <groupId>org.apache.kafka</groupId>
    3.             <artifactId>kafka-clients</artifactId>
    4.             <version>0.9.0.1</version>
    5.         </dependency>
    复制代码

         
        &nbsp;
        2) 写一个KafkaUtil工具类,用于构造Kafka Client
         
       
    1. public class KafkaUtil {
    2.         private static KafkaProducer<String, String> kp;
    3.         public static KafkaProducer<String, String> getProducer() {
    4.                 if (kp == null) {
    5.                         Properties props = new Properties();
    6.                         props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
    7.                         props.put("acks", "1");
    8.                         props.put("retries", 0);
    9.                         props.put("batch.size", 16384);
    10.                         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    11.                         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    12.                         kp = new KafkaProducer<String, String>(props);
    13.                 }
    14.                 return kp;
    15.         }
    16. }
    复制代码
       

       

         
        &nbsp; KafkaProducer<K,V>的K代表每条消息的key类型,V代表消息类型。
        消息的key用于决定此条消息由哪一个partition接收,所以我们须要保证每条消息的key是不同的。
        &nbsp; Producer端的经常使用配置
       
         bootstrap.servers:Kafka集群连接串,能够由多个host:port组成
         acks:broker消息确认的模式,有三种:
    0:不进行消息接收确认,即Client端发送完毕后不会等待Broker的确认
    1:由Leader确认,Leader接收到消息后会马上返回确认信息
    all:集群完整确认。Leader会等待全部in-sync的follower节点都确认收到消息后,再返回确认信息
    我们能够依据消息的重要程度,设置不同的确认模式。默觉得1
         retries:发送失败时Producer端的重试次数。默觉得0
         batch.size:当同一时候有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。假设设置为0。则每条消息都独立发送。默觉得16384字节
         linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下。配置linger.ms能够让Producer在发送消息前等待一定时间。以积累很多其它的消息打包发送。达到节省网络资源的目的。默觉得0
         key.serializer/value.serializer:消息key/value的序列器Class,依据key和value的类型决定
         buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中。假设消息产生的速度大于消息发送的速度。那么缓冲池满后发送消息的请求会被堵塞。默认33554432字节(32MB)
       
        &nbsp; 很多其它的Producer配置见官网:http://kafka.apache.org/documentation.html#producerconfigs
        &nbsp;
        &nbsp; 3) 写一个简单的Producer端,每隔1秒向Kafka集群发送一条消息:
       
    1. public class KafkaTest {
    2.         public static void main(String[] args) throws Exception{
    3.                 Producer<String, String> producer = KafkaUtil.getProducer();
    4.                 int i = 0;
    5.                 while(true) {
    6.                         ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
    7.                         producer.send(record, new Callback() {
    8.                                 public void onCompletion(RecordMetadata metadata, Exception e) {
    9.                                         if (e != null)
    10.                                                 e.printStackTrace();
    11.                                         System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
    12.                                 }
    13.                         });
    14.                         i++;
    15.                         Thread.sleep(1000);
    16.                 }
    17.         }
    18. }
    复制代码

        &nbsp;
        &nbsp; 在调用KafkaProducer的send方法时,能够注冊一个回调方法,在Producer端完毕发送后会触发回调逻辑。在回调方法的 metadata对象中,我们能够获取到已发送消息的offset和落在的分区等信息。注意,假设acks配置为0。依旧会触发回调逻辑,仅仅是拿不到 offset和消息落地的分区信息。
        &nbsp;&nbsp;&nbsp; 跑一下。输出是这种:
       

         
          message send to partition 0, offset: 28
          
    message send to partition 1, offset: 26
          
    message send to partition 0, offset: 29
          
    message send to partition 1, offset: 27
          
    message send to partition 1, offset: 28
          
    message send to partition 0, offset: 30
          
    message send to partition 0, offset: 31
          
    message send to partition 1, offset: 29
          
    message send to partition 1, offset: 30
          
    message send to partition 1, offset: 31
          
    message send to partition 0, offset: 32
          
    message send to partition 0, offset: 33
          
    message send to partition 0, offset: 34
          
    message send to partition 1, offset: 32
          
       

        &nbsp; 乍一看似乎offset乱掉了,但事实上这是由于消息分布在了两个分区上,每一个分区上的offset事实上是正确递增的。
        &nbsp;
        5. 使用Kafka的Consumer API来完毕消息的消费
        &nbsp;
        1) 改造一下KafkaUtil类,增加Consumer client的构造。
         
       
    1. public class KafkaUtil {
    2.         private static KafkaProducer<String, String> kp;
    3.         private static KafkaConsumer<String, String> kc;
    4.         public static KafkaProducer<String, String> getProducer() {
    5.                 if (kp == null) {
    6.                         Properties props = new Properties();
    7.                         props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
    8.                         props.put("acks", "1");
    9.                         props.put("retries", 0);
    10.                         props.put("batch.size", 16384);
    11.                         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    12.                         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    13.                         kp = new KafkaProducer<String, String>(props);
    14.                 }
    15.                 return kp;
    16.         }
    17.        
    18.         public static KafkaConsumer<String, String> getConsumer() {
    19.                 if(kc == null) {
    20.                         Properties props = new Properties();
    21.                         props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
    22.                         props.put("group.id", "1");
    23.                         props.put("enable.auto.commit", "true");
    24.                         props.put("auto.commit.interval.ms", "1000");
    25.                         props.put("session.timeout.ms", "30000");
    26.                         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    27.                         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    28.                         kc = new KafkaConsumer<String, String>(props);
    29.                 }
    30.                 return kc;
    31.         }
    32. }
    复制代码

        &nbsp; 相同,我们介绍一下Consumer经常使用配置
       
         bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含义一样,不再赘述
         fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer会等待消息积累到一定尺寸后进行批量拉取。默觉得1,代表有一条就拉一条
         max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默觉得1M
         group.id:Consumer的group id,同一个group下的多个Consumer不会拉取到反复的消息,不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。
         enable.auto.commit:是否自己主动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动改动offset)。默觉得true
         auto.commit.interval.ms:自己主动提交offset的间隔毫秒数,默认5000。
       
        &nbsp; 全部的Consumer配置见官方文档:http://kafka.apache.org/documentation.html#newconsumerconfigs
        &nbsp;
        2) 编写Consumer端:
       
    1. public class KafkaTest {
    2.         public static void main(String[] args) throws Exception{
    3.                 KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
    4.                 consumer.subscribe(Arrays.asList("test"));
    5.                 while(true) {
    6.                         ConsumerRecords<String, String> records = consumer.poll(1000);
    7.                         for(ConsumerRecord<String, String> record : records) {
    8.                                 System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
    9.                         }
    10.                 }
    11.         }
    12. }
    复制代码

        &nbsp;
        &nbsp; 执行输出:
       

         
          fetched from partition 0, offset: 28, message: this is message0
          
    fetched from partition 0, offset: 29, message: this is message2
          
    fetched from partition 0, offset: 30, message: this is message5
          
    fetched from partition 0, offset: 31, message: this is message6
          
    fetched from partition 0, offset: 32, message: this is message10
          
    fetched from partition 0, offset: 33, message: this is message11
          
    fetched from partition 0, offset: 34, message: this is message12
          
    fetched from partition 1, offset: 26, message: this is message1
          
    fetched from partition 1, offset: 27, message: this is message3
          
    fetched from partition 1, offset: 28, message: this is message4
          
    fetched from partition 1, offset: 29, message: this is message7
          
    fetched from partition 1, offset: 30, message: this is message8
          
    fetched from partition 1, offset: 31, message: this is message9
          
    fetched from partition 1, offset: 32, message: this is message13
          
       

        &nbsp;
        说明:
       
         KafkaConsumer的poll方法即是从Broker拉取消息。在poll之前首先要用subscribe方法订阅一个Topic。
         poll方法的入參是拉取超时毫秒数,假设没有新的消息可供拉取。consumer会等待指定的毫秒数,到达超时时间后会直接返回一个空的结果集。
         如 果Topic有多个partition。KafkaConsumer会在多个partition间以轮询方式实现负载均衡。假设启动了多个 Consumer线程。Kafka也能够通过zookeeper实现多个Consumer间的调度。保证同一组下的Consumer不会反复消费消息。注 意。Consumer数量不能超过partition数。超出部分的Consumer无法拉取到不论什么数据。
         能够看出。拉取到的消息并非全然顺序化的。kafka仅仅能保证一个partition内的消息先进先出,所以在跨partition的情况下,消息的顺序是没有保证的。
         本 例中採用的是自己主动提交offset,Kafka client会启动一个线程定期将offset提交至broker。假设在自己主动提交的间隔内发生问题(比方整个JVM进程死掉)。那么有一部分消息是会被 反复消费的。要避免这一问题,可使用手动提交offset的方式。构造consumer时将enable.auto.commit设为false,并在代 码中用consumer.commitSync()来手动提交。
       
        假设不想让kafka控制consumer拉取数据时在partition间的负载均衡,也能够手工控制:
       
    1.         public static void main(String[] args) throws Exception{
    2.                 KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
    3.             String topic = "test";
    4.             TopicPartition partition0 = new TopicPartition(topic, 0);
    5.             TopicPartition partition1 = new TopicPartition(topic, 1);
    6.             consumer.assign(Arrays.asList(partition0, partition1));
    7.                 while(true) {
    8.                         ConsumerRecords<String, String> records = consumer.poll(100);
    9.                         for(ConsumerRecord<String, String> record : records) {
    10.                                 System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
    11.                         }
    12.                         consumer.commitSync();
    13.                 }
    14.         }
    复制代码

        &nbsp;使用consumer.assign()方法为consumer线程指定1个或多个partition。
         
        &nbsp;
        &nbsp; 此处的坑:
       
         在測试中我发现,假设用手工指定partition的方法拉取消息,不知为何kafka的自己主动提交offset机制会失效。必须使用手动方式才干正确提交已消费的消息offset。
         
        &nbsp;
        &nbsp; 题外话:
       
         在 真正的应用环境中。Consumer端将消息拉取下来后要做的肯定不止是输出出来这么简单,在消费消息时非常有可能须要花掉很多其它的时间。
         
         1个 Consumer线程消费消息的速度非常有可能是赶不上Producer产生消息的速度。所以我们不得不考虑Consumer端採用多线程模型来消费消息。
    然而KafkaConsumer并非线程安全的,多个线程操作同一个KafkaConsumer实例会出现各种问题,Kafka官方对于Consumer端的多线程处理给出的指导建议例如以下:

    1. 每一个线程都持有一个KafkaConsumer对象
    优点:

         
          实现简单
          不须要线程间的协作,效率最高
          最easy实现每一个Partition内消息的顺序处理
         
         弊端:
         
          每一个KafkaConsumer都要与集群保持一个TCP连接
          线程数不能超过Partition数
          每一batch拉取的数据量会变小,对吞吐量有一定影响
         
         2. 解耦,1个Consumer线程负责拉取消息。数个Worker线程负责消费消息
    优点:
         
          可自由控制Worker线程的数量,不受Partition数量限制
         
         弊端:
         
          消息消费的顺序无法保证
          难以控制手动提交offset的时机
         
         个人觉得另外一种方式更加可取,consumer数不能超过partition数这个限制是非常要命的。不可能为了提高Consumer消费消息的效率而把Topic分成很多其它的partition,partition越多。集群的高可用性就越低。
         
        &nbsp;
        &nbsp;
        6. Kafka集群高可用性測试
        &nbsp;
        1) 查看当前Topic的状态:
       
    1. /kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test
    复制代码

        &nbsp; 输出:
       
       
    1. Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
    2.    Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
    3.    Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
    复制代码
         
        &nbsp; 能够看到。partition0的leader是broker1,parition1的leader是broker0
        &nbsp;
        2) 启动Producer向Kafka集群发送消息
        &nbsp; 输出:
       

         
          message send to partition 0, offset: 35
          
    message send to partition 1, offset: 33
          
    message send to partition 0, offset: 36
          
    message send to partition 1, offset: 34
          
    message send to partition 1, offset: 35
          
    message send to partition 0, offset: 37
          
    message send to partition 0, offset: 38
          
    message send to partition 1, offset: 36
          
    message send to partition 1, offset: 37
          
       

        &nbsp;
        3) 登录SSH将broker0。也就是partition 1的leader kill掉
        &nbsp;
        &nbsp; 再次查看Topic状态:
         
       
    1. Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
    2.   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1
    3.   Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1
    复制代码

         
        &nbsp; 能够看到,当前parition0和parition1的leader都是broker1了
        &nbsp;
        &nbsp; 此时再去看Producer的输出:
       

         
          [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with /10.0.0.100 disconnected
          

          java.net.ConnectException: Connection refused: no further information
          

          &nbsp;&nbsp; &nbsp;at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
          

          &nbsp;&nbsp; &nbsp;at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
          

          &nbsp;&nbsp; &nbsp;at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
          

          &nbsp;&nbsp; &nbsp;at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
          

          &nbsp;&nbsp; &nbsp;at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
          

          &nbsp;&nbsp; &nbsp;at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
          

          &nbsp;&nbsp; &nbsp;at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
          

          &nbsp;&nbsp; &nbsp;at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
          

          &nbsp;&nbsp; &nbsp;at java.lang.Thread.run(Thread.java:745)
    [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 7 to Cluster(nodes = [Node(1, 10.0.0.101, 9092)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test, partition = 0, leader = 1, replicas = [1,], isr = [1,]])

          
       

        &nbsp; 能看到Producer端的DEBUG日志显示与broker0的链接断开了,此时Kafka立马開始更新集群metadata,更新后的metadata表示broker1如今是两个partition的leader。Producer进程非常快就恢复继续执行,没有漏发不论什么消息。能够看出Kafka集群的故障切换机制还是非常厉害的
        &nbsp;
        4) 我们再把broker0启动起来
       
    1. bin/kafka-server-start.sh -daemon config/server.properties
    复制代码

        &nbsp; 然后再次检查Topic状态:
       
       
    1. Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
    2.    Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
    3.    Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
    复制代码
         

         
        &nbsp; 我们看到。broker0启动起来了。而且已经是in-sync状态(注意Isr从1变成了1,0),但此时两个partition的leader还都是 broker1,也就是说当前broker1会承载全部的发送和拉取请求。这显然是不行的,我们要让集群恢复到负载均衡的状态。
       

        &nbsp; 这时候。须要使用Kafka的选举工具触发一次选举:
         
       
    1. bin/kafka-preferred-replica-election.sh --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
    复制代码
       

         
        &nbsp; 选举完毕后,再次查看Topic状态:
       
       
    1. Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
    2.    Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
    3.    Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0
    复制代码
         

         
        &nbsp; 能够看到。集群又一次回到了broker0挂掉之前的状态
        &nbsp; 但此时,Producer端产生了异常:
       

         
          org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
          
       

        &nbsp; 原因是Producer端在尝试向broker1的parition0发送消息时,partition0的leader已经切换成了broker0,所以消息发送失败。
        &nbsp; 此时用Consumer去消费消息,会发现消息的编号不连续了。确实漏发了一条消息。这是由于我们在构造Producer时设定了retries=0,所以在发送失败时Producer端不会尝试重发。
         
        &nbsp; 将retries改为3后再次尝试,会发现leader切换时再次发生了相同的问题,但Producer的重发机制起了作用,消息重发成功,启动Consumer端检查也证实了全部消息都发送成功了。
        &nbsp;
       
         &nbsp; 每次集群单点发生问题恢复后。都须要进行又一次选举才干彻底恢复集群的leader分配,假设嫌每次这样做非常麻烦,能够在broker的配置文件(即 server.properties)中配置auto.leader.rebalance.enable=true,这样broker在启动后就会自己主动进行又一次选举
         
        &nbsp;
        至此。我们通过測试证实了集群出现单点故障和恢复的过程中。Producer端能够保持正确运转。接下来我们看一下Consumer端的表现:
        &nbsp;
       

        5) 同一时候启动Producer进程和Consumer进程
        &nbsp; 此时Producer一边在生产消息,Consumer一边在消费消息
        &nbsp;
        6) 把broker0干掉。观察Consumer端的输出:
        能看到,在broker0挂掉后,consumer也端产生了一系列INFO和WARN输出,但同Producer端一样。若干秒后自己主动恢复,消息仍然是连续的,并未出现断点。
        &nbsp;
        7) 再次把broker0启动。并触发又一次选举,然后观察输出:
       

         
          fetched from partition 0, offset: 418, message: this is message48
          
    fetched from partition 0, offset: 419, message: this is message49
          

          [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
          

          [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead.
          

          [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group.
          
    fetched from partition 1, offset: 392, message: this is message50
          
    fetched from partition 0, offset: 420, message: this is message51
          
       

        &nbsp; 能看到,重选举后Consumer端也输出了一些日志。意思是在提交offset时发现当前的调度器已经失效了,但非常快就又一次获取了新的有效调度器,恢复 了offset的自己主动提交,验证已提交offset的值也证明了offset提交并未因leader切换而错误发生。
        &nbsp;
        &nbsp; 如上。我们也通过測试证实了Kafka集群出现单点故障时,Consumer端的功能正确性。
       

       

        至此。Kafka+Zookeeper集群的安装配置、高可用性验证、Java Client的使用介绍就结束了。本人也是初用Kafka不久,有错误之处望拍砖,在使用Kafka集群时遇到了什么问题,也欢迎交流分享

       
       
      
    回复

    使用道具 举报

  • TA的每日心情
    奋斗
    2024-1-9 12:28
  • 签到天数: 25 天

    [LV.4]偶尔看看III

    发表于 2018-3-27 08:42:12 | 显示全部楼层
    太长,我搭建只要20分钟
    回复 支持 反对

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-21 06:40 , Processed in 0.386535 second(s), 38 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表