kafka消费者怎么样接受消息_kafka接收消息命令

1.Golang kafka简述和操作(sarama同步异步和消费组)

2.Kafka 设计详解之队列

3.kafka消费者java版本读取不到消息怎么办

4.kafka-docker上使用+常用指令

5.java kafka

6.Kafka总结

7.Kafka consumer 解析

8.kafka总结

9.Kafka消费者总结

kafka消费者怎么样接受消息_kafka接收消息命令

Golang kafka简述和操作(sarama同步异步和消费组)

       一、Kafka简述

        1. 为什么需要用到消息队列

        异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

        解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

        缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

        2.为什么选择kafka呢?

        这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

        kafka的优点:

        1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群

        kafka的缺点:

        1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

        3. Golang 操作kafka

        3.1. kafka的环境

        网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

        3.2. 第三方库

        github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组

        3.3. 消费者

        单个消费者

        funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{ ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{ ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList { ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{ fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return} ? wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) }deferpc.AsyncClose() wg.Done() ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}

        消费组

        funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{ ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){ ? errors := c.Errors() ? noti := c.Notifications()for{select{caseerr := <-errors: glog.Errorln(err)case<-noti: } ? }? }(c)formsg :=rangec.Messages() { ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) ? c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}

        3.4. 生产者

        同步生产者

        packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := &sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{ ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{ ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

        异步生产者

        funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){ ? errors := p.Errors() ? success := p.Successes()for{select{caseerr := <-errors:iferr !=nil{ ? glog.Errorln(err) }case<-success: } ? }? }(p)for{ ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) ? fmt.Fprintln(os.Stdout, v) ? msg := &sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), ? } ? p.Input() <- msg ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{ ? }}

        3.5. 结果展示->

        同步生产打印:

        分区ID:0,offset:90

        消费打印:

        Partition:0,Offset:90,key:,value:Hello World!

        异步生产打印:

        async:7272async:7616async:998

        消费打印:

        Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998

Kafka 设计详解之队列

       对于每个主题,Kafka群集都会维护一个分区日志,如下所示:

        每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到结构化的提交日志中。分区中的每个记录均分配有一个称为偏移的顺序ID号,该ID 唯一地标识分区中的每个记录。

        每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。(类似于游标指针的方式顺序处理数据,并且该指标可以任意移动)

        分区的设计结构

        生产者分区策略是 决定生产者将消息发送到哪个分区的算法, 主要有以下几种:

        kafka消息的有序性,是采用消息键保序策略来实现的。 一个topic,一个partition(分割),一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue。

        kafka发送进行消息压缩有两个地方,分别是生产端压缩和Broker端压缩。

        生产者端压缩 生产者压缩通常采用的GZIP算法这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。 配置参数:

        Broker压缩 大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但以下情况会引发Broker压缩

        消费者端解压 Kafka 会将启用了哪种压缩算法封装进消息集合中,在Consummer中进行解压操作。

        kafka提供以下特性来保证其消息的不丢失,从而保证消息的可靠性

        生产者确认机制 当 Kafka 的若干个 Broker(根据配置策略,可以是一个,也可以是ALL) 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

        生产者失败回调机制 生产者不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。producer.send(msg, callback) 采用异步的方式,当发生失败时会调用callback方法。

        失败重试机制 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

        消费者确认机制 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

        副本机制 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

        限定Broker选取Leader机制 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

        由于kafka生产者确认机制、失败重试机制的存在,kafka的消息不会丢失但是存在由于网络延迟等原因造成重复发送的可能性。 所以我们要考虑消息幂等性的设计。 kafka提供了幂等性Producer的方式来保证消息幂等性。使用 ****的方式开启幂等性。

        幂等性 Producer 的作用范围:

        Kafka事务 事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。 同样使用 的方式开启事务。

        consumer group是kafka提供的可扩展且具有容错性的消费者机制。它是由一个或者多个消费者组成,它们共享同一个Group ID. 组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

        consummer group有以下的特性:

        消费者位置 消费者位置,即位移。 消费者在消费的过程中需要记录自己消费了多少数据。 位移提交有自动、手动两种方式进行位移提交。

        Kafka通过一个内置Topic(__consumer_offsets)来管理消费者位移。

        rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

        Kafka提供了一个角色:coordinator来执行对于consumer group的管理。 Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。

        Rebalance 过程分为两步:Join 和 Sync。 Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

        Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

kafka消费者java版本读取不到消息怎么办

        在 上文 中我们介绍了 Kafka 的网络通信,本文打算详细分析 Kafka 的核心 — 队列 的设计和实现,来对 Kafka 进行更深一步的了解。

        队列是一种先进先出(FIFO)的数据结构,它是 Kafka 中最重要的部分,负责收集生产者生产的消息,并将这些消息传递给消费者。要实现一个队列有多种方式,Kafka 作为一个消息队列中间件,在设计队列时主要要考虑两个问题:

        乍一看到这个问题,我们会想,内存的读取速度远快于磁盘,如果追求性能,内存也充足的话,当然是将生产者产生的消息数据写到内存(比如用一个数组或者链表来存储队列数据),供消费者消费。真的是这样吗?

        下面我们依次分析下写内存和写磁盘文件的优缺点,首先,内存的优点是读写速度非常快,但是,如果我们的目标是设计「大数据量」下的「高吞吐量」的消息队列,会有以下几个问题:

        接下来我们来分析一下磁盘,写磁盘文件方式存储队列数据的优点就是能规避上述内存的缺点,但其有很严重的缺点,就是读写速度慢,如果纯依靠磁盘,那消息队列肯定做不到「高吞吐量」这个目标。

        分析了内存跟磁盘的优缺点,好像我们还是只能选写内存,但我们忽视了磁盘的两个情况:一是磁盘慢是慢在随机读写,如果是顺序读写,他的速度能达到 600MB/sec(RAID-5 磁盘阵列),并不慢,如果我们尽可能地将数据的读写设计成顺序的,可以大大提升性能。二是现代的操作系统会(尽可能地)将磁盘里的文件进行缓存

        有了操作系统级别的文件缓存,那用磁盘存储队列数据的方式就变得有优势了。首先,磁盘文件的数据会有文件缓存,所以不必担心随机读写的性能;其次,同样是使用内存,磁盘文件使用的是操作系统级别的内存,相比于在 Java 内存堆中存储队列,它没有 GC 问题,也没有 Java 对象的额外内存开销,更可以规避应用重启后的内存 load 数据耗时的问题,而且,文件缓存是操作系统提供的,因为我们只要简单的写磁盘文件,系统复杂性大大降低。

        因此,Kafka 直接使用磁盘来存储消息队列的数据。

        刚才我们已经决定用磁盘文件来存储队列数据,那么要如何选择数据结构呢?一般情况下,如果需要查找数据并随机访问,我们会用 B+ 树来存储数据,但其时间复杂度是 O(log N),由于我们设计的是消息队列,我们可以完全顺序的写收到的生产者消息,消费者消费时,只要记录下消费者当前消费的位置,往后消费就可以了,这样可以对文件尽可能的进行顺序读写,同时,时间复杂度是O(1)。其实,这跟我们写日志的方式很像,每条日志顺序 append 到日志文件。

        之前我们已经确定采用直接顺序写磁盘文件的方式来存储队列数据,下面我们来剖析下具体的实现细节。

        在 Kafka 中,用一个文件夹存储一条消息队列,成为一个 Log,每条消息队列由多个文件组成,每个文件称为一个 LogSegment,每当一个 LogSegment 的大小到达阈值,系统就会重新生成一个 LogSegment;当旧的 LogSegment 过期需要清理时(虽然磁盘空间相对于内存会宽裕很多,我们可以保存更长时间的消息数据,比如一周,以供消费者更灵活的使用,但还是需要定期清理太老的数据),系统会根据清理策略删除这些文件。

        现在我们知道一个队列(Log)是由多个队列段文件(LogSegment)组成的,那么 Kafka 是如何将这些文件逻辑上连接从而组成一条有序队列的呢?在生成每个队列段文件时,Kafka 用该段的初始位移来对其命名,如在新建一个队列时,会初始化第一个队列段文件,那么其文件名就是0,假设每个段的大小是固定值 L,那么第二个段文件名就是 L,第 N 个就是 (N - 1)* L。这样,我们就可以根据文件名对段文件进行排序,排序后的顺序就是整个队列的逻辑顺序。

        了解了队列的基本实现,下面我们就来分析下队列的核心操作—读和写。

        写操作发生在生产者向队列生产消息时,在上篇文章讲网络通信时我们已经说到,所有的客户端请求会根据协议转到一个 Handler 来具体处理,负责写操作的 Handler 叫 ProducerHandler,整个写请求的流程如下:

        之前我们说过,如果是顺序写,由于省掉了磁头寻址的时间,磁盘的性能还是很高的,我们看到 Kakfa 队列是以顺序方式写的,所以性能很高。但是,如果一台 Kafka 服务器有很多个队列,而硬盘的磁头是有限的,所以还是得在不同的队列直接来回切换寻址,性能会有所下降。

        队列的读操作发送在消费者消费队列数据时,由于队列是线性的,只需要记录消费者上次消费到了哪里(offset),接下去消费就好了。那么首先会有一个问题,由谁来记消费者到底消费到哪里了?

        一般情况下,我们会想到让服务端来记录各个消费者当前的消费位置,当消费者来拉数据,根据记录的消费位置和队列的当前位置,要么返回新的待消费数据,要么返回空。让服务端记录消费位置,当遇到网络异常时会有一些问题,比如服务端将消息发给消费者后,如果网络异常消费者没有收到消息,那么这条消息就被「跳过」了,当然我们可以借鉴二阶段提交的思想,服务端将消息发送给消费者后,标记状态为「已发送」,等消费者消费成功后,返回一个 ack 给服务端,服务端再将其标记为「成功消费」。不过这样设计还是会有一个问题,如果消费者没有返回 ack 给服务端,此时这条消息可能在已经被消费也可能还没被消费,服务端无从得知,只能根据人为策略跳过(可能会漏消息)或者重发(可能存在重复数据)。另一个问题是,如果有很多消费者,服务端需要记录每条消息的每个消费者的消费状态,这在大数据的场景下,非常消耗性能和内存。

        Kafka 将每个消费者的消费状态记录在消费者本身(隔一段时间将最新消费状态同步到 zookeeper),每次消费者要拉数据,就给服务端传递一个 offset,告诉服务端从队列的哪个位置开始给我数据,以及一个参数 length,告诉服务端最多给我多大的数据(批量顺序读数据,更高性能),这样就能使服务端的设计复杂度大大降低。当然这解决不了一致性的问题,不过消费者可以根据自己程序特点,更灵活地处理事务。

        下面就来分析整个读的流程:

        分布式系统中不可避免的会遇到一致性问题,主要是两块:生产者与队列服务端之间的一致性问题、消费者与队列服务端之间的一致性问题,下面依次展开。

        当生产者向服务端投递消息时,可能会由于网络或者其他问题失败,如果要保证一致性,需要生产者在失败后重试,不过重试又会导致消息重复的问题,一个解决方案是每个消息给一个唯一的 id,通过服务端的主动去重来避免重复消息的问题,不过这一机制目前 Kafka 还未实现。目前 Kafka 提供配置,供用户不同场景下选择允许漏消息(失败后不重试)还是允许重复消息(失败后重试)。

        由于在消费者里我们可以自己控制消费位置,就可以更灵活的进行个性化设计。如果我们在拉取到消息后,先增加 offset,然后再进行消息的后续处理,如果在消息还未处理完消费者就挂掉,就会存在消息遗漏的问题;如果我们在拉取到消息后,先进行消息处理,处理成功后再增加 offset,那么如果消息处理一半消费者挂掉,会存在重复消息的问题。要做到完全一致,最好的办法是将 offset 的存储与消费者放一起,每消费一条数据就将 offset+1。

        本文介绍了 Kafka 的队列实现以及其读写过程。Kafka 认为操作系统级别的文件缓存比 Java 的堆内存更省空间和高效,如果生产者消费者之间比较「和谐」的话,大部分的读写操作都会落在文件缓存,且在顺序读写的情况下,硬盘的速度并不慢,因此选择直接写磁盘文件的方式存储队列。在队列的读写过程中,Kafka 尽可能地使用顺序读写,并使用零拷贝来优化性能。最后,Kafka 让消费者自己控制消费位置,提供了更加灵活的数据消费方式。

kafka-docker上使用+常用指令

       3. 启动服务

       3.1 启动zookeeper

       启动zk有两种方式,第一种是使用kafka自己带的一个zk。

       bin/zookeeper-server-start.sh config/zookeeper.properties&

       另一种是使用其它的zookeeper,可以位于本机也可以位于其它地址。这种情况需要修改config下面的sercer.properties里面的zookeeper地址

       。例如zookeeper.connect=10.202.4.179:2181

       3.2 启动 kafka

       bin/kafka-server-start.sh config/server.properties

       4.创建topic

       bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test

       创建一个名为test的topic,只有一个副本,一个分区。

       通过list命令查看刚刚创建的topic

       bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181

       5.启动producer并发送消息启动producer

       bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

       启动之后就可以发送消息了

       比如

       test

       hello boy

       按Ctrl+C退出发送消息

       6.启动consumer

       bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning

       启动consumer之后就可以在console中看到producer发送的消息了

       可以开启两个终端,一个发送消息,一个接受消息。

       如果这样都不行的话,查看zookeeper进程和kafka的topic,一步步排查原因吧。

java kafka

        生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion

        启动镜像

        启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/server.properties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考

        其中172.17.0.3需要改成自己docker的网桥连接地址

        查看已启动容器

        查看所有容器

        启动未启动的容器

        进入kafka容器

        创建主题

        主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分)

        查看topic信息

        集群部署

        可以通过compose集群化部署过es,这里通过创建另一个compose.yml文件来部署kafka,配置文件参考 docker-compose集群部署

        生产者:

        消费者:

        方式一:从当前主题的迁移量位置+1开始取数据

        方式二:从当前主题第一条消息开始消费

        生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的

        单播消息:

        当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息

        多播消息:

        当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息

        查看消费组详细信息:

        CURRENT-OFFSET:最后被消费的偏移量

        LOG-END-OFFSET:消息总量(最后一条消息的偏移量)

        LAG :积压了多少条消息

        常见问题:

        1、如何防止消息丢失

        生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2

        消费者:把自动提交改成手动提交

        2、如何防止消息的重复消费

        针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;

        3、消息积压

        消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。

        ps:

        缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker

        参考链接:

        1、kafka安装教程--推荐

        2、kafka配置文件server.properties参数说明

        3、创建主题分区数

        4、解决docker容器启动不了的问题

        5、通过docker-compose集群部署

        6、学习视频

Kafka总结

       java kafka是什么,让我们一起了解一下?

        kafka是由Scala和Java编写,Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据,这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

        kafka的简单应用有哪些?

        Broker : Kafka集群包含一个或多个服务器,这种服务器被称为broker。

        Topic : 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

        Partition : Partition是物理上的概念,每个Topic包含一个或多个Partition。

       Producer : 负责发布消息到Kafka broker。

        Consumer : 消息消费者,向Kafka broker读取消息的客户端。

        Consumer Group : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

        kafka如何应用?

        通过对消费者项目的编写,案例代码操作如下: package?com.jbit.util; import?org.apache.kafka.clients.consumer.ConsumerRecord; import?org.springframework.kafka.annotation.KafkaListener; import?org.springframework.stereotype.Component; import?java.util.Optional; @Component public?class?MyKafkaListener?{ @KafkaListener(topics?=?{"message"}) public?void?listen(ConsumerRecord?record)?{ Optional?kafkaMessage?=?Optional.ofNullable(record.value()); if?(kafkaMessage.isPresent())?{ Object?message?=?kafkaMessage.get(); System.out.println("数据接收完毕:"+message); } } }

Kafka consumer 解析

        consumer 采用 pull(拉)模式从 broker 中读取数据。

        push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

        pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数

        据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有

        数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

        分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。

        可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。

        处理顺序 :拦截器->序列化器->分区器

        消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

        整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。

        一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。

        当前消费者需要提交的消费位移是offset+1

        在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。

        Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。

        Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。

        每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。

        日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。

        日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。

        在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

        Kafka 中有多种延时操作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。

        延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。

        为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。

        Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在

        初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

        Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

        生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。

        每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。

        在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中

        一旦上述数据写入操作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

        在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。

        Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所

        有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。

kafka总结

        上一篇说了Kafka consumer的处理逻辑、实现原理及相关的特点,本篇来看看Kafka 另一个client Consumer,作为生产者消费者的另一端,consumer提供了消费消息的能力,下面来看看Kafka中的consumer 应该如何正确使用及实现原理。

        常见的消息引擎中通常有 经典的生产者消费者模式 发布订阅模式 两种

        是一种点对点的方式,消息不会被重复消费,可以粗暴的理解为消息被消费后就被标记删除或者已删除了,这是常见的消息队列通常的模式。比如说进程间通信,这种基于队列实现消息传输服务的。

        相对于生产者 消费者模式,消息可能会被多方消费,可以简单的理解为一份报纸的内容,订阅它的人都可以读到它,当一个人读完之后也就没必要再次去读了。并且在发布订阅模式中,通常有个概念叫做topic,每个topic 有对应的发布者(publisher)、订阅者(subsciber)。

        那Kafka是如何实现生产者消费者两种模式的呢?往后看~

        kafka中有一个概念叫做consumer group,每个group 去订阅对应的topic,topic的每条消息只能发送到订阅它的消费者组的其中一个实例上,并且每个消费者至多使用一个消费者组来标示自己。这样不难得出,当某个topic 仅有一个group来消费时(组内有一个或者多个consumer),这个topic的消息的消费模式就是典型的生产者消费者模式。

        而当某个topic 被多个消费者组订阅,而每个组仅有一个消费者时,每条消息就会被广播到每个消费者上。

        这里需要注意下,还有个叫做独立消费者(standalone consumer)的概念,对于consumer group 是以group 为单位进行消息消费的,而standalone 会单独的执行消费,以consumer 实例为单位进行消费的。

        是时候来看看Kafka consumer 端的实现原理了,先从最基础的group 开始,当前较新版本的consumer是依赖于broker端的coordinator来完成组的管理的(主要是把分配方案通知到每个consumer实例上),当然了这里涉及一个一致性策略,当无法达成这个策略是,就直接抛异常请求人工介入处理了。

        coordinator 实现组的管理,依赖的主要是consumer group的状态,仅有 Empty(组内没有任何active consumer)、PreparingRebalance(group 正在准备进行rebalance)、AwaitingSync(所有组员已经加入组并等待leader consumer发送分区的分配方案)、Stable(group开始正常消费)、Dead(该group 已经被废弃)这五个状态,那他们是如何轮转的可以简单的看一下状态机。

        就整个过程来说,可以大致分为加入组阶段、状态同步阶段。

        加入组阶段:当明确group的coordinator之后,组内成员需要显式的发送JoinGroup请求(主要包括 订阅信息、成员id等元数据信息)给对应的coordinator,然后coordinator选择对应的consumer 作为leader,然后再给其他成员产生响应(一个空数组)。当然啦,如果某个consumer 指定的分配策略是其他consumer 不支持的,那么这个实例是不被接受的。现有的分区策略主要有:range、round-robin、sticky,其中sticky是其中可以最大限度保证分区的负载的均衡分配机rebalance之后的最少分配变动。

        offset 概念这里需要单独抽出来说一下,因为在Kafka 里面存在两个offset的概念,一个指的是consumer 中的offset,一个是broker中的offset

        concumer offset 用来记录当前消费了多少条消息,这个offset的状态是由consumer group来维护的,通过检查点机制对于offset的值进行持久化(内部就是一个map)

        broker offset 消息在broker 端的位移值,根据之前说过的几个概念可以大致的理解为一个<topic,partition,offset>可以唯一的标示到一条消息。

        因为新版本和旧版本Kafka 所采用的offset保存策略是不同的,旧版本中主要依赖于Zookeeper,但是zookeeper不是干这事儿的啊,所以kafka 在数量很大的消费发生时,zookeeper读写会异常的频繁,导致很容易成为整个Kafka系统的瓶颈。所以新版本对这种方式作出了重大更新,不再依赖于Zookeeper 来进行状态的保存,而是在broker 端直接开一个内部使用的topic,也就是_consumer_offsets topic,并且kafka 为了兼容老版本的consumer 还提供了 offsets.storage=kafka这样一个适配参数。

        最后要说的一点就是consumer 端的Rebalance 过程(rebalance是针对consumer group来说的,如果是standalone consumer 则没有这个概念),rebalance也就是如何达成一致来分配订阅topic的所有分区。这个rebalance的代价还是不小的,我们是需要避免高频的rebalance的。常见的rebalance 场景有:新成员加入组、组内成员崩溃(这种场景无法主动通知,需要被动的检测才行,并且需要一个session.timeout 才检测到)、成员主动离组。

        consumer 是可以执行任意次rebalance的,为了区分两次rebalance上的数据(防止无效或者延迟的offset提交),consumer 设计了一个叫做rebalance generation的标示。

        对应常见的rebalance请求有:

        JoinGroup:consumer 请求加入组

        SyncGroup:group leader把分配方案同步给组内所有成员

        Heartbeat:consumer 定期向coordination汇报心跳表示自己还存活

        LeaveGroup:consumer 主动通知coordinator该consumer即将离组

        DescribeGroup:查看组的所有信息。

        Consumer端常见的概念大致就这么多。

Kafka消费者总结

        创建消息。一般情况下一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。通常是通过消息键和分区器来实现,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器。

        读取消息。消费者订阅一个或多个主题,并按照消息生产的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,他是一个不断递增的整数值,在创建消息时,Kafka回吧它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper或者Kafka上,如果消费者关闭或重启,它的读取状态不会丢失。

        kafka通过partition的概念,保证了partition内消息有序性,缓解了上面的问题。partition内消息会复制分发给所有分组 ,每个分组只有一个consumer能消费这条消息。这个语义保证了某个分组消费某个分区的消息,是同步而非并发的。如果一个topic只有一个partition,那么这个topic并发消费有序,否则只是单个partition有序。

        消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用。

        假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快 怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展

        我们可以通过增加消费组的消费者来进行水平扩展提升消费能力 。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

        Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组 。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者

        可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance) 。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。

        消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。

        如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡 。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

        在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

       之后把KafkaListener的信息封装到MethodKafkaListenerEndpoint,再调用this.registrar.registerEndpoint(endpoint, factory);注册:KafkaListenerEndpointRegistrar实现了BeanFactoryAware, InitializingBean两个接口,因此在Spring初始化Bean的时候会遍历InitializingBean的所有实现类,并执行afterPropertiesSet方法

        registerAllEndpoints方法将解析的KafkaListener封装到KafkaListenerEndpointDescriptor,然后注册到list里。registerListenerContainer为每一个KafkaListenerEndpointDescriptor生成一个MessageListenerContainer

        KafkaMessageListenerContainer最终继承了Lifecycle,Spring在遍历所有的LifeStyle,执行start方法时KafkaMessageListenerContainer的dostart方法会被调用,实例化了KafkaListenerConsumer对象,ListenerConsumer实现了

        Runnable,所以可以放入线程池中,这样可以并发执行,但是这里有个问题,就是getConsumerTaskExecutor如果没有配置线程池,默认的线程池是什么?

        执行:

        ListenerConsumer实现了Runnable,所以最终由run方法调用的poll()来拉取消息。

        总结:

        KafkaListener内部也是多线程消费,并且是多线程消费的第一种,一个线程实例化一个KafkaConsumer实例