version6:step2-日志链路追踪实现

This commit is contained in:
Wxx 2025-02-27 18:45:06 +08:00
parent bbe88333df
commit 9cf25a765d
32 changed files with 357 additions and 20 deletions

View File

@ -4,10 +4,12 @@ package common.serializer.mycoder;
import common.exception.SerializeException;
import common.message.MessageType;
import common.serializer.myserializer.Serializer;
import common.trace.TraceContext;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.util.Arrays;
import java.util.List;
@ -27,7 +29,12 @@ public class MyDecoder extends ByteToMessageDecoder {
if (in.readableBytes() < 6) { // messageType + serializerType + length
return;
}
//1.读取消息类型
//1.读取traceMsg
int traceLength=in.readInt();
byte[] traceBytes=new byte[traceLength];
in.readBytes(traceBytes);
serializeTraceMsg(traceBytes);
//2.读取消息类型
short messageType = in.readShort();
// 现在还只支持request与response请求
if (messageType != MessageType.REQUEST.getCode() &&
@ -35,19 +42,19 @@ public class MyDecoder extends ByteToMessageDecoder {
log.warn("暂不支持此种数据, messageType: {}", messageType);
return;
}
//2.读取序列化的方式&类型
//3.读取序列化的方式&类型
short serializerType = in.readShort();
Serializer serializer = Serializer.getSerializerByCode(serializerType);
if (serializer == null) {
log.error("不存在对应的序列化器, serializerType: {}", serializerType);
throw new SerializeException("不存在对应的序列化器, serializerType: " + serializerType);
}
//3.读取序列化数组长度
//4.读取序列化数组长度
int length = in.readInt();
if (in.readableBytes() < length) {
return; // 数据不完整等待更多数据
}
//4.读取序列化数组
//5.读取序列化数组
byte[] bytes = new byte[length];
in.readBytes(bytes);
log.debug("Received bytes: {}", Arrays.toString(bytes));
@ -55,4 +62,11 @@ public class MyDecoder extends ByteToMessageDecoder {
out.add(deserialize);
}
//解析并存储traceMsg
private void serializeTraceMsg(byte[] traceByte){
String traceMsg=new String(traceByte);
String[] msgs=traceMsg.split(";");
if(!msgs[0].equals("")) TraceContext.setTraceId(msgs[0]);
if(!msgs[1].equals("")) TraceContext.setParentSpanId(msgs[1]);
}
}

View File

@ -5,11 +5,13 @@ import common.message.MessageType;
import common.message.RpcRequest;
import common.message.RpcResponse;
import common.serializer.myserializer.Serializer;
import common.trace.TraceContext;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
/**
* @ClassName MyEncoder
@ -26,7 +28,15 @@ public class MyEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
log.debug("Encoding message of type: {}", msg.getClass());
//1.写入消息类型
//1.写入trace消息头
String traceMsg= TraceContext.getTraceId() +";"+TraceContext.getSpanId();
byte[] traceBytes=traceMsg.getBytes();
// 1.1写入traceMsg长度
out.writeInt(traceBytes.length);
// 1.2写入traceBytes
out.writeBytes(traceBytes);
//2.写入消息类型
if (msg instanceof RpcRequest) {
out.writeShort(MessageType.REQUEST.getCode());
} else if (msg instanceof RpcResponse) {
@ -35,16 +45,16 @@ public class MyEncoder extends MessageToByteEncoder {
log.error("Unknown message type: {}", msg.getClass());
throw new IllegalArgumentException("Unknown message type: " + msg.getClass());
}
//2.写入序列化方式
//3.写入序列化方式
out.writeShort(serializer.getType());
//得到序列化数组
byte[] serializeBytes = serializer.serialize(msg);
if (serializeBytes == null || serializeBytes.length == 0) {
throw new IllegalArgumentException("Serialized message is empty");
}
//3.写入长度
//4.写入长度
out.writeInt(serializeBytes.length);
//4.写入序列化数组
//5.写入序列化数组
out.writeBytes(serializeBytes);
}
}

View File

@ -0,0 +1,57 @@
package common.trace;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2025/2/18 18:37
*/
@Slf4j
public class TraceContext {
public static void setTraceId(String traceId) {
MDC.put("traceId",traceId);
}
public static String getTraceId() {
return MDC.get("traceId");
}
public static void setSpanId(String spanId) {
MDC.put("spanId",spanId);
}
public static String getSpanId() {
return MDC.get("spanId");
}
public static void setParentSpanId(String parentSpanId) {
MDC.put("parentSpanId",parentSpanId);
}
public static String getParentSpanId() {
return MDC.get("parentSpanId");
}
public static void setStartTimestamp(String startTimestamp) {
MDC.put("startTimestamp",startTimestamp);
}
public static String getStartTimestamp() {
return MDC.get("startTimestamp");
}
public static Map<String,String> getCopy(){
return MDC.getCopyOfContextMap();
}
public static void clone(Map<String,String> context){
for(Map.Entry<String,String> entry:context.entrySet()){
System.out.println(entry.getKey()+":"+entry.getValue());
MDC.put(entry.getKey(),entry.getValue());
}
}
public static void clear() {
MDC.clear();
}
}

View File

@ -18,7 +18,7 @@ import java.util.concurrent.Executors;
@Slf4j
public class ConsumerTest {
private static final int THREAD_POOL_SIZE = 20;
private static final int THREAD_POOL_SIZE = 30;
private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public static void main(String[] args) throws InterruptedException {

View File

@ -111,5 +111,20 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-okhttp3</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,42 @@
package com.kama.client.netty;
import common.trace.TraceContext;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2025/2/20 16:56
*/
@Slf4j
public class MDCChannelHandler extends ChannelOutboundHandlerAdapter {
public static final AttributeKey<Map<String, String>> TRACE_CONTEXT_KEY = AttributeKey.valueOf("TraceContext");
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 从Channel属性中获取Trace上下文
Map<String, String> traceContext =
ctx.channel().attr(TRACE_CONTEXT_KEY).get();
if (traceContext != null) {
// 设置到当前线程的TraceContext或MDC
TraceContext.clone(traceContext);
log.info("已绑定Trace上下文: {}", traceContext);
} else {
log.error("Trace上下文未设置!");
}
// 继续传递请求
super.write(ctx, msg, promise);
}
}

View File

@ -4,12 +4,15 @@ package com.kama.client.netty;
import common.serializer.mycoder.MyDecoder;
import common.serializer.mycoder.MyEncoder;
import common.serializer.myserializer.Serializer;
import common.trace.TraceContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.MDC;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@ -21,22 +24,21 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
public NettyClientInitializer(){}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 使用自定义的编码器和解码器
try {
// 根据传入的序列化器类型初始化编码器
pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyClientHandler());
pipeline.addLast(new MDCChannelHandler());
// 客户端只关注写事件如果超过2秒没有发送数据则发送心跳包
pipeline.addLast(new IdleStateHandler(0, 2, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).toString());
//pipeline.addLast(new IdleStateHandler(0, 2, 0, TimeUnit.SECONDS));
//pipeline.addLast(new HeartbeatHandler());
log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).getType());
} catch (Exception e) {
log.error("Error initializing Netty client pipeline", e);
throw e; // 重新抛出异常确保管道初始化失败时处理正确

View File

@ -7,8 +7,10 @@ import com.kama.client.rpcclient.RpcClient;
import com.kama.client.rpcclient.impl.NettyRpcClient;
import com.kama.client.servicecenter.ServiceCenter;
import com.kama.client.servicecenter.ZKServiceCenter;
import com.kama.trace.interceptor.ClientTraceInterceptor;
import common.message.RpcRequest;
import common.message.RpcResponse;
import common.trace.TraceContext;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
@ -39,6 +41,9 @@ public class ClientProxy implements InvocationHandler {
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//trace记录
ClientTraceInterceptor.beforeInvoke();
//System.out.println(TraceContext.getTraceId() +";"+ TraceContext.getSpanId());
//构建request
RpcRequest request = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
@ -83,6 +88,8 @@ public class ClientProxy implements InvocationHandler {
}
log.info("收到响应: {} 状态码: {}", request.getInterfaceName(), response.getCode());
}
//trace上报
ClientTraceInterceptor.afterInvoke(method.getName());
return response != null ? response.getData() : null;
}

View File

@ -1,9 +1,11 @@
package com.kama.client.rpcclient.impl;
import com.kama.client.netty.MDCChannelHandler;
import com.kama.client.netty.NettyClientInitializer;
import com.kama.client.rpcclient.RpcClient;
import common.message.RpcRequest;
import common.message.RpcResponse;
import common.trace.TraceContext;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -12,8 +14,10 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.net.InetSocketAddress;
import java.util.Map;
/**
* @ClassName NettyRpcClient
@ -44,6 +48,7 @@ public class NettyRpcClient implements RpcClient {
@Override
public RpcResponse sendRequest(RpcRequest request) {
Map<String,String> mdcContextMap=TraceContext.getCopy();
//从注册中心获取host,post
if (address == null) {
log.error("服务发现失败,返回的地址为 null");
@ -55,6 +60,9 @@ public class NettyRpcClient implements RpcClient {
// 连接到远程服务
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 将当前Trace上下文保存到Channel属性
channel.attr(MDCChannelHandler.TRACE_CONTEXT_KEY).set(mdcContextMap);
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果

View File

@ -3,6 +3,7 @@ package com.kama.server.netty;
import com.kama.server.provider.ServiceProvider;
import com.kama.server.ratelimit.RateLimit;
import com.kama.trace.interceptor.ServerTraceInterceptor;
import common.message.RpcRequest;
import common.message.RpcResponse;
import io.netty.channel.ChannelFutureListener;
@ -33,8 +34,15 @@ public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcReques
log.error("接收到非法请求RpcRequest 为空");
return;
}
//trace记录
ServerTraceInterceptor.beforeHandle();
RpcResponse response = getResponse(request);
//ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
//trace上报
ServerTraceInterceptor.afterHandle(request.getMethodName());
ctx.writeAndFlush(response);
}

View File

@ -29,8 +29,8 @@ public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 服务端只关注读事件如果3秒内没有收到客户端的消息将会触发IdleState.READER_IDLE事件将由HeartbeatHandler进行处理
pipeline.addLast(new IdleStateHandler(3, 0, 0 ,TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
//pipeline.addLast(new IdleStateHandler(3, 0, 0 ,TimeUnit.SECONDS));
//pipeline.addLast(new HeartbeatHandler());
//使用自定义的编/解码器
pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new MyDecoder());

View File

@ -19,13 +19,13 @@ public class RateLimitProvider {
private final Map<String, RateLimit> rateLimitMap = new ConcurrentHashMap<>();
// 默认的限流桶容量和令牌生成速率
private static final int DEFAULT_CAPACITY = 100;
private static final int DEFAULT_RATE = 10;
private static final int DEFAULT_CAPACITY = 10;
private static final int DEFAULT_RATE = 100;
// 提供限流实例
public RateLimit getRateLimit(String interfaceName) {
return rateLimitMap.computeIfAbsent(interfaceName, key -> {
RateLimit rateLimit = new TokenBucketRateLimitImpl(DEFAULT_CAPACITY, DEFAULT_RATE);
RateLimit rateLimit = new TokenBucketRateLimitImpl(DEFAULT_RATE, DEFAULT_CAPACITY);
log.info("为接口 [{}] 创建了新的限流策略: {}", interfaceName, rateLimit);
return rateLimit;
});

View File

@ -0,0 +1,22 @@
package com.kama.trace;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2025/2/16 23:05
*/
public class TraceIdGenerator {
public static String generateTraceId() {
UUID uuid = UUID.randomUUID();
String uuidString = uuid.toString();
// 去掉连字符
String uuidWithoutHyphens = uuidString.replace("-", "");
return uuidWithoutHyphens;
}
public static String generateSpanId() {
return String.valueOf(System.currentTimeMillis());
}
}

View File

@ -0,0 +1,48 @@
package com.kama.trace;
/**
* @author wxx
* @version 1.0
* @create 2025/2/18 16:41
*/
import lombok.extern.slf4j.Slf4j;
import zipkin2.Span;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.okhttp3.OkHttpSender;
import java.util.Map;
@Slf4j
public class ZipkinReporter {
private static final String ZIPKIN_URL = "http://localhost:9411/api/v2/spans"; // Zipkin 服务器地址
private static final AsyncReporter<Span> reporter;
static {
// 初始化 Zipkin 上报器
OkHttpSender sender = OkHttpSender.create(ZIPKIN_URL);
reporter = AsyncReporter.create(sender);
}
/**
* 上报 Span 数据到 Zipkin
*/
public static void reportSpan(String traceId, String spanId, String parentSpanId,
String name, long startTimestamp, long duration,
String serviceName,String type) {
Span span = Span.newBuilder()
.traceId(traceId)
.id(spanId)
.parentId(parentSpanId)
.name(name)
.timestamp(startTimestamp * 1000) // Zipkin 使用微秒
.duration(duration * 1000) // Zipkin 使用微秒
.putTag("service",serviceName)
.putTag("type",type)
.build();
reporter.report(span);
log.info("当前traceId:{}正在上报日志-----",traceId);
}
public static void close() {
reporter.close();
}
}

View File

@ -0,0 +1,47 @@
package com.kama.trace.interceptor;
import com.kama.trace.TraceIdGenerator;
import com.kama.trace.ZipkinReporter;
import common.trace.TraceContext;
/**
* @author wxx
* @version 1.0
* @create 2025/2/18 18:30
*/
public class ClientTraceInterceptor {
public static void beforeInvoke() {
String traceId = TraceContext.getTraceId();
if (traceId == null) {
traceId = TraceIdGenerator.generateTraceId();
TraceContext.setTraceId(traceId);
}
String spanId = TraceIdGenerator.generateSpanId();
TraceContext.setSpanId(spanId);
// 记录客户端 Span
long startTimestamp = System.currentTimeMillis();
TraceContext.setStartTimestamp(String.valueOf(startTimestamp));
}
public static void afterInvoke(String serviceName) {
long endTimestamp = System.currentTimeMillis();
long startTimestamp = Long.valueOf(TraceContext.getStartTimestamp());
long duration = endTimestamp - startTimestamp;
// 上报客户端 Span
ZipkinReporter.reportSpan(
TraceContext.getTraceId(),
TraceContext.getSpanId(),
TraceContext.getParentSpanId(),
"client-" + serviceName,
startTimestamp,
duration,
serviceName,
"client"
);
// 清理 TraceContext
TraceContext.clear();
}
}

View File

@ -0,0 +1,47 @@
package com.kama.trace.interceptor;
import com.kama.trace.TraceIdGenerator;
import com.kama.trace.ZipkinReporter;
import common.trace.TraceContext;
import org.slf4j.MDC;
/**
* @author wxx
* @version 1.0
* @create 2025/2/18 18:31
*/
public class ServerTraceInterceptor {
public static void beforeHandle() {
String traceId = TraceContext.getTraceId();
String parentSpanId =TraceContext.getParentSpanId();
String spanId = TraceIdGenerator.generateSpanId();
TraceContext.setTraceId(traceId);
TraceContext.setSpanId(spanId);
TraceContext.setParentSpanId(parentSpanId);
// 记录服务端 Span
long startTimestamp = System.currentTimeMillis();
TraceContext.setStartTimestamp(String.valueOf(startTimestamp));
}
public static void afterHandle(String serviceName) {
long endTimestamp = System.currentTimeMillis();
long startTimestamp = Long.valueOf(TraceContext.getStartTimestamp());
long duration = endTimestamp - startTimestamp;
// 上报服务端 Span
ZipkinReporter.reportSpan(
TraceContext.getTraceId(),
TraceContext.getSpanId(),
TraceContext.getParentSpanId(),
"server-" + serviceName,
startTimestamp,
duration,
serviceName,
"server"
);
// 清理 TraceContext
TraceContext.clear();
}
}

View File

@ -74,6 +74,16 @@
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>