version6:step3-心跳检测优化

This commit is contained in:
Wxx 2025-03-01 21:22:59 +08:00
parent 9cf25a765d
commit 206a56c07e
14 changed files with 66 additions and 26 deletions

View File

@ -0,0 +1,18 @@
package common.message;
import lombok.AllArgsConstructor;
/**
* @author wxx
* @version 1.0
* @create 2025/2/28 18:32
*/
@AllArgsConstructor
public enum RequestType {
NORMAL(0), HEARTBEAT(1);
private int code;
public int getCode() {
return code;
}
}

View File

@ -20,6 +20,8 @@ import java.io.Serializable;
@Data @Data
@Builder @Builder
public class RpcRequest implements Serializable { public class RpcRequest implements Serializable {
//v6新增:请求类型
private RequestType type=RequestType.NORMAL;
//接口名方法名参数列表参数类型 //接口名方法名参数列表参数类型
private String interfaceName; private String interfaceName;
@ -28,4 +30,8 @@ public class RpcRequest implements Serializable {
private Object[] params; private Object[] params;
private Class<?>[] paramsType; private Class<?>[] paramsType;
public static RpcRequest heartBeat() {
return RpcRequest.builder().type(RequestType.HEARTBEAT).build();
}
} }

View File

@ -1,15 +1,20 @@
package com.kama.client.netty; package com.kama.client.netty;
import common.message.RpcRequest;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import java.lang.ref.ReferenceQueue;
/** /**
* @author wxx * @author wxx
* @version 1.0 * @version 1.0
* @create 2025/2/13 15:01 * @create 2025/2/13 15:01
*/ */
@Slf4j
public class HeartbeatHandler extends ChannelDuplexHandler { public class HeartbeatHandler extends ChannelDuplexHandler {
@Override @Override
@ -20,10 +25,11 @@ public class HeartbeatHandler extends ChannelDuplexHandler {
IdleState idleState = idleStateEvent.state(); IdleState idleState = idleStateEvent.state();
if(idleState == IdleState.WRITER_IDLE) { if(idleState == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("两秒没有写数据,发送心跳包\n"); ctx.writeAndFlush(RpcRequest.heartBeat());
System.out.println("超过两秒没有写数据,发送心跳包"); log.info("超过8秒没有写数据,发送心跳包");
} }
}else {
super.userEventTriggered(ctx, evt);
} }
} }
} }

View File

@ -23,7 +23,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse>
AttributeKey<RpcResponse> RESPONSE_KEY = AttributeKey.valueOf("RPCResponse"); AttributeKey<RpcResponse> RESPONSE_KEY = AttributeKey.valueOf("RPCResponse");
// 将响应存入 Channel 属性 // 将响应存入 Channel 属性
ctx.channel().attr(RESPONSE_KEY).set(response); ctx.channel().attr(RESPONSE_KEY).set(response);
ctx.channel().close(); //ctx.channel().close();
} }
@Override @Override

View File

@ -35,9 +35,9 @@ public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
pipeline.addLast(new MyDecoder()); pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyClientHandler()); pipeline.addLast(new NettyClientHandler());
pipeline.addLast(new MDCChannelHandler()); pipeline.addLast(new MDCChannelHandler());
// 客户端只关注写事件如果超过2秒没有发送数据则发送心跳包 // 客户端只关注写事件如果超过8秒没有发送数据则发送心跳包
//pipeline.addLast(new IdleStateHandler(0, 2, 0, TimeUnit.SECONDS)); pipeline.addLast(new IdleStateHandler(0, 8, 0, TimeUnit.SECONDS));
//pipeline.addLast(new HeartbeatHandler()); pipeline.addLast(new HeartbeatHandler());
log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).getType()); log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).getType());
} catch (Exception e) { } catch (Exception e) {
log.error("Error initializing Netty client pipeline", e); log.error("Error initializing Netty client pipeline", e);

View File

@ -8,6 +8,7 @@ import com.kama.client.rpcclient.impl.NettyRpcClient;
import com.kama.client.servicecenter.ServiceCenter; import com.kama.client.servicecenter.ServiceCenter;
import com.kama.client.servicecenter.ZKServiceCenter; import com.kama.client.servicecenter.ZKServiceCenter;
import com.kama.trace.interceptor.ClientTraceInterceptor; import com.kama.trace.interceptor.ClientTraceInterceptor;
import common.message.RequestType;
import common.message.RpcRequest; import common.message.RpcRequest;
import common.message.RpcResponse; import common.message.RpcResponse;
import common.trace.TraceContext; import common.trace.TraceContext;
@ -46,6 +47,7 @@ public class ClientProxy implements InvocationHandler {
//System.out.println(TraceContext.getTraceId() +";"+ TraceContext.getSpanId()); //System.out.println(TraceContext.getTraceId() +";"+ TraceContext.getSpanId());
//构建request //构建request
RpcRequest request = RpcRequest.builder() RpcRequest request = RpcRequest.builder()
.type(RequestType.NORMAL)
.interfaceName(method.getDeclaringClass().getName()) .interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName()) .methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build(); .params(args).paramsType(method.getParameterTypes()).build();

View File

@ -4,35 +4,36 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/** /**
* @author wxx * @author wxx
* @version 1.0 * @version 1.0
* @create 2025/2/13 15:27 * @create 2025/2/13 15:27
*/ */
@Slf4j
public class HeartbeatHandler extends ChannelDuplexHandler { public class HeartbeatHandler extends ChannelDuplexHandler {
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
try {
// 处理IdleState.READER_IDLE时间 // 处理IdleState.READER_IDLE时间
if(evt instanceof IdleStateEvent) { if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt; IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
IdleState idleState = ((IdleStateEvent) evt).state(); IdleState idleState = ((IdleStateEvent) evt).state();
// 如果是触发的是读空闲时间说明已经超过n秒没有收到客户端心跳包 // 如果是触发的是读空闲时间说明已经超过n秒没有收到客户端心跳包
if(idleState == IdleState.READER_IDLE) { if(idleState == IdleState.READER_IDLE) {
System.out.println("超过n秒没有收到客户端心跳 channel: " + ctx.channel()); log.info("超过10秒没有收到客户端心跳 channel: " + ctx.channel());
// 关闭channel避免造成更多资源占用
ctx.close();
}else if(idleState ==IdleState.WRITER_IDLE){
log.info("超过20s没有写数据,channel: " + ctx.channel());
// 关闭channel避免造成更多资源占用 // 关闭channel避免造成更多资源占用
ctx.close(); ctx.close();
} }
}}catch (Exception e){
log.error("处理事件发生异常"+e);
} }
} }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收到客户端数据, channel: " + ctx.channel() + ", 数据: " + msg.toString());
}
} }

View File

@ -4,6 +4,7 @@ package com.kama.server.netty;
import com.kama.server.provider.ServiceProvider; import com.kama.server.provider.ServiceProvider;
import com.kama.server.ratelimit.RateLimit; import com.kama.server.ratelimit.RateLimit;
import com.kama.trace.interceptor.ServerTraceInterceptor; import com.kama.trace.interceptor.ServerTraceInterceptor;
import common.message.RequestType;
import common.message.RpcRequest; import common.message.RpcRequest;
import common.message.RpcResponse; import common.message.RpcResponse;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -34,16 +35,22 @@ public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcReques
log.error("接收到非法请求RpcRequest 为空"); log.error("接收到非法请求RpcRequest 为空");
return; return;
} }
//trace记录 if(request.getType() == RequestType.HEARTBEAT){
ServerTraceInterceptor.beforeHandle(); log.info("接收到来自客户端的心跳包");
return;
}
if(request.getType() == RequestType.NORMAL) {
//trace记录
ServerTraceInterceptor.beforeHandle();
RpcResponse response = getResponse(request); RpcResponse response = getResponse(request);
//ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); //ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
//trace上报 //trace上报
ServerTraceInterceptor.afterHandle(request.getMethodName()); ServerTraceInterceptor.afterHandle(request.getMethodName());
ctx.writeAndFlush(response); ctx.writeAndFlush(response);
}
} }
@Override @Override

View File

@ -28,9 +28,9 @@ public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
// 服务端只关注读事件如果3秒内没有收到客户端的消息将会触发IdleState.READER_IDLE事件将由HeartbeatHandler进行处理 // 服务端关注读事件和写事件如果10秒内没有收到客户端的消息将会触发IdleState.READER_IDLE事件将由HeartbeatHandler进行处理
//pipeline.addLast(new IdleStateHandler(3, 0, 0 ,TimeUnit.SECONDS)); pipeline.addLast(new IdleStateHandler(10, 20, 0 ,TimeUnit.SECONDS));
//pipeline.addLast(new HeartbeatHandler()); pipeline.addLast(new HeartbeatHandler());
//使用自定义的编/解码器 //使用自定义的编/解码器
pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3))); pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new MyDecoder()); pipeline.addLast(new MyDecoder());