diff --git a/version4/pom.xml b/version4/pom.xml index f0dc119..254fdb9 100644 --- a/version4/pom.xml +++ b/version4/pom.xml @@ -13,5 +13,52 @@ 8 UTF-8 + + + org.projectlombok + lombok + 1.18.30 + compile + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + + org.apache.logging.log4j + log4j-1.2-api + 2.8.2 + + + io.netty + netty-all + 4.1.51.Final + compile + + + + org.apache.curator + curator-recipes + 5.1.0 + + + com.alibaba + fastjson + 1.2.83 + + + com.github.rholder + guava-retrying + 2.0.0 + + + com.github.rholder + guava-retrying + 2.0.0 + compile + + + \ No newline at end of file diff --git a/version4/src/main/java/part1/Client/TestClient.java b/version4/src/main/java/part1/Client/TestClient.java new file mode 100644 index 0000000..705f383 --- /dev/null +++ b/version4/src/main/java/part1/Client/TestClient.java @@ -0,0 +1,27 @@ +package part1.Client; + + +import part1.Client.proxy.ClientProxy; +import part1.common.pojo.User; +import part1.common.service.UserService; + + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 18:39 + */ + +public class TestClient { + public static void main(String[] args) throws InterruptedException { + ClientProxy clientProxy=new ClientProxy(); + UserService proxy=clientProxy.getProxy(UserService.class); + + User user = proxy.getUserByUserId(1); + System.out.println("从服务端得到的user="+user.toString()); + + User u=User.builder().id(100).userName("wxx").sex(true).build(); + Integer id = proxy.insertUserId(u); + System.out.println("向服务端插入user的id"+id); + } +} diff --git a/version4/src/main/java/part1/Client/cache/serviceCache.java b/version4/src/main/java/part1/Client/cache/serviceCache.java new file mode 100644 index 0000000..807d818 --- /dev/null +++ b/version4/src/main/java/part1/Client/cache/serviceCache.java @@ -0,0 +1,44 @@ +package part1.Client.cache; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/4 0:57 + */ +public class serviceCache { + //key: serviceName 服务名 + //value: addressList 服务提供者列表 + private static Map> cache=new HashMap<>(); + + //添加服务 + public void addServcieToCache(String serviceName,String address){ + if(cache.containsKey(serviceName)){ + List addressList = cache.get(serviceName); + addressList.add(address); + System.out.println("将name为"+serviceName+"和地址为"+address+"的服务添加到本地缓存中"); + }else { + List addressList=new ArrayList<>(); + addressList.add(address); + cache.put(serviceName,addressList); + } + } + //从缓存中取服务地址列表 + public List getServiceListFromCache(String serviceName){ + if(!cache.containsKey(serviceName)) { + return null; + } + return cache.get(serviceName); + } + + //从缓存中删除服务地址 + public void delete(String serviceName,String address){ + List addressList = cache.get(serviceName); + addressList.remove(address); + System.out.println("将name为"+serviceName+"和地址为"+address+"的服务从本地缓存中删除"); + } +} diff --git a/version4/src/main/java/part1/Client/circuitBreaker/CircuitBreaker.java b/version4/src/main/java/part1/Client/circuitBreaker/CircuitBreaker.java new file mode 100644 index 0000000..d7b15d5 --- /dev/null +++ b/version4/src/main/java/part1/Client/circuitBreaker/CircuitBreaker.java @@ -0,0 +1,87 @@ +package part1.Client.circuitBreaker; + +/** + * @author wxx + * @version 1.0 + * @create 2024/7/2 21:49 + */ +import java.util.concurrent.atomic.AtomicInteger; + +public class CircuitBreaker { + //当前状态 + private CircuitBreakerState state = CircuitBreakerState.CLOSED; + private AtomicInteger failureCount = new AtomicInteger(0); + private AtomicInteger successCount = new AtomicInteger(0); + private AtomicInteger requestCount = new AtomicInteger(0); + //失败次数阈值 + private final int failureThreshold; + //半开启-》关闭状态的成功次数比例 + private final double halfOpenSuccessRate; + //恢复时间 + private final long retryTimePeriod; + //上一次失败时间 + private long lastFailureTime = 0; + + public CircuitBreaker(int failureThreshold, double halfOpenSuccessRate,long retryTimePeriod) { + this.failureThreshold = failureThreshold; + this.halfOpenSuccessRate = halfOpenSuccessRate; + this.retryTimePeriod = retryTimePeriod; + } + //查看当前熔断器是否允许请求通过 + public synchronized boolean allowRequest() { + long currentTime = System.currentTimeMillis(); + switch (state) { + case OPEN: + if (currentTime - lastFailureTime > retryTimePeriod) { + state = CircuitBreakerState.HALF_OPEN; + resetCounts(); + return true; + } + return false; + case HALF_OPEN: + requestCount.incrementAndGet(); + return true; + case CLOSED: + default: + return true; + } + } + //记录成功 + public synchronized void recordSuccess() { + if (state == CircuitBreakerState.HALF_OPEN) { + successCount.incrementAndGet(); + if (successCount.get() >= halfOpenSuccessRate * requestCount.get()) { + state = CircuitBreakerState.CLOSED; + resetCounts(); + } + } else { + resetCounts(); + } + } + //记录失败 + public synchronized void recordFailure() { + failureCount.incrementAndGet(); + lastFailureTime = System.currentTimeMillis(); + if (state == CircuitBreakerState.HALF_OPEN) { + state = CircuitBreakerState.OPEN; + lastFailureTime = System.currentTimeMillis(); + } else if (failureCount.get() >= failureThreshold) { + state = CircuitBreakerState.OPEN; + } + } + //重置次数 + private void resetCounts() { + failureCount.set(0); + successCount.set(0); + requestCount.set(0); + } + + public CircuitBreakerState getState() { + return state; + } +} + +enum CircuitBreakerState { + //关闭,开启,半开启 + CLOSED, OPEN, HALF_OPEN +} \ No newline at end of file diff --git a/version4/src/main/java/part1/Client/circuitBreaker/CircuitBreakerProvider.java b/version4/src/main/java/part1/Client/circuitBreaker/CircuitBreakerProvider.java new file mode 100644 index 0000000..7ce3649 --- /dev/null +++ b/version4/src/main/java/part1/Client/circuitBreaker/CircuitBreakerProvider.java @@ -0,0 +1,23 @@ +package part1.Client.circuitBreaker; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2024/7/3 0:22 + */ +public class CircuitBreakerProvider { + private Map circuitBreakerMap=new HashMap<>(); + + public CircuitBreaker getCircuitBreaker(String serviceName){ + CircuitBreaker circuitBreaker; + if(circuitBreakerMap.containsKey(serviceName)){ + circuitBreaker=circuitBreakerMap.get(serviceName); + }else { + circuitBreaker=new CircuitBreaker(3,0.5,10000); + } + return circuitBreaker; + } +} diff --git a/version4/src/main/java/part1/Client/netty/handler/NettyClientHandler.java b/version4/src/main/java/part1/Client/netty/handler/NettyClientHandler.java new file mode 100644 index 0000000..31858c6 --- /dev/null +++ b/version4/src/main/java/part1/Client/netty/handler/NettyClientHandler.java @@ -0,0 +1,27 @@ +package part1.Client.netty.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.AttributeKey; +import part1.common.Message.RpcResponse; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 17:29 + */ +public class NettyClientHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { + // 接收到response, 给channel设计别名,让sendRequest里读取response + AttributeKey key = AttributeKey.valueOf("RPCResponse"); + ctx.channel().attr(key).set(response); + ctx.channel().close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/version4/src/main/java/part1/Client/netty/nettyInitializer/NettyClientInitializer.java b/version4/src/main/java/part1/Client/netty/nettyInitializer/NettyClientInitializer.java new file mode 100644 index 0000000..e0b520a --- /dev/null +++ b/version4/src/main/java/part1/Client/netty/nettyInitializer/NettyClientInitializer.java @@ -0,0 +1,26 @@ +package part1.Client.netty.nettyInitializer; + + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import part1.Client.netty.handler.NettyClientHandler; +import part1.common.serializer.myCode.MyDecoder; +import part1.common.serializer.myCode.MyEncoder; +import part1.common.serializer.mySerializer.JsonSerializer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 17:26 + */ +public class NettyClientInitializer extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + //使用自定义的编/解码器 + pipeline.addLast(new MyEncoder(new JsonSerializer())); + pipeline.addLast(new MyDecoder()); + pipeline.addLast(new NettyClientHandler()); + } +} diff --git a/version4/src/main/java/part1/Client/proxy/ClientProxy.java b/version4/src/main/java/part1/Client/proxy/ClientProxy.java new file mode 100644 index 0000000..5de3e11 --- /dev/null +++ b/version4/src/main/java/part1/Client/proxy/ClientProxy.java @@ -0,0 +1,73 @@ +package part1.Client.proxy; + + +import part1.Client.circuitBreaker.CircuitBreaker; +import part1.Client.circuitBreaker.CircuitBreakerProvider; +import part1.Client.retry.guavaRetry; +import part1.Client.rpcClient.RpcClient; +import part1.Client.rpcClient.impl.NettyRpcClient; +import part1.Client.serviceCenter.ServiceCenter; +import part1.Client.serviceCenter.ZKServiceCenter; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 16:49 + */ +public class ClientProxy implements InvocationHandler { + //传入参数service接口的class对象,反射封装成一个request + + private RpcClient rpcClient; + private ServiceCenter serviceCenter; + private CircuitBreakerProvider circuitBreakerProvider; + public ClientProxy() throws InterruptedException { + serviceCenter=new ZKServiceCenter(); + rpcClient=new NettyRpcClient(serviceCenter); + circuitBreakerProvider=new CircuitBreakerProvider(); + } + + //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端) + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + //构建request + RpcRequest request=RpcRequest.builder() + .interfaceName(method.getDeclaringClass().getName()) + .methodName(method.getName()) + .params(args).paramsType(method.getParameterTypes()).build(); + //获取熔断器 + CircuitBreaker circuitBreaker=circuitBreakerProvider.getCircuitBreaker(method.getName()); + //判断熔断器是否允许请求经过 + if (!circuitBreaker.allowRequest()){ + //这里可以针对熔断做特殊处理,返回特殊值 + return null; + } + //数据传输 + RpcResponse response; + //后续添加逻辑:为保持幂等性,只对白名单上的服务进行重试 + if (serviceCenter.checkRetry(request.getInterfaceName())){ + //调用retry框架进行重试操作 + response=new guavaRetry().sendServiceWithRetry(request,rpcClient); + }else { + //只调用一次 + response= rpcClient.sendRequest(request); + } + //记录response的状态,上报给熔断器 + if (response.getCode() ==200){ + circuitBreaker.recordSuccess(); + } + if (response.getCode()==500){ + circuitBreaker.recordFailure(); + } + return response.getData(); + } + public T getProxy(Class clazz){ + Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); + return (T)o; + } +} diff --git a/version4/src/main/java/part1/Client/retry/guavaRetry.java b/version4/src/main/java/part1/Client/retry/guavaRetry.java new file mode 100644 index 0000000..62e70c3 --- /dev/null +++ b/version4/src/main/java/part1/Client/retry/guavaRetry.java @@ -0,0 +1,43 @@ +package part1.Client.retry; + +import com.github.rholder.retry.*; +import part1.Client.rpcClient.RpcClient; +import part1.common.Message.RpcResponse; +import part1.common.Message.RpcRequest; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/20 19:28 + */ +public class guavaRetry { + private RpcClient rpcClient; + public RpcResponse sendServiceWithRetry(RpcRequest request, RpcClient rpcClient) { + this.rpcClient=rpcClient; + Retryer retryer = RetryerBuilder.newBuilder() + //无论出现什么异常,都进行重试 + .retryIfException() + //返回结果为 error时进行重试 + .retryIfResult(response -> Objects.equals(response.getCode(), 500)) + //重试等待策略:等待 2s 后再进行重试 + .withWaitStrategy(WaitStrategies.fixedWait(2, TimeUnit.SECONDS)) + //重试停止策略:重试达到 3 次 + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + System.out.println("RetryListener: 第" + attempt.getAttemptNumber() + "次调用"); + } + }) + .build(); + try { + return retryer.call(() -> rpcClient.sendRequest(request)); + } catch (Exception e) { + e.printStackTrace(); + } + return RpcResponse.fail(); + } +} \ No newline at end of file diff --git a/version4/src/main/java/part1/Client/rpcClient/RpcClient.java b/version4/src/main/java/part1/Client/rpcClient/RpcClient.java new file mode 100644 index 0000000..0268128 --- /dev/null +++ b/version4/src/main/java/part1/Client/rpcClient/RpcClient.java @@ -0,0 +1,15 @@ +package part1.Client.rpcClient; + +import part1.common.Message.RpcResponse; +import part1.common.Message.RpcRequest; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 18:55 + */ +public interface RpcClient { + + //定义底层通信的方法 + RpcResponse sendRequest(RpcRequest request); +} diff --git a/version4/src/main/java/part1/Client/rpcClient/impl/NettyRpcClient.java b/version4/src/main/java/part1/Client/rpcClient/impl/NettyRpcClient.java new file mode 100644 index 0000000..bca9e15 --- /dev/null +++ b/version4/src/main/java/part1/Client/rpcClient/impl/NettyRpcClient.java @@ -0,0 +1,67 @@ +package part1.Client.rpcClient.impl; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.AttributeKey; +import part1.Client.netty.nettyInitializer.NettyClientInitializer; +import part1.Client.rpcClient.RpcClient; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; +import part1.Client.serviceCenter.ServiceCenter; + +import java.net.InetSocketAddress; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 19:40 + */ +public class NettyRpcClient implements RpcClient { + + private static final Bootstrap bootstrap; + private static final EventLoopGroup eventLoopGroup; + + private ServiceCenter serviceCenter; + public NettyRpcClient(ServiceCenter serviceCenter) throws InterruptedException { + this.serviceCenter=serviceCenter; + } + + //netty客户端初始化 + static { + eventLoopGroup = new NioEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) + .handler(new NettyClientInitializer()); + } + @Override + public RpcResponse sendRequest(RpcRequest request) { + //从注册中心获取host,post + InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName()); + String host = address.getHostName(); + int port = address.getPort(); + try { + ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); + Channel channel = channelFuture.channel(); + // 发送数据 + channel.writeAndFlush(request); + //sync()堵塞获取结果 + channel.closeFuture().sync(); + // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置) + // AttributeKey是,线程隔离的,不会由线程安全问题。 + // 当前场景下选择堵塞获取结果 + // 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener... + AttributeKey key = AttributeKey.valueOf("RPCResponse"); + RpcResponse response = channel.attr(key).get(); + + System.out.println(response); + return response; + } catch (InterruptedException e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/version4/src/main/java/part1/Client/rpcClient/impl/SimpleSocketRpcCilent.java b/version4/src/main/java/part1/Client/rpcClient/impl/SimpleSocketRpcCilent.java new file mode 100644 index 0000000..99c8c85 --- /dev/null +++ b/version4/src/main/java/part1/Client/rpcClient/impl/SimpleSocketRpcCilent.java @@ -0,0 +1,41 @@ +package part1.Client.rpcClient.impl; + +import part1.Client.rpcClient.RpcClient; +import part1.common.Message.RpcResponse; +import part1.common.Message.RpcRequest; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 18:58 + */ +public class SimpleSocketRpcCilent implements RpcClient { + private String host; + private int port; + public SimpleSocketRpcCilent(String host,int port){ + this.host=host; + this.port=port; + } + @Override + public RpcResponse sendRequest(RpcRequest request) { + try { + Socket socket=new Socket(host, port); + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + + oos.writeObject(request); + oos.flush(); + + RpcResponse response=(RpcResponse) ois.readObject(); + return response; + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/version4/src/main/java/part1/Client/serviceCenter/ServiceCenter.java b/version4/src/main/java/part1/Client/serviceCenter/ServiceCenter.java new file mode 100644 index 0000000..57a250d --- /dev/null +++ b/version4/src/main/java/part1/Client/serviceCenter/ServiceCenter.java @@ -0,0 +1,16 @@ +package part1.Client.serviceCenter; + +import java.net.InetSocketAddress; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 21:42 + */ +//服务中心接口 +public interface ServiceCenter { + // 查询:根据服务名查找地址 + InetSocketAddress serviceDiscovery(String serviceName); + //判断是否可重试 + boolean checkRetry(String serviceName) ; +} diff --git a/version4/src/main/java/part1/Client/serviceCenter/ZKServiceCenter.java b/version4/src/main/java/part1/Client/serviceCenter/ZKServiceCenter.java new file mode 100644 index 0000000..d549d18 --- /dev/null +++ b/version4/src/main/java/part1/Client/serviceCenter/ZKServiceCenter.java @@ -0,0 +1,93 @@ +package part1.Client.serviceCenter; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import part1.Client.serviceCenter.ZkWatcher.watchZK; +import part1.Client.cache.serviceCache; +import part1.Client.serviceCenter.balance.impl.ConsistencyHashBalance; + +import java.net.InetSocketAddress; +import java.util.List; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 21:41 + */ +public class ZKServiceCenter implements ServiceCenter{ + // curator 提供的zookeeper客户端 + private CuratorFramework client; + //zookeeper根路径节点 + private static final String ROOT_PATH = "MyRPC"; + private static final String RETRY = "CanRetry"; + //serviceCache + private serviceCache cache; + + //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接 + public ZKServiceCenter() throws InterruptedException { + // 指数时间重试 + RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); + // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接 + // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系, + // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍 + // 使用心跳监听状态 + this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") + .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build(); + this.client.start(); + System.out.println("zookeeper 连接成功"); + //初始化本地缓存 + cache=new serviceCache(); + //加入zookeeper事件监听器 + watchZK watcher=new watchZK(client,cache); + //监听启动 + watcher.watchToUpdate(ROOT_PATH); + } + //根据服务名(接口名)返回地址 + @Override + public InetSocketAddress serviceDiscovery(String serviceName) { + try { + //先从本地缓存中找 + List addressList=cache.getServiceListFromCache(serviceName); + //如果找不到,再去zookeeper中找 + //这种i情况基本不会发生,或者说只会出现在初始化阶段 + if(addressList==null) { + addressList=client.getChildren().forPath("/" + serviceName); + } + // 负载均衡得到地址 + String address = new ConsistencyHashBalance().balance(addressList); + return parseAddress(address); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + // + public boolean checkRetry(String serviceName) { + boolean canRetry =false; + try { + List serviceList = client.getChildren().forPath("/" + RETRY); + for(String s:serviceList){ + if(s.equals(serviceName)){ + System.out.println("服务"+serviceName+"在白名单上,可进行重试"); + canRetry=true; + } + } + }catch (Exception e) { + e.printStackTrace(); + } + return canRetry; + } + // 地址 -> XXX.XXX.XXX.XXX:port 字符串 + private String getServiceAddress(InetSocketAddress serverAddress) { + return serverAddress.getHostName() + + ":" + + serverAddress.getPort(); + } + // 字符串解析为地址 + private InetSocketAddress parseAddress(String address) { + String[] result = address.split(":"); + return new InetSocketAddress(result[0], Integer.parseInt(result[1])); + } +} diff --git a/version4/src/main/java/part1/Client/serviceCenter/ZkWatcher/watchZK.java b/version4/src/main/java/part1/Client/serviceCenter/ZkWatcher/watchZK.java new file mode 100644 index 0000000..7861c1b --- /dev/null +++ b/version4/src/main/java/part1/Client/serviceCenter/ZkWatcher/watchZK.java @@ -0,0 +1,82 @@ +package part1.Client.serviceCenter.ZkWatcher; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import part1.Client.cache.serviceCache; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/4 1:00 + */ +public class watchZK { + // curator 提供的zookeeper客户端 + private CuratorFramework client; + //本地缓存 + serviceCache cache; + + public watchZK(CuratorFramework client,serviceCache cache){ + this.client=client; + this.cache=cache; + } + + /** + * 监听当前节点和子节点的 更新,创建,删除 + * @param path + */ + public void watchToUpdate(String path) throws InterruptedException { + CuratorCache curatorCache = CuratorCache.build(client, "/"); + curatorCache.listenable().addListener(new CuratorCacheListener() { + @Override + public void event(Type type, ChildData childData, ChildData childData1) { + // 第一个参数:事件类型(枚举) + // 第二个参数:节点更新前的状态、数据 + // 第三个参数:节点更新后的状态、数据 + // 创建节点时:节点刚被创建,不存在 更新前节点 ,所以第二个参数为 null + // 删除节点时:节点被删除,不存在 更新后节点 ,所以第三个参数为 null + // 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据 + switch (type.name()) { + case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件 + //获取更新的节点的路径 + String path=new String(childData1.getPath()); + //按照格式 ,读取 + String[] pathList= path.split("/"); + if(pathList.length<=2) break; + else { + String serviceName=pathList[1]; + String address=pathList[2]; + //将新注册的服务加入到本地缓存中 + cache.addServcieToCache(serviceName,address); + } + break; + case "NODE_CHANGED": // 节点更新 + if (childData.getData() != null) { + System.out.println("修改前的数据: " + new String(childData.getData())); + } else { + System.out.println("节点第一次赋值!"); + } + System.out.println("修改后的数据: " + new String(childData1.getData())); + break; + case "NODE_DELETED": // 节点删除 + String path_d=new String(childData.getPath()); + //按照格式 ,读取 + String[] pathList_d= path_d.split("/"); + if(pathList_d.length<=2) break; + else { + String serviceName=pathList_d[1]; + String address=pathList_d[2]; + //将新注册的服务加入到本地缓存中 + cache.delete(serviceName,address); + } + break; + default: + break; + } + } + }); + //开启监听 + curatorCache.start(); + } +} \ No newline at end of file diff --git a/version4/src/main/java/part1/Client/serviceCenter/balance/LoadBalance.java b/version4/src/main/java/part1/Client/serviceCenter/balance/LoadBalance.java new file mode 100644 index 0000000..03d3f7a --- /dev/null +++ b/version4/src/main/java/part1/Client/serviceCenter/balance/LoadBalance.java @@ -0,0 +1,15 @@ +package part1.Client.serviceCenter.balance; + +import java.util.List; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/19 21:00 + * 给服务地址列表,根据不同的负载均衡策略选择一个 + */ +public interface LoadBalance { + String balance(List addressList); + void addNode(String node) ; + void delNode(String node); +} \ No newline at end of file diff --git a/version4/src/main/java/part1/Client/serviceCenter/balance/impl/ConsistencyHashBalance.java b/version4/src/main/java/part1/Client/serviceCenter/balance/impl/ConsistencyHashBalance.java new file mode 100644 index 0000000..f1f315b --- /dev/null +++ b/version4/src/main/java/part1/Client/serviceCenter/balance/impl/ConsistencyHashBalance.java @@ -0,0 +1,119 @@ +package part1.Client.serviceCenter.balance.impl; + +import part1.Client.serviceCenter.balance.LoadBalance; + +import java.util.*; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/19 21:16 + * 一致性哈希算法 负载均衡 + */ +public class ConsistencyHashBalance implements LoadBalance { + // 虚拟节点的个数 + private static final int VIRTUAL_NUM = 5; + + // 虚拟节点分配,key是hash值,value是虚拟节点服务器名称 + private static SortedMap shards = new TreeMap(); + + // 真实节点列表 + private static List realNodes = new LinkedList(); + + //模拟初始服务器 + private static String[] servers =null; + + private static void init(List serviceList) { + for (String server :serviceList) { + realNodes.add(server); + System.out.println("真实节点[" + server + "] 被添加"); + for (int i = 0; i < VIRTUAL_NUM; i++) { + String virtualNode = server + "&&VN" + i; + int hash = getHash(virtualNode); + shards.put(hash, virtualNode); + System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加"); + } + } + } + /** + * 获取被分配的节点名 + * + * @param node + * @return + */ + public static String getServer(String node,List serviceList) { + init(serviceList); + int hash = getHash(node); + Integer key = null; + SortedMap subMap = shards.tailMap(hash); + if (subMap.isEmpty()) { + key = shards.lastKey(); + } else { + key = subMap.firstKey(); + } + String virtualNode = shards.get(key); + return virtualNode.substring(0, virtualNode.indexOf("&&")); + } + + /** + * 添加节点 + * + * @param node + */ + public void addNode(String node) { + if (!realNodes.contains(node)) { + realNodes.add(node); + System.out.println("真实节点[" + node + "] 上线添加"); + for (int i = 0; i < VIRTUAL_NUM; i++) { + String virtualNode = node + "&&VN" + i; + int hash = getHash(virtualNode); + shards.put(hash, virtualNode); + System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加"); + } + } + } + + /** + * 删除节点 + * + * @param node + */ + public void delNode(String node) { + if (realNodes.contains(node)) { + realNodes.remove(node); + System.out.println("真实节点[" + node + "] 下线移除"); + for (int i = 0; i < VIRTUAL_NUM; i++) { + String virtualNode = node + "&&VN" + i; + int hash = getHash(virtualNode); + shards.remove(hash); + System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被移除"); + } + } + } + + /** + * FNV1_32_HASH算法 + */ + private static int getHash(String str) { + final int p = 16777619; + int hash = (int) 2166136261L; + for (int i = 0; i < str.length(); i++) + hash = (hash ^ str.charAt(i)) * p; + hash += hash << 13; + hash ^= hash >> 7; + hash += hash << 3; + hash ^= hash >> 17; + hash += hash << 5; + // 如果算出来的值为负数则取其绝对值 + if (hash < 0) + hash = Math.abs(hash); + return hash; + } + + @Override + public String balance(List addressList) { + String random= UUID.randomUUID().toString(); + return getServer(random,addressList); + } + +} diff --git a/version4/src/main/java/part1/Client/serviceCenter/balance/impl/RandomLoadBalance.java b/version4/src/main/java/part1/Client/serviceCenter/balance/impl/RandomLoadBalance.java new file mode 100644 index 0000000..9776d6b --- /dev/null +++ b/version4/src/main/java/part1/Client/serviceCenter/balance/impl/RandomLoadBalance.java @@ -0,0 +1,24 @@ +package part1.Client.serviceCenter.balance.impl; + +import part1.Client.serviceCenter.balance.LoadBalance; + +import java.util.List; +import java.util.Random; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/19 21:20 + * 随机 负载均衡 + */ +public class RandomLoadBalance implements LoadBalance { + @Override + public String balance(List addressList) { + Random random=new Random(); + int choose = random.nextInt(addressList.size()); + System.out.println("负载均衡选择了"+choose+"服务器"); + return null; + } + public void addNode(String node){} ; + public void delNode(String node){}; +} diff --git a/version4/src/main/java/part1/Client/serviceCenter/balance/impl/RoundLoadBalance.java b/version4/src/main/java/part1/Client/serviceCenter/balance/impl/RoundLoadBalance.java new file mode 100644 index 0000000..9f90d59 --- /dev/null +++ b/version4/src/main/java/part1/Client/serviceCenter/balance/impl/RoundLoadBalance.java @@ -0,0 +1,24 @@ +package part1.Client.serviceCenter.balance.impl; + +import part1.Client.serviceCenter.balance.LoadBalance; + +import java.util.List; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/19 21:21 + * 轮询 负载均衡 + */ +public class RoundLoadBalance implements LoadBalance { + private int choose=-1; + @Override + public String balance(List addressList) { + choose++; + choose=choose%addressList.size(); + System.out.println("负载均衡选择了"+choose+"服务器"); + return addressList.get(choose); + } + public void addNode(String node) {}; + public void delNode(String node){}; +} diff --git a/version4/src/main/java/part1/Server/TestServer.java b/version4/src/main/java/part1/Server/TestServer.java new file mode 100644 index 0000000..747e8c6 --- /dev/null +++ b/version4/src/main/java/part1/Server/TestServer.java @@ -0,0 +1,27 @@ +package part1.Server; + + +import part1.Server.server.impl.NettyRPCRPCServer; +import part1.Server.provider.ServiceProvider; +import part1.Server.server.RpcServer; +import part1.common.service.Impl.UserServiceImpl; +import part1.common.service.UserService; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/11 19:39 + */ + +public class TestServer { + public static void main(String[] args) throws InterruptedException { + UserService userService=new UserServiceImpl(); + + ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999); + + serviceProvider.provideServiceInterface(userService,true); + + RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider); + rpcServer.start(9999); + } +} diff --git a/version4/src/main/java/part1/Server/netty/handler/NettyRPCServerHandler.java b/version4/src/main/java/part1/Server/netty/handler/NettyRPCServerHandler.java new file mode 100644 index 0000000..db474d0 --- /dev/null +++ b/version4/src/main/java/part1/Server/netty/handler/NettyRPCServerHandler.java @@ -0,0 +1,59 @@ +package part1.Server.netty.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.AllArgsConstructor; +import part1.Server.provider.ServiceProvider; +import part1.Server.ratelimit.RateLimit; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 16:40 + * 因为是服务器端,我们知道接受到请求格式是RPCRequest + * Object类型也行,强制转型就行 + */ +@AllArgsConstructor +public class NettyRPCServerHandler extends SimpleChannelInboundHandler { + private ServiceProvider serviceProvider; + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { + RpcResponse response = getResponse(request); + ctx.writeAndFlush(response); + ctx.close(); + } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + private RpcResponse getResponse(RpcRequest rpcRequest){ + //得到服务名 + String interfaceName=rpcRequest.getInterfaceName(); + //接口限流降级 + RateLimit rateLimit=serviceProvider.getRateLimitProvider().getRateLimit(interfaceName); + if(!rateLimit.getToken()){ + //如果获取令牌失败,进行限流降级,快速返回结果 + return RpcResponse.fail(); + } + + //得到服务端相应服务实现类 + Object service = serviceProvider.getService(interfaceName); + //反射调用方法 + Method method=null; + try { + method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType()); + Object invoke=method.invoke(service,rpcRequest.getParams()); + return RpcResponse.sussess(invoke); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + System.out.println("方法执行错误"); + return RpcResponse.fail(); + } + } +} diff --git a/version4/src/main/java/part1/Server/netty/nettyInitializer/NettyServerInitializer.java b/version4/src/main/java/part1/Server/netty/nettyInitializer/NettyServerInitializer.java new file mode 100644 index 0000000..6518280 --- /dev/null +++ b/version4/src/main/java/part1/Server/netty/nettyInitializer/NettyServerInitializer.java @@ -0,0 +1,30 @@ +package part1.Server.netty.nettyInitializer; + + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import lombok.AllArgsConstructor; +import part1.Server.netty.handler.NettyRPCServerHandler; +import part1.Server.provider.ServiceProvider; +import part1.common.serializer.myCode.MyDecoder; +import part1.common.serializer.myCode.MyEncoder; +import part1.common.serializer.mySerializer.JsonSerializer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 16:15 + */ +@AllArgsConstructor +public class NettyServerInitializer extends ChannelInitializer { + private ServiceProvider serviceProvider; + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + //使用自定义的编/解码器 + pipeline.addLast(new MyEncoder(new JsonSerializer())); + pipeline.addLast(new MyDecoder()); + pipeline.addLast(new NettyRPCServerHandler(serviceProvider)); + } +} diff --git a/version4/src/main/java/part1/Server/provider/ServiceProvider.java b/version4/src/main/java/part1/Server/provider/ServiceProvider.java new file mode 100644 index 0000000..30064d1 --- /dev/null +++ b/version4/src/main/java/part1/Server/provider/ServiceProvider.java @@ -0,0 +1,53 @@ +package part1.Server.provider; + +import part1.Server.ratelimit.provider.RateLimitProvider; +import part1.Server.serviceRegister.impl.ZKServiceRegister; +import part1.Server.serviceRegister.ServiceRegister; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/16 17:35 + */ +public class ServiceProvider { + private Map interfaceProvider; + + private int port; + private String host; + //注册服务类 + private ServiceRegister serviceRegister; + //限流器 + private RateLimitProvider rateLimitProvider; + public ServiceProvider(String host,int port){ + //需要传入服务端自身的网络地址 + this.host=host; + this.port=port; + this.interfaceProvider=new HashMap<>(); + this.serviceRegister=new ZKServiceRegister(); + this.rateLimitProvider=new RateLimitProvider(); + } + + public void provideServiceInterface(Object service,boolean canRetry){ + String serviceName=service.getClass().getName(); + Class[] interfaceName=service.getClass().getInterfaces(); + + for (Class clazz:interfaceName){ + //本机的映射表 + interfaceProvider.put(clazz.getName(),service); + //在注册中心注册服务 + serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port),canRetry); + } + } + + public Object getService(String interfaceName){ + return interfaceProvider.get(interfaceName); + } + + public RateLimitProvider getRateLimitProvider(){ + return rateLimitProvider; + } +} diff --git a/version4/src/main/java/part1/Server/ratelimit/RateLimit.java b/version4/src/main/java/part1/Server/ratelimit/RateLimit.java new file mode 100644 index 0000000..08c4254 --- /dev/null +++ b/version4/src/main/java/part1/Server/ratelimit/RateLimit.java @@ -0,0 +1,11 @@ +package part1.Server.ratelimit; + +/** + * @author wxx + * @version 1.0 + * @create 2024/7/2 1:43 + */ +public interface RateLimit { + //获取访问许可 + boolean getToken(); +} diff --git a/version4/src/main/java/part1/Server/ratelimit/impl/TokenBucketRateLimitImpl.java b/version4/src/main/java/part1/Server/ratelimit/impl/TokenBucketRateLimitImpl.java new file mode 100644 index 0000000..6ea71df --- /dev/null +++ b/version4/src/main/java/part1/Server/ratelimit/impl/TokenBucketRateLimitImpl.java @@ -0,0 +1,49 @@ +package part1.Server.ratelimit.impl; + +import part1.Server.ratelimit.RateLimit; + +/** + * @author wxx + * @version 1.0 + * @create 2024/7/2 1:44 + */ +public class TokenBucketRateLimitImpl implements RateLimit { + //令牌产生速率(单位为ms) + private static int RATE; + //桶容量 + private static int CAPACITY; + //当前桶容量 + private volatile int curCapcity; + //时间戳 + private volatile long timeStamp=System.currentTimeMillis(); + public TokenBucketRateLimitImpl(int rate,int capacity){ + RATE=rate; + CAPACITY=capacity; + curCapcity=capacity; + } + @Override + public synchronized boolean getToken() { + //如果当前桶还有剩余,就直接返回 + if(curCapcity>0){ + curCapcity--; + return true; + } + //如果桶无剩余, + long current=System.currentTimeMillis(); + //如果距离上一次的请求的时间大于RATE的时间 + if(current-timeStamp>=RATE){ + //计算这段时间间隔中生成的令牌,如果>2,桶容量加上(计算的令牌-1) + //如果==1,就不做操作(因为这一次操作要消耗一个令牌) + if((current-timeStamp)/RATE>=2){ + curCapcity+=(int)(current-timeStamp)/RATE-1; + } + //保持桶内令牌容量<=10 + if(curCapcity>CAPACITY) curCapcity=CAPACITY; + //刷新时间戳为本次请求 + timeStamp=current; + return true; + } + //获得不到,返回false + return false; + } +} diff --git a/version4/src/main/java/part1/Server/ratelimit/provider/RateLimitProvider.java b/version4/src/main/java/part1/Server/ratelimit/provider/RateLimitProvider.java new file mode 100644 index 0000000..2d3cf86 --- /dev/null +++ b/version4/src/main/java/part1/Server/ratelimit/provider/RateLimitProvider.java @@ -0,0 +1,25 @@ +package part1.Server.ratelimit.provider; + +import part1.Server.ratelimit.RateLimit; +import part1.Server.ratelimit.impl.TokenBucketRateLimitImpl; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2024/7/2 1:45 + */ +public class RateLimitProvider { + private Map rateLimitMap=new HashMap<>(); + + public RateLimit getRateLimit(String interfaceName){ + if(!rateLimitMap.containsKey(interfaceName)){ + RateLimit rateLimit=new TokenBucketRateLimitImpl(100,10); + rateLimitMap.put(interfaceName,rateLimit); + return rateLimit; + } + return rateLimitMap.get(interfaceName); + } +} \ No newline at end of file diff --git a/version4/src/main/java/part1/Server/server/RpcServer.java b/version4/src/main/java/part1/Server/server/RpcServer.java new file mode 100644 index 0000000..7ed9584 --- /dev/null +++ b/version4/src/main/java/part1/Server/server/RpcServer.java @@ -0,0 +1,11 @@ +package part1.Server.server; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:26 + */ +public interface RpcServer { + void start(int port); + void stop(); +} diff --git a/version4/src/main/java/part1/Server/server/impl/NettyRPCRPCServer.java b/version4/src/main/java/part1/Server/server/impl/NettyRPCRPCServer.java new file mode 100644 index 0000000..514b857 --- /dev/null +++ b/version4/src/main/java/part1/Server/server/impl/NettyRPCRPCServer.java @@ -0,0 +1,48 @@ +package part1.Server.server.impl; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.AllArgsConstructor; +import part1.Server.netty.nettyInitializer.NettyServerInitializer; +import part1.Server.provider.ServiceProvider; +import part1.Server.server.RpcServer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 14:01 + */ +@AllArgsConstructor +public class NettyRPCRPCServer implements RpcServer { + private ServiceProvider serviceProvider; + @Override + public void start(int port) { + // netty 服务线程组boss负责建立连接, work负责具体的请求 + NioEventLoopGroup bossGroup = new NioEventLoopGroup(); + NioEventLoopGroup workGroup = new NioEventLoopGroup(); + System.out.println("netty服务端启动了"); + try { + //启动netty服务器 + ServerBootstrap serverBootstrap = new ServerBootstrap(); + //初始化 + serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer(serviceProvider)); + //同步堵塞 + ChannelFuture channelFuture=serverBootstrap.bind(port).sync(); + //死循环监听 + channelFuture.channel().closeFuture().sync(); + }catch (InterruptedException e){ + e.printStackTrace(); + }finally { + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version4/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java b/version4/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java new file mode 100644 index 0000000..bbab53f --- /dev/null +++ b/version4/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java @@ -0,0 +1,39 @@ +package part1.Server.server.impl; + + +import lombok.AllArgsConstructor; +import part1.Server.server.RpcServer; +import part1.Server.server.work.WorkThread; +import part1.Server.provider.ServiceProvider; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:37 + */ +@AllArgsConstructor +public class SimpleRPCRPCServer implements RpcServer { + private ServiceProvider serviceProvide; + @Override + public void start(int port) { + try { + ServerSocket serverSocket=new ServerSocket(port); + System.out.println("服务器启动了"); + while (true) { + Socket socket = serverSocket.accept(); + new Thread(new WorkThread(socket,serviceProvide)).start(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version4/src/main/java/part1/Server/server/work/WorkThread.java b/version4/src/main/java/part1/Server/server/work/WorkThread.java new file mode 100644 index 0000000..064b9c2 --- /dev/null +++ b/version4/src/main/java/part1/Server/server/work/WorkThread.java @@ -0,0 +1,58 @@ +package part1.Server.server.work; + + +import lombok.AllArgsConstructor; +import part1.Server.provider.ServiceProvider; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/14 17:39 + */ +@AllArgsConstructor +public class WorkThread implements Runnable{ + private Socket socket; + private ServiceProvider serviceProvide; + @Override + public void run() { + try { + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + //读取客户端传过来的request + RpcRequest rpcRequest = (RpcRequest) ois.readObject(); + //反射调用服务方法获取返回值 + RpcResponse rpcResponse=getResponse(rpcRequest); + //向客户端写入response + oos.writeObject(rpcResponse); + oos.flush(); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + private RpcResponse getResponse(RpcRequest rpcRequest){ + //得到服务名 + String interfaceName=rpcRequest.getInterfaceName(); + //得到服务端相应服务实现类 + Object service = serviceProvide.getService(interfaceName); + //反射调用方法 + Method method=null; + try { + method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType()); + Object invoke=method.invoke(service,rpcRequest.getParams()); + return RpcResponse.sussess(invoke); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + System.out.println("方法执行错误"); + return RpcResponse.fail(); + } + } +} diff --git a/version4/src/main/java/part1/Server/serviceRegister/ServiceRegister.java b/version4/src/main/java/part1/Server/serviceRegister/ServiceRegister.java new file mode 100644 index 0000000..0f94e4d --- /dev/null +++ b/version4/src/main/java/part1/Server/serviceRegister/ServiceRegister.java @@ -0,0 +1,15 @@ +package part1.Server.serviceRegister; + +import java.net.InetSocketAddress; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 16:58 + */ +// 服务注册接口 +public interface ServiceRegister { + // 注册:保存服务与地址。 + void register(String serviceName, InetSocketAddress serviceAddress,boolean canRetry); + +} diff --git a/version4/src/main/java/part1/Server/serviceRegister/impl/ZKServiceRegister.java b/version4/src/main/java/part1/Server/serviceRegister/impl/ZKServiceRegister.java new file mode 100644 index 0000000..6d4bb6f --- /dev/null +++ b/version4/src/main/java/part1/Server/serviceRegister/impl/ZKServiceRegister.java @@ -0,0 +1,68 @@ +package part1.Server.serviceRegister.impl; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; +import part1.Server.serviceRegister.ServiceRegister; + +import java.net.InetSocketAddress; +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 17:28 + */ +public class ZKServiceRegister implements ServiceRegister { + // curator 提供的zookeeper客户端 + private CuratorFramework client; + //zookeeper根路径节点 + private static final String ROOT_PATH = "MyRPC"; + private static final String RETRY = "CanRetry"; + + //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接 + public ZKServiceRegister(){ + // 指数时间重试 + RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); + // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接 + // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系, + // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍 + // 使用心跳监听状态 + this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") + .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build(); + this.client.start(); + System.out.println("zookeeper 连接成功"); + } + //注册服务到注册中心 + @Override + public void register(String serviceName, InetSocketAddress serviceAddress,boolean canRetry) { + try { + // serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址 + if(client.checkExists().forPath("/" + serviceName) == null){ + client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName); + } + // 路径地址,一个/代表一个节点 + String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress); + // 临时节点,服务器下线就删除节点 + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); + //如果这个服务是幂等性,就增加到节点中 + if (canRetry){ + path ="/"+RETRY+"/"+serviceName; + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); + } + } catch (Exception e) { + System.out.println("此服务已存在"); + } + } + // 地址 -> XXX.XXX.XXX.XXX:port 字符串 + private String getServiceAddress(InetSocketAddress serverAddress) { + return serverAddress.getHostName() + + ":" + + serverAddress.getPort(); + } + // 字符串解析为地址 + private InetSocketAddress parseAddress(String address) { + String[] result = address.split(":"); + return new InetSocketAddress(result[0], Integer.parseInt(result[1])); + } +} diff --git a/version4/src/main/java/part1/common/Message/MessageType.java b/version4/src/main/java/part1/common/Message/MessageType.java new file mode 100644 index 0000000..ebbcc83 --- /dev/null +++ b/version4/src/main/java/part1/common/Message/MessageType.java @@ -0,0 +1,17 @@ +package part1.common.Message; + +import lombok.AllArgsConstructor; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/2 22:29 + */ +@AllArgsConstructor +public enum MessageType { + REQUEST(0),RESPONSE(1); + private int code; + public int getCode(){ + return code; + } +} \ No newline at end of file diff --git a/version4/src/main/java/part1/common/Message/RpcRequest.java b/version4/src/main/java/part1/common/Message/RpcRequest.java new file mode 100644 index 0000000..2642f38 --- /dev/null +++ b/version4/src/main/java/part1/common/Message/RpcRequest.java @@ -0,0 +1,29 @@ +package part1.common.Message; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 18:30 + * 定义发送的消息格式 + */ +@NoArgsConstructor +@AllArgsConstructor +@Data +@Builder +public class RpcRequest implements Serializable { + //服务类名,客户端只知道接口 + private String interfaceName; + //调用的方法名 + private String methodName; + //参数列表 + private Object[] params; + //参数类型 + private Class[] paramsType; +} diff --git a/version4/src/main/java/part1/common/Message/RpcResponse.java b/version4/src/main/java/part1/common/Message/RpcResponse.java new file mode 100644 index 0000000..7a44b45 --- /dev/null +++ b/version4/src/main/java/part1/common/Message/RpcResponse.java @@ -0,0 +1,35 @@ +package part1.common.Message; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 19:18 + */ +@NoArgsConstructor +@AllArgsConstructor +@Data +@Builder +public class RpcResponse implements Serializable { + //状态信息 + private int code; + private String message; + //更新:加入传输数据的类型,以便在自定义序列化器中解析 + private Class dataType; + //具体数据 + private Object data; + + public static RpcResponse sussess(Object data){ + return RpcResponse.builder().code(200).dataType(data.getClass()).data(data).build(); + } + public static RpcResponse fail(){ + return RpcResponse.builder().code(500).message("服务器发生错误").build(); + } +} + diff --git a/version4/src/main/java/part1/common/pojo/User.java b/version4/src/main/java/part1/common/pojo/User.java new file mode 100644 index 0000000..d7cef8c --- /dev/null +++ b/version4/src/main/java/part1/common/pojo/User.java @@ -0,0 +1,25 @@ +package part1.common.pojo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 17:50 + */ +@Builder +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User implements Serializable { + // 客户端和服务端共有的 + private Integer id; + private String userName; + private Boolean sex; +} + diff --git a/version4/src/main/java/part1/common/serializer/myCode/MyDecoder.java b/version4/src/main/java/part1/common/serializer/myCode/MyDecoder.java new file mode 100644 index 0000000..78ed394 --- /dev/null +++ b/version4/src/main/java/part1/common/serializer/myCode/MyDecoder.java @@ -0,0 +1,41 @@ +package part1.common.serializer.myCode; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import part1.common.Message.MessageType; +import part1.common.serializer.mySerializer.Serializer; + +import java.util.List; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/2 22:24 + * 按照自定义的消息格式解码数据 + */ +public class MyDecoder extends ByteToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List out) throws Exception { + //1.读取消息类型 + short messageType = in.readShort(); + // 现在还只支持request与response请求 + if(messageType != MessageType.REQUEST.getCode() && + messageType != MessageType.RESPONSE.getCode()){ + System.out.println("暂不支持此种数据"); + return; + } + //2.读取序列化的方式&类型 + short serializerType = in.readShort(); + Serializer serializer = Serializer.getSerializerByCode(serializerType); + if(serializer == null) + throw new RuntimeException("不存在对应的序列化器"); + //3.读取序列化数组长度 + int length = in.readInt(); + //4.读取序列化数组 + byte[] bytes=new byte[length]; + in.readBytes(bytes); + Object deserialize= serializer.deserialize(bytes, messageType); + out.add(deserialize); + } +} diff --git a/version4/src/main/java/part1/common/serializer/myCode/MyEncoder.java b/version4/src/main/java/part1/common/serializer/myCode/MyEncoder.java new file mode 100644 index 0000000..7ce0fd4 --- /dev/null +++ b/version4/src/main/java/part1/common/serializer/myCode/MyEncoder.java @@ -0,0 +1,41 @@ +package part1.common.serializer.myCode; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import lombok.AllArgsConstructor; +import part1.common.Message.MessageType; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; +import part1.common.serializer.mySerializer.Serializer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/2 22:24 + * 依次按照自定义的消息格式写入,传入的数据为request或者response + * 需要持有一个serialize器,负责将传入的对象序列化成字节数组 + */ +@AllArgsConstructor +public class MyEncoder extends MessageToByteEncoder { + private Serializer serializer; + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + System.out.println(msg.getClass()); + //1.写入消息类型 + if(msg instanceof RpcRequest){ + out.writeShort(MessageType.REQUEST.getCode()); + } + else if(msg instanceof RpcResponse){ + out.writeShort(MessageType.RESPONSE.getCode()); + } + //2.写入序列化方式 + out.writeShort(serializer.getType()); + //得到序列化数组 + byte[] serializeBytes = serializer.serialize(msg); + //3.写入长度 + out.writeInt(serializeBytes.length); + //4.写入序列化数组 + out.writeBytes(serializeBytes); + } +} diff --git a/version4/src/main/java/part1/common/serializer/mySerializer/JsonSerializer.java b/version4/src/main/java/part1/common/serializer/mySerializer/JsonSerializer.java new file mode 100644 index 0000000..39240cf --- /dev/null +++ b/version4/src/main/java/part1/common/serializer/mySerializer/JsonSerializer.java @@ -0,0 +1,65 @@ +package part1.common.serializer.mySerializer; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/2 22:31 + */ +public class JsonSerializer implements Serializer { + @Override + public byte[] serialize(Object obj) { + byte[] bytes = JSONObject.toJSONBytes(obj); + return bytes; + } + + @Override + public Object deserialize(byte[] bytes, int messageType) { + Object obj = null; + // 传输的消息分为request与response + switch (messageType){ + case 0: + RpcRequest request = JSON.parseObject(bytes, RpcRequest.class); + Object[] objects = new Object[request.getParams().length]; + // 把json字串转化成对应的对象, fastjson可以读出基本数据类型,不用转化 + // 对转换后的request中的params属性逐个进行类型判断 + for(int i = 0; i < objects.length; i++){ + Class paramsType = request.getParamsType()[i]; + //判断每个对象类型是否和paramsTypes中的一致 + if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())){ + //如果不一致,就行进行类型转换 + objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i],request.getParamsType()[i]); + }else{ + //如果一致就直接赋给objects[i] + objects[i] = request.getParams()[i]; + } + } + request.setParams(objects); + obj = request; + break; + case 1: + RpcResponse response = JSON.parseObject(bytes, RpcResponse.class); + Class dataType = response.getDataType(); + //判断转化后的response对象中的data的类型是否正确 + if(! dataType.isAssignableFrom(response.getData().getClass())){ + response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType)); + } + obj = response; + break; + default: + System.out.println("暂时不支持此种消息"); + throw new RuntimeException(); + } + return obj; + } + + //1 代表json序列化方式 + @Override + public int getType() { + return 1; + } +} diff --git a/version4/src/main/java/part1/common/serializer/mySerializer/ObjectSerializer.java b/version4/src/main/java/part1/common/serializer/mySerializer/ObjectSerializer.java new file mode 100644 index 0000000..3eec03c --- /dev/null +++ b/version4/src/main/java/part1/common/serializer/mySerializer/ObjectSerializer.java @@ -0,0 +1,54 @@ +package part1.common.serializer.mySerializer; + +import java.io.*; + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/2 22:36 + */ +public class ObjectSerializer implements Serializer { + //利用Java io 对象 -》字节数组 + @Override + public byte[] serialize(Object obj) { + byte[] bytes=null; + ByteArrayOutputStream bos=new ByteArrayOutputStream(); + try { + //是一个对象输出流,用于将 Java 对象序列化为字节流,并将其连接到bos上 + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + //刷新 ObjectOutputStream,确保所有缓冲区中的数据都被写入到底层流中。 + oos.flush(); + //将bos其内部缓冲区中的数据转换为字节数组 + bytes = bos.toByteArray(); + oos.close(); + bos.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return bytes; + } + + //字节数组 -》对象 + @Override + public Object deserialize(byte[] bytes, int messageType) { + Object obj = null; + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + try { + ObjectInputStream ois = new ObjectInputStream(bis); + obj = ois.readObject(); + ois.close(); + bis.close(); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + return obj; + } + + //0 代表Java 原生序列器 + @Override + public int getType() { + return 0; + } +} + diff --git a/version4/src/main/java/part1/common/serializer/mySerializer/Serializer.java b/version4/src/main/java/part1/common/serializer/mySerializer/Serializer.java new file mode 100644 index 0000000..e0829d0 --- /dev/null +++ b/version4/src/main/java/part1/common/serializer/mySerializer/Serializer.java @@ -0,0 +1,29 @@ +package part1.common.serializer.mySerializer; + + +/** + * @author wxx + * @version 1.0 + * @create 2024/6/2 22:31 + */ +public interface Serializer { + // 把对象序列化成字节数组 + byte[] serialize(Object obj); + // 从字节数组反序列化成消息, 使用java自带序列化方式不用messageType也能得到相应的对象(序列化字节数组里包含类信息) + // 其它方式需指定消息格式,再根据message转化成相应的对象 + Object deserialize(byte[] bytes, int messageType); + // 返回使用的序列器,是哪个 + // 0:java自带序列化方式, 1: json序列化方式 + int getType(); + // 根据序号取出序列化器,暂时有两种实现方式,需要其它方式,实现这个接口即可 + static Serializer getSerializerByCode(int code){ + switch (code){ + case 0: + return new ObjectSerializer(); + case 1: + return new JsonSerializer(); + default: + return null; + } + } +} diff --git a/version4/src/main/java/part1/common/service/Impl/UserServiceImpl.java b/version4/src/main/java/part1/common/service/Impl/UserServiceImpl.java new file mode 100644 index 0000000..373243b --- /dev/null +++ b/version4/src/main/java/part1/common/service/Impl/UserServiceImpl.java @@ -0,0 +1,32 @@ +package part1.common.service.Impl; + + +import part1.common.pojo.User; +import part1.common.service.UserService; + +import java.util.Random; +import java.util.UUID; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:28 + */ +public class UserServiceImpl implements UserService { + @Override + public User getUserByUserId(Integer id) { + System.out.println("客户端查询了"+id+"的用户"); + // 模拟从数据库中取用户的行为 + Random random = new Random(); + User user = User.builder().userName(UUID.randomUUID().toString()) + .id(id) + .sex(random.nextBoolean()).build(); + return user; + } + + @Override + public Integer insertUserId(User user) { + System.out.println("插入数据成功"+user.getUserName()); + return user.getId(); + } +} \ No newline at end of file diff --git a/version4/src/main/java/part1/common/service/UserService.java b/version4/src/main/java/part1/common/service/UserService.java new file mode 100644 index 0000000..0e539bd --- /dev/null +++ b/version4/src/main/java/part1/common/service/UserService.java @@ -0,0 +1,16 @@ +package part1.common.service; + + +import part1.common.pojo.User; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:27 + */ +public interface UserService { + // 客户端通过这个接口调用服务端的实现类 + User getUserByUserId(Integer id); + //新增一个功能 + Integer insertUserId(User user); +} diff --git a/version4/src/main/resources/log4j.properties b/version4/src/main/resources/log4j.properties new file mode 100644 index 0000000..4e012fe --- /dev/null +++ b/version4/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +log4j.rootLogger=ERROR, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender + +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n