Netty + RabbitMQ + MongoDB 实现在线聊天室

master
chenyuepan 2 weeks ago
parent d3b643c363
commit 516a676e66

@ -217,26 +217,18 @@
<artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
<version>4.5.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-actuator</artifactId>-->
<!-- </dependency>-->
<!-- Spring Boot Admin Client -->
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>3.0.2</version>
</dependency>
<!-- Metrics -->
<!-- <dependency>-->
<!-- <groupId>io.micrometer</groupId>-->
<!-- <artifactId>micrometer-registry-prometheus</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.springboot.ai</groupId>-->
<!-- <artifactId>spring-ai-openai-spring-boot-starter</artifactId>-->
<!-- <version>1.0.0</version> &lt;!&ndash; 使用最新版本 &ndash;&gt;-->
<!-- </dependency>-->
<!-- MongoDB -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
</dependencies>
<!-- 全局 build -->
<build>

@ -1,6 +1,9 @@
package com.example.venue_reservation_service.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -32,4 +35,23 @@ public class RabbitMQConfig {
.with("delay.routing.key")
.noargs();
}
// 添加聊天消息队列
@Bean
public Queue chatMessageQueue() {
return new Queue("chat.message.queue");
}
// 配置RabbitTemplate用于JSON消息转换
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

@ -64,12 +64,12 @@ public class UserController {
}
@Operation(
summary = "管理员登录",
description = "场馆管理员登录系统,验证账号密码",
summary = "账号密码登录",
description = "用户、管理员以及超级管理员登录系统,验证账号密码",
parameters = {
@Parameter(
name = "dto",
description = "管理员登录DTO包含账号和密码",
description = "登录DTO包含账号和密码",
required = true,
schema = @Schema(implementation = AdminDTO.class)
)
@ -114,22 +114,6 @@ public class UserController {
return userService.getUserList(dto);
}
@Operation(
summary = "校验用户身份",
description = "校验用户提交的身份信息(如等)",
parameters = {
@Parameter(
name = "dto",
description = "身份校验DTO包含用户身份信息",
required = true,
schema = @Schema(implementation = CheckDTO.class)
)
}
)
@PostMapping("/check")
public Result check(@RequestBody CheckDTO dto) {
return userService.userCheck(dto);
}
@Operation(
summary = "获取用户认证信息",

@ -1,13 +0,0 @@
package com.example.venue_reservation_service.dto;
import lombok.Data;
@Data
public class CheckDTO {
private Integer userId;
private String schoolId;
private String passwd;
}

@ -0,0 +1,24 @@
package com.example.venue_reservation_service.entity;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import java.time.LocalDateTime;
@Document(collection = "chat_messages")
@Data
public class ChatMessage {
@Id
private String id;
private String sender;
private Integer senderRole;
private String content;
private LocalDateTime timestamp;
public ChatMessage(String sender, Integer senderRole, String content) {
this.sender = sender;
this.senderRole = senderRole;
this.content = content;
this.timestamp = LocalDateTime.now();
}
}

@ -1,6 +1,9 @@
package com.example.venue_reservation_service.netty;
import com.example.venue_reservation_service.entity.ChatMessage;
import com.example.venue_reservation_service.repository.ChatMessageRepository;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
@ -9,18 +12,28 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.data.domain.Page;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
@Component
@Scope("prototype")
@ChannelHandler.Sharable
public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static final Map<String, Channel> userChannelMap = new HashMap<>();
private static final Map<Integer, String> roleMap = new HashMap<>();
@Autowired
private ChatMessageRepository chatMessageRepository;
static {
roleMap.put(0, "普通用户");
roleMap.put(1, "普通用户");
@ -41,7 +54,6 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
Channel incoming = ctx.channel();
String username = null;
// 查找并移除用户
for (Map.Entry<String, Channel> entry : userChannelMap.entrySet()) {
if (entry.getValue().equals(incoming)) {
username = entry.getKey();
@ -71,6 +83,8 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
handleLogin(incoming, jsonObject);
} else if ("message".equals(type)) {
handleMessage(incoming, jsonObject);
} else if ("loadHistory".equals(type)) {
handleLoadHistory(incoming, jsonObject);
} else {
sendErrorMessage(incoming, "未知消息类型: " + type);
}
@ -115,6 +129,10 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
return;
}
// 保存消息到MongoDB
ChatMessage chatMessage = new ChatMessage(username, userRole, content);
chatMessageRepository.save(chatMessage);
String roleName = roleMap.getOrDefault(userRole, "未知角色");
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
@ -129,6 +147,32 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
broadcastMessage(messageJson.toJSONString());
}
private void handleLoadHistory(Channel incoming, JSONObject jsonObject) {
String username = jsonObject.getString("username");
long lastTimestamp = jsonObject.getLongValue("lastTimestamp");
int pageSize = jsonObject.getIntValue("pageSize");
if (!userChannelMap.containsKey(username) || !userChannelMap.get(username).equals(incoming)) {
sendErrorMessage(incoming, "用户未登录");
return;
}
LocalDateTime beforeDate = lastTimestamp == 0 ? LocalDateTime.now()
: LocalDateTime.ofEpochSecond(lastTimestamp/1000, 0, java.time.ZoneOffset.UTC);
Page<ChatMessage> messages = chatMessageRepository.findByTimestampBeforeOrderByTimestampDesc(
beforeDate,
org.springframework.data.domain.PageRequest.of(0, pageSize)
);
JSONObject response = new JSONObject();
response.put("type", "history");
response.put("messages", messages.getContent());
response.put("hasMore", !messages.isLast());
incoming.writeAndFlush(new TextWebSocketFrame(response.toJSONString()));
}
private void broadcastMessage(String message) {
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame(message));
@ -158,7 +202,6 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
Channel channel = ctx.channel();
String username = null;
// 查找并移除用户
for (Map.Entry<String, Channel> entry : userChannelMap.entrySet()) {
if (entry.getValue().equals(channel)) {
username = entry.getKey();

@ -15,6 +15,7 @@ import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketSe
import io.netty.handler.stream.ChunkedWriteHandler;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -24,6 +25,9 @@ public class NettyChatServer {
@Value("${server.netty.port}")
private int port;
@Autowired
private ChatServerHandler chatServerHandler; // 确保正确注入
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@ -46,7 +50,7 @@ public class NettyChatServer {
new ChunkedWriteHandler(),
new WebSocketServerCompressionHandler(),
new WebSocketServerProtocolHandler("/ws", null, true),
new ChatServerHandler()
chatServerHandler // 使用注入的共享handler
);
}
});

@ -0,0 +1,11 @@
package com.example.venue_reservation_service.repository;
import com.example.venue_reservation_service.entity.ChatMessage;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.repository.MongoRepository;
import java.time.LocalDateTime;
public interface ChatMessageRepository extends MongoRepository<ChatMessage, String> {
Page<ChatMessage> findByTimestampBeforeOrderByTimestampDesc(LocalDateTime before, Pageable pageable);
}

@ -21,8 +21,6 @@ public interface UserService extends IService<User> {
Result getUserList(UserDTO dto);
Result userCheck(CheckDTO dto);
Result getSchool(Integer userId);
Result userRecharge(Integer userId, Double amount);

@ -66,6 +66,7 @@ public class QuestionServiceImpl extends ServiceImpl<QuestionMapper, Question>
question.setCreatedTime(LocalDateTime.now());
}
question.setToUser(1);
question.setToAdmin(1);
saveOrUpdate(question);
return Result.ok(question).message("保存问题成功");
}
@ -108,10 +109,18 @@ public class QuestionServiceImpl extends ServiceImpl<QuestionMapper, Question>
if (!Objects.equals(question.getUserId(), userId)) {
return Result.fail().message("非本人提问,不可删除");
}
if (question.getStatus() == 1) {
replyMapper.delete(Wrappers.<Reply>lambdaQuery().eq(Reply::getQuestionId, id));
User user = userMapper.selectById(userId);
if(user.getType() == 3){
question.setToAdmin(0);
}else{
question.setToUser(0);
}
if (question.getToUser() == 0 && question.getToAdmin() == 0 && question.getStatus() == 1) {
replyMapper.delete(Wrappers.<Reply>lambdaQuery().eq(Reply::getQuestionId, id));
removeById(id);
}else{
updateById(question);
}
return Result.ok(question).message("移除成功");
}

@ -85,20 +85,29 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User>
return Result.fail().message("登录失败");
}
User user = getOne(Wrappers.<User>lambdaQuery().eq(User::getOpenId, openid));
LoginVo loginVo = new LoginVo();
loginVo.setIsFirst(0);
if(Optional.ofNullable(user).isEmpty()){
// 如果当前用户信息不存在,进行注册
user = new User();
// 生成默认用户名
user.setUsername("用户" + System.currentTimeMillis());
// 设置OpenId作唯一标识
user.setOpenId(openid);
user.setType(0); // 表示普通用户
user.setAccount(LocalDate.now().getYear() +"-"+ System.currentTimeMillis());
// 表示普通用户
user.setType(0);
// 设置用户账号(系统生成)
user.setAccount(PasswordGenerator.generateAccount());
// 设置用户初始密码
user.setAccessCode(PasswordGenerator.generatePassword());
user.setRegisterTime(LocalDateTime.now());
user.setIsUpload(0);
save(user);
// 添加新的账户余额信息
Balance balance = new Balance(0.0, user.getId());
balanceService.save(balance);
loginVo.setIsFirst(1);
loginVo.setAccessCode(user.getAccessCode());
}
// 获取用户账号余额数据
Double money = balanceService.getOne(Wrappers.<Balance>lambdaQuery().eq(Balance::getUserId, user.getId()))
@ -107,7 +116,8 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User>
user.setAvatar(imgUrl + user.getAvatar());
}
user.setBalance(money);
LoginVo loginVo = new LoginVo(user, JwtUtil.createToken(user.getId()));
loginVo.setUser(user);
loginVo.setToken(JwtUtil.createToken(user.getId()));
return Result.ok(loginVo).message("登录成功");
}
@ -117,34 +127,66 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User>
LambdaQueryWrapper<User> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(User::getAccount, account);
User user = getOne(wrapper);
if (getOne(wrapper) == null) {
return Result.fail().message("管理员账号不存在");
if (user == null) { // 修正:避免重复查询
return Result.fail().message("用户账号不存在");
}
String password = dto.getPasswd();
String redisKey = "login_lock:" + account; // 使用固定前缀
if (!user.getAccessCode().equals(password)) {
// 查询该账号在五分钟内的登录次数
String key = name + dto.getAccount();
if (Optional.ofNullable(redisUtil.get(key)).isEmpty()) {
// 若该 key 值未出现过或者过期了则存入缓存同时设置存活时间为5分钟
redisUtil.set(key, 1+"-"+ System.currentTimeMillis(), 5 * 60);
// 处理密码错误的情况
if (redisUtil.hasKey(redisKey)) {
String[] parts = String.valueOf(redisUtil.get(redisKey)).split("-");
int failCount = Integer.parseInt(parts[0]);
long firstFailTime = Long.parseLong(parts[1]);
// 错误次数已达上限
if (failCount >= 3) {
long lockTime = 5 * 60 * 1000; // 5分钟毫秒数
long remainMillis = lockTime - (System.currentTimeMillis() - firstFailTime);
if (remainMillis > 0) {
return Result.fail().message("该账号已被锁定,请在 " + (remainMillis / 1000) + " 秒后重新登录");
} else {
String[] split = String.valueOf(redisUtil.get(key)).split("-");
// 获取次数
int i = Integer.parseInt(split[0]);
if(i == 3){
return Result.fail().message("该账号已被锁定,请在 "+ DateUtil.calc(split[1])+" 秒后,再进行登录");
}else{
i++;
redisUtil.set(key, i+"-"+String.valueOf(System.currentTimeMillis()), 5 * 60 );
// 锁定已过期,重置计数器
redisUtil.remove(redisKey);
}
}
}
// 更新错误计数器(包含首次错误情况)
int newCount = 1;
long timestamp = System.currentTimeMillis();
if (redisUtil.hasKey(redisKey)) {
String[] parts = String.valueOf(redisUtil.get(redisKey)).split("-");
newCount = Integer.parseInt(parts[0]) + 1;
timestamp = Long.parseLong(parts[1]); // 保持第一次错误时间
}
// 更新Redis仅首次设置过期时间
String value = newCount + "-" + timestamp;
if (newCount == 1) {
redisUtil.set(redisKey, value, 5 * 60);
} else {
redisUtil.set(redisKey, value); // 更新次数但保留原TTL
}
return Result.fail().message("账号密码不匹配");
}
// 密码正确,登录成功
// 清除登录错误计数
if (redisUtil.hasKey(redisKey)) {
redisUtil.remove(redisKey);
}
user.setAccessCode("");
if (user.getIsUpload() == 1) {
user.setAvatar(imgUrl + user.getAvatar());
}
LoginVo vo = new LoginVo(user, JwtUtil.createToken(user.getId()));
LoginVo vo = new LoginVo();
vo.setUser(user);
vo.setToken(JwtUtil.createToken(user.getId()));
return Result.ok(vo).message("登录成功");
}
@ -204,11 +246,6 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User>
return Result.ok(vo).message("ok");
}
@Override
public Result userCheck(CheckDTO dto) {
return Result.ok( ).message("身份信息认证成功");
}
@Override
public Result getSchool(Integer userId) {
School one = schoolService.getOne(Wrappers.<School>lambdaQuery()

@ -0,0 +1,40 @@
package com.example.venue_reservation_service.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class PasswordGenerator {
private static final char[] ALPHANUMERIC_CHARS =
"0123456789abcdefghijklmnopqrstuvwxyz".toCharArray();
public static String generatePassword() {
Random random = new Random();
char[] password = new char[6];
for (int i = 0; i < 6; i++) {
int randomIndex = random.nextInt(ALPHANUMERIC_CHARS.length);
password[i] = ALPHANUMERIC_CHARS[randomIndex];
}
return new String(password);
}
public static String generateAccount() {
// 1. 获取当前日期yymmdd格式
SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMdd");
String datePart = dateFormat.format(new Date());
// 2. 获取时间戳毫秒值并取最后6位
long timestamp = System.currentTimeMillis();
String timestampStr = String.valueOf(timestamp);
// 确保获取最后6位数字
String timePart = timestampStr.substring(
Math.max(0, timestampStr.length() - 6));
// 3. 组合成账号
return datePart + timePart;
}
}

@ -5,10 +5,13 @@ import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class LoginVo {
private User user;
private String token;
private Integer isFirst;
private String accessCode;
}

@ -35,6 +35,13 @@ spring:
max-wait: -1
max-idle: 5
min-idle: 0
mongodb:
host: ${localhosturl}
port: 27017
database: chat_db
username: mongo
password: mongo
authentication-database: admin
jackson:
date-format: yyyy-MM-dd HH:mm:ss
rabbitmq:
@ -58,7 +65,7 @@ spring:
boot:
admin:
client:
url: http://localhost:10020/ # 服务端地址
url: http://119.29.191.232:10020 # 服务端地址
username: admin # 服务端用户名
password: admin123! # 服务端密码
@ -90,7 +97,9 @@ knife4j:
# username: admin
# password: 123456
chat:
# MongoDB消息保留天数
message-retention-days: 30
weixin:
app_id: wxce0025f2c0b10a8e
secret: 686870f1b1a875369ee979e0648fa950

Loading…
Cancel
Save