commit
e3985168bf
@ -0,0 +1,13 @@
|
||||
package com.kama.annotation;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.METHOD)
|
||||
public @interface Retryable {
|
||||
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package com.kama.service;
|
||||
|
||||
|
||||
import com.kama.annotation.Retryable;
|
||||
import com.kama.pojo.User;
|
||||
|
||||
/**
|
||||
@ -12,8 +13,12 @@ import com.kama.pojo.User;
|
||||
*/
|
||||
|
||||
public interface UserService {
|
||||
|
||||
// 查询
|
||||
@Retryable
|
||||
User getUserByUserId(Integer id);
|
||||
|
||||
// 新增
|
||||
@Retryable
|
||||
Integer insertUserId(User user);
|
||||
}
|
||||
|
||||
@ -53,7 +53,7 @@ public class JsonSerializer implements Serializer {
|
||||
}
|
||||
Class<?> dataType = response.getDataType();
|
||||
//判断转化后的response对象中的data的类型是否正确
|
||||
if(!dataType.isAssignableFrom(response.getData().getClass())){
|
||||
if(response.getData() != null && !dataType.isAssignableFrom(response.getData().getClass())){
|
||||
response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
|
||||
}
|
||||
obj = response;
|
||||
|
||||
@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* @ClassName ClientProxy
|
||||
@ -34,7 +35,6 @@ public class ClientProxy implements InvocationHandler {
|
||||
|
||||
public ClientProxy() throws InterruptedException {
|
||||
serviceCenter = new ZKServiceCenter();
|
||||
rpcClient = new NettyRpcClient(serviceCenter);
|
||||
circuitBreakerProvider = new CircuitBreakerProvider();
|
||||
}
|
||||
|
||||
@ -58,13 +58,17 @@ public class ClientProxy implements InvocationHandler {
|
||||
RpcResponse response;
|
||||
//后续添加逻辑:为保持幂等性,只对白名单上的服务进行重试
|
||||
// 如果启用重试机制,先检查是否需要重试
|
||||
if (serviceCenter.checkRetry(request.getInterfaceName())) {
|
||||
String methodSignature = getMethodSignature(request.getInterfaceName(), method);
|
||||
log.info("方法签名: " + methodSignature);
|
||||
InetSocketAddress serviceAddress = serviceCenter.serviceDiscovery(request);
|
||||
rpcClient = new NettyRpcClient(serviceAddress);
|
||||
if (serviceCenter.checkRetry(serviceAddress, methodSignature)) {
|
||||
//调用retry框架进行重试操作
|
||||
try {
|
||||
log.info("尝试重试调用服务: {}", request.getInterfaceName());
|
||||
log.info("尝试重试调用服务: {}", methodSignature);
|
||||
response = new GuavaRetry().sendServiceWithRetry(request, rpcClient);
|
||||
} catch (Exception e) {
|
||||
log.error("重试调用失败: {}", request.getInterfaceName(), e);
|
||||
log.error("重试调用失败: {}", methodSignature, e);
|
||||
circuitBreaker.recordFailure();
|
||||
throw e; // 将异常抛给调用者
|
||||
}
|
||||
@ -89,4 +93,20 @@ public class ClientProxy implements InvocationHandler {
|
||||
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
|
||||
return (T) o;
|
||||
}
|
||||
|
||||
// 根据接口名字和方法获取方法签名
|
||||
private String getMethodSignature(String interfaceName, Method method) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(interfaceName).append("#").append(method.getName()).append("(");
|
||||
Class<?>[] parameterTypes = method.getParameterTypes();
|
||||
for (int i = 0; i < parameterTypes.length; i++) {
|
||||
sb.append(parameterTypes[i].getName());
|
||||
if (i < parameterTypes.length - 1) {
|
||||
sb.append(",");
|
||||
} else{
|
||||
sb.append(")");
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,10 +31,10 @@ public class NettyRpcClient implements RpcClient {
|
||||
private static final Bootstrap bootstrap;
|
||||
private static final EventLoopGroup eventLoopGroup;
|
||||
|
||||
private ServiceCenter serviceCenter;
|
||||
private final InetSocketAddress address;
|
||||
|
||||
public NettyRpcClient(ServiceCenter serviceCenter) throws InterruptedException {
|
||||
this.serviceCenter = serviceCenter;
|
||||
public NettyRpcClient(InetSocketAddress serviceAddress) {
|
||||
this.address = serviceAddress;
|
||||
}
|
||||
|
||||
//netty客户端初始化
|
||||
@ -48,7 +48,6 @@ public class NettyRpcClient implements RpcClient {
|
||||
@Override
|
||||
public RpcResponse sendRequest(RpcRequest request) {
|
||||
//从注册中心获取host,post
|
||||
InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
|
||||
if (address == null) {
|
||||
log.error("服务发现失败,返回的地址为 null");
|
||||
return RpcResponse.fail("服务发现失败,地址为 null");
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package com.kama.client.servicecenter;
|
||||
|
||||
|
||||
import common.message.RpcRequest;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
@ -13,8 +15,8 @@ import java.net.InetSocketAddress;
|
||||
|
||||
public interface ServiceCenter {
|
||||
// 查询:根据服务名查找地址
|
||||
InetSocketAddress serviceDiscovery(String serviceName);
|
||||
InetSocketAddress serviceDiscovery(RpcRequest request);
|
||||
|
||||
//判断是否可重试
|
||||
boolean checkRetry(String serviceName);
|
||||
boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature);
|
||||
}
|
||||
|
||||
@ -4,7 +4,7 @@ import com.kama.client.cache.ServiceCache;
|
||||
import com.kama.client.servicecenter.ZKWatcher.watchZK;
|
||||
import com.kama.client.servicecenter.balance.LoadBalance;
|
||||
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
|
||||
import com.kama.client.servicecenter.balance.impl.RandomLoadBalance;
|
||||
import common.message.RpcRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
@ -57,7 +57,8 @@ public class ZKServiceCenter implements ServiceCenter {
|
||||
|
||||
//根据服务名(接口名)返回地址
|
||||
@Override
|
||||
public InetSocketAddress serviceDiscovery(String serviceName) {
|
||||
public InetSocketAddress serviceDiscovery(RpcRequest request) {
|
||||
String serviceName = request.getInterfaceName();
|
||||
try {
|
||||
//先从本地缓存中找
|
||||
List<String> addressList = cache.getServiceListFromCache(serviceName);
|
||||
@ -89,20 +90,23 @@ public class ZKServiceCenter implements ServiceCenter {
|
||||
//保证线程安全使用CopyOnWriteArraySet
|
||||
private Set<String> retryServiceCache = new CopyOnWriteArraySet<>();
|
||||
//写一个白名单缓存,优化性能
|
||||
public boolean checkRetry(String serviceName) {
|
||||
// 如果缓存为空,则从 Zookeeper 中加载白名单
|
||||
@Override
|
||||
public boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature) {
|
||||
if (retryServiceCache.isEmpty()) {
|
||||
try {
|
||||
// 获取 Zookeeper 上的 /RETRY 路径下的所有子节点(服务名称)
|
||||
List<String> serviceList = client.getChildren().forPath("/" + RETRY);
|
||||
// 将从 Zookeeper 获取到的服务名称列表添加到缓存中
|
||||
retryServiceCache.addAll(serviceList);
|
||||
CuratorFramework rootClient = client.usingNamespace(RETRY);
|
||||
List<String> retryableMethods = rootClient.getChildren().forPath("/" + getServiceAddress(serviceAddress));
|
||||
retryServiceCache.addAll(retryableMethods);
|
||||
} catch (Exception e) {
|
||||
log.error("检查重试失败,服务名:{}", serviceName, e);
|
||||
log.error("检查重试失败,方法签名:{}", methodSignature, e);
|
||||
}
|
||||
}
|
||||
// 判断服务是否在缓存的白名单中
|
||||
return retryServiceCache.contains(serviceName);
|
||||
return retryServiceCache.contains(methodSignature);
|
||||
}
|
||||
|
||||
// 将InetSocketAddress解析为格式为ip:port的字符串
|
||||
private String getServiceAddress(InetSocketAddress serverAddress){
|
||||
return serverAddress.getHostName() + ":" + serverAddress.getPort();
|
||||
}
|
||||
|
||||
// 字符串解析为地址
|
||||
|
||||
@ -38,7 +38,7 @@ public class ServiceProvider {
|
||||
this.rateLimitProvider = new RateLimitProvider();
|
||||
}
|
||||
|
||||
public void provideServiceInterface(Object service, boolean canRetry) {
|
||||
public void provideServiceInterface(Object service) {
|
||||
String serviceName = service.getClass().getName();
|
||||
Class<?>[] interfaceName = service.getClass().getInterfaces();
|
||||
|
||||
@ -46,7 +46,7 @@ public class ServiceProvider {
|
||||
//本机的映射表
|
||||
interfaceProvider.put(clazz.getName(), service);
|
||||
//在注册中心注册服务
|
||||
serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port), canRetry);
|
||||
serviceRegister.register(clazz, new InetSocketAddress(host, port));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -12,5 +12,5 @@ import java.net.InetSocketAddress;
|
||||
*/
|
||||
|
||||
public interface ServiceRegister {
|
||||
void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry);
|
||||
void register(Class<?> clazz, InetSocketAddress serviceAddress);
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.kama.server.serviceRegister.impl;
|
||||
|
||||
import com.kama.annotation.Retryable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
@ -10,7 +11,10 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import com.kama.server.serviceRegister.ServiceRegister;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @ClassName ZKServiceRegister
|
||||
@ -38,7 +42,8 @@ public class ZKServiceRegister implements ServiceRegister {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry) {
|
||||
public void register(Class<?> clazz, InetSocketAddress serviceAddress) {
|
||||
String serviceName = clazz.getName();
|
||||
try {
|
||||
if (client.checkExists().forPath("/" + serviceName) == null) {
|
||||
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
|
||||
@ -53,10 +58,12 @@ public class ZKServiceRegister implements ServiceRegister {
|
||||
log.info("服务地址 {} 已经存在,跳过注册", path);
|
||||
}
|
||||
|
||||
if (canRetry) {
|
||||
path = "/" + RETRY + "/" + serviceName;
|
||||
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
|
||||
log.info("重试标识 {} 注册成功", path);
|
||||
// 注册白名单
|
||||
List<String> retryableMethods = getRetryableMethod(clazz);
|
||||
log.info("可重试的方法: {}", retryableMethods);
|
||||
CuratorFramework rootClient = client.usingNamespace(RETRY);
|
||||
for (String retryableMethod : retryableMethods) {
|
||||
rootClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/" + getServiceAddress(serviceAddress) + "/" + retryableMethod);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("服务注册失败,服务名:{},错误信息:{}", serviceName, e.getMessage(), e);
|
||||
@ -71,4 +78,31 @@ public class ZKServiceRegister implements ServiceRegister {
|
||||
private String getServiceAddress(InetSocketAddress serverAddress) {
|
||||
return serverAddress.getHostName() + ":" + serverAddress.getPort();
|
||||
}
|
||||
|
||||
// 判断一个方法是否加了Retryable注解
|
||||
private List<String> getRetryableMethod(Class<?> clazz){
|
||||
List<String> retryableMethods = new ArrayList<>();
|
||||
for (Method method : clazz.getDeclaredMethods()) {
|
||||
if (method.isAnnotationPresent(Retryable.class)) {
|
||||
String methodSignature = getMethodSignature(clazz, method);
|
||||
retryableMethods.add(methodSignature);
|
||||
}
|
||||
}
|
||||
return retryableMethods;
|
||||
}
|
||||
|
||||
private String getMethodSignature(Class<?> clazz, Method method) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(clazz.getName()).append("#").append(method.getName()).append("(");
|
||||
Class<?>[] parameterTypes = method.getParameterTypes();
|
||||
for (int i = 0; i < parameterTypes.length; i++) {
|
||||
sb.append(parameterTypes[i].getName());
|
||||
if (i < parameterTypes.length - 1) {
|
||||
sb.append(",");
|
||||
} else{
|
||||
sb.append(")");
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,16 +20,18 @@ public class ProviderTest {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
KRpcApplication.initialize();
|
||||
String ip = System.getProperty("ip");
|
||||
int port = Integer.parseInt(System.getProperty("port"));
|
||||
// 创建 UserService 实例
|
||||
UserService userService = new UserServiceImpl();
|
||||
ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 9999);
|
||||
ServiceProvider serviceProvider = new ServiceProvider(ip, port);
|
||||
// 发布服务接口到 ServiceProvider
|
||||
serviceProvider.provideServiceInterface(userService, true); // 可以设置是否支持重试
|
||||
serviceProvider.provideServiceInterface(userService); // 可以设置是否支持重试
|
||||
|
||||
// 启动 RPC 服务器并监听端口
|
||||
RpcServer rpcServer = new NettyRpcServer(serviceProvider);
|
||||
rpcServer.start(9999); // 启动 Netty RPC 服务,监听 9999 端口
|
||||
log.info("RPC 服务端启动,监听端口 9999");
|
||||
rpcServer.start(port); // 启动 Netty RPC 服务,监听 port 端口
|
||||
log.info("RPC 服务端启动,监听端口" + port);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user