From 9c088aab99cd5c83124e6bf68289bf4253cea761 Mon Sep 17 00:00:00 2001 From: ShiYue1026 Date: Sun, 26 Jan 2025 15:00:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=9F=BA=E4=BA=8E=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E7=99=BD=E5=90=8D=E5=8D=95=E7=9A=84=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/kama/annotation/Retryable.java | 13 ++++++ .../java/com/kama/service/UserService.java | 5 +++ .../myserializer/JsonSerializer.java | 2 +- .../com/kama/client/proxy/ClientProxy.java | 28 ++++++++++-- .../client/rpcclient/impl/NettyRpcClient.java | 7 ++- .../client/servicecenter/ServiceCenter.java | 6 ++- .../client/servicecenter/ZKServiceCenter.java | 26 ++++++----- .../kama/server/provider/ServiceProvider.java | 4 +- .../serviceRegister/ServiceRegister.java | 2 +- .../impl/ZKServiceRegister.java | 44 ++++++++++++++++--- .../java/com/kama/provider/ProviderTest.java | 10 +++-- 11 files changed, 113 insertions(+), 34 deletions(-) create mode 100644 version5/krpc-api/src/main/java/com/kama/annotation/Retryable.java diff --git a/version5/krpc-api/src/main/java/com/kama/annotation/Retryable.java b/version5/krpc-api/src/main/java/com/kama/annotation/Retryable.java new file mode 100644 index 0000000..1b6a6c4 --- /dev/null +++ b/version5/krpc-api/src/main/java/com/kama/annotation/Retryable.java @@ -0,0 +1,13 @@ +package com.kama.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Retryable { + +} + diff --git a/version5/krpc-api/src/main/java/com/kama/service/UserService.java b/version5/krpc-api/src/main/java/com/kama/service/UserService.java index 388978b..d79510c 100644 --- a/version5/krpc-api/src/main/java/com/kama/service/UserService.java +++ b/version5/krpc-api/src/main/java/com/kama/service/UserService.java @@ -1,6 +1,7 @@ package com.kama.service; +import com.kama.annotation.Retryable; import com.kama.pojo.User; /** @@ -12,8 +13,12 @@ import com.kama.pojo.User; */ public interface UserService { + // 查询 + @Retryable User getUserByUserId(Integer id); + // 新增 + @Retryable Integer insertUserId(User user); } diff --git a/version5/krpc-common/src/main/java/common/serializer/myserializer/JsonSerializer.java b/version5/krpc-common/src/main/java/common/serializer/myserializer/JsonSerializer.java index fab6b97..2dabd30 100644 --- a/version5/krpc-common/src/main/java/common/serializer/myserializer/JsonSerializer.java +++ b/version5/krpc-common/src/main/java/common/serializer/myserializer/JsonSerializer.java @@ -53,7 +53,7 @@ public class JsonSerializer implements Serializer { } Class dataType = response.getDataType(); //判断转化后的response对象中的data的类型是否正确 - if(!dataType.isAssignableFrom(response.getData().getClass())){ + if(response.getData() != null && !dataType.isAssignableFrom(response.getData().getClass())){ response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType)); } obj = response; diff --git a/version5/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java b/version5/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java index ef3f6e8..e4f9f60 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java +++ b/version5/krpc-core/src/main/java/com/kama/client/proxy/ClientProxy.java @@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; /** * @ClassName ClientProxy @@ -34,7 +35,6 @@ public class ClientProxy implements InvocationHandler { public ClientProxy() throws InterruptedException { serviceCenter = new ZKServiceCenter(); - rpcClient = new NettyRpcClient(serviceCenter); circuitBreakerProvider = new CircuitBreakerProvider(); } @@ -58,13 +58,17 @@ public class ClientProxy implements InvocationHandler { RpcResponse response; //后续添加逻辑:为保持幂等性,只对白名单上的服务进行重试 // 如果启用重试机制,先检查是否需要重试 - if (serviceCenter.checkRetry(request.getInterfaceName())) { + String methodSignature = getMethodSignature(request.getInterfaceName(), method); + log.info("方法签名: " + methodSignature); + InetSocketAddress serviceAddress = serviceCenter.serviceDiscovery(request); + rpcClient = new NettyRpcClient(serviceAddress); + if (serviceCenter.checkRetry(serviceAddress, methodSignature)) { //调用retry框架进行重试操作 try { - log.info("尝试重试调用服务: {}", request.getInterfaceName()); + log.info("尝试重试调用服务: {}", methodSignature); response = new GuavaRetry().sendServiceWithRetry(request, rpcClient); } catch (Exception e) { - log.error("重试调用失败: {}", request.getInterfaceName(), e); + log.error("重试调用失败: {}", methodSignature, e); circuitBreaker.recordFailure(); throw e; // 将异常抛给调用者 } @@ -89,4 +93,20 @@ public class ClientProxy implements InvocationHandler { Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); return (T) o; } + + // 根据接口名字和方法获取方法签名 + private String getMethodSignature(String interfaceName, Method method) { + StringBuilder sb = new StringBuilder(); + sb.append(interfaceName).append("#").append(method.getName()).append("("); + Class[] parameterTypes = method.getParameterTypes(); + for (int i = 0; i < parameterTypes.length; i++) { + sb.append(parameterTypes[i].getName()); + if (i < parameterTypes.length - 1) { + sb.append(","); + } else{ + sb.append(")"); + } + } + return sb.toString(); + } } diff --git a/version5/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java b/version5/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java index 7bd2e75..7347161 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java +++ b/version5/krpc-core/src/main/java/com/kama/client/rpcclient/impl/NettyRpcClient.java @@ -31,10 +31,10 @@ public class NettyRpcClient implements RpcClient { private static final Bootstrap bootstrap; private static final EventLoopGroup eventLoopGroup; - private ServiceCenter serviceCenter; + private final InetSocketAddress address; - public NettyRpcClient(ServiceCenter serviceCenter) throws InterruptedException { - this.serviceCenter = serviceCenter; + public NettyRpcClient(InetSocketAddress serviceAddress) { + this.address = serviceAddress; } //netty客户端初始化 @@ -48,7 +48,6 @@ public class NettyRpcClient implements RpcClient { @Override public RpcResponse sendRequest(RpcRequest request) { //从注册中心获取host,post - InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName()); if (address == null) { log.error("服务发现失败,返回的地址为 null"); return RpcResponse.fail("服务发现失败,地址为 null"); diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ServiceCenter.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ServiceCenter.java index f1aa801..c077265 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ServiceCenter.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ServiceCenter.java @@ -1,6 +1,8 @@ package com.kama.client.servicecenter; +import common.message.RpcRequest; + import java.net.InetSocketAddress; /** @@ -13,8 +15,8 @@ import java.net.InetSocketAddress; public interface ServiceCenter { // 查询:根据服务名查找地址 - InetSocketAddress serviceDiscovery(String serviceName); + InetSocketAddress serviceDiscovery(RpcRequest request); //判断是否可重试 - boolean checkRetry(String serviceName); + boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature); } diff --git a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java index 93f2701..c974f90 100644 --- a/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java +++ b/version5/krpc-core/src/main/java/com/kama/client/servicecenter/ZKServiceCenter.java @@ -4,7 +4,7 @@ import com.kama.client.cache.ServiceCache; import com.kama.client.servicecenter.ZKWatcher.watchZK; import com.kama.client.servicecenter.balance.LoadBalance; import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance; -import com.kama.client.servicecenter.balance.impl.RandomLoadBalance; +import common.message.RpcRequest; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -57,7 +57,8 @@ public class ZKServiceCenter implements ServiceCenter { //根据服务名(接口名)返回地址 @Override - public InetSocketAddress serviceDiscovery(String serviceName) { + public InetSocketAddress serviceDiscovery(RpcRequest request) { + String serviceName = request.getInterfaceName(); try { //先从本地缓存中找 List addressList = cache.getServiceListFromCache(serviceName); @@ -89,20 +90,23 @@ public class ZKServiceCenter implements ServiceCenter { //保证线程安全使用CopyOnWriteArraySet private Set retryServiceCache = new CopyOnWriteArraySet<>(); //写一个白名单缓存,优化性能 - public boolean checkRetry(String serviceName) { - // 如果缓存为空,则从 Zookeeper 中加载白名单 + @Override + public boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature) { if (retryServiceCache.isEmpty()) { try { - // 获取 Zookeeper 上的 /RETRY 路径下的所有子节点(服务名称) - List serviceList = client.getChildren().forPath("/" + RETRY); - // 将从 Zookeeper 获取到的服务名称列表添加到缓存中 - retryServiceCache.addAll(serviceList); + CuratorFramework rootClient = client.usingNamespace(RETRY); + List retryableMethods = rootClient.getChildren().forPath("/" + getServiceAddress(serviceAddress)); + retryServiceCache.addAll(retryableMethods); } catch (Exception e) { - log.error("检查重试失败,服务名:{}", serviceName, e); + log.error("检查重试失败,方法签名:{}", methodSignature, e); } } - // 判断服务是否在缓存的白名单中 - return retryServiceCache.contains(serviceName); + return retryServiceCache.contains(methodSignature); + } + + // 将InetSocketAddress解析为格式为ip:port的字符串 + private String getServiceAddress(InetSocketAddress serverAddress){ + return serverAddress.getHostName() + ":" + serverAddress.getPort(); } // 字符串解析为地址 diff --git a/version5/krpc-core/src/main/java/com/kama/server/provider/ServiceProvider.java b/version5/krpc-core/src/main/java/com/kama/server/provider/ServiceProvider.java index 4bbde28..5a1857a 100644 --- a/version5/krpc-core/src/main/java/com/kama/server/provider/ServiceProvider.java +++ b/version5/krpc-core/src/main/java/com/kama/server/provider/ServiceProvider.java @@ -38,7 +38,7 @@ public class ServiceProvider { this.rateLimitProvider = new RateLimitProvider(); } - public void provideServiceInterface(Object service, boolean canRetry) { + public void provideServiceInterface(Object service) { String serviceName = service.getClass().getName(); Class[] interfaceName = service.getClass().getInterfaces(); @@ -46,7 +46,7 @@ public class ServiceProvider { //本机的映射表 interfaceProvider.put(clazz.getName(), service); //在注册中心注册服务 - serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port), canRetry); + serviceRegister.register(clazz, new InetSocketAddress(host, port)); } } diff --git a/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/ServiceRegister.java b/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/ServiceRegister.java index dd0398d..3a9d4eb 100644 --- a/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/ServiceRegister.java +++ b/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/ServiceRegister.java @@ -12,5 +12,5 @@ import java.net.InetSocketAddress; */ public interface ServiceRegister { - void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry); + void register(Class clazz, InetSocketAddress serviceAddress); } diff --git a/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/impl/ZKServiceRegister.java b/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/impl/ZKServiceRegister.java index ce952e6..9414457 100644 --- a/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/impl/ZKServiceRegister.java +++ b/version5/krpc-core/src/main/java/com/kama/server/serviceRegister/impl/ZKServiceRegister.java @@ -1,5 +1,6 @@ package com.kama.server.serviceRegister.impl; +import com.kama.annotation.Retryable; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -10,7 +11,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.kama.server.serviceRegister.ServiceRegister; +import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; /** * @ClassName ZKServiceRegister @@ -38,7 +42,8 @@ public class ZKServiceRegister implements ServiceRegister { } @Override - public void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry) { + public void register(Class clazz, InetSocketAddress serviceAddress) { + String serviceName = clazz.getName(); try { if (client.checkExists().forPath("/" + serviceName) == null) { client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName); @@ -53,10 +58,12 @@ public class ZKServiceRegister implements ServiceRegister { log.info("服务地址 {} 已经存在,跳过注册", path); } - if (canRetry) { - path = "/" + RETRY + "/" + serviceName; - client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); - log.info("重试标识 {} 注册成功", path); + // 注册白名单 + List retryableMethods = getRetryableMethod(clazz); + log.info("可重试的方法: {}", retryableMethods); + CuratorFramework rootClient = client.usingNamespace(RETRY); + for (String retryableMethod : retryableMethods) { + rootClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/" + getServiceAddress(serviceAddress) + "/" + retryableMethod); } } catch (Exception e) { log.error("服务注册失败,服务名:{},错误信息:{}", serviceName, e.getMessage(), e); @@ -71,4 +78,31 @@ public class ZKServiceRegister implements ServiceRegister { private String getServiceAddress(InetSocketAddress serverAddress) { return serverAddress.getHostName() + ":" + serverAddress.getPort(); } + + // 判断一个方法是否加了Retryable注解 + private List getRetryableMethod(Class clazz){ + List retryableMethods = new ArrayList<>(); + for (Method method : clazz.getDeclaredMethods()) { + if (method.isAnnotationPresent(Retryable.class)) { + String methodSignature = getMethodSignature(clazz, method); + retryableMethods.add(methodSignature); + } + } + return retryableMethods; + } + + private String getMethodSignature(Class clazz, Method method) { + StringBuilder sb = new StringBuilder(); + sb.append(clazz.getName()).append("#").append(method.getName()).append("("); + Class[] parameterTypes = method.getParameterTypes(); + for (int i = 0; i < parameterTypes.length; i++) { + sb.append(parameterTypes[i].getName()); + if (i < parameterTypes.length - 1) { + sb.append(","); + } else{ + sb.append(")"); + } + } + return sb.toString(); + } } diff --git a/version5/krpc-provider/src/main/java/com/kama/provider/ProviderTest.java b/version5/krpc-provider/src/main/java/com/kama/provider/ProviderTest.java index f55742b..5d3fc79 100644 --- a/version5/krpc-provider/src/main/java/com/kama/provider/ProviderTest.java +++ b/version5/krpc-provider/src/main/java/com/kama/provider/ProviderTest.java @@ -20,16 +20,18 @@ public class ProviderTest { public static void main(String[] args) throws InterruptedException { KRpcApplication.initialize(); + String ip = System.getProperty("ip"); + int port = Integer.parseInt(System.getProperty("port")); // 创建 UserService 实例 UserService userService = new UserServiceImpl(); - ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 9999); + ServiceProvider serviceProvider = new ServiceProvider(ip, port); // 发布服务接口到 ServiceProvider - serviceProvider.provideServiceInterface(userService, true); // 可以设置是否支持重试 + serviceProvider.provideServiceInterface(userService); // 可以设置是否支持重试 // 启动 RPC 服务器并监听端口 RpcServer rpcServer = new NettyRpcServer(serviceProvider); - rpcServer.start(9999); // 启动 Netty RPC 服务,监听 9999 端口 - log.info("RPC 服务端启动,监听端口 9999"); + rpcServer.start(port); // 启动 Netty RPC 服务,监听 port 端口 + log.info("RPC 服务端启动,监听端口" + port); } }