目录

消息队列


消息队列

走进消息队列

场景

/img/Go/message-queue/image1.png
  • 案例一:有一天晚上我们上完课,回到宿舍,想着新出的游戏机,但又摸了摸钱包,太贵了买不起,这个时候你突然想到,今天抖音直播搞活动,瞬间你掏出了手机打开抖音搜索,找到直播间以后,你打开了心心念念的游戏机详情页,看到价格只要500。
    • 这个时候我们分析一下,就我们上面这几步操作,在我们的程序背后,做了什么事情。
/img/Go/message-queue/image2.png
  • 案例二:服务能力有限(到点上车,无数用户在对面狂点手机+黄牛的脚本)
/img/Go/message-queue/image3.png
  • 案例三:链路耗时长尾(下单之后一直在转圈圈)
/img/Go/message-queue/image4.png
  • 案例四:日志存储

    • 如果完事之后发现本地日志丢掉了怎么办?
  • 面对上面的四个场景,我们有什么解决办法吗?

解决问题

  • 案例一
    • 解决方案:解耦
/img/Go/message-queue/image5.png
  • 类似生产者消费者模型(或者是gochannel),即使数据库方面宕机但是消息仍然被保存下来,部分请求还是可以被处理掉

  • 案例二

    • 解决方案:削峰
/img/Go/message-queue/image6.png
  • 加了一层缓冲机制

  • 案例三

    • 解决方案:异步
/img/Go/message-queue/image7.png
  • 前两个请求处理完成之后用户会马上得到响应,不再干等着,耗时的通知商家响应方是商家,所以慢一点问题不太大

  • 三个过程从原来的单线程顺序处理变成多线程异步处理

  • 案例四

/img/Go/message-queue/image8.png

什么是消息队列?

消息队列(MQ),指保存消息的一个容器,本质是个队列。但是这个队列需要满足高吞吐,高并发,并且高可用

业界消息队列对比

/img/Go/message-queue/image9.png

消息队列Kafka

  • 使用场景:离线的消息处理当中(日志信息,Metrics数据,用户行为 $\Longrightarrow$ 搜索,点赞,评论,收藏)

如何使用Kafka

  • 基本概念
    • Topic:逻辑队列,不同的Topic可以建立不同的Topic
    • Cluster:物理集群,每个集群中可以建立多个不同的Topic
    • Producer:生产者,负责将业务消息发送到Topic
    • Consumer:消费者,负责消费Topic中的消息
    • ConsumerGroup:消费者组,不同组Consumer互不干涉
    • 同一个Topic中的Partion可以并发处理,提高吞吐量
/img/Go/message-queue/image10.png

  • Offset

Offset:消息在Partion内的相对位置信息,可以理解为唯一ID,在partion内部是严格递增的


  • Replica

每个分片有多个ReplicaLeader Replica将会从$LSR$中选出

/img/Go/message-queue/image11.png
  • Leader:对外进行读写

  • Foller:不断将数据从Leader上面拉取下来,努力和Leader保持一个一致的状态,如果和Leader差距过大会被踢出ISR(参考Replica3)

  • ISR的作用:如果ISR里面的leader对应的机器发生了宕机等故障,ISR会从里面的Foller中选择一个让其重新成为Leader,保证服务器继续运行,保证了高可用性


  • 数据复制
/img/Go/message-queue/image12.png
  • 其中有一个Broker同时也扮演了Controller的角色。Controller是整个集群的大脑,负责对副本和Broker的分配

  • Kafka架构
/img/Go/message-queue/image13.png

消息的处理流程

发送消息
  • 如果发送一条消息,等待其成功后再发送下一条消息会有什么问题?

  • 批量发送消息可以增加吞吐量,但是如果网络带宽不够怎么办
  • 解决方法:进行压缩,减少消息大小,目前支持的有:Snappy,Gzip,LZ4,ZSTD压缩算法
存储消息
  • 如何存储到磁盘?

  • Borker的文件结构

/img/Go/message-queue/image14.png
  • 磁盘结构:由于寻道成本比较高,所以顺序写入可以减少寻道带来的时间成本

  • Broker-顺序写
  • 采用顺序写的方式进行写入(磁盘末尾追加,减少动磁头的次数),以提高写入效率

  • Broker-如何找到消息

  • Consumer 通过发送FetchRequest请求消息数据,Broker 会将指定Offset处的消息,按照时间窗口和消息大小窗口发送给 Consumer寻找数据这个细节是如何做到的呢?

  • 例子:寻找offer=28

    • 二分法查找小于目标offer的最大文件

    • 接着通过偏移量去对应的地址里面继续查找

/img/Go/message-queue/image15.png

  • Broker-时间戳索引文件
    • 二分法找到小于目标时间戳的最大索引位置,再通过offset的方式找到最终数据
/img/Go/message-queue/image16.png