消息队列面试题(爪哇程序员)
消息队列有哪些应用场景?
异步处理、流量控制、服务解耦、消息广播
消息队列的弊端有哪些?
数据延迟;增加系统复杂度;可能产生数据不一致的问题。
使用消息队列,怎么确保消息不丢失?
在生产阶段,你需要捕获消息发送的错误,并重发消息。 在存储阶段,你可以通过配置刷盘和 复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或 者磁盘损坏而丢失。 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。
使用消息队列,如果处理重复消息?
1)利用数据库的唯一约束实现幂等
2)为更新的数据设置前置条件(CAS)
3)记录并检查操 作(在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消 息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。)
Kafka的消息是有序的吗?如果保证Kafka消息的顺序性?
Kafka只能保证局部有序,即只能保证一个分区里的消息有序。而其具体实现是通过生产者为 每个分区的消息维护一个发送队列,我们需要将保证顺序的消息都发送到同一个分区中。并且 由于Kafka会同时发送多个消息,所以还需指定max.in.flight.requests.per.connection为 1,保证前一个消息发送成功,后一个消息才开始发送。
消息如何保证幂等性
例如kafka的offset可能是消费者批量处理后才提交到zk,重启后再消费时就可能会收到重复 消息,需要消费者在处理消息时做幂等性设计,即先判断是否消费过,把已消费的放到本地缓 存或者redis中,每次消费时先做个判断即可。
消息队列积压怎么办
当消费者出现异常,很容易引起队列积压,如果一秒钟1000个消息,那么一个小时就是几千万 的消息积压,是非常可怕的事情,但是生产线上又有可能会出现; 当消息积压来不及处理,rabbitMQ如果设置了消息过期时间,那么就有可能由于积压无法及 时处理而过期,这消息就被丢失了; 解决方法如下:
不建议在生产环境使用数据过期策略,一是数据是否丢失无法控制,二是一旦积压就很有可能丢失;建议数据的处理都有代码来控制;
当出现消息积压时,做法就是临时扩大consumer个数,让消息快速消费,一般都 是通过业务逻辑的手段来完成如下:
rabbitmq解决积压范例
-
修复consumer代码故隙,确保consumer逻辑正确可以消费
-
停止consumer,开启10倍20倍的queue个数
-
创建一个临时的consumer程序,消费积压的queue,并把消息写入到扩建10倍的queue中
- 再开启10倍20倍的consumer对新的扩充后队列进行消费
-
这种做法相当于通过物理资源扩充了10们来快速消费
-
当消费完成后,需要恢复原有架构,开启原来的consumer进行正常消费
kafka解决范例
- 修复consumer代码故障,确保consumer逻辑正确可以消费
-
停止consumer,新建topic,新建10倍20倍的partition个数
-
创建对应原topic的partition个数的临时的consumer程序,消费原来的topic,并把消息写入到扩建的新topic中
- 再开启对应新partition个数的consumer对新的topic进行消费
- 这种做法相当于通过物理资源扩充了10倍来快速消费
各种MQ的比较
特性 | activeMQ | rabbitMQ | rocketMQ | kafka |
---|---|---|---|---|
单机 吞吐量 | 万/秒 | 万/秒 | 10万/秒 | 10万/秒 |
topic 对吞 吐量 的影响 | 无 | 无 | topic达到几百/几千个级 别,吞吐量会有小幅下 降; 这是rocket的最大优 势 所以非常适用于支撑大批量topic场景 | topic可以达到几十/几 百个级别,吞吐量会有 大幅下降 kafka不适用 大批量topic场景,除非加机器 |
时效 性 | 毫秒 | 微秒 这是 rabbit 最 大优势,延迟低 | 毫秒 | 毫秒 |
可用性 | 高。主从架构 | 高。主从架构 | 非常高。分布式。多副本,不会丢数据, | 非常高。分布式。不会数据,不会不可用。 |
可靠 性 | 有较低概 率丢失数据 | —- | 经配置优化可达到0丢失 | 经配置优化可达到0丢 失 |
功能 特性 | 功能齐 全,但已不怎么维护 | erlang开 发,并发强,性能极好,延迟低 | MQ功能较为齐全,扩展好 | 功能简单,主要用于大 数据实时计算和日志采集,事实标准 |
综上,总结如下:
- activeMQ
优点:技术成熟,功能齐全,历史悠久,有大量公司在使用
缺点:偶尔会有较低概率丢失数据,而且社区已经不怎么维护5.15.X版本
使用场景:主要用于系统解耦和异步处理,不适用与大数据量吞吐情况。互联网公司很少适用
- rabitMQ
优点:吞吐量高,功能齐全,管理界面易用,社区活跃,性能极好,;
缺点:吞吐量只是万级,erlang难以二次开发和掌控;集群动态扩展非常麻烦;
使用场景:吞吐量不高而要求低延迟,并且不会频繁调整和扩展的场景。非常适合 国内中小型互联网公司适用,因为管理界面非常友好,可以在界面进行配置和优 化/集群监控。
- rocketMQ
优点:支持百千级大规模topic。吞吐量高(十万级,日处理上百亿)。接口易用。分布式易扩展,阿里支持。java开发易于掌控
缺点:与阿⾥(社区)存在绑定。不兼容M规范。
使用场景:高吞吐量
- kafka
优点:超高吞吐量,超高可用性和可靠性,分布式易扩展
缺点:topic支持少,MQ功能简单,消息可能会重复消费影响数据精确度
使用场景:超高
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万 消息持续积压几小时怎么解决?
(一)、大量消息在mq里积压了几个小时了还没解决
几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,10点多,11点多 这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复consumer 的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。
一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条,所以 如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过 来。
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉。 新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量。 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后 不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。 这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。
(二)、消息队列过期失效问题
假设你用的是rabbitmq,rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中 积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。那这就是第二个坑了。这 就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。 这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大 量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。 就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝 咖啡熬夜到晚上12点以后,用户都睡觉了。
这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重 新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。
假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那 1000个订单给查出来,手动发到mq里去再补一次。
(三)、消息队列满了怎么搞?
如果走的方式是消息积压在mq里,那么如果你很长时间都没处理掉,此时导致mq都快写满 了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接 入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案, 到了晚上再补数据吧。
为什么使用消息队列?
**面试官心理分析
其实面试官主要是想看看:
- 第一,你知不知道你们系统里为什么要用消息队列这个东西?
不少候选人,说自己项目里用了 Redis、MQ,但是其实他并不知道自己为什么要用这个 东西。其实说白了,就是为了用而用,或者是别人设计的架构,他从头到尾都没思考 过。
没有对自己的架构问过为什么的人,一定是平时没有思考的人,面试官对这类候选人印 象通常很不好。因为面试官担心你进了团队之后只会木头木脑的干呆活儿,不会自己思 考。
- 第二,你既然用了消息队列这个东西,你知不知道用了有什么好处&坏处?
你要是没考虑过这个,那你盲目弄个 MQ 进系统里,后面出了问题你是不是就自己溜了 给公司留坑?你要是没考虑过引入一个技术可能存在的弊端和风险,面试官把这类候选 人招进来了,基本可能就是挖坑型选手。就怕你干 1 年挖一堆坑,自己跳槽了,给公司留 下无穷后患。
- 第三,既然你用了 MQ,可能是某一种 MQ,那么你当时做没做过调研?
你别傻乎乎的自己拍脑袋看个人喜好就瞎用了一个 MQ,比如 Kafka,甚至都从没调研 过业界流行的 MQ 到底有哪几种。每一个 MQ 的优点和缺点是什么。每一个 MQ 没有绝 对的好坏,但是就是看用在哪个场景可以扬长避短,利用其优势,规避其劣势。
如果是一个不考虑技术选型的候选人招进了团队,leader 交给他一个任务,去设计个什 么系统,他在里面用一些技术,可能都没考虑过选型,最后选的技术可能并不一定合 适,一样是留坑。
**面试题剖析
其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场 景里用消息队列是什么?
面试官问你这个问题,期望的一个回答是说,你们公司有个什么业务场景,这个业务场景有个 什么技术挑战,如果不用 MQ 可能会很麻烦,但是你现在用了 MQ 之后带给了你很多的好处。
先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦、异 步、削峰。
**解耦
看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个 数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃……
在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数 据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果 挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!
如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面 消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取 消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维 护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦 了。
面试技巧:你需要去考虑一下你负责的系统中是否有类似的场景,就是一个系统或者一个模 块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用 是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在 你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用 MQ 作解耦。
**异步
再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写 库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求 总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。 用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。
一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完 成,对用户几乎是无感知的。
如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受 一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按 钮,8ms 以后就直接返回了,爽!网站做得真好,真快
**削峰
每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请 求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。
一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。
但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作, 每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。
如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每 秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己 每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。 而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可 能有几十万甚至几百万的请求积压在 MQ 中。
这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系 统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积 压的消息给解决掉。
916.什么是 ActiveMQ?
activeMQ 是一种开源的,实现了 JMS1.1 规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信
917.ActiveMQ 服务器宕机怎么办
这得从 ActiveMQ 的储存机制说起。在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的,它们的最大限制在配置文件的
918.丢消息怎么办
这得从 java 的 java.net.SocketException 异常说起。简单点说就是当网络发送方发送一堆数据,然后调用 close 关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用 read 方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生 SocketException 后,原本缓存区中数据也作废了,此时接收者再次调用 read 方法去读取缓存中的数据,就会报 Software caused connection abort: recv failed 错误。 通过抓包得知,ActiveMQ 会每隔 10 秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。如果你看过上面第一条,就会知道非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续 20 到 30 秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close()时,会期待服务器对于关闭连接的回答,如果超过 15 秒没回答就直接调用 socket层的 close 关闭 tcp 连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了 java.net.SocketException 异常,把缓存里的数据作废了,没处理的消息全部丢失。 解决方案:用持久化消息,或者非持久化消息及时处理不要堆积,或者启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。
919.持久化消息非常慢。
默认的情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有 2 个数量级的提升。所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。
920.消息的不均匀消费。
有时在发送一些消息之后,开启 2 个消费者去处理消息。会发现一个消费者处理了所有的消息,另一个消费者根本没收到消息。原因在于 ActiveMQ 的 prefetch 机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是 1000 条。这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。更通常的情况是,消费这些消息非常耗时,你开了 10 个消费者去处理,结果发现只有一台机器吭哧吭哧处理,另外 9 台啥事不干。 解决方案:将 prefetch 设为 1,每次处理 1 条消息,处理完再去取,这样也慢不了多少。
921.死信队列。
如果你想在消息处理失败后,不被服务器删除,还能被其他消费者处理或重试,可以关闭AUTO_ACKNOWLEDGE,将 ack 交由程序自己处理。那如果使用了 AUTO_ACKNOWLEDGE,消息是什么时候被确认的,还有没有阻止消息确认的方法?有! 消费消息有 2 种方法,一种是调用 consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。另一种方法是采用 listener 回调函数,在有消息到达时,会调用 listener 接口的 onMessage 方法。在这种情况下,在 onMessage 方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。那么问题来了,如果一条消息不能被处理,会被退回服务器重新分配,如果只有一个消费者,该消息又会重新被获取,重新抛异常。就算有多个消费者,往往在一个服务器上不能处理的消息,在另外的服务器上依然不能被处理。难道就这么退回–获取–报错死循环了吗? 在重试 6 次后,ActiveMQ 认为这条消息是“有毒”的,将会把消息丢到死信队列里。如果你的消息不见了,去 ActiveMQ.DLQ 里找找,说不定就躺在那里。
922.ActiveMQ 中的消息重发时间间隔和重发次数吗?
ActiveMQ:是 Apache 出品,最流行的,能力强劲的开源消息总线。是一个完全支持 JMS1.1 和 J2EE1.4 规范的 JMS Provider 实现。JMS(Java 消息服务):是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 首先,我们得大概了解下,在哪些情况下,ActiveMQ 服务器会将消息重发给消费者,这里为简单起见,假定采用的消息发送模式为队列(即消息发送者和消息接收者)。
- 如果消息接收者在处理完一条消息的处理过程后没有对 MOM 进行应答,则该消息将由 MOM 重发.
- 如果我们队某个队列设置了预读参数(consumer.prefetchSize),如果消息接收者在处理第一条消息时(没向 MOM 发送消息接收确认)就宕机了,则预读数量的所有消息都将被重发!
- 如果 Session 是事务的,则只要消息接收者有一条消息没有确认,或发送消息期间 MOM 或客户端某一方突然宕机了,则该事务范围中的所有消息 MOM 都将重发。
- 说到这里,大家可能会有疑问,ActiveMQ 消息服务器怎么知道消费者客户端到底是消息正在处理中还没来得急对消息进行应答还是已经处理完成了没有应答或是宕机了根本没机会应答呢?其实在所有的客户端机器上,内存中都运行着一套客户端的 ActiveMQ 环境,该环境负责缓存发来的消息,负责维持着和 ActiveMQ 服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。 我们可以来对 ActiveMQ 的重发策略(Redelivery Policy)来进行自定义配置,其中的配置参数主要有以下几个:
可用的属性 | 属性 | 默认值 | 说明 |
---|---|---|---|
l | collisionAvoidanceFactor | 默认值 0.15 | 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance 参数时才生效。 |
l | maximumRedeliveries | 默认值 6 | 最大重传次数,达到最大重连次数后抛出异常。为-1 时不限次 |
数,为 0 时表示不进行重传。 | |||
l | maximumRedeliveryDelay | 默认值-1 | 最大传送延迟,只在 useExponentialBackOff 为 true 时有效(V5.5),假设首次重连间隔为 10ms,倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 |
l | initialRedeliveryDelay | 默认值 1000L, 初始重发延迟时间 | |
l | redeliveryDelay | 默认值 1000L | 重发延迟时间,当 initialRedeliveryDelay=0 时生效(v5.4) |
l | useCollisionAvoidance | 默认值 false | , 启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡 broker 处理性能,不会有时很忙,有时很空闲。 |
l | useExponentialBackOff | 默认值 false | 启用指数倍数递增的方式增加延迟时间。 |
l | backOffMultiplier | 默认值 5 | 重连时间间隔递增倍数,只有值大于 1 和启用useExponentialBackOff 参数时才生效 |