Browse Source

消息推送基础代码

wangwl 1 month ago
parent
commit
edc8004359

+ 5 - 0
ruoyi-equity/pom.xml

@@ -30,6 +30,11 @@
             <artifactId>springfox-boot-starter</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
 
     </dependencies>
 

+ 18 - 0
ruoyi-equity/src/main/java/com/ruoyi/equity/config/WebSocketConfig.java

@@ -0,0 +1,18 @@
+package com.ruoyi.equity.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig {
+
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+
+}

+ 35 - 0
ruoyi-equity/src/main/java/com/ruoyi/equity/domain/vo/MsgVO.java

@@ -0,0 +1,35 @@
+package com.ruoyi.equity.domain.vo;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @Param   推送到前端的消息格式
+ * @Author wangwl
+ * @Date 2024/7/17 14:08
+ **/
+@Data
+@Builder
+public class MsgVO<T> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 服务器响应给前端的消息类型 msgUnreadCount:未读消息数量
+     */
+    @ApiModelProperty("消息类型")
+    private String type;
+    @ApiModelProperty("承载数据")
+    private T data;
+
+    public MsgVO() {
+    }
+
+    public MsgVO(String type, T data) {
+        this.type = type;
+        this.data = data;
+    }
+}

+ 49 - 0
ruoyi-equity/src/main/java/com/ruoyi/equity/domain/vo/UserInfoVO.java

@@ -0,0 +1,49 @@
+package com.ruoyi.equity.domain.vo;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * @Param   封装用户的系统-项目-合同-id信息
+ * @Author wangwl
+ * @Date 2024/7/18 16:26
+ **/
+@Data
+@Builder
+public class UserInfoVO implements Serializable {
+
+    @ApiModelProperty("userId")
+    private Long userId;
+
+    @ApiModelProperty("msg")
+    private String msg;
+
+    public UserInfoVO() {
+    }
+
+    public UserInfoVO(Long userId) {
+        this.userId = userId;
+    }
+
+    public UserInfoVO(Long userId, String msg) {
+        this.userId = userId;
+        this.msg = msg;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        UserInfoVO that = (UserInfoVO) o;
+        return Objects.equals(userId, that.userId) && Objects.equals(msg, that.msg);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(userId, msg);
+    }
+}

+ 19 - 0
ruoyi-equity/src/main/java/com/ruoyi/equity/event/WebsocketEvent.java

@@ -0,0 +1,19 @@
+package com.ruoyi.equity.event;
+
+
+import com.ruoyi.equity.domain.vo.UserInfoVO;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.context.ApplicationEvent;
+
+@Getter
+@Setter
+public class WebsocketEvent extends ApplicationEvent {
+ 
+    private UserInfoVO userInfoVO;
+
+    public WebsocketEvent(Object source) {
+        super(source);
+    }
+
+}

+ 18 - 0
ruoyi-equity/src/main/java/com/ruoyi/equity/event/WebsocketEventListener.java

@@ -0,0 +1,18 @@
+package com.ruoyi.equity.event;
+
+import com.ruoyi.equity.service.IEquityMessageRecordService;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class WebsocketEventListener implements ApplicationListener<WebsocketEvent> {
+
+    private final IEquityMessageRecordService equityMessageRecordService;
+    @Override
+    public void onApplicationEvent(@NonNull WebsocketEvent event) {
+        equityMessageRecordService.getUnreadMessageCount(event.getUserInfoVO());
+    }
+}

+ 11 - 0
ruoyi-equity/src/main/java/com/ruoyi/equity/service/IEquityMessageRecordService.java

@@ -7,6 +7,7 @@ import com.ruoyi.equity.domain.EquityMessageRecord;
 import com.ruoyi.equity.domain.EquityTransferRecord;
 import com.ruoyi.equity.domain.dto.EquityMessageRecordDTO;
 import com.ruoyi.equity.domain.vo.EquityMessageRecordVO;
+import com.ruoyi.equity.domain.vo.UserInfoVO;
 
 /**
  * 股权消息通知Service接口
@@ -64,5 +65,15 @@ public interface IEquityMessageRecordService extends IService<EquityMessageRecor
      */
     public int deleteEquityMessageRecordByEquityMessageId(Long equityMessageId);
 
+    /**
+     * 获取未读消息
+     * @return
+     */
     Integer getUnreadMessageCount();
+
+    /**
+     * 获取未读消息
+     * @param vo
+     */
+    void getUnreadMessageCount(UserInfoVO vo);
 }

+ 23 - 6
ruoyi-equity/src/main/java/com/ruoyi/equity/service/impl/EquityMessageRecordServiceImpl.java

@@ -4,16 +4,18 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 
+import cn.hutool.json.JSONUtil;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.ruoyi.common.utils.DateUtils;
 import com.ruoyi.common.utils.SecurityUtils;
-import com.ruoyi.common.utils.StringUtils;
-import com.ruoyi.equity.domain.EquityTransferRecord;
 import com.ruoyi.equity.domain.dto.EquityMessageRecordDTO;
 import com.ruoyi.equity.domain.vo.EquityMessageRecordVO;
-import com.ruoyi.equity.mapper.EquityTransferRecordMapper;
-import org.springframework.beans.factory.annotation.Autowired;
+import com.ruoyi.equity.domain.vo.MsgVO;
+import com.ruoyi.equity.domain.vo.UserInfoVO;
+import com.ruoyi.equity.socket.WebSocketService;
+import lombok.RequiredArgsConstructor;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import com.ruoyi.equity.mapper.EquityMessageRecordMapper;
 import com.ruoyi.equity.domain.EquityMessageRecord;
@@ -26,9 +28,12 @@ import com.ruoyi.equity.service.IEquityMessageRecordService;
  * @date 2025-03-05
  */
 @Service
+@RequiredArgsConstructor
 public class EquityMessageRecordServiceImpl extends ServiceImpl<EquityMessageRecordMapper, EquityMessageRecord> implements IEquityMessageRecordService {
-    @Autowired
-    private EquityMessageRecordMapper equityMessageRecordMapper;
+
+    private final EquityMessageRecordMapper equityMessageRecordMapper;
+
+    private final WebSocketService webSocketService;
 
     /**
      * 查询股权消息通知
@@ -112,4 +117,16 @@ public class EquityMessageRecordServiceImpl extends ServiceImpl<EquityMessageRec
                 .eq(EquityMessageRecord::getUserId, userId)
                 .eq(EquityMessageRecord::getIsRead, "0")));
     }
+    @Override
+    public void getUnreadMessageCount(UserInfoVO vo) {
+        //获取当前用户未读消息数量
+        int total = Math.toIntExact(this.count(new LambdaQueryWrapper<EquityMessageRecord>()
+                .eq(EquityMessageRecord::getUserId, vo.getUserId())
+                .eq(EquityMessageRecord::getIsRead, "0")));
+        //格式转换
+        MsgVO<Integer> msgVO = MsgVO.<Integer>builder().type("msgUnreadCount").data(total).build();
+        vo.setMsg(JSONUtil.toJsonStr(msgVO));
+        webSocketService.sendMessage(vo);
+    }
+
 }

+ 183 - 0
ruoyi-equity/src/main/java/com/ruoyi/equity/socket/WebSocketService.java

@@ -0,0 +1,183 @@
+package com.ruoyi.equity.socket;
+
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.StrUtil;
+import com.ruoyi.common.utils.SecurityUtils;
+import com.ruoyi.equity.domain.vo.MsgVO;
+import com.ruoyi.equity.domain.vo.UserInfoVO;
+import com.ruoyi.equity.event.WebsocketEvent;
+import com.ruoyi.equity.facade.EquityMessageRecordFacade;
+import com.ruoyi.equity.service.IEquityMessageRecordService;
+import lombok.Data;
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Data
+@Component
+@ServerEndpoint(value = "/websocket/{userId}")
+public class WebSocketService implements ApplicationContextAware {
+
+    private static ApplicationContext applicationContext;
+
+    @Override
+    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
+        WebSocketService.applicationContext = applicationContext;
+    }
+
+
+    /**日志*/
+    private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);
+
+    /**静态变量,用来记录当前在线用户数。应该把它设计成线程安全的。*/
+    private static int onlineUserCount = 0;
+
+    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
+    private static int onlineLinkCount = 0;
+
+    /**用户锁的映射表*/
+    private static final ConcurrentHashMap<Long, ReentrantLock> locks = new ConcurrentHashMap<>();
+
+    /**concurrent包的线程安全Set,用来存放每个用户对应的session对象*/
+    public static final ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> userSessionMap =  new ConcurrentHashMap<>();
+
+    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
+    private Session session;
+    /**用户id*/
+    private Long userId;
+
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("userId") Long userId) {
+        ReentrantLock lock = locks.computeIfAbsent(userId, k -> new ReentrantLock());
+        lock.lock();
+        CopyOnWriteArrayList<Session> sessions = userSessionMap.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>());
+        if (sessions.isEmpty()){
+            addOnlineUserCount();
+        }
+        addOnlineLinkCount();
+        sessions.add(session);
+        this.session = session;
+        this.userId= userId;
+        log.info("----------------------建立连接-------------------------------");
+        log.info(StrUtil.format("用户{}连接成功,当前用户总连接数:{}", userId, sessions.size()));
+        log.info(StrUtil.format("当前在线人数为:{},当前总连接数:{}", getOnlineUserCount(),getOnlineLinkCount()));
+        UserInfoVO vo = UserInfoVO.builder().userId(userId).build();
+        try {
+            WebsocketEvent event = new WebsocketEvent("websocketService");
+            event.setUserInfoVO(vo);
+            applicationContext.publishEvent(event);
+        }catch (Exception e){
+            log.error("用户:"+userId+",网络异常:"+e.getMessage());
+        }finally {
+            lock.unlock();
+        }
+
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        log.info("----------------------关闭连接-------------------------------");
+        ReentrantLock lock = locks.computeIfAbsent(userId, k -> new ReentrantLock());
+        lock.lock();
+        try {
+            CopyOnWriteArrayList<Session> sessions = userSessionMap.get(userId);
+            if (CollUtil.isEmpty(sessions)) {
+                throw new RuntimeException("sessions中未找到当前连接");
+            }
+            sessions.remove(session);
+            subOnlineLinkCount();
+            if (CollUtil.isEmpty(sessions)){
+                userSessionMap.remove(userId);
+                subOnlineUserCount();
+                log.info(StrUtil.format("用户{}退出,当前在线人数为:{}", userId, getOnlineUserCount()));
+            }else {
+                log.info(StrUtil.format("用户{}断开连接,用户当前剩余连接:{},当前在线人数为:{}", userId, sessions.size(),getOnlineUserCount()));
+            }
+        }catch (Exception e){
+            log.info("用户"+userId + "断开连接时发生异常:"+e.getMessage());
+        }finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     * @param message 客户端发送过来的消息*/
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        log.info("收到用户消息:"+userId+",报文:"+message);
+    }
+
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
+        error.printStackTrace();
+    }
+
+    public void sendMessage(UserInfoVO vo){
+        List<Session> sessions = userSessionMap.get(vo.getUserId());
+        if(sessions != null && sessions.size() > 0){
+            /** 此处不能使用普通集合,否则在推送消息时,当前用户又开启一个页面,集合发生变化迭代器会报错ConcurrentModificationException*/
+            for (Session s : sessions) {
+                if (s == null){
+                    continue;
+                }
+                RemoteEndpoint.Basic basicRemote = s.getBasicRemote();
+                /** 推送消息时可能因为nginx配置的连接时间,或者用户刚好断开连接,
+                 * 导致通道关闭,但是数组中使用的是快照,还存在这个连接,此时跳过推送给这个窗口*/
+                if (basicRemote != null){
+                    try {
+                        basicRemote.sendText(vo.getMsg());
+                    } catch (IOException e) {
+                        log.error(vo.getUserId()+"推送消息失败,消息内容["+ vo.getMsg()+"],原因:"+e.getMessage());
+                    }
+                }
+            }
+        }
+    }
+
+    public static synchronized void addOnlineUserCount() {
+        WebSocketService.onlineUserCount++;
+    }
+
+    public static synchronized void subOnlineUserCount() {
+        WebSocketService.onlineUserCount--;
+    }
+
+
+    public static synchronized void addOnlineLinkCount() {
+        WebSocketService.onlineLinkCount++;
+    }
+
+    public static synchronized void subOnlineLinkCount() {
+        WebSocketService.onlineLinkCount--;
+    }
+
+    public static synchronized int getOnlineUserCount() {
+        return onlineUserCount;
+    }
+
+    public static synchronized int getOnlineLinkCount() {
+        return onlineLinkCount;
+    }
+}

+ 1 - 1
ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java

@@ -111,7 +111,7 @@ public class SecurityConfig
             .authorizeHttpRequests((requests) -> {
                 permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());
                 // 对于登录login 注册register 验证码captchaImage 允许匿名访问
-                requests.antMatchers("/login", "/register", "/captchaImage").permitAll()
+                requests.antMatchers("/login", "/register", "/captchaImage","/websocket/*").permitAll()
                     // 静态资源,可匿名访问
                     .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll()
                     .antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**").permitAll()