diff --git a/version6/krpc-common/src/main/java/common/message/RequestType.java b/version6/krpc-common/src/main/java/common/message/RequestType.java new file mode 100644 index 0000000..d1eef1e --- /dev/null +++ b/version6/krpc-common/src/main/java/common/message/RequestType.java @@ -0,0 +1,18 @@ +package common.message; + +import lombok.AllArgsConstructor; + +/** + * @author wxx + * @version 1.0 + * @create 2025/2/28 18:32 + */ +@AllArgsConstructor +public enum RequestType { + NORMAL(0), HEARTBEAT(1); + private int code; + + public int getCode() { + return code; + } +} diff --git a/version6/krpc-common/src/main/java/common/message/RpcRequest.java b/version6/krpc-common/src/main/java/common/message/RpcRequest.java index b44a931..2c711a1 100644 --- a/version6/krpc-common/src/main/java/common/message/RpcRequest.java +++ b/version6/krpc-common/src/main/java/common/message/RpcRequest.java @@ -20,6 +20,8 @@ import java.io.Serializable; @Data @Builder public class RpcRequest implements Serializable { + //v6新增:请求类型 + private RequestType type=RequestType.NORMAL; //接口名、方法名、参数列表参数类型 private String interfaceName; @@ -28,4 +30,8 @@ public class RpcRequest implements Serializable { private Object[] params; private Class[] paramsType; + public static RpcRequest heartBeat() { + return RpcRequest.builder().type(RequestType.HEARTBEAT).build(); + } + } diff --git a/version6/krpc-core/src/main/java/com/kama/client/netty/HeartbeatHandler.java b/version6/krpc-core/src/main/java/com/kama/client/netty/HeartbeatHandler.java index c4587fb..dcdb627 100644 --- a/version6/krpc-core/src/main/java/com/kama/client/netty/HeartbeatHandler.java +++ b/version6/krpc-core/src/main/java/com/kama/client/netty/HeartbeatHandler.java @@ -1,15 +1,20 @@ package com.kama.client.netty; +import common.message.RpcRequest; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import lombok.extern.slf4j.Slf4j; + +import java.lang.ref.ReferenceQueue; /** * @author wxx * @version 1.0 * @create 2025/2/13 15:01 */ +@Slf4j public class HeartbeatHandler extends ChannelDuplexHandler { @Override @@ -20,10 +25,11 @@ public class HeartbeatHandler extends ChannelDuplexHandler { IdleState idleState = idleStateEvent.state(); if(idleState == IdleState.WRITER_IDLE) { - ctx.writeAndFlush("两秒没有写数据,发送心跳包\n"); - System.out.println("超过两秒没有写数据,发送心跳包"); + ctx.writeAndFlush(RpcRequest.heartBeat()); + log.info("超过8秒没有写数据,发送心跳包"); } - + }else { + super.userEventTriggered(ctx, evt); } } } diff --git a/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientHandler.java b/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientHandler.java index 15c98ff..7975d5f 100644 --- a/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientHandler.java +++ b/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientHandler.java @@ -23,7 +23,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler AttributeKey RESPONSE_KEY = AttributeKey.valueOf("RPCResponse"); // 将响应存入 Channel 属性 ctx.channel().attr(RESPONSE_KEY).set(response); - ctx.channel().close(); + //ctx.channel().close(); } @Override diff --git a/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java b/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java index 521aa27..8483a58 100644 --- a/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java +++ b/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java @@ -35,9 +35,9 @@ public class NettyClientInitializer extends ChannelInitializer { pipeline.addLast(new MyDecoder()); pipeline.addLast(new NettyClientHandler()); pipeline.addLast(new MDCChannelHandler()); - // 客户端只关注写事件,如果超过2秒没有发送数据,则发送心跳包 - //pipeline.addLast(new IdleStateHandler(0, 2, 0, TimeUnit.SECONDS)); - //pipeline.addLast(new HeartbeatHandler()); + // 客户端只关注写事件,如果超过8秒没有发送数据,则发送心跳包 + pipeline.addLast(new IdleStateHandler(0, 8, 0, TimeUnit.SECONDS)); + pipeline.addLast(new HeartbeatHandler()); log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).getType()); } catch (Exception e) { log.error("Error initializing Netty client pipeline", e); diff --git a/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java b/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java index f0b7acc..8e07ea2 100644 --- a/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java +++ b/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java @@ -8,6 +8,7 @@ import com.kama.client.rpcclient.impl.NettyRpcClient; import com.kama.client.servicecenter.ServiceCenter; import com.kama.client.servicecenter.ZKServiceCenter; import com.kama.trace.interceptor.ClientTraceInterceptor; +import common.message.RequestType; import common.message.RpcRequest; import common.message.RpcResponse; import common.trace.TraceContext; @@ -46,6 +47,7 @@ public class ClientProxy implements InvocationHandler { //System.out.println(TraceContext.getTraceId() +";"+ TraceContext.getSpanId()); //构建request RpcRequest request = RpcRequest.builder() + .type(RequestType.NORMAL) .interfaceName(method.getDeclaringClass().getName()) .methodName(method.getName()) .params(args).paramsType(method.getParameterTypes()).build(); diff --git a/version6/krpc-core/src/main/java/com/kama/server/netty/HeartbeatHandler.java b/version6/krpc-core/src/main/java/com/kama/server/netty/HeartbeatHandler.java index 045bea1..277f216 100644 --- a/version6/krpc-core/src/main/java/com/kama/server/netty/HeartbeatHandler.java +++ b/version6/krpc-core/src/main/java/com/kama/server/netty/HeartbeatHandler.java @@ -4,35 +4,36 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import lombok.extern.slf4j.Slf4j; /** * @author wxx * @version 1.0 * @create 2025/2/13 15:27 */ +@Slf4j public class HeartbeatHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + try { // 处理IdleState.READER_IDLE时间 if(evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; IdleState idleState = ((IdleStateEvent) evt).state(); - // 如果是触发的是读空闲时间,说明已经超过n秒没有收到客户端心跳包 if(idleState == IdleState.READER_IDLE) { - System.out.println("超过n秒没有收到客户端心跳, channel: " + ctx.channel()); - + log.info("超过10秒没有收到客户端心跳, channel: " + ctx.channel()); + // 关闭channel,避免造成更多资源占用 + ctx.close(); + }else if(idleState ==IdleState.WRITER_IDLE){ + log.info("超过20s没有写数据,channel: " + ctx.channel()); // 关闭channel,避免造成更多资源占用 ctx.close(); } - + }}catch (Exception e){ + log.error("处理事件发生异常"+e); } } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("接收到客户端数据, channel: " + ctx.channel() + ", 数据: " + msg.toString()); - } } \ No newline at end of file diff --git a/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java b/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java index f12a92d..5a40409 100644 --- a/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java +++ b/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java @@ -4,6 +4,7 @@ package com.kama.server.netty; import com.kama.server.provider.ServiceProvider; import com.kama.server.ratelimit.RateLimit; import com.kama.trace.interceptor.ServerTraceInterceptor; +import common.message.RequestType; import common.message.RpcRequest; import common.message.RpcResponse; import io.netty.channel.ChannelFutureListener; @@ -34,16 +35,22 @@ public class NettyRpcServerHandler extends SimpleChannelInboundHandler { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - // 服务端只关注读事件,如果3秒内没有收到客户端的消息,将会触发IdleState.READER_IDLE事件,将由HeartbeatHandler进行处理 - //pipeline.addLast(new IdleStateHandler(3, 0, 0 ,TimeUnit.SECONDS)); - //pipeline.addLast(new HeartbeatHandler()); + // 服务端关注读事件和写事件,如果10秒内没有收到客户端的消息,将会触发IdleState.READER_IDLE事件,将由HeartbeatHandler进行处理 + pipeline.addLast(new IdleStateHandler(10, 20, 0 ,TimeUnit.SECONDS)); + pipeline.addLast(new HeartbeatHandler()); //使用自定义的编/解码器 pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3))); pipeline.addLast(new MyDecoder()); diff --git a/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class b/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class index 15eeb31..e6d9a30 100644 Binary files a/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class and b/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class differ diff --git a/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class b/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class index ce4c829..b138af1 100644 Binary files a/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class and b/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class differ diff --git a/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class b/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class index 525d979..0f74e48 100644 Binary files a/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class and b/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class differ diff --git a/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class b/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class index bb69e2c..ecd2890 100644 Binary files a/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class and b/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class differ diff --git a/version6/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class b/version6/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class index 499dd03..52a1470 100644 Binary files a/version6/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class and b/version6/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class differ