Commit b1181345 by zzrdark

1.netty websocket 模块

parent 42833488
......@@ -77,6 +77,13 @@
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- FastJSON 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>
</dependencies>
<build>
......
package com.mx.cneeds.server.datashow;
package com.mx.cneeds.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
......
......@@ -25,6 +25,7 @@ public class OAuth2ResourceServer extends ResourceServerConfigurerAdapter {
"/device/device/getChannel_nums",
"/wechat/wechatFileUpload",
"/wechat/wechatDownload/**",
"/wechat/wechatMessageHttp",
"/logFile/logfile/DeviceLogFileUpload",
"/logFile/logfile/proDeviceUploadLogFile",
// "/user/info",
......
package com.mx.cneeds.server.datashow.web.wechat;
import com.alibaba.fastjson.JSON;
import com.mx.cneeds.common.constant.FilePath;
import com.mx.cneeds.common.result.R;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageDispatcher;
import com.mx.cneeds.server.wechat.common.json.WechatJsonMessage;
import com.mx.hbasefile.hadoop.hdfs.api.HdfsTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;
......@@ -32,6 +36,9 @@ public class WechatController {
@Autowired
private HdfsTemplate hdfsTemplate;
@Autowired
private MessageDispatcher messageDispatcher;
@RequestMapping("/wechatFileUpload")
@ResponseBody
public R upload(HttpServletRequest httpServletRequest, MultipartFile file) throws IOException {
......@@ -76,7 +83,11 @@ public class WechatController {
total-=1024;
}
outputStream.close();
}
@RequestMapping("/wechatMessageHttp")
@ResponseBody
public void wechatMessageHttp(@RequestBody WechatJsonMessage jsonMessage){
messageDispatcher.doWechatDispatcher(jsonMessage);
}
}
package com.mx.cneeds.server.wechat.common.codec;
import com.alibaba.fastjson.JSON;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 通信协议的消息体
*/
public class Invocation {
/**
* 类型
*/
private String type;
/**
* 消息,JSON 格式
*/
private String message;
/**
* 空构造方法
*/
public Invocation() { }
public Invocation(String type, String message) {
this.type = type;
this.message = message;
}
public Invocation(String type, Message message) {
this.type = type;
this.message = JSON.toJSONString(message);
}
public String getType() {
return type;
}
public Invocation setType(String type) {
this.type = type;
return this;
}
public String getMessage() {
return message;
}
public Invocation setMessage(String message) {
this.message = message;
return this;
}
@Override
public String toString() {
return "Invocation{" +
"type='" + type + '\'' +
", message='" + message + '\'' +
'}';
}
}
package com.mx.cneeds.server.wechat.common.codec;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* {@link Invocation} 解码器
*/
public class InvocationDecoder extends ByteToMessageDecoder {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 标记当前读取位置
in.markReaderIndex();
// 判断是否能够读取 length 长度
if (in.readableBytes() <= 4) {
return;
}
// 读取长度
int length = in.readInt();
if (length < 0) {
throw new CorruptedFrameException("negative length: " + length);
}
// 如果 message 不够可读,则退回到原读取位置
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// 读取内容
byte[] content = new byte[length];
in.readBytes(content);
// 解析成 Invocation
Invocation invocation = JSON.parseObject(content, Invocation.class);
out.add(invocation);
logger.info("[decode][连接({}) 解析到一条消息({})]", ctx.channel().id(), invocation.toString());
}
}
package com.mx.cneeds.server.wechat.common.codec;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link Invocation} 编码器
*/
public class InvocationEncoder extends MessageToByteEncoder<Invocation> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {
// 将 Invocation 转换成 byte[] 数组
// byte[] content = JSON.toJSONBytes(invocation);
String toJsonString = JSON.toJSONString(invocation);
System.out.println(toJsonString);
byte[] content = toJsonString.getBytes();
// 写入 length
out.writeInt(content.length);
// 写入内容
out.writeBytes(content);
logger.info("[decode][连接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());
}
}
package com.mx.cneeds.server.wechat.common.codec;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.List;
/**
* @ClassName MessageToWebSocketEncoder
* @Author zzrdark
* @Date 2020-06-23 21:28
* @Description TODO
**/
public class MessageToWebSocketEncoder extends MessageToMessageEncoder<Invocation> {
@Override
protected void encode(ChannelHandlerContext ctx, Invocation invocation, List<Object> list) throws Exception {
String toJsonString = JSON.toJSONString(invocation);
list.add(new TextWebSocketFrame(toJsonString));
}
}
package com.mx.cneeds.server.wechat.common.dispatcher;
/**
* 消息接口
*/
public interface Message {
}
package com.mx.cneeds.server.wechat.common.dispatcher;
import com.alibaba.fastjson.JSON;
import com.mx.cneeds.server.wechat.common.codec.Invocation;
import com.mx.cneeds.server.wechat.common.json.WechatJsonMessage;
import com.mx.cneeds.server.wechat.websocket.config.NettyChannelManager;
import com.mx.cneeds.server.wechat.websocket.message.pojo.wechat.WechatMessageRequest;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
public class MessageDispatcher {
@Autowired
private MessageHandlerContainer messageHandlerContainer;
public void doWechatDispatcher(WechatJsonMessage jsonMessage){
WechatMessageRequest wechatMessageRequest = new WechatMessageRequest();
BeanUtils.copyProperties(jsonMessage,wechatMessageRequest);
WechatMessageRequest.DataBean requestDataBean = new WechatMessageRequest.DataBean();
BeanUtils.copyProperties(jsonMessage.getData(),requestDataBean);
wechatMessageRequest.setData(requestDataBean);
Invocation invocation = new Invocation(WechatMessageRequest.TYPE,wechatMessageRequest);
sendToWechat(invocation);
}
private void sendToWechat(Invocation invocation) {
// 获得 type 对应的 MessageHandler 处理器
MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());
// 获得 MessageHandler 处理器 的消息类
Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);
// 解析消息
Message message = JSON.parseObject(invocation.getMessage(), messageClass);
// 执行逻辑
// noinspection unchecked
messageHandler.execute(null, message);
}
}
package com.mx.cneeds.server.wechat.common.dispatcher;
import io.netty.channel.Channel;
public interface MessageHandler<T extends Message> {
/**
* 执行处理消息
*
* @param channel 通道
* @param message 消息
*/
void execute(Channel channel, T message);
/**
* @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段
*/
String getType();
}
package com.mx.cneeds.server.wechat.common.dispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class MessageHandlerContainer implements InitializingBean {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 消息类型与 MessageHandler 的映射
*/
private final Map<String, MessageHandler> handlers = new HashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Override
public void afterPropertiesSet() throws Exception {
// 通过 ApplicationContext 获得所有 MessageHandler Bean
applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean
.forEach(messageHandler -> handlers.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
logger.info("[afterPropertiesSet][消息处理器数量:{}]", handlers.size());
}
/**
* 获得类型对应的 MessageHandler
*
* @param type 类型
* @return MessageHandler
*/
MessageHandler getMessageHandler(String type) {
MessageHandler handler = handlers.get(type);
if (handler == null) {
throw new IllegalArgumentException(String.format("类型(%s) 找不到匹配的 MessageHandler 处理器", type));
}
return handler;
}
/**
* 获得 MessageHandler 处理的消息类
*
* @param handler 处理器
* @return 消息类
*/
static Class<? extends Message> getMessageClass(MessageHandler handler) {
// 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
// 获得接口的 Type 数组
Type[] interfaces = targetClass.getGenericInterfaces();
Class<?> superclass = targetClass.getSuperclass();
while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准
interfaces = superclass.getGenericInterfaces();
superclass = targetClass.getSuperclass();
}
if (Objects.nonNull(interfaces)) {
// 遍历 interfaces 数组
for (Type type : interfaces) {
// 要求 type 是泛型参数
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
// 要求是 MessageHandler 接口
if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
// 取首个元素
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return (Class<Message>) actualTypeArguments[0];
} else {
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}
}
}
}
}
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}
}
package com.mx.cneeds.server.wechat.common.json;
/**
* @ClassName JsonMessage
* @Author zzrdark
* @Date 2020-06-23 17:36
* @Description TODO
**/
public class WechatJsonMessage {
/**
* account : 18351837032
* data : {"content":"私聊文字消息测试","fromUser":"zhongweiyu789","msgId":1114311117,"newMsgId":7032628722018977792,"self":false,"timestamp":1591325316,"toUser":"wxid_fj9ghhitl4qm22","wId":"d6a0cb7d-da8c-488a-a119-f8594984e0c7"}
* messageType : 5
* wcId : wxid_fj9ghhitl4qm22
*/
private String account;
private DataBean data;
private int messageType;
private String wcId;
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public DataBean getData() {
return data;
}
public void setData(DataBean data) {
this.data = data;
}
public int getMessageType() {
return messageType;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
public String getWcId() {
return wcId;
}
public void setWcId(String wcId) {
this.wcId = wcId;
}
public static class DataBean {
/**
* content : 私聊文字消息测试
* fromUser : zhongweiyu789
* msgId : 1114311117
* newMsgId : 7032628722018977792
* self : false
* timestamp : 1591325316
* toUser : wxid_fj9ghhitl4qm22
* wId : d6a0cb7d-da8c-488a-a119-f8594984e0c7
*/
private String content;
private String fromGroup;
private String fromUser;
private int msgId;
private long newMsgId;
private boolean self;
private int timestamp;
private String toUser;
private String wId;
private Long voiceLength;
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getFromGroup() {
return fromGroup;
}
public void setFromGroup(String fromGroup) {
this.fromGroup = fromGroup;
}
public String getFromUser() {
return fromUser;
}
public void setFromUser(String fromUser) {
this.fromUser = fromUser;
}
public int getMsgId() {
return msgId;
}
public void setMsgId(int msgId) {
this.msgId = msgId;
}
public long getNewMsgId() {
return newMsgId;
}
public void setNewMsgId(long newMsgId) {
this.newMsgId = newMsgId;
}
public boolean isSelf() {
return self;
}
public void setSelf(boolean self) {
this.self = self;
}
public int getTimestamp() {
return timestamp;
}
public void setTimestamp(int timestamp) {
this.timestamp = timestamp;
}
public String getToUser() {
return toUser;
}
public void setToUser(String toUser) {
this.toUser = toUser;
}
public String getWId() {
return wId;
}
public void setWId(String wId) {
this.wId = wId;
}
public Long getVoiceLength() {
return voiceLength;
}
public void setVoiceLength(Long voiceLength) {
this.voiceLength = voiceLength;
}
}
}
package com.mx.cneeds.server.wechat.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
@Component
public class NettyServer {
private Logger logger = LoggerFactory.getLogger(getClass());
@Value("${netty.port}")
private Integer port;
@Autowired
private NettyServerHandlerInitializer nettyServerHandlerInitializer;
/**
* boss 线程组,用于服务端接受客户端的连接
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup();
/**
* worker 线程组,用于服务端接受客户端的数据读写
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* Netty Server Channel
*/
private Channel channel;
/**
* 启动 Netty Server
*/
@PostConstruct
public void start() throws InterruptedException {
// 创建 ServerBootstrap 对象,用于 Netty Server 启动
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 ServerBootstrap 的各种属性
bootstrap.group(bossGroup, workerGroup)
// 指定 Channel 为服务端 NioServerSocketChannel
.channel(NioServerSocketChannel.class)
// 设置 Netty Server 的端口
.localAddress(new InetSocketAddress(port))
// 服务端 accept 队列的大小
.option(ChannelOption.SO_BACKLOG, 1024)
// TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
// .childOption(ChannelOption.SO_KEEPALIVE, true)
// 允许较小的数据包的发送,降低延迟
// .childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(nettyServerHandlerInitializer);
// 绑定端口,并同步等待成功,即启动服务端
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
channel = future.channel();
logger.info("[start][Netty Server 启动在 {} 端口]", port);
}
}
/**
* 关闭 Netty Server
*/
@PreDestroy
public void shutdown() {
// 关闭 Netty Server
if (channel != null) {
channel.close();
}
// 优雅关闭两个 EventLoopGroup 对象
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
package com.mx.cneeds.server.wechat.websocket;
import com.mx.cneeds.server.wechat.common.codec.InvocationDecoder;
import com.mx.cneeds.server.wechat.common.codec.InvocationEncoder;
import com.mx.cneeds.server.wechat.common.codec.MessageToWebSocketEncoder;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageDispatcher;
import com.mx.cneeds.server.wechat.websocket.handler.NettyServerHandler;
import com.mx.cneeds.server.wechat.websocket.handler.OperateWebsocketHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
/**
* 心跳超时时间
*/
private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;
@Autowired
private NettyServerHandler nettyServerHandler;
@Autowired
private OperateWebsocketHandler operateWebsocketHandler;
@Override
protected void initChannel(Channel ch) {
// 获得 Channel 对应的 ChannelPipeline
ChannelPipeline channelPipeline = ch.pipeline();
// 添加一堆 NettyServerHandler 到 ChannelPipeline 中
channelPipeline
// 空闲检测
// .addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
.addLast(new HttpServerCodec())
//以块的方式来写的处理器
.addLast(new ChunkedWriteHandler())
.addLast(new HttpObjectAggregator(8192))
// 编码器
// .addLast(new InvocationEncoder())
.addLast(new MessageToWebSocketEncoder())
// 操作
.addLast(operateWebsocketHandler)
// 服务端处理器
.addLast(nettyServerHandler)
.addLast(new WebSocketServerProtocolHandler(
"/ws", null, true, 65536 * 10));
}
}
package com.mx.cneeds.server.wechat.websocket.config;
import com.mx.cneeds.server.wechat.common.codec.Invocation;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 客户端 Channel 管理器。提供两种功能:
* 1. 客户端 Channel 的管理
* 2. 向客户端 Channel 发送消息
*/
@Component
public class NettyChannelManager {
/**
* {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户
*/
private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user");
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* Channel 映射
*/
private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
/**
* 用户与 Channel 的映射。
*
* 通过它,可以获取用户对应的 Channel。这样,我们可以向指定用户发送消息。
*/
private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>();
/**
* 添加 Channel 到 {@link #channels} 中
*
* @param channel Channel
*/
public void add(Channel channel) {
channels.put(channel.id(), channel);
logger.info("[add][一个连接({})加入]", channel.id());
}
/**
* 添加指定用户到 {@link #userChannels} 中
*
* @param channel Channel
* @param user 用户
*/
public void addUser(Channel channel, String user) {
Channel existChannel = channels.get(channel.id());
if (existChannel == null) {
logger.error("[addUser][连接({}) 不存在]", channel.id());
return;
}
// 设置属性
channel.attr(CHANNEL_ATTR_KEY_USER).set(user);
// 添加到 userChannels
userChannels.put(user, channel);
}
/**
* 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除
*
* @param channel Channel
*/
public void remove(Channel channel) {
// 移除 channels
channels.remove(channel.id());
// 移除 userChannels
if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) {
userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get());
}
logger.info("[remove][一个连接({})离开]", channel.id());
}
/**
* 向指定用户发送消息
*
* @param user 用户
* @param invocation 消息体
*/
public void send(String user, Invocation invocation) {
// 获得用户对应的 Channel
Channel channel = userChannels.get(user);
if (channel == null) {
logger.error("[send][连接不存在]");
return;
}
if (!channel.isActive()) {
logger.error("[send][连接({})未激活]", channel.id());
return;
}
// 发送消息
channel.writeAndFlush(invocation);
}
/**
* 向所有用户发送消息
*
* @param invocation 消息体
*/
public void sendAll(Invocation invocation) {
for (Channel channel : channels.values()) {
if (!channel.isActive()) {
logger.error("[send][连接({})未激活]", channel.id());
return;
}
// 发送消息
channel.writeAndFlush(invocation);
}
}
}
package com.mx.cneeds.server.wechat.websocket.config;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageDispatcher;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageHandlerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyServerConfig {
@Bean
public MessageDispatcher messageDispatcher() {
return new MessageDispatcher();
}
@Bean
public MessageHandlerContainer messageHandlerContainer() {
return new MessageHandlerContainer();
}
}
package com.mx.cneeds.server.wechat.websocket.handler;
import com.mx.cneeds.server.wechat.websocket.config.NettyChannelManager;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 服务端 Channel 实现类,提供对客户端 Channel 建立连接、断开连接、异常时的处理
*/
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private NettyChannelManager channelManager;
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 从管理器中添加
channelManager.add(ctx.channel());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
// 从管理器中移除
channelManager.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
// 断开连接
ctx.channel().close();
}
}
package com.mx.cneeds.server.wechat.websocket.handler;
import com.alibaba.fastjson.JSON;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageHandlerContainer;
import com.mx.cneeds.server.wechat.websocket.config.NettyChannelManager;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName OperateWebsocketHandler
* @Author zzrdark
* @Date 2020-06-23 16:11
* @Description TODO
**/
@Component
@ChannelHandler.Sharable
public class OperateWebsocketHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyChannelManager nettyChannelManager;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次连接是FullHttpRequest,处理参数
if (null != msg && msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
Map paramMap=getUrlParams(uri);
System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
String wcId = null;
if (paramMap.get("wcId") instanceof String){
wcId = (String) paramMap.get("wcId");
}
// 增加用户信息
nettyChannelManager.addUser(ctx.channel(),wcId);
//如果url包含参数,需要处理
if(uri.contains("?")){
String newUri=uri.substring(0,uri.indexOf("?"));
System.out.println(newUri);
request.setUri(newUri);
}
super.channelRead(ctx, msg);
}else if(msg instanceof TextWebSocketFrame){
//正常的TEXT消息类型
TextWebSocketFrame frame=(TextWebSocketFrame)msg;
channelRead0(ctx,frame);
// System.out.println("客户端收到服务器数据:" +frame.text());
}
}
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
ctx.channel().writeAndFlush(textWebSocketFrame);
}
private static Map getUrlParams(String url){
Map<String,String> map = new HashMap<>();
url = url.replace("?",";");
if (!url.contains(";")){
return map;
}
if (url.split(";").length > 0){
String[] arr = url.split(";")[1].split("&");
for (String s : arr){
String key = s.split("=")[0];
String value = s.split("=")[1];
map.put(key,value);
}
return map;
}else{
return map;
}
}
}
package com.mx.cneeds.server.wechat.websocket.message.handler.auth;
import com.mx.cneeds.server.wechat.common.codec.Invocation;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageHandler;
import com.mx.cneeds.server.wechat.websocket.config.NettyChannelManager;
import com.mx.cneeds.server.wechat.websocket.message.pojo.auth.AuthRequest;
import com.mx.cneeds.server.wechat.websocket.message.pojo.auth.AuthResponse;
import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class AuthRequestHandler implements MessageHandler<AuthRequest> {
@Autowired
private NettyChannelManager nettyChannelManager;
@Override
public void execute(Channel channel, AuthRequest authRequest) {
// 如果未传递 accessToken
if (StringUtils.isEmpty(authRequest.getAccessToken())) {
AuthResponse authResponse = new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入");
channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
return;
}
// 添加到 WebSocketUtil 中
// 考虑到代码简化,我们先直接使用 accessToken 作为 User
nettyChannelManager.addUser(channel, authRequest.getAccessToken());
// 判断是否认证成功。这里,假装直接成功
AuthResponse authResponse = new AuthResponse().setCode(0);
channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
}
@Override
public String getType() {
return AuthRequest.TYPE;
}
}
package com.mx.cneeds.server.wechat.websocket.message.handler.chat;
import com.mx.cneeds.server.wechat.common.codec.Invocation;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageHandler;
import com.mx.cneeds.server.wechat.websocket.config.NettyChannelManager;
import com.mx.cneeds.server.wechat.websocket.message.pojo.chat.ChatRedirectToUserRequest;
import com.mx.cneeds.server.wechat.websocket.message.pojo.chat.ChatSendResponse;
import com.mx.cneeds.server.wechat.websocket.message.pojo.chat.ChatSendToAllRequest;
import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> {
@Autowired
private NettyChannelManager nettyChannelManager;
@Override
public void execute(Channel channel, ChatSendToAllRequest message) {
// 这里,假装直接成功
ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));
// 创建转发的消息,并广播发送
ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
nettyChannelManager.sendAll(new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
}
@Override
public String getType() {
return ChatSendToAllRequest.TYPE;
}
}
package com.mx.cneeds.server.wechat.websocket.message.handler.chat;
import com.mx.cneeds.server.wechat.common.codec.Invocation;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageHandler;
import com.mx.cneeds.server.wechat.websocket.config.NettyChannelManager;
import com.mx.cneeds.server.wechat.websocket.message.pojo.chat.ChatRedirectToUserRequest;
import com.mx.cneeds.server.wechat.websocket.message.pojo.chat.ChatSendResponse;
import com.mx.cneeds.server.wechat.websocket.message.pojo.chat.ChatSendToOneRequest;
import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> {
@Autowired
private NettyChannelManager nettyChannelManager;
@Override
public void execute(Channel channel, ChatSendToOneRequest message) {
// 这里,假装直接成功
/*
ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));
*/
// 创建转发的消息,发送给指定用户
ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
nettyChannelManager.send(message.getToUser(), new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
}
@Override
public String getType() {
return ChatSendToOneRequest.TYPE;
}
}
package com.mx.cneeds.server.wechat.websocket.message.handler.heartbeat;
import com.mx.cneeds.server.wechat.common.codec.Invocation;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageHandler;
import com.mx.cneeds.server.wechat.websocket.message.pojo.heartbeat.HeartbeatRequest;
import com.mx.cneeds.server.wechat.websocket.message.pojo.heartbeat.HeartbeatResponse;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void execute(Channel channel, HeartbeatRequest message) {
logger.info("[execute][收到连接({}) 的心跳请求]", channel.id());
// 响应心跳
HeartbeatResponse response = new HeartbeatResponse();
channel.write(new Invocation(HeartbeatResponse.TYPE, response));
}
@Override
public String getType() {
return HeartbeatRequest.TYPE;
}
}
package com.mx.cneeds.server.wechat.websocket.message.handler.wechat;
import com.mx.cneeds.server.wechat.common.codec.Invocation;
import com.mx.cneeds.server.wechat.common.dispatcher.MessageHandler;
import com.mx.cneeds.server.wechat.websocket.config.NettyChannelManager;
import com.mx.cneeds.server.wechat.websocket.message.pojo.chat.ChatRedirectToUserRequest;
import com.mx.cneeds.server.wechat.websocket.message.pojo.wechat.WechatMessageRequest;
import com.mx.cneeds.server.wechat.websocket.message.pojo.wechat.WechatMessageResponse;
import io.netty.channel.Channel;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName WechatMessageHandler
* @Author zzrdark
* @Date 2020-06-23 18:22
* @Description TODO
**/
@Component
public class WechatMessageHandler implements MessageHandler<WechatMessageRequest> {
@Autowired
private NettyChannelManager nettyChannelManager;
@Override
public void execute(Channel channel, WechatMessageRequest message) {
WechatMessageResponse.DataBean responseDataBean = new WechatMessageResponse.DataBean();
BeanUtils.copyProperties(message.getData(),responseDataBean);
WechatMessageResponse wechatMessageResponse = new WechatMessageResponse(responseDataBean,message.getMessageType());
nettyChannelManager.send(message.getWcId(), new Invocation(ChatRedirectToUserRequest.TYPE, wechatMessageResponse));
}
@Override
public String getType() {
return WechatMessageRequest.TYPE;
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.auth;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 用户认证请求
* @author zzr
*/
public class AuthRequest implements Message {
public static final String TYPE = "AUTH_REQUEST";
/**
* 认证 Token
*/
private String accessToken;
public String getAccessToken() {
return accessToken;
}
public AuthRequest setAccessToken(String accessToken) {
this.accessToken = accessToken;
return this;
}
@Override
public String toString() {
return "AuthRequest{" +
"accessToken='" + accessToken + '\'' +
'}';
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.auth;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 用户认证响应
* @author zzr
*/
public class AuthResponse implements Message {
public static final String TYPE = "AUTH_RESPONSE";
/**
* 响应状态码
*/
private Integer code;
/**
* 响应提示
*/
private String message;
public Integer getCode() {
return code;
}
public AuthResponse setCode(Integer code) {
this.code = code;
return this;
}
public String getMessage() {
return message;
}
public AuthResponse setMessage(String message) {
this.message = message;
return this;
}
@Override
public String toString() {
return "AuthResponse{" +
"code=" + code +
", message='" + message + '\'' +
'}';
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.chat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 转发消息给一个用户的 Message
* @author zzr
*/
public class ChatRedirectToUserRequest implements Message {
public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST";
/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;
public String getMsgId() {
return msgId;
}
public ChatRedirectToUserRequest setMsgId(String msgId) {
this.msgId = msgId;
return this;
}
public String getContent() {
return content;
}
public ChatRedirectToUserRequest setContent(String content) {
this.content = content;
return this;
}
@Override
public String toString() {
return "ChatRedirectToUserRequest{" +
"msgId='" + msgId + '\'' +
", content='" + content + '\'' +
'}';
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.chat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 聊天发送消息结果的 Response
* @author zzr
*/
public class ChatSendResponse implements Message {
public static final String TYPE = "CHAT_SEND_RESPONSE";
/**
* 消息编号
*/
private String msgId;
/**
* 响应状态码
*/
private Integer code;
/**
* 响应提示
*/
private String message;
public String getMsgId() {
return msgId;
}
public ChatSendResponse setMsgId(String msgId) {
this.msgId = msgId;
return this;
}
public Integer getCode() {
return code;
}
public ChatSendResponse setCode(Integer code) {
this.code = code;
return this;
}
public String getMessage() {
return message;
}
public ChatSendResponse setMessage(String message) {
this.message = message;
return this;
}
@Override
public String toString() {
return "ChatSendResponse{" +
"msgId='" + msgId + '\'' +
", code=" + code +
", message='" + message + '\'' +
'}';
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.chat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 发送给所有人的群聊消息的 Message
* @author zzr
*/
public class ChatSendToAllRequest implements Message {
public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";
/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;
public String getContent() {
return content;
}
public ChatSendToAllRequest setContent(String content) {
this.content = content;
return this;
}
public String getMsgId() {
return msgId;
}
public ChatSendToAllRequest setMsgId(String msgId) {
this.msgId = msgId;
return this;
}
@Override
public String toString() {
return "ChatSendToAllRequest{" +
"msgId='" + msgId + '\'' +
", content='" + content + '\'' +
'}';
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.chat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 发送给指定人的私聊消息 Request
* @author zzr
*/
public class ChatSendToOneRequest implements Message {
public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST";
/**
* 发送给的用户
*/
private String toUser;
/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;
public String getToUser() {
return toUser;
}
public ChatSendToOneRequest setToUser(String toUser) {
this.toUser = toUser;
return this;
}
public String getMsgId() {
return msgId;
}
public ChatSendToOneRequest setMsgId(String msgId) {
this.msgId = msgId;
return this;
}
public String getContent() {
return content;
}
public ChatSendToOneRequest setContent(String content) {
this.content = content;
return this;
}
@Override
public String toString() {
return "ChatSendToOneRequest{" +
"toUser='" + toUser + '\'' +
", msgId='" + msgId + '\'' +
", content='" + content + '\'' +
'}';
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.heartbeat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 消息 - 心跳请求
* @author zzr
*/
public class HeartbeatRequest implements Message {
/**
* 类型 - 心跳请求
*/
public static final String TYPE = "HEARTBEAT_REQUEST";
@Override
public String toString() {
return "HeartbeatRequest{}";
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.heartbeat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
/**
* 消息 - 心跳响应
* @author zzr
*/
public class HeartbeatResponse implements Message {
/**
* 类型 - 心跳响应
*/
public static final String TYPE = "HEARTBEAT_RESPONSE";
@Override
public String toString() {
return "HeartbeatResponse{}";
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.wechat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
import com.mx.cneeds.server.wechat.common.json.WechatJsonMessage;
/**
* @ClassName WechatMessageRequest
* @Author zzrdark
* @Date 2020-06-23 18:05
* @Description TODO
**/
public class WechatMessageRequest implements Message {
public final static String TYPE = "WECHAT_MESSAGE_REQUEST";
/**
* account : 18351837032
* data : {"content":"私聊文字消息测试","fromUser":"zhongweiyu789","msgId":1114311117,"newMsgId":7032628722018977792,"self":false,"timestamp":1591325316,"toUser":"wxid_fj9ghhitl4qm22","wId":"d6a0cb7d-da8c-488a-a119-f8594984e0c7"}
* messageType : 5
* wcId : wxid_fj9ghhitl4qm22
*/
private String account;
private DataBean data;
private int messageType;
private String wcId;
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public DataBean getData() {
return data;
}
public void setData(DataBean data) {
this.data = data;
}
public int getMessageType() {
return messageType;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
public String getWcId() {
return wcId;
}
public void setWcId(String wcId) {
this.wcId = wcId;
}
public static class DataBean {
/**
* content : 私聊文字消息测试
* fromUser : zhongweiyu789
* msgId : 1114311117
* newMsgId : 7032628722018977792
* self : false
* timestamp : 1591325316
* toUser : wxid_fj9ghhitl4qm22
* wId : d6a0cb7d-da8c-488a-a119-f8594984e0c7
*/
private String content;
private String fromGroup;
private String fromUser;
private int msgId;
private long newMsgId;
private boolean self;
private int timestamp;
private String toUser;
private String wId;
private Long voiceLength;
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getFromGroup() {
return fromGroup;
}
public void setFromGroup(String fromGroup) {
this.fromGroup = fromGroup;
}
public String getFromUser() {
return fromUser;
}
public void setFromUser(String fromUser) {
this.fromUser = fromUser;
}
public int getMsgId() {
return msgId;
}
public void setMsgId(int msgId) {
this.msgId = msgId;
}
public long getNewMsgId() {
return newMsgId;
}
public void setNewMsgId(long newMsgId) {
this.newMsgId = newMsgId;
}
public boolean isSelf() {
return self;
}
public void setSelf(boolean self) {
this.self = self;
}
public int getTimestamp() {
return timestamp;
}
public void setTimestamp(int timestamp) {
this.timestamp = timestamp;
}
public String getToUser() {
return toUser;
}
public void setToUser(String toUser) {
this.toUser = toUser;
}
public String getWId() {
return wId;
}
public void setWId(String wId) {
this.wId = wId;
}
public Long getVoiceLength() {
return voiceLength;
}
public void setVoiceLength(Long voiceLength) {
this.voiceLength = voiceLength;
}
}
}
package com.mx.cneeds.server.wechat.websocket.message.pojo.wechat;
import com.mx.cneeds.server.wechat.common.dispatcher.Message;
import com.mx.cneeds.server.wechat.common.json.WechatJsonMessage;
/**
* @ClassName WechatMessageResponse
* @Author zzrdark
* @Date 2020-06-23 18:24
* @Description TODO
**/
public class WechatMessageResponse implements Message {
public final static String TYPE = "WECHAT_MESSAGE_RESPONSE";
/**
* account : 18351837032
* data : {"content":"私聊文字消息测试","fromUser":"zhongweiyu789","msgId":1114311117,"newMsgId":7032628722018977792,"self":false,"timestamp":1591325316,"toUser":"wxid_fj9ghhitl4qm22","wId":"d6a0cb7d-da8c-488a-a119-f8594984e0c7"}
* messageType : 5
* wcId : wxid_fj9ghhitl4qm22
*/
private DataBean data;
private int messageType;
public WechatMessageResponse(DataBean data, int messageType) {
this.data = data;
this.messageType = messageType;
}
public DataBean getData() {
return data;
}
public void setData(DataBean data) {
this.data = data;
}
public int getMessageType() {
return messageType;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
public static class DataBean {
/**
* content : 私聊文字消息测试
* fromUser : zhongweiyu789
* msgId : 1114311117
* newMsgId : 7032628722018977792
* self : false
* timestamp : 1591325316
* toUser : wxid_fj9ghhitl4qm22
* wId : d6a0cb7d-da8c-488a-a119-f8594984e0c7
*/
private String content;
private String fromGroup;
private String fromUser;
private int msgId;
private long newMsgId;
private boolean self;
private int timestamp;
private String toUser;
private String wId;
private Long voiceLength;
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getFromGroup() {
return fromGroup;
}
public void setFromGroup(String fromGroup) {
this.fromGroup = fromGroup;
}
public String getFromUser() {
return fromUser;
}
public void setFromUser(String fromUser) {
this.fromUser = fromUser;
}
public int getMsgId() {
return msgId;
}
public void setMsgId(int msgId) {
this.msgId = msgId;
}
public long getNewMsgId() {
return newMsgId;
}
public void setNewMsgId(long newMsgId) {
this.newMsgId = newMsgId;
}
public boolean isSelf() {
return self;
}
public void setSelf(boolean self) {
this.self = self;
}
public int getTimestamp() {
return timestamp;
}
public void setTimestamp(int timestamp) {
this.timestamp = timestamp;
}
public String getToUser() {
return toUser;
}
public void setToUser(String toUser) {
this.toUser = toUser;
}
public String getWId() {
return wId;
}
public void setWId(String wId) {
this.wId = wId;
}
public Long getVoiceLength() {
return voiceLength;
}
public void setVoiceLength(Long voiceLength) {
this.voiceLength = voiceLength;
}
}
}
......@@ -17,6 +17,6 @@ spring:
multipart:
max-file-size: 100000MB
max-request-size: 100000MB
location: /root/cneeds-server/tmp/
logging:
config: classpath:logback-spring-prod.xml
......@@ -8,3 +8,5 @@ server:
servlet:
context-path: /
netty:
port: 5000
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment