实现基于方法白名单的重试机制

This commit is contained in:
ShiYue1026 2025-01-26 15:00:14 +08:00
parent e674e32728
commit 9c088aab99
11 changed files with 113 additions and 34 deletions

View File

@ -0,0 +1,13 @@
package com.kama.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Retryable {
}

View File

@ -1,6 +1,7 @@
package com.kama.service; package com.kama.service;
import com.kama.annotation.Retryable;
import com.kama.pojo.User; import com.kama.pojo.User;
/** /**
@ -12,8 +13,12 @@ import com.kama.pojo.User;
*/ */
public interface UserService { public interface UserService {
// 查询 // 查询
@Retryable
User getUserByUserId(Integer id); User getUserByUserId(Integer id);
// 新增 // 新增
@Retryable
Integer insertUserId(User user); Integer insertUserId(User user);
} }

View File

@ -53,7 +53,7 @@ public class JsonSerializer implements Serializer {
} }
Class<?> dataType = response.getDataType(); Class<?> dataType = response.getDataType();
//判断转化后的response对象中的data的类型是否正确 //判断转化后的response对象中的data的类型是否正确
if(!dataType.isAssignableFrom(response.getData().getClass())){ if(response.getData() != null && !dataType.isAssignableFrom(response.getData().getClass())){
response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType)); response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
} }
obj = response; obj = response;

View File

@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
/** /**
* @ClassName ClientProxy * @ClassName ClientProxy
@ -34,7 +35,6 @@ public class ClientProxy implements InvocationHandler {
public ClientProxy() throws InterruptedException { public ClientProxy() throws InterruptedException {
serviceCenter = new ZKServiceCenter(); serviceCenter = new ZKServiceCenter();
rpcClient = new NettyRpcClient(serviceCenter);
circuitBreakerProvider = new CircuitBreakerProvider(); circuitBreakerProvider = new CircuitBreakerProvider();
} }
@ -58,13 +58,17 @@ public class ClientProxy implements InvocationHandler {
RpcResponse response; RpcResponse response;
//后续添加逻辑为保持幂等性只对白名单上的服务进行重试 //后续添加逻辑为保持幂等性只对白名单上的服务进行重试
// 如果启用重试机制先检查是否需要重试 // 如果启用重试机制先检查是否需要重试
if (serviceCenter.checkRetry(request.getInterfaceName())) { String methodSignature = getMethodSignature(request.getInterfaceName(), method);
log.info("方法签名: " + methodSignature);
InetSocketAddress serviceAddress = serviceCenter.serviceDiscovery(request);
rpcClient = new NettyRpcClient(serviceAddress);
if (serviceCenter.checkRetry(serviceAddress, methodSignature)) {
//调用retry框架进行重试操作 //调用retry框架进行重试操作
try { try {
log.info("尝试重试调用服务: {}", request.getInterfaceName()); log.info("尝试重试调用服务: {}", methodSignature);
response = new GuavaRetry().sendServiceWithRetry(request, rpcClient); response = new GuavaRetry().sendServiceWithRetry(request, rpcClient);
} catch (Exception e) { } catch (Exception e) {
log.error("重试调用失败: {}", request.getInterfaceName(), e); log.error("重试调用失败: {}", methodSignature, e);
circuitBreaker.recordFailure(); circuitBreaker.recordFailure();
throw e; // 将异常抛给调用者 throw e; // 将异常抛给调用者
} }
@ -89,4 +93,20 @@ public class ClientProxy implements InvocationHandler {
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T) o; return (T) o;
} }
// 根据接口名字和方法获取方法签名
private String getMethodSignature(String interfaceName, Method method) {
StringBuilder sb = new StringBuilder();
sb.append(interfaceName).append("#").append(method.getName()).append("(");
Class<?>[] parameterTypes = method.getParameterTypes();
for (int i = 0; i < parameterTypes.length; i++) {
sb.append(parameterTypes[i].getName());
if (i < parameterTypes.length - 1) {
sb.append(",");
} else{
sb.append(")");
}
}
return sb.toString();
}
} }

View File

@ -31,10 +31,10 @@ public class NettyRpcClient implements RpcClient {
private static final Bootstrap bootstrap; private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup; private static final EventLoopGroup eventLoopGroup;
private ServiceCenter serviceCenter; private final InetSocketAddress address;
public NettyRpcClient(ServiceCenter serviceCenter) throws InterruptedException { public NettyRpcClient(InetSocketAddress serviceAddress) {
this.serviceCenter = serviceCenter; this.address = serviceAddress;
} }
//netty客户端初始化 //netty客户端初始化
@ -48,7 +48,6 @@ public class NettyRpcClient implements RpcClient {
@Override @Override
public RpcResponse sendRequest(RpcRequest request) { public RpcResponse sendRequest(RpcRequest request) {
//从注册中心获取host,post //从注册中心获取host,post
InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
if (address == null) { if (address == null) {
log.error("服务发现失败,返回的地址为 null"); log.error("服务发现失败,返回的地址为 null");
return RpcResponse.fail("服务发现失败,地址为 null"); return RpcResponse.fail("服务发现失败,地址为 null");

View File

@ -1,6 +1,8 @@
package com.kama.client.servicecenter; package com.kama.client.servicecenter;
import common.message.RpcRequest;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
/** /**
@ -13,8 +15,8 @@ import java.net.InetSocketAddress;
public interface ServiceCenter { public interface ServiceCenter {
// 查询根据服务名查找地址 // 查询根据服务名查找地址
InetSocketAddress serviceDiscovery(String serviceName); InetSocketAddress serviceDiscovery(RpcRequest request);
//判断是否可重试 //判断是否可重试
boolean checkRetry(String serviceName); boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature);
} }

View File

@ -4,7 +4,7 @@ import com.kama.client.cache.ServiceCache;
import com.kama.client.servicecenter.ZKWatcher.watchZK; import com.kama.client.servicecenter.ZKWatcher.watchZK;
import com.kama.client.servicecenter.balance.LoadBalance; import com.kama.client.servicecenter.balance.LoadBalance;
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance; import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
import com.kama.client.servicecenter.balance.impl.RandomLoadBalance; import common.message.RpcRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -57,7 +57,8 @@ public class ZKServiceCenter implements ServiceCenter {
//根据服务名接口名返回地址 //根据服务名接口名返回地址
@Override @Override
public InetSocketAddress serviceDiscovery(String serviceName) { public InetSocketAddress serviceDiscovery(RpcRequest request) {
String serviceName = request.getInterfaceName();
try { try {
//先从本地缓存中找 //先从本地缓存中找
List<String> addressList = cache.getServiceListFromCache(serviceName); List<String> addressList = cache.getServiceListFromCache(serviceName);
@ -89,20 +90,23 @@ public class ZKServiceCenter implements ServiceCenter {
//保证线程安全使用CopyOnWriteArraySet //保证线程安全使用CopyOnWriteArraySet
private Set<String> retryServiceCache = new CopyOnWriteArraySet<>(); private Set<String> retryServiceCache = new CopyOnWriteArraySet<>();
//写一个白名单缓存优化性能 //写一个白名单缓存优化性能
public boolean checkRetry(String serviceName) { @Override
// 如果缓存为空则从 Zookeeper 中加载白名单 public boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature) {
if (retryServiceCache.isEmpty()) { if (retryServiceCache.isEmpty()) {
try { try {
// 获取 Zookeeper 上的 /RETRY 路径下的所有子节点服务名称 CuratorFramework rootClient = client.usingNamespace(RETRY);
List<String> serviceList = client.getChildren().forPath("/" + RETRY); List<String> retryableMethods = rootClient.getChildren().forPath("/" + getServiceAddress(serviceAddress));
// 将从 Zookeeper 获取到的服务名称列表添加到缓存中 retryServiceCache.addAll(retryableMethods);
retryServiceCache.addAll(serviceList);
} catch (Exception e) { } catch (Exception e) {
log.error("检查重试失败,服务名:{}", serviceName, e); log.error("检查重试失败,方法签名:{}", methodSignature, e);
} }
} }
// 判断服务是否在缓存的白名单中 return retryServiceCache.contains(methodSignature);
return retryServiceCache.contains(serviceName); }
// 将InetSocketAddress解析为格式为ip:port的字符串
private String getServiceAddress(InetSocketAddress serverAddress){
return serverAddress.getHostName() + ":" + serverAddress.getPort();
} }
// 字符串解析为地址 // 字符串解析为地址

View File

@ -38,7 +38,7 @@ public class ServiceProvider {
this.rateLimitProvider = new RateLimitProvider(); this.rateLimitProvider = new RateLimitProvider();
} }
public void provideServiceInterface(Object service, boolean canRetry) { public void provideServiceInterface(Object service) {
String serviceName = service.getClass().getName(); String serviceName = service.getClass().getName();
Class<?>[] interfaceName = service.getClass().getInterfaces(); Class<?>[] interfaceName = service.getClass().getInterfaces();
@ -46,7 +46,7 @@ public class ServiceProvider {
//本机的映射表 //本机的映射表
interfaceProvider.put(clazz.getName(), service); interfaceProvider.put(clazz.getName(), service);
//在注册中心注册服务 //在注册中心注册服务
serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port), canRetry); serviceRegister.register(clazz, new InetSocketAddress(host, port));
} }
} }

View File

@ -12,5 +12,5 @@ import java.net.InetSocketAddress;
*/ */
public interface ServiceRegister { public interface ServiceRegister {
void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry); void register(Class<?> clazz, InetSocketAddress serviceAddress);
} }

View File

@ -1,5 +1,6 @@
package com.kama.server.serviceRegister.impl; package com.kama.server.serviceRegister.impl;
import com.kama.annotation.Retryable;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -10,7 +11,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.kama.server.serviceRegister.ServiceRegister; import com.kama.server.serviceRegister.ServiceRegister;
import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/** /**
* @ClassName ZKServiceRegister * @ClassName ZKServiceRegister
@ -38,7 +42,8 @@ public class ZKServiceRegister implements ServiceRegister {
} }
@Override @Override
public void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry) { public void register(Class<?> clazz, InetSocketAddress serviceAddress) {
String serviceName = clazz.getName();
try { try {
if (client.checkExists().forPath("/" + serviceName) == null) { if (client.checkExists().forPath("/" + serviceName) == null) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName); client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
@ -53,10 +58,12 @@ public class ZKServiceRegister implements ServiceRegister {
log.info("服务地址 {} 已经存在,跳过注册", path); log.info("服务地址 {} 已经存在,跳过注册", path);
} }
if (canRetry) { // 注册白名单
path = "/" + RETRY + "/" + serviceName; List<String> retryableMethods = getRetryableMethod(clazz);
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); log.info("可重试的方法: {}", retryableMethods);
log.info("重试标识 {} 注册成功", path); CuratorFramework rootClient = client.usingNamespace(RETRY);
for (String retryableMethod : retryableMethods) {
rootClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/" + getServiceAddress(serviceAddress) + "/" + retryableMethod);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("服务注册失败,服务名:{},错误信息:{}", serviceName, e.getMessage(), e); log.error("服务注册失败,服务名:{},错误信息:{}", serviceName, e.getMessage(), e);
@ -71,4 +78,31 @@ public class ZKServiceRegister implements ServiceRegister {
private String getServiceAddress(InetSocketAddress serverAddress) { private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() + ":" + serverAddress.getPort(); return serverAddress.getHostName() + ":" + serverAddress.getPort();
} }
// 判断一个方法是否加了Retryable注解
private List<String> getRetryableMethod(Class<?> clazz){
List<String> retryableMethods = new ArrayList<>();
for (Method method : clazz.getDeclaredMethods()) {
if (method.isAnnotationPresent(Retryable.class)) {
String methodSignature = getMethodSignature(clazz, method);
retryableMethods.add(methodSignature);
}
}
return retryableMethods;
}
private String getMethodSignature(Class<?> clazz, Method method) {
StringBuilder sb = new StringBuilder();
sb.append(clazz.getName()).append("#").append(method.getName()).append("(");
Class<?>[] parameterTypes = method.getParameterTypes();
for (int i = 0; i < parameterTypes.length; i++) {
sb.append(parameterTypes[i].getName());
if (i < parameterTypes.length - 1) {
sb.append(",");
} else{
sb.append(")");
}
}
return sb.toString();
}
} }

View File

@ -20,16 +20,18 @@ public class ProviderTest {
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
KRpcApplication.initialize(); KRpcApplication.initialize();
String ip = System.getProperty("ip");
int port = Integer.parseInt(System.getProperty("port"));
// 创建 UserService 实例 // 创建 UserService 实例
UserService userService = new UserServiceImpl(); UserService userService = new UserServiceImpl();
ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 9999); ServiceProvider serviceProvider = new ServiceProvider(ip, port);
// 发布服务接口到 ServiceProvider // 发布服务接口到 ServiceProvider
serviceProvider.provideServiceInterface(userService, true); // 可以设置是否支持重试 serviceProvider.provideServiceInterface(userService); // 可以设置是否支持重试
// 启动 RPC 服务器并监听端口 // 启动 RPC 服务器并监听端口
RpcServer rpcServer = new NettyRpcServer(serviceProvider); RpcServer rpcServer = new NettyRpcServer(serviceProvider);
rpcServer.start(9999); // 启动 Netty RPC 服务监听 9999 端口 rpcServer.start(port); // 启动 Netty RPC 服务监听 port 端口
log.info("RPC 服务端启动,监听端口 9999"); log.info("RPC 服务端启动,监听端口" + port);
} }
} }