diff --git a/version5/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class b/version5/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class index d8bfe2c..4fad818 100644 Binary files a/version5/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class and b/version5/krpc-core/target/classes/com/kama/server/ratelimit/provider/RateLimitProvider.class differ diff --git a/version6/krpc-common/src/main/java/common/serializer/mycoder/MyDecoder.java b/version6/krpc-common/src/main/java/common/serializer/mycoder/MyDecoder.java index 16c2d26..fce0114 100644 --- a/version6/krpc-common/src/main/java/common/serializer/mycoder/MyDecoder.java +++ b/version6/krpc-common/src/main/java/common/serializer/mycoder/MyDecoder.java @@ -4,10 +4,12 @@ package common.serializer.mycoder; import common.exception.SerializeException; import common.message.MessageType; import common.serializer.myserializer.Serializer; +import common.trace.TraceContext; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; import java.util.Arrays; import java.util.List; @@ -27,7 +29,12 @@ public class MyDecoder extends ByteToMessageDecoder { if (in.readableBytes() < 6) { // messageType + serializerType + length return; } - //1.读取消息类型 + //1.读取traceMsg + int traceLength=in.readInt(); + byte[] traceBytes=new byte[traceLength]; + in.readBytes(traceBytes); + serializeTraceMsg(traceBytes); + //2.读取消息类型 short messageType = in.readShort(); // 现在还只支持request与response请求 if (messageType != MessageType.REQUEST.getCode() && @@ -35,19 +42,19 @@ public class MyDecoder extends ByteToMessageDecoder { log.warn("暂不支持此种数据, messageType: {}", messageType); return; } - //2.读取序列化的方式&类型 + //3.读取序列化的方式&类型 short serializerType = in.readShort(); Serializer serializer = Serializer.getSerializerByCode(serializerType); if (serializer == null) { log.error("不存在对应的序列化器, serializerType: {}", serializerType); throw new SerializeException("不存在对应的序列化器, serializerType: " + serializerType); } - //3.读取序列化数组长度 + //4.读取序列化数组长度 int length = in.readInt(); if (in.readableBytes() < length) { return; // 数据不完整,等待更多数据 } - //4.读取序列化数组 + //5.读取序列化数组 byte[] bytes = new byte[length]; in.readBytes(bytes); log.debug("Received bytes: {}", Arrays.toString(bytes)); @@ -55,4 +62,11 @@ public class MyDecoder extends ByteToMessageDecoder { out.add(deserialize); } + //解析并存储traceMsg + private void serializeTraceMsg(byte[] traceByte){ + String traceMsg=new String(traceByte); + String[] msgs=traceMsg.split(";"); + if(!msgs[0].equals("")) TraceContext.setTraceId(msgs[0]); + if(!msgs[1].equals("")) TraceContext.setParentSpanId(msgs[1]); + } } diff --git a/version6/krpc-common/src/main/java/common/serializer/mycoder/MyEncoder.java b/version6/krpc-common/src/main/java/common/serializer/mycoder/MyEncoder.java index 862796f..01efe20 100644 --- a/version6/krpc-common/src/main/java/common/serializer/mycoder/MyEncoder.java +++ b/version6/krpc-common/src/main/java/common/serializer/mycoder/MyEncoder.java @@ -5,11 +5,13 @@ import common.message.MessageType; import common.message.RpcRequest; import common.message.RpcResponse; import common.serializer.myserializer.Serializer; +import common.trace.TraceContext; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; /** * @ClassName MyEncoder @@ -26,7 +28,15 @@ public class MyEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { log.debug("Encoding message of type: {}", msg.getClass()); - //1.写入消息类型 + //1.写入trace消息头 + String traceMsg= TraceContext.getTraceId() +";"+TraceContext.getSpanId(); + byte[] traceBytes=traceMsg.getBytes(); + // 1.1写入traceMsg长度 + out.writeInt(traceBytes.length); + // 1.2写入traceBytes + out.writeBytes(traceBytes); + + //2.写入消息类型 if (msg instanceof RpcRequest) { out.writeShort(MessageType.REQUEST.getCode()); } else if (msg instanceof RpcResponse) { @@ -35,16 +45,16 @@ public class MyEncoder extends MessageToByteEncoder { log.error("Unknown message type: {}", msg.getClass()); throw new IllegalArgumentException("Unknown message type: " + msg.getClass()); } - //2.写入序列化方式 + //3.写入序列化方式 out.writeShort(serializer.getType()); //得到序列化数组 byte[] serializeBytes = serializer.serialize(msg); if (serializeBytes == null || serializeBytes.length == 0) { throw new IllegalArgumentException("Serialized message is empty"); } - //3.写入长度 + //4.写入长度 out.writeInt(serializeBytes.length); - //4.写入序列化数组 + //5.写入序列化数组 out.writeBytes(serializeBytes); } } diff --git a/version6/krpc-common/src/main/java/common/trace/TraceContext.java b/version6/krpc-common/src/main/java/common/trace/TraceContext.java new file mode 100644 index 0000000..22c83e3 --- /dev/null +++ b/version6/krpc-common/src/main/java/common/trace/TraceContext.java @@ -0,0 +1,57 @@ +package common.trace; + +import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; + +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2025/2/18 18:37 + */ +@Slf4j +public class TraceContext { + + public static void setTraceId(String traceId) { + MDC.put("traceId",traceId); + } + + public static String getTraceId() { + return MDC.get("traceId"); + } + + public static void setSpanId(String spanId) { + MDC.put("spanId",spanId); + } + + public static String getSpanId() { + return MDC.get("spanId"); + } + public static void setParentSpanId(String parentSpanId) { + MDC.put("parentSpanId",parentSpanId); + } + + public static String getParentSpanId() { + return MDC.get("parentSpanId"); + } + public static void setStartTimestamp(String startTimestamp) { + MDC.put("startTimestamp",startTimestamp); + } + + public static String getStartTimestamp() { + return MDC.get("startTimestamp"); + } + public static Map getCopy(){ + return MDC.getCopyOfContextMap(); + } + public static void clone(Map context){ + for(Map.Entry entry:context.entrySet()){ + System.out.println(entry.getKey()+":"+entry.getValue()); + MDC.put(entry.getKey(),entry.getValue()); + } + } + public static void clear() { + MDC.clear(); + } +} \ No newline at end of file diff --git a/version6/krpc-common/target/classes/common/serializer/mycoder/MyDecoder.class b/version6/krpc-common/target/classes/common/serializer/mycoder/MyDecoder.class index 44eb61a..28629ad 100644 Binary files a/version6/krpc-common/target/classes/common/serializer/mycoder/MyDecoder.class and b/version6/krpc-common/target/classes/common/serializer/mycoder/MyDecoder.class differ diff --git a/version6/krpc-common/target/classes/common/serializer/mycoder/MyEncoder.class b/version6/krpc-common/target/classes/common/serializer/mycoder/MyEncoder.class index cbdc4b0..5ac5416 100644 Binary files a/version6/krpc-common/target/classes/common/serializer/mycoder/MyEncoder.class and b/version6/krpc-common/target/classes/common/serializer/mycoder/MyEncoder.class differ diff --git a/version6/krpc-common/target/classes/common/trace/TraceContext.class b/version6/krpc-common/target/classes/common/trace/TraceContext.class new file mode 100644 index 0000000..2f2f46c Binary files /dev/null and b/version6/krpc-common/target/classes/common/trace/TraceContext.class differ diff --git a/version6/krpc-consumer/src/main/java/com/kama/consumer/ConsumerTest.java b/version6/krpc-consumer/src/main/java/com/kama/consumer/ConsumerTest.java index 759db0e..5de60e3 100644 --- a/version6/krpc-consumer/src/main/java/com/kama/consumer/ConsumerTest.java +++ b/version6/krpc-consumer/src/main/java/com/kama/consumer/ConsumerTest.java @@ -18,7 +18,7 @@ import java.util.concurrent.Executors; @Slf4j public class ConsumerTest { - private static final int THREAD_POOL_SIZE = 20; + private static final int THREAD_POOL_SIZE = 30; private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE); public static void main(String[] args) throws InterruptedException { diff --git a/version6/krpc-consumer/target/classes/com/kama/consumer/ConsumerTest.class b/version6/krpc-consumer/target/classes/com/kama/consumer/ConsumerTest.class index 9905858..eb527ca 100644 Binary files a/version6/krpc-consumer/target/classes/com/kama/consumer/ConsumerTest.class and b/version6/krpc-consumer/target/classes/com/kama/consumer/ConsumerTest.class differ diff --git a/version6/krpc-core/pom.xml b/version6/krpc-core/pom.xml index 6d3cf94..fdffc2b 100644 --- a/version6/krpc-core/pom.xml +++ b/version6/krpc-core/pom.xml @@ -111,5 +111,20 @@ junit junit + + io.zipkin.zipkin2 + zipkin + 3.4.0 + + + io.zipkin.reporter2 + zipkin-reporter + 3.4.0 + + + io.zipkin.reporter2 + zipkin-sender-okhttp3 + 3.4.0 + \ No newline at end of file diff --git a/version6/krpc-core/src/main/java/com/kama/client/netty/MDCChannelHandler.java b/version6/krpc-core/src/main/java/com/kama/client/netty/MDCChannelHandler.java new file mode 100644 index 0000000..ce40c5f --- /dev/null +++ b/version6/krpc-core/src/main/java/com/kama/client/netty/MDCChannelHandler.java @@ -0,0 +1,42 @@ +package com.kama.client.netty; + +import common.trace.TraceContext; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.util.AttributeKey; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2025/2/20 16:56 + */ +@Slf4j +public class MDCChannelHandler extends ChannelOutboundHandlerAdapter { + public static final AttributeKey> TRACE_CONTEXT_KEY = AttributeKey.valueOf("TraceContext"); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + // 从Channel属性中获取Trace上下文 + Map traceContext = + ctx.channel().attr(TRACE_CONTEXT_KEY).get(); + + if (traceContext != null) { + // 设置到当前线程的TraceContext或MDC + TraceContext.clone(traceContext); + log.info("已绑定Trace上下文: {}", traceContext); + } else { + log.error("Trace上下文未设置!"); + } + + // 继续传递请求 + super.write(ctx, msg, promise); + } + +} \ No newline at end of file diff --git a/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java b/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java index f64c74d..521aa27 100644 --- a/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java +++ b/version6/krpc-core/src/main/java/com/kama/client/netty/NettyClientInitializer.java @@ -4,12 +4,15 @@ package com.kama.client.netty; import common.serializer.mycoder.MyDecoder; import common.serializer.mycoder.MyEncoder; import common.serializer.myserializer.Serializer; +import common.trace.TraceContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.log4j.MDC; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -21,22 +24,21 @@ import java.util.concurrent.TimeUnit; */ @Slf4j public class NettyClientInitializer extends ChannelInitializer { - - + public NettyClientInitializer(){} @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - // 使用自定义的编码器和解码器 try { // 根据传入的序列化器类型初始化编码器 pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3))); pipeline.addLast(new MyDecoder()); pipeline.addLast(new NettyClientHandler()); + pipeline.addLast(new MDCChannelHandler()); // 客户端只关注写事件,如果超过2秒没有发送数据,则发送心跳包 - pipeline.addLast(new IdleStateHandler(0, 2, 0, TimeUnit.SECONDS)); - pipeline.addLast(new HeartbeatHandler()); - log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).toString()); + //pipeline.addLast(new IdleStateHandler(0, 2, 0, TimeUnit.SECONDS)); + //pipeline.addLast(new HeartbeatHandler()); + log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).getType()); } catch (Exception e) { log.error("Error initializing Netty client pipeline", e); throw e; // 重新抛出异常,确保管道初始化失败时处理正确 diff --git a/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java b/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java index a5361ca..f0b7acc 100644 --- a/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java +++ b/version6/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java @@ -7,8 +7,10 @@ import com.kama.client.rpcclient.RpcClient; import com.kama.client.rpcclient.impl.NettyRpcClient; import com.kama.client.servicecenter.ServiceCenter; import com.kama.client.servicecenter.ZKServiceCenter; +import com.kama.trace.interceptor.ClientTraceInterceptor; import common.message.RpcRequest; import common.message.RpcResponse; +import common.trace.TraceContext; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; @@ -39,6 +41,9 @@ public class ClientProxy implements InvocationHandler { //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端) @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + //trace记录 + ClientTraceInterceptor.beforeInvoke(); + //System.out.println(TraceContext.getTraceId() +";"+ TraceContext.getSpanId()); //构建request RpcRequest request = RpcRequest.builder() .interfaceName(method.getDeclaringClass().getName()) @@ -83,6 +88,8 @@ public class ClientProxy implements InvocationHandler { } log.info("收到响应: {} 状态码: {}", request.getInterfaceName(), response.getCode()); } + //trace上报 + ClientTraceInterceptor.afterInvoke(method.getName()); return response != null ? response.getData() : null; } diff --git a/version6/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java b/version6/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java index 177b7f3..cbd8d12 100644 --- a/version6/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java +++ b/version6/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java @@ -1,9 +1,11 @@ package com.kama.client.rpcclient.impl; +import com.kama.client.netty.MDCChannelHandler; import com.kama.client.netty.NettyClientInitializer; import com.kama.client.rpcclient.RpcClient; import common.message.RpcRequest; import common.message.RpcResponse; +import common.trace.TraceContext; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -12,8 +14,10 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; import java.net.InetSocketAddress; +import java.util.Map; /** * @ClassName NettyRpcClient @@ -44,6 +48,7 @@ public class NettyRpcClient implements RpcClient { @Override public RpcResponse sendRequest(RpcRequest request) { + Map mdcContextMap=TraceContext.getCopy(); //从注册中心获取host,post if (address == null) { log.error("服务发现失败,返回的地址为 null"); @@ -55,6 +60,9 @@ public class NettyRpcClient implements RpcClient { // 连接到远程服务 ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); Channel channel = channelFuture.channel(); + // 将当前Trace上下文保存到Channel属性 + channel.attr(MDCChannelHandler.TRACE_CONTEXT_KEY).set(mdcContextMap); + // 发送数据 channel.writeAndFlush(request); //sync()堵塞获取结果 diff --git a/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java b/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java index 9c26832..f12a92d 100644 --- a/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java +++ b/version6/krpc-core/src/main/java/com/kama/server/netty/NettyRpcServerHandler.java @@ -3,6 +3,7 @@ package com.kama.server.netty; import com.kama.server.provider.ServiceProvider; import com.kama.server.ratelimit.RateLimit; +import com.kama.trace.interceptor.ServerTraceInterceptor; import common.message.RpcRequest; import common.message.RpcResponse; import io.netty.channel.ChannelFutureListener; @@ -33,8 +34,15 @@ public class NettyRpcServerHandler extends SimpleChannelInboundHandler { protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 服务端只关注读事件,如果3秒内没有收到客户端的消息,将会触发IdleState.READER_IDLE事件,将由HeartbeatHandler进行处理 - pipeline.addLast(new IdleStateHandler(3, 0, 0 ,TimeUnit.SECONDS)); - pipeline.addLast(new HeartbeatHandler()); + //pipeline.addLast(new IdleStateHandler(3, 0, 0 ,TimeUnit.SECONDS)); + //pipeline.addLast(new HeartbeatHandler()); //使用自定义的编/解码器 pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3))); pipeline.addLast(new MyDecoder()); diff --git a/version6/krpc-core/src/main/java/com/kama/server/ratelimit/provider/RateLimitProvider.java b/version6/krpc-core/src/main/java/com/kama/server/ratelimit/provider/RateLimitProvider.java index b2f7d01..325baea 100644 --- a/version6/krpc-core/src/main/java/com/kama/server/ratelimit/provider/RateLimitProvider.java +++ b/version6/krpc-core/src/main/java/com/kama/server/ratelimit/provider/RateLimitProvider.java @@ -19,13 +19,13 @@ public class RateLimitProvider { private final Map rateLimitMap = new ConcurrentHashMap<>(); // 默认的限流桶容量和令牌生成速率 - private static final int DEFAULT_CAPACITY = 100; - private static final int DEFAULT_RATE = 10; + private static final int DEFAULT_CAPACITY = 10; + private static final int DEFAULT_RATE = 100; // 提供限流实例 public RateLimit getRateLimit(String interfaceName) { return rateLimitMap.computeIfAbsent(interfaceName, key -> { - RateLimit rateLimit = new TokenBucketRateLimitImpl(DEFAULT_CAPACITY, DEFAULT_RATE); + RateLimit rateLimit = new TokenBucketRateLimitImpl(DEFAULT_RATE, DEFAULT_CAPACITY); log.info("为接口 [{}] 创建了新的限流策略: {}", interfaceName, rateLimit); return rateLimit; }); diff --git a/version6/krpc-core/src/main/java/com/kama/trace/TraceIdGenerator.java b/version6/krpc-core/src/main/java/com/kama/trace/TraceIdGenerator.java new file mode 100644 index 0000000..1050485 --- /dev/null +++ b/version6/krpc-core/src/main/java/com/kama/trace/TraceIdGenerator.java @@ -0,0 +1,22 @@ +package com.kama.trace; + +import java.util.UUID; + +/** + * @author wxx + * @version 1.0 + * @create 2025/2/16 23:05 + */ +public class TraceIdGenerator { + public static String generateTraceId() { + UUID uuid = UUID.randomUUID(); + String uuidString = uuid.toString(); + // 去掉连字符 + String uuidWithoutHyphens = uuidString.replace("-", ""); + return uuidWithoutHyphens; + } + + public static String generateSpanId() { + return String.valueOf(System.currentTimeMillis()); + } +} diff --git a/version6/krpc-core/src/main/java/com/kama/trace/ZipkinReporter.java b/version6/krpc-core/src/main/java/com/kama/trace/ZipkinReporter.java new file mode 100644 index 0000000..1d527f7 --- /dev/null +++ b/version6/krpc-core/src/main/java/com/kama/trace/ZipkinReporter.java @@ -0,0 +1,48 @@ +package com.kama.trace; + +/** + * @author wxx + * @version 1.0 + * @create 2025/2/18 16:41 + */ +import lombok.extern.slf4j.Slf4j; +import zipkin2.Span; +import zipkin2.reporter.AsyncReporter; +import zipkin2.reporter.okhttp3.OkHttpSender; + +import java.util.Map; +@Slf4j +public class ZipkinReporter { + private static final String ZIPKIN_URL = "http://localhost:9411/api/v2/spans"; // Zipkin 服务器地址 + private static final AsyncReporter reporter; + + static { + // 初始化 Zipkin 上报器 + OkHttpSender sender = OkHttpSender.create(ZIPKIN_URL); + reporter = AsyncReporter.create(sender); + } + + /** + * 上报 Span 数据到 Zipkin + */ + public static void reportSpan(String traceId, String spanId, String parentSpanId, + String name, long startTimestamp, long duration, + String serviceName,String type) { + Span span = Span.newBuilder() + .traceId(traceId) + .id(spanId) + .parentId(parentSpanId) + .name(name) + .timestamp(startTimestamp * 1000) // Zipkin 使用微秒 + .duration(duration * 1000) // Zipkin 使用微秒 + .putTag("service",serviceName) + .putTag("type",type) + .build(); + reporter.report(span); + log.info("当前traceId:{}正在上报日志-----",traceId); + } + + public static void close() { + reporter.close(); + } +} \ No newline at end of file diff --git a/version6/krpc-core/src/main/java/com/kama/trace/interceptor/ClientTraceInterceptor.java b/version6/krpc-core/src/main/java/com/kama/trace/interceptor/ClientTraceInterceptor.java new file mode 100644 index 0000000..2c4168b --- /dev/null +++ b/version6/krpc-core/src/main/java/com/kama/trace/interceptor/ClientTraceInterceptor.java @@ -0,0 +1,47 @@ +package com.kama.trace.interceptor; + +import com.kama.trace.TraceIdGenerator; +import com.kama.trace.ZipkinReporter; +import common.trace.TraceContext; + +/** + * @author wxx + * @version 1.0 + * @create 2025/2/18 18:30 + */ +public class ClientTraceInterceptor { + public static void beforeInvoke() { + String traceId = TraceContext.getTraceId(); + if (traceId == null) { + traceId = TraceIdGenerator.generateTraceId(); + TraceContext.setTraceId(traceId); + } + String spanId = TraceIdGenerator.generateSpanId(); + TraceContext.setSpanId(spanId); + + // 记录客户端 Span + long startTimestamp = System.currentTimeMillis(); + TraceContext.setStartTimestamp(String.valueOf(startTimestamp)); + } + + public static void afterInvoke(String serviceName) { + long endTimestamp = System.currentTimeMillis(); + long startTimestamp = Long.valueOf(TraceContext.getStartTimestamp()); + long duration = endTimestamp - startTimestamp; + + // 上报客户端 Span + ZipkinReporter.reportSpan( + TraceContext.getTraceId(), + TraceContext.getSpanId(), + TraceContext.getParentSpanId(), + "client-" + serviceName, + startTimestamp, + duration, + serviceName, + "client" + ); + + // 清理 TraceContext + TraceContext.clear(); + } +} \ No newline at end of file diff --git a/version6/krpc-core/src/main/java/com/kama/trace/interceptor/ServerTraceInterceptor.java b/version6/krpc-core/src/main/java/com/kama/trace/interceptor/ServerTraceInterceptor.java new file mode 100644 index 0000000..1260b12 --- /dev/null +++ b/version6/krpc-core/src/main/java/com/kama/trace/interceptor/ServerTraceInterceptor.java @@ -0,0 +1,47 @@ +package com.kama.trace.interceptor; + +import com.kama.trace.TraceIdGenerator; +import com.kama.trace.ZipkinReporter; +import common.trace.TraceContext; +import org.slf4j.MDC; + +/** + * @author wxx + * @version 1.0 + * @create 2025/2/18 18:31 + */ +public class ServerTraceInterceptor { + public static void beforeHandle() { + String traceId = TraceContext.getTraceId(); + String parentSpanId =TraceContext.getParentSpanId(); + String spanId = TraceIdGenerator.generateSpanId(); + TraceContext.setTraceId(traceId); + TraceContext.setSpanId(spanId); + TraceContext.setParentSpanId(parentSpanId); + + // 记录服务端 Span + long startTimestamp = System.currentTimeMillis(); + TraceContext.setStartTimestamp(String.valueOf(startTimestamp)); + } + + public static void afterHandle(String serviceName) { + long endTimestamp = System.currentTimeMillis(); + long startTimestamp = Long.valueOf(TraceContext.getStartTimestamp()); + long duration = endTimestamp - startTimestamp; + + // 上报服务端 Span + ZipkinReporter.reportSpan( + TraceContext.getTraceId(), + TraceContext.getSpanId(), + TraceContext.getParentSpanId(), + "server-" + serviceName, + startTimestamp, + duration, + serviceName, + "server" + ); + + // 清理 TraceContext + TraceContext.clear(); + } +} diff --git a/version6/krpc-core/target/classes/com/kama/client/netty/MDCChannelHandler.class b/version6/krpc-core/target/classes/com/kama/client/netty/MDCChannelHandler.class new file mode 100644 index 0000000..104e12f Binary files /dev/null and b/version6/krpc-core/target/classes/com/kama/client/netty/MDCChannelHandler.class differ diff --git a/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class b/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class index f3db506..15eeb31 100644 Binary files a/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class and b/version6/krpc-core/target/classes/com/kama/client/netty/NettyClientInitializer.class differ diff --git a/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class b/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class index 74b4edd..ce4c829 100644 Binary files a/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class and b/version6/krpc-core/target/classes/com/kama/client/proxy/ClientProxy.class differ diff --git a/version6/krpc-core/target/classes/com/kama/client/rpcclient/impl/NettyRpcClient.class b/version6/krpc-core/target/classes/com/kama/client/rpcclient/impl/NettyRpcClient.class index 9e3cf2d..df3f6bd 100644 Binary files a/version6/krpc-core/target/classes/com/kama/client/rpcclient/impl/NettyRpcClient.class and b/version6/krpc-core/target/classes/com/kama/client/rpcclient/impl/NettyRpcClient.class differ diff --git a/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class b/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class index 2a99cc4..525d979 100644 Binary files a/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class and b/version6/krpc-core/target/classes/com/kama/server/netty/NettyRpcServerHandler.class differ diff --git a/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class b/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class index 2639814..bb69e2c 100644 Binary files a/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class and b/version6/krpc-core/target/classes/com/kama/server/netty/NettyServerInitializer.class differ diff --git a/version6/krpc-core/target/classes/com/kama/trace/TraceIdGenerator.class b/version6/krpc-core/target/classes/com/kama/trace/TraceIdGenerator.class new file mode 100644 index 0000000..5bbe787 Binary files /dev/null and b/version6/krpc-core/target/classes/com/kama/trace/TraceIdGenerator.class differ diff --git a/version6/krpc-core/target/classes/com/kama/trace/ZipkinReporter.class b/version6/krpc-core/target/classes/com/kama/trace/ZipkinReporter.class new file mode 100644 index 0000000..40440e7 Binary files /dev/null and b/version6/krpc-core/target/classes/com/kama/trace/ZipkinReporter.class differ diff --git a/version6/krpc-core/target/classes/com/kama/trace/interceptor/ClientTraceInterceptor.class b/version6/krpc-core/target/classes/com/kama/trace/interceptor/ClientTraceInterceptor.class new file mode 100644 index 0000000..2f69e95 Binary files /dev/null and b/version6/krpc-core/target/classes/com/kama/trace/interceptor/ClientTraceInterceptor.class differ diff --git a/version6/krpc-core/target/classes/com/kama/trace/interceptor/ServerTraceInterceptor.class b/version6/krpc-core/target/classes/com/kama/trace/interceptor/ServerTraceInterceptor.class new file mode 100644 index 0000000..bee7402 Binary files /dev/null and b/version6/krpc-core/target/classes/com/kama/trace/interceptor/ServerTraceInterceptor.class differ diff --git a/version6/pom.xml b/version6/pom.xml index 8bf3dbd..27e0731 100644 --- a/version6/pom.xml +++ b/version6/pom.xml @@ -74,6 +74,16 @@ guava-retrying 2.0.0 + + io.zipkin.zipkin2 + zipkin + 3.4.0 + + + io.zipkin.reporter2 + zipkin-reporter + 3.4.0 +