Spring Boot 使用 SSE 方式向前端推送数据详解

目录
  • 前言
  • 服务端
  • SSE工具类
  • 在Controller层创建 SSEController.java
  • 前端代码

前言

SSE简单的来说就是服务器主动向前端推送数据的一种技术,它是单向的,也就是说前端是不能向服务器发送数据的。SSE适用于消息推送,监控等只需要服务器推送数据的场景中,下面是使用Spring Boot 来实现一个简单的模拟向前端推动进度数据,前端页面接受后展示进度条。

服务端

在Spring Boot中使用时需要注意,最好使用Spring Web 提供的SseEmitter这个类来进行操作,我在刚开始时使用网上说的将Content-Type设置为text-stream这种方式发现每次前端每次都会重新创建接。最后参考该文实现了最终想要的效果:

SSE工具类

SSEServer.java

package vip.huhailong.catchat.sse;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * @author Huhailong
 */
@Slf4j
public class SSEServer {

    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    public static SseEmitter connect(String userId){
        //设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常
        SseEmitter sseEmitter = new SseEmitter(0L);
        //注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeOutCallBack(userId));
        sseEmitterMap.put(userId,sseEmitter);
        //数量+1
        count.getAndIncrement();
        log.info("create new sse connect ,current user:{}",userId);
        return sseEmitter;
    }
    /**
     * 给指定用户发消息
     */
    public static void sendMessage(String userId, String message){
        if(sseEmitterMap.containsKey(userId)){
            try{
                sseEmitterMap.get(userId).send(message);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",userId,e.getMessage());
                e.printStackTrace();
            }
        }
    }

    /**
     * 想多人发送消息,组播
     */
    public static void groupSendMessage(String groupId, String message){
        if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
            sseEmitterMap.forEach((k,v) -> {
                try{
                    if(k.startsWith(groupId)){
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                }catch (IOException e){
                    log.error("user id:{}, send message error:{}",groupId,message);
                    removeUser(k);
                }
            });
        }
    }
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k,v)->{
            try{
                v.send(message,MediaType.APPLICATION_JSON);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",k,e.getMessage());
                removeUser(k);
            }
        });
    }
    /**
     * 群发消息
     */
    public static void batchSendMessage(String message, Set<String> userIds){
        userIds.forEach(userId->sendMessage(userId,message));
    }
    public static void removeUser(String userId){
        sseEmitterMap.remove(userId);
        //数量-1
        count.getAndDecrement();
        log.info("remove user id:{}",userId);
    }
    public static List<String> getIds(){
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    public static int getUserCount(){
        return count.intValue();
    }
    private static Runnable completionCallBack(String userId) {
        return () -> {
            log.info("结束连接,{}",userId);
            removeUser(userId);
        };
    }
    private static Runnable timeOutCallBack(String userId){
        return ()->{
            log.info("连接超时,{}",userId);
            removeUser(userId);
        };
    }
    private static Consumer<Throwable> errorCallBack(String userId){
        return throwable -> {
            log.error("连接异常,{}",userId);
            removeUser(userId);
        };
    }
}

上面这个类可以把它当作一个SSE的工具类,下面我们使用一下它

在Controller层创建 SSEController.java

package vip.huhailong.catchat.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.huhailong.catchat.sse.SSEServer;

/**
 * @author Huhailong
 */
@Slf4j
@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SSEController {

    @GetMapping("/connect/{userId}")
    public SseEmitter connect(@PathVariable String userId){
        return SSEServer.connect(userId);
    }

    @GetMapping("/process")
    public void sendMessage() throws InterruptedException {
        for(int i=0; i<=100; i++){
            if(i>50&&i<70){
                Thread.sleep(500L);
            }else{
                Thread.sleep(100L);
            }
            SSEServer.batchSendMessage(String.valueOf(i));
        }
    }
}

上面的connect是用来连接sse的,它返回一个SseEmitter实例,这时候连接就已经创建了,然后下面的process接口是用来推送数据的,我这里是准备让前端实现一个进度条的效果,所以推送的是数字,为了效果明显,我在推送到50到70的时候速度放慢,其余都是100ms

前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Home</title>
    <script>
        let data = new EventSource("/cat-chat/sse/connect/huhailong")
        data.onmessage = function(event){
            console.log("test=>",event)
            document.getElementById("result").innerText = event.data+'%';
            document.getElementById("my-progress").value = event.data;
        }
    </script>
</head>
<body>
    <div id="result"></div>
    <progress style="width: 300px" id="my-progress" value="0" max="100"></progress>
</body>
</html>

最终效果:

到此这篇关于Spring Boot 使用 SSE 方式向前端推送数据详解的文章就介绍到这了,更多相关Spring Boot SSE内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2022-08-10

maven插件assembly使用及springboot启动脚本start.sh和停止脚本 stop.sh

我们在项目中都会遇到项目打包,可以通过assembly对我们的项目进行打包. 1.首先看一下在打包前的项目文件结构. 2.在pom.xml中配置assembly插件 <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plug

Springboot如何基于assembly服务化实现打包

(1)首先我们编辑 assembly.xml 配置文件,在前文的基础上新增第三方依赖设置(高亮部分),实现将第三方的 jar 包添加到压缩包里的 lib 目录: <?xml version="1.0" encoding="UTF-8"?> <assembly> <!-- 必须写,否则打包时会有 assembly ID must be present and non-empty 错误 这个名字最终会追加到打包的名字的末尾,如项目的名字为

Springboot基于assembly的服务化打包方案及spring boot部署方式

在使用assembly来打包springboot微服务项目前,我想说一说,目前springboot项目的几种常见的部署方式. 使用docker容器去部署,将springboot的应用构建成一个docker image,然后通过容器去启动镜像 ,这种方式在需要部署大规模的应用和应用扩展时是非常方便的,属于目前工业级的部署方案,但是需要掌握docker的生态圈技术. 使用fatjar直接部署启动,这是很多初学者或者极小规模情况下的一个简单应用部署方式. 本文主要针对第二种部署方式提供一种更加友好的打

spring boot测试打包部署的方法

有很多网友会时不时的问我,spring boot项目如何测试,如何部署,在生产中有什么好的部署方案吗?这篇文章就来介绍一下spring boot 如何开发.调试.打包到最后的投产上线. 开发阶段 单元测试 在开发阶段的时候最重要的是单元测试了,springboot对单元测试的支持已经很完善了. 1.在pom包中添加spring-boot-starter-test包引用 <dependency> <groupId>org.springframework.boot</groupI

Spring Boot Maven 打包可执行Jar文件的实现方法

Maven pom.xml 必须包含 <packaging>jar</packaging> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>

详解基于Spring Boot与Spring Data JPA的多数据源配置

由于项目需要,最近研究了一下基于spring Boot与Spring Data JPA的多数据源配置问题.以下是传统的单数据源配置代码.这里使用的是Spring的Annotation在代码内部直接配置的方式,没有使用任何XML文件. @Configuration @EnableJpaRepositories(basePackages = "org.lyndon.repository") @EnableTransactionManagement @PropertySource("

springboot 使用Spring Boot Actuator监控应用小结

微服务的特点决定了功能模块的部署是分布式的,大部分功能模块都是运行在不同的机器上,彼此通过服务调用进行交互,前后台的业务流会经过很多个微服务的处理和传递,出现了异常如何快速定位是哪个环节出现了问题? 在这种框架下,微服务的监控显得尤为重要.本文主要结合Spring Boot Actuator,跟大家一起分享微服务Spring Boot Actuator的常见用法,方便我们在日常中对我们的微服务进行监控治理. Actuator监控 Spring Boot使用"习惯优于配置的理念",采用包

Springboot基于maven打包分离lib及resource

之前在部署Spring Boot项目时,经常因为只修改了一小处代码.或者只更新了某个jar包,但是却需要将整个项目重新打包.上传.部署,整个包一般都会达到40-60M,每次都重复这个操作真的很耗费时间,因此就想是否能够将依赖lib与项目代码分离出来,每次部署只需要发布代码即可. 项目发版,为了应对更新多变的依赖jar包,实现增量或替换依赖jar包,越来越多的企业实现源代码和依赖jar包和依赖配置分离,更好的应对复杂多变的现场和生产环境,使用maven打包配置如下: <build> <pl

springboot基于Mybatis mysql实现读写分离

近日工作任务较轻,有空学习学习技术,遂来研究如果实现读写分离.这里用博客记录下过程,一方面可备日后查看,同时也能分享给大家(网上的资料真的大都是抄来抄去,,还不带格式的,看的真心难受). 完整代码:https://github.com/FleyX/demo-project/tree/master/dxfl 1.背景 一个项目中数据库最基础同时也是最主流的是单机数据库,读写都在一个库中.当用户逐渐增多,单机数据库无法满足性能要求时,就会进行读写分离改造(适用于读多写少),写操作一个库,读操作多个库

浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

通过zookeeper实现分布式锁 1.创建zookeeper的client 首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean { private static final Logger LOGG

Android使用美团多渠道打包方案详解

Andorid渠道市场有多分散呢?分散到比Android碎片化还严重,你还在为多渠道打包而头疼吗?美团提供了速度快到白驹过隙的多渠道打包方案.说的有点夸张,对,虽然夸张,但是确实很快,不夸张不足以形容其快.废话不多说,先讲原理,再讲实践方法. 新旧打包方法原理对比讲解 传统方式 在AndroidManifest定义渠道的年代,多渠道打包无非以下两种方案: 方案一:完全的重新编译,即在代码重新编译打包之前,在AndroidManifest中修改渠道标示: 方案二:通过ApkTool进行解包,然后修

SpringBoot基于数据库实现定时任务过程解析

这篇文章主要介绍了SpringBoot基于数据库实现定时任务过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在我们平时开发的项目中,定时任务基本属于必不可少的功能,那大家都是怎么做的呢?但我知道的大多都是静态定时任务实现. 基于注解来创建定时任务非常简单,只需几行代码便可完成.实现如下: @Configuration @EnableScheduling public class SimpleScheduleTask { //10秒钟执行