关于websocket多节点分布式问题的解决方案
- 发表于:2021-12-17 12:22
- 阅读(3479)
在开始思考分布式会有什么问题时,先来回答一个问题: 服务端如何与客户端交流?
在 ws 服务端,当与客户端连接成功后,会生成一个对象 connection,ws 会维护一个与客户端所有连接的 connections。如果想要主动推送消息到客户端,只需要调用API connection.sendText(message)。
那如何给所有人广播消息呢?
服务器只需要与它自身的所有连接 server.connections 挨个发消息就是广播,所以它只是一个伪广播:我要给群里所有人发消息,但我不能在群里发,只能挨个私发。
(1)websocket多节点分布式问题
如果我们的系统是一个简单的单体应用,项目部署跑在一台服务器上,这样我们简单地使用websocket貌似也没什么问题,但是对于一些复杂的大型应用,通过会将项目部署在多台服务器上,或者拆分成模块部署在多个服务器。
通过nginx请求转发和负载均衡,每个进来的用户请求会被轮询转发到不同的服务器上,也就是说,当客户端与服务端建立websocket连接的时候,只是和这个服务器集群中的其中某一个服务器节点连接而已,这样就会产生一个问题:
如果其中某一台服务端要推送消息给客户端,只能推送到那台服务器订阅的客户端上,而没有订阅该服务器的客户端就不会收到消息,所以这样是有问题的,可能到时会有很多用户抱怨没有受到消息通知的问题。
所以,我们的需求就是:
当其中某一台服务器要推送消息给客户端的时候,我们需要把消息同时发送给其他所有服务器节点,这样所有的客户端才能收到消息。
我们要求不管增加多少台服务器,都要让所有订阅消息通道的客户端能接收到消息,那我们要如何实现呢?
(2)针对分布式问题的解决方案
我们可以借助消息中间件,例如:redis、rabbitmq、kafka等消息中间件。
也就是说,当我们需要给所有客户端推送消息的时候,我们通过负载均衡到其中一台服务器,这台服务器把需要推送的消息输出到rabbitmq等消息中间件,这样监听了对应消息通道的所有服务器都能监听到mq输入消息,然后在监听处理方法中推送改消息给所有客户端。
这样,就能解决分布式服务器集群产生的部分客户端收不到消息的问题。
流程也就是如下图所示:
图片来源于我下面参考的博客
(3)使用Rabbitmq需要注意的地方
主要参考:spring-cloud-stream 整合rabbitmq 消息分组
Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不允许的,我们这时候需要给消息消费者加个分组信息,这样多个消费者实例在一个组下面就不会再出现消息重复消费。
单生产多消费相比于单生产单消费来说只是多了一个消费者而已,需要注意的是,这两个消费者都是监听一个消息队列并且默认情况下针对生产者发布的消息是进行分摊的,而不是同时拥有消费,因为rabbitmq默认的消费确认原则是生产者发布了一个消息,只要有一个消费者消费了,那么此消息会立即从队列中移除,不会管这个消息的消费时长以及是否成功。这种方式存在着一个问题就是,如果此消费者拿到此消息之后,处理超时异常了等,那么就造成了此消息没有被成功消费而丢失,想要解决这个问题可以关闭rabbitMQ的自动消息确认功能,从而在代码中进行手动确认。手动确认一定要确保消息处理后消息回执,否则将造成消息二次消费以及消息积压到队列中。举个栗子:默认情况下如果生产者发送了一条消息到消息队列中,两个消费者只能有一个消费者能够消费到此消息。
如果有多个消息消费者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统做集群部署,都会从 RabbitMQ 中获取订单信息,如果一个订单消息同时被两个服务消费,系统肯定会出现问题。为了避免这种情况,Stream 提供了消息分组来解决该问题。
在 Stream 中处于同一个 group 中的多个消费者是竞争关系,能够保证消息只会被其中一个应用消费。不同的组是可以消费的,同一个组会发生竞争关系,只有其中一个可以消费。通过 spring.cloud.stream.bindings..group 属性指定组名。
主要是修改application.yml的配置文件。
通过结果可以看到消息被两个消费者同时消费了,原因是因为它们属于不同的分组,默认情况下分组名称是随机生成的。
简单说,生产者发送10条,两个消费者都会收到10条。
如果把他们配置成同一个分组,此时多个消息消费者只有其中一个可以消费(同一个group,多个消费者之间是竞争的关系)。
简单说,生产者发送10条,其中一个消费者收到4条,另一个消费者可能收到6条
参考:
聊聊分布式下的WebSocket解决方案
关于一个 websocket 多节点分布式问题的头条面试题
【Q023】websocket 服务多节点部署时会有什么问题,怎么解决 #24
spring-cloud-stream 整合rabbitmq 消息分组
Spring Cloud Stream与RabbitMQ 消费者 消息分组
spring-cloud-stream-rabbit的一个topic对应多组消费者实例
spring-boot整合RabbitMQ(单生产以及多消费)
相关推荐
-
mac安装subversion,并使用svn命令检出服务器上的代码库项目
mac安装svn只要通过Homebrew安装即可,不需要下载额外的安装包手动安装,Homebrew类似一个软件库,我们可以通过brew命令实现一键下载并安装我们所需要的常用软件。
-
Java如何获取泛型类T的Class
我们平时在封装接口或抽象类的时候经常会用到Java的泛型,经常会在传入一个泛型类T,然后封装一些抽象的方法,泛型的好处就是在编译的时候检查类型安全,并且所有的强制类型转换都是隐式和自动的,这样可以提高代码的通用性。但是我们有时候需要获取泛型类的Class,那可以如何获取到呢?
-
springboot项目事务报错:Transaction synchronization is not active
这几天在使用spring声明式事务的时候突然报了一个错误:Transaction synchronization is not active,之前使用的都是好好的,为什么这次就不行了呢?不就是加一个 @Transactional 而已嘛???
-
mysql应该如何在where语句中添加if语句进行条件判断?where if 语句应该如何使用
我们在平时的项目开发中,有时候会遇到复杂一点的需求,需要我们手动编写复杂的SQL语句,并且有时候需要根据每条表记录的实际情况进行判断,根据每条记录动态添加不同的where条件,这个时候我们就可以在where语句中使用if语句进行条件判断,那么where if应该如何正确使用呢?
-
关于websocket多节点分布式问题的解决方案
websocket是一种在单个TCP连接上进行双全工通信的协议,使用websocket,我们可以实现服务端主动向各个订阅消息通道的客户端推送消息。这点比传统的http轮询请求要更好一点,避免一些无用的请求,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
-
mac如何把文件压缩成tar、zip以及如何解压tar、zip?
有时候我们需要把文件压缩成一个tar文件或zip文件,发送给别人,那么在macos系统应该如何压缩和解压缩呢?
-
openjdk和jdk有什么区别,应该如何选择?
我们一开始学习java的时候,安装的都是从sun官网或oracle官网下载的jdk安装包,但其实还有另外一个来源可以获取到jdk安装包,那就是openjdk,它和jdk基本一样,推荐使用openjdk。
-
如果git仓库发生变更,IDEA如何直接修改git远程仓库地址?
有时候我们整理远程仓库代码的时候,会修改远程仓库的名称,或者所属分组,这个时候在IDEA由于还是使用原先拉取的旧仓库地址,导致本地代码会提交不了,也更新不了远程最新代码,那么这个时候要如何修改IDEA当前的git远程仓库地址呢?如何无缝修改,修改完之后就能和原来一样更新提交,并且以前的提交记录也保留呢?
-
微信公众号自定义菜单报错:no permission to use weapp in menu rid:xxxxxxx
昨晚公司系统添加微信公众号菜单突然报错:{"errcode":45064,"errmsg":"no permission to use weapp in menu rid: 60311f70-0736ff08-29143906"}
-
Java如何使用stream流对List列表数据进行自定义排序
我们一般做排序功能都是通过在mysql数据库中的表中定义好排序字段,然后使用升序或降序来进行排序,复杂一点的话就配合多个排序字段进行排序,但是如果碰到那种无法使用表的字段进行排序的情况,我们需要先从数据库中取出列表数据,然后再通过业务代码对列表进行排序,这个时候我们就可以使用redis或Java的stream流。
-
微信企业付款到零钱报错:此请求可能存在风险,已被微信拦截
具体错误信息:com.github.binarywang.wxpay.exception.WxPayException: 返回代码:[SUCCESS],返回信息:[支付失败],结果代码:[FAIL],错误代码:[NO_AUTH],错误详情:[此请求可能存在风险,已被微信拦截。]
-
springboot项目使用@Transactional注解如何避免长事务问题
在springboot项目中,我们开启事务是非常简单的,使用注解的方式就是在需要开启事务的方法上添加@Transactional,这样就可以实现这个方法里面的所有操作和调用方法的操作都绑定在一个事务上面,要么全部一起执行成功,要么全部一起执行失败,如果其中有某个地方抛了异常,则整个方法涉及的事务操作都会回滚,但是如果随意滥用@Transactional,又有可能引发长事务问题,导致数据库死锁、数据库连接池占满等问题。
-
css实现“展开阅读全文”功能
最近发现很多博客网站,资讯网站喜欢把资讯博文,内容等这些大文本的信息在页面显示的时候都会有个“展开阅读全文”的按钮,点击这个按钮即可展开显示所有的内容,不然一开始就显示那么长的篇幅相对来说既不美观,又对用户体验不好。现在就让我来仿照这类网站实现一个“展开阅读全文”功能。这里主要用到的前端技术是html+jquery+css,只做展开功能,没做收起功能(收起功能没必要吧,谁会去收起呀???)。
-
关于编程中面向对象的理解,什么是面向对象
面向对象设计相对于结构化程序设计可以说是一种更优秀的程序设计方法。它的基本思想是使用类、对象、继承、封装、消息等基本概念进行程序设计。它是从现实世界中客观存在的事物(即对象)出发来构造软件系统,并在软件系统构造中尽可能运用人类的自然思维方式,强调直接以现实世界中的事物(即对象)为中心来思考,认识问题。
-
我的linux操作命令总结,记录常用linux操作命令
平常本地开发项目使用的系统基本都是window系统,而且都是图形化操作,非常方便,window也是越做越好了,项目部署到生产环境一般都是选择linux系统(当然window server系列也可以),而linux一般则选择centOS居多,这里记录一下linux常用命令,以免老是过几天就忘了,后续不断补充。