From ec1f961669722cb41abb0312b2b2c7ec48433139 Mon Sep 17 00:00:00 2001
From: Wxx <2563806166@qq.com>
Date: Thu, 20 Jun 2024 20:24:50 +0800
Subject: [PATCH] =?UTF-8?q?version3-=E9=85=8D=E7=BD=AE=E4=B8=AD=E5=BF=83?=
=?UTF-8?q?=EF=BC=8C=E8=B6=85=E6=97=B6=E9=87=8D=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
version3/pom.xml | 6 +
.../main/java/part2/Client/TestClient.java | 28 +++++
.../java/part2/Client/cache/serviceCache.java | 44 +++++++
.../netty/handler/NettyClientHandler.java | 27 ++++
.../NettyClientInitializer.java | 26 ++++
.../java/part2/Client/proxy/ClientProxy.java | 55 ++++++++
.../java/part2/Client/retry/guavaRetry.java | 43 +++++++
.../part2/Client/rpcClient/RpcClient.java | 15 +++
.../Client/rpcClient/impl/NettyRpcClient.java | 68 ++++++++++
.../rpcClient/impl/SimpleSocketRpcCilent.java | 41 ++++++
.../Client/serviceCenter/ServiceCenter.java | 16 +++
.../Client/serviceCenter/ZKServiceCenter.java | 93 ++++++++++++++
.../serviceCenter/ZkWatcher/watchZK.java | 82 ++++++++++++
.../serviceCenter/balance/LoadBalance.java | 15 +++
.../balance/impl/ConsistencyHashBalance.java | 119 ++++++++++++++++++
.../balance/impl/RandomLoadBalance.java | 24 ++++
.../balance/impl/RoundLoadBalance.java | 24 ++++
.../main/java/part2/Server/TestServer.java | 27 ++++
.../netty/handler/NettyRPCServerHandler.java | 51 ++++++++
.../NettyServerInitializer.java | 30 +++++
.../Server/provider/ServiceProvider.java | 47 +++++++
.../java/part2/Server/server/RpcServer.java | 11 ++
.../Server/server/impl/NettyRPCRPCServer.java | 48 +++++++
.../server/impl/SimpleRPCRPCServer.java | 39 ++++++
.../part2/Server/server/work/WorkThread.java | 58 +++++++++
.../serviceRegister/ServiceRegister.java | 15 +++
.../impl/ZKServiceRegister.java | 68 ++++++++++
.../part2/common/Message/MessageType.java | 17 +++
.../java/part2/common/Message/RpcRequest.java | 29 +++++
.../part2/common/Message/RpcResponse.java | 35 ++++++
.../src/main/java/part2/common/pojo/User.java | 25 ++++
.../common/serializer/myCode/MyDecoder.java | 41 ++++++
.../common/serializer/myCode/MyEncoder.java | 41 ++++++
.../mySerializer/JsonSerializer.java | 65 ++++++++++
.../mySerializer/ObjectSerializer.java | 54 ++++++++
.../serializer/mySerializer/Serializer.java | 29 +++++
.../common/service/Impl/UserServiceImpl.java | 32 +++++
.../part2/common/service/UserService.java | 16 +++
version4/pom.xml | 17 +++
39 files changed, 1521 insertions(+)
create mode 100644 version3/src/main/java/part2/Client/TestClient.java
create mode 100644 version3/src/main/java/part2/Client/cache/serviceCache.java
create mode 100644 version3/src/main/java/part2/Client/netty/handler/NettyClientHandler.java
create mode 100644 version3/src/main/java/part2/Client/netty/nettyInitializer/NettyClientInitializer.java
create mode 100644 version3/src/main/java/part2/Client/proxy/ClientProxy.java
create mode 100644 version3/src/main/java/part2/Client/retry/guavaRetry.java
create mode 100644 version3/src/main/java/part2/Client/rpcClient/RpcClient.java
create mode 100644 version3/src/main/java/part2/Client/rpcClient/impl/NettyRpcClient.java
create mode 100644 version3/src/main/java/part2/Client/rpcClient/impl/SimpleSocketRpcCilent.java
create mode 100644 version3/src/main/java/part2/Client/serviceCenter/ServiceCenter.java
create mode 100644 version3/src/main/java/part2/Client/serviceCenter/ZKServiceCenter.java
create mode 100644 version3/src/main/java/part2/Client/serviceCenter/ZkWatcher/watchZK.java
create mode 100644 version3/src/main/java/part2/Client/serviceCenter/balance/LoadBalance.java
create mode 100644 version3/src/main/java/part2/Client/serviceCenter/balance/impl/ConsistencyHashBalance.java
create mode 100644 version3/src/main/java/part2/Client/serviceCenter/balance/impl/RandomLoadBalance.java
create mode 100644 version3/src/main/java/part2/Client/serviceCenter/balance/impl/RoundLoadBalance.java
create mode 100644 version3/src/main/java/part2/Server/TestServer.java
create mode 100644 version3/src/main/java/part2/Server/netty/handler/NettyRPCServerHandler.java
create mode 100644 version3/src/main/java/part2/Server/netty/nettyInitializer/NettyServerInitializer.java
create mode 100644 version3/src/main/java/part2/Server/provider/ServiceProvider.java
create mode 100644 version3/src/main/java/part2/Server/server/RpcServer.java
create mode 100644 version3/src/main/java/part2/Server/server/impl/NettyRPCRPCServer.java
create mode 100644 version3/src/main/java/part2/Server/server/impl/SimpleRPCRPCServer.java
create mode 100644 version3/src/main/java/part2/Server/server/work/WorkThread.java
create mode 100644 version3/src/main/java/part2/Server/serviceRegister/ServiceRegister.java
create mode 100644 version3/src/main/java/part2/Server/serviceRegister/impl/ZKServiceRegister.java
create mode 100644 version3/src/main/java/part2/common/Message/MessageType.java
create mode 100644 version3/src/main/java/part2/common/Message/RpcRequest.java
create mode 100644 version3/src/main/java/part2/common/Message/RpcResponse.java
create mode 100644 version3/src/main/java/part2/common/pojo/User.java
create mode 100644 version3/src/main/java/part2/common/serializer/myCode/MyDecoder.java
create mode 100644 version3/src/main/java/part2/common/serializer/myCode/MyEncoder.java
create mode 100644 version3/src/main/java/part2/common/serializer/mySerializer/JsonSerializer.java
create mode 100644 version3/src/main/java/part2/common/serializer/mySerializer/ObjectSerializer.java
create mode 100644 version3/src/main/java/part2/common/serializer/mySerializer/Serializer.java
create mode 100644 version3/src/main/java/part2/common/service/Impl/UserServiceImpl.java
create mode 100644 version3/src/main/java/part2/common/service/UserService.java
create mode 100644 version4/pom.xml
diff --git a/version3/pom.xml b/version3/pom.xml
index 1762da7..2484083 100644
--- a/version3/pom.xml
+++ b/version3/pom.xml
@@ -48,5 +48,11 @@
fastjson
1.2.83
+
+ com.github.rholder
+ guava-retrying
+ 2.0.0
+
+
\ No newline at end of file
diff --git a/version3/src/main/java/part2/Client/TestClient.java b/version3/src/main/java/part2/Client/TestClient.java
new file mode 100644
index 0000000..f679b83
--- /dev/null
+++ b/version3/src/main/java/part2/Client/TestClient.java
@@ -0,0 +1,28 @@
+package part2.Client;
+
+
+import part2.Client.proxy.ClientProxy;
+import part2.common.pojo.User;
+import part2.common.service.UserService;
+
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/6 18:39
+ */
+
+public class TestClient {
+ public static void main(String[] args) throws InterruptedException {
+ ClientProxy clientProxy=new ClientProxy();
+ //ClientProxy clientProxy=new proxy.Client.part1.ClientProxy("127.0.0.1",9999,0);
+ UserService proxy=clientProxy.getProxy(UserService.class);
+
+ User user = proxy.getUserByUserId(1);
+ System.out.println("从服务端得到的user="+user.toString());
+
+ User u=User.builder().id(100).userName("wxx").sex(true).build();
+ Integer id = proxy.insertUserId(u);
+ System.out.println("向服务端插入user的id"+id);
+ }
+}
diff --git a/version3/src/main/java/part2/Client/cache/serviceCache.java b/version3/src/main/java/part2/Client/cache/serviceCache.java
new file mode 100644
index 0000000..4d42097
--- /dev/null
+++ b/version3/src/main/java/part2/Client/cache/serviceCache.java
@@ -0,0 +1,44 @@
+package part2.Client.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/4 0:57
+ */
+public class serviceCache {
+ //key: serviceName 服务名
+ //value: addressList 服务提供者列表
+ private static Map> cache=new HashMap<>();
+
+ //添加服务
+ public void addServcieToCache(String serviceName,String address){
+ if(cache.containsKey(serviceName)){
+ List addressList = cache.get(serviceName);
+ addressList.add(address);
+ System.out.println("将name为"+serviceName+"和地址为"+address+"的服务添加到本地缓存中");
+ }else {
+ List addressList=new ArrayList<>();
+ addressList.add(address);
+ cache.put(serviceName,addressList);
+ }
+ }
+ //从缓存中取服务地址列表
+ public List getServiceListFromCache(String serviceName){
+ if(!cache.containsKey(serviceName)) {
+ return null;
+ }
+ return cache.get(serviceName);
+ }
+
+ //从缓存中删除服务地址
+ public void delete(String serviceName,String address){
+ List addressList = cache.get(serviceName);
+ addressList.remove(address);
+ System.out.println("将name为"+serviceName+"和地址为"+address+"的服务从本地缓存中删除");
+ }
+}
diff --git a/version3/src/main/java/part2/Client/netty/handler/NettyClientHandler.java b/version3/src/main/java/part2/Client/netty/handler/NettyClientHandler.java
new file mode 100644
index 0000000..95eaf28
--- /dev/null
+++ b/version3/src/main/java/part2/Client/netty/handler/NettyClientHandler.java
@@ -0,0 +1,27 @@
+package part2.Client.netty.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeKey;
+import part2.common.Message.RpcResponse;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 17:29
+ */
+public class NettyClientHandler extends SimpleChannelInboundHandler {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
+ // 接收到response, 给channel设计别名,让sendRequest里读取response
+ AttributeKey key = AttributeKey.valueOf("RPCResponse");
+ ctx.channel().attr(key).set(response);
+ ctx.channel().close();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+}
diff --git a/version3/src/main/java/part2/Client/netty/nettyInitializer/NettyClientInitializer.java b/version3/src/main/java/part2/Client/netty/nettyInitializer/NettyClientInitializer.java
new file mode 100644
index 0000000..175c729
--- /dev/null
+++ b/version3/src/main/java/part2/Client/netty/nettyInitializer/NettyClientInitializer.java
@@ -0,0 +1,26 @@
+package part2.Client.netty.nettyInitializer;
+
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import part2.Client.netty.handler.NettyClientHandler;
+import part2.common.serializer.myCode.MyDecoder;
+import part2.common.serializer.myCode.MyEncoder;
+import part2.common.serializer.mySerializer.JsonSerializer;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 17:26
+ */
+public class NettyClientInitializer extends ChannelInitializer {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ //使用自定义的编/解码器
+ pipeline.addLast(new MyEncoder(new JsonSerializer()));
+ pipeline.addLast(new MyDecoder());
+ pipeline.addLast(new NettyClientHandler());
+ }
+}
diff --git a/version3/src/main/java/part2/Client/proxy/ClientProxy.java b/version3/src/main/java/part2/Client/proxy/ClientProxy.java
new file mode 100644
index 0000000..289c7fc
--- /dev/null
+++ b/version3/src/main/java/part2/Client/proxy/ClientProxy.java
@@ -0,0 +1,55 @@
+package part2.Client.proxy;
+
+
+import part2.Client.retry.guavaRetry;
+import part2.Client.rpcClient.RpcClient;
+import part2.Client.rpcClient.impl.NettyRpcClient;
+import part2.Client.serviceCenter.ServiceCenter;
+import part2.Client.serviceCenter.ZKServiceCenter;
+import part2.common.Message.RpcRequest;
+import part2.common.Message.RpcResponse;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/6 16:49
+ */
+public class ClientProxy implements InvocationHandler {
+ //传入参数service接口的class对象,反射封装成一个request
+
+ private RpcClient rpcClient;
+ private ServiceCenter serviceCenter;
+ public ClientProxy() throws InterruptedException {
+ serviceCenter=new ZKServiceCenter();
+ rpcClient=new NettyRpcClient(serviceCenter);
+ }
+
+ //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ //构建request
+ RpcRequest request=RpcRequest.builder()
+ .interfaceName(method.getDeclaringClass().getName())
+ .methodName(method.getName())
+ .params(args).paramsType(method.getParameterTypes()).build();
+ //数据传输
+ RpcResponse response;
+ //后续添加逻辑:为保持幂等性,只对白名单上的服务进行重试
+ if (serviceCenter.checkRetry(request.getInterfaceName())){
+ //调用retry框架进行重试操作
+ response=new guavaRetry().sendServiceWithRetry(request,rpcClient);
+ }else {
+ //只调用一次
+ response= rpcClient.sendRequest(request);
+ }
+ return response.getData();
+ }
+ public T getProxy(Class clazz){
+ Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
+ return (T)o;
+ }
+}
diff --git a/version3/src/main/java/part2/Client/retry/guavaRetry.java b/version3/src/main/java/part2/Client/retry/guavaRetry.java
new file mode 100644
index 0000000..2fc5bfd
--- /dev/null
+++ b/version3/src/main/java/part2/Client/retry/guavaRetry.java
@@ -0,0 +1,43 @@
+package part2.Client.retry;
+
+import com.github.rholder.retry.*;
+import part2.Client.rpcClient.RpcClient;
+import part2.common.Message.RpcRequest;
+import part2.common.Message.RpcResponse;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/20 19:28
+ */
+public class guavaRetry {
+ private RpcClient rpcClient;
+ public RpcResponse sendServiceWithRetry(RpcRequest request, RpcClient rpcClient) {
+ this.rpcClient=rpcClient;
+ Retryer retryer = RetryerBuilder.newBuilder()
+ //无论出现什么异常,都进行重试
+ .retryIfException()
+ //返回结果为 error时进行重试
+ .retryIfResult(response -> Objects.equals(response.getCode(), 500))
+ //重试等待策略:等待 2s 后再进行重试
+ .withWaitStrategy(WaitStrategies.fixedWait(2, TimeUnit.SECONDS))
+ //重试停止策略:重试达到 3 次
+ .withStopStrategy(StopStrategies.stopAfterAttempt(3))
+ .withRetryListener(new RetryListener() {
+ @Override
+ public void onRetry(Attempt attempt) {
+ System.out.println("RetryListener: 第" + attempt.getAttemptNumber() + "次调用");
+ }
+ })
+ .build();
+ try {
+ return retryer.call(() -> rpcClient.sendRequest(request));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return RpcResponse.fail();
+ }
+}
\ No newline at end of file
diff --git a/version3/src/main/java/part2/Client/rpcClient/RpcClient.java b/version3/src/main/java/part2/Client/rpcClient/RpcClient.java
new file mode 100644
index 0000000..21690b1
--- /dev/null
+++ b/version3/src/main/java/part2/Client/rpcClient/RpcClient.java
@@ -0,0 +1,15 @@
+package part2.Client.rpcClient;
+
+import part2.common.Message.RpcRequest;
+import part2.common.Message.RpcResponse;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/2 18:55
+ */
+public interface RpcClient {
+
+ //定义底层通信的方法
+ RpcResponse sendRequest(RpcRequest request);
+}
diff --git a/version3/src/main/java/part2/Client/rpcClient/impl/NettyRpcClient.java b/version3/src/main/java/part2/Client/rpcClient/impl/NettyRpcClient.java
new file mode 100644
index 0000000..1f5d026
--- /dev/null
+++ b/version3/src/main/java/part2/Client/rpcClient/impl/NettyRpcClient.java
@@ -0,0 +1,68 @@
+package part2.Client.rpcClient.impl;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.AttributeKey;
+import part2.Client.netty.nettyInitializer.NettyClientInitializer;
+import part2.Client.rpcClient.RpcClient;
+import part2.Client.serviceCenter.ServiceCenter;
+import part2.Client.serviceCenter.ZKServiceCenter;
+import part2.common.Message.RpcRequest;
+import part2.common.Message.RpcResponse;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/2 19:40
+ */
+public class NettyRpcClient implements RpcClient {
+
+ private static final Bootstrap bootstrap;
+ private static final EventLoopGroup eventLoopGroup;
+
+ private ServiceCenter serviceCenter;
+ public NettyRpcClient(ServiceCenter serviceCenter) throws InterruptedException {
+ this.serviceCenter=serviceCenter;
+ }
+
+ //netty客户端初始化
+ static {
+ eventLoopGroup = new NioEventLoopGroup();
+ bootstrap = new Bootstrap();
+ bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+ .handler(new NettyClientInitializer());
+ }
+ @Override
+ public RpcResponse sendRequest(RpcRequest request) {
+ //从注册中心获取host,post
+ InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
+ String host = address.getHostName();
+ int port = address.getPort();
+ try {
+ ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
+ Channel channel = channelFuture.channel();
+ // 发送数据
+ channel.writeAndFlush(request);
+ //sync()堵塞获取结果
+ channel.closeFuture().sync();
+ // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
+ // AttributeKey是,线程隔离的,不会由线程安全问题。
+ // 当前场景下选择堵塞获取结果
+ // 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
+ AttributeKey key = AttributeKey.valueOf("RPCResponse");
+ RpcResponse response = channel.attr(key).get();
+
+ System.out.println(response);
+ return response;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/version3/src/main/java/part2/Client/rpcClient/impl/SimpleSocketRpcCilent.java b/version3/src/main/java/part2/Client/rpcClient/impl/SimpleSocketRpcCilent.java
new file mode 100644
index 0000000..8faae62
--- /dev/null
+++ b/version3/src/main/java/part2/Client/rpcClient/impl/SimpleSocketRpcCilent.java
@@ -0,0 +1,41 @@
+package part2.Client.rpcClient.impl;
+
+import part2.Client.rpcClient.RpcClient;
+import part2.common.Message.RpcRequest;
+import part2.common.Message.RpcResponse;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/2 18:58
+ */
+public class SimpleSocketRpcCilent implements RpcClient {
+ private String host;
+ private int port;
+ public SimpleSocketRpcCilent(String host,int port){
+ this.host=host;
+ this.port=port;
+ }
+ @Override
+ public RpcResponse sendRequest(RpcRequest request) {
+ try {
+ Socket socket=new Socket(host, port);
+ ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
+ ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
+
+ oos.writeObject(request);
+ oos.flush();
+
+ RpcResponse response=(RpcResponse) ois.readObject();
+ return response;
+ } catch (IOException | ClassNotFoundException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+}
diff --git a/version3/src/main/java/part2/Client/serviceCenter/ServiceCenter.java b/version3/src/main/java/part2/Client/serviceCenter/ServiceCenter.java
new file mode 100644
index 0000000..ab72194
--- /dev/null
+++ b/version3/src/main/java/part2/Client/serviceCenter/ServiceCenter.java
@@ -0,0 +1,16 @@
+package part2.Client.serviceCenter;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 21:42
+ */
+//服务中心接口
+public interface ServiceCenter {
+ // 查询:根据服务名查找地址
+ InetSocketAddress serviceDiscovery(String serviceName);
+ //判断是否可重试
+ boolean checkRetry(String serviceName) ;
+}
diff --git a/version3/src/main/java/part2/Client/serviceCenter/ZKServiceCenter.java b/version3/src/main/java/part2/Client/serviceCenter/ZKServiceCenter.java
new file mode 100644
index 0000000..4befc04
--- /dev/null
+++ b/version3/src/main/java/part2/Client/serviceCenter/ZKServiceCenter.java
@@ -0,0 +1,93 @@
+package part2.Client.serviceCenter;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import part2.Client.cache.serviceCache;
+import part2.Client.serviceCenter.ZkWatcher.watchZK;
+import part2.Client.serviceCenter.balance.impl.ConsistencyHashBalance;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 21:41
+ */
+public class ZKServiceCenter implements ServiceCenter{
+ // curator 提供的zookeeper客户端
+ private CuratorFramework client;
+ //zookeeper根路径节点
+ private static final String ROOT_PATH = "MyRPC";
+ private static final String RETRY = "CanRetry";
+ //serviceCache
+ private serviceCache cache;
+
+ //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接
+ public ZKServiceCenter() throws InterruptedException {
+ // 指数时间重试
+ RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
+ // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接
+ // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
+ // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
+ // 使用心跳监听状态
+ this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
+ .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
+ this.client.start();
+ System.out.println("zookeeper 连接成功");
+ //初始化本地缓存
+ cache=new serviceCache();
+ //加入zookeeper事件监听器
+ watchZK watcher=new watchZK(client,cache);
+ //监听启动
+ watcher.watchToUpdate(ROOT_PATH);
+ }
+ //根据服务名(接口名)返回地址
+ @Override
+ public InetSocketAddress serviceDiscovery(String serviceName) {
+ try {
+ //先从本地缓存中找
+ List addressList=cache.getServiceListFromCache(serviceName);
+ //如果找不到,再去zookeeper中找
+ //这种i情况基本不会发生,或者说只会出现在初始化阶段
+ if(addressList==null) {
+ addressList=client.getChildren().forPath("/" + serviceName);
+ }
+ // 负载均衡得到地址
+ String address = new ConsistencyHashBalance().balance(addressList);
+ return parseAddress(address);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ //
+ public boolean checkRetry(String serviceName) {
+ boolean canRetry =false;
+ try {
+ List serviceList = client.getChildren().forPath("/" + RETRY);
+ for(String s:serviceList){
+ if(s.equals(serviceName)){
+ System.out.println("服务"+serviceName+"在白名单上,可进行重试");
+ canRetry=true;
+ }
+ }
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ return canRetry;
+ }
+ // 地址 -> XXX.XXX.XXX.XXX:port 字符串
+ private String getServiceAddress(InetSocketAddress serverAddress) {
+ return serverAddress.getHostName() +
+ ":" +
+ serverAddress.getPort();
+ }
+ // 字符串解析为地址
+ private InetSocketAddress parseAddress(String address) {
+ String[] result = address.split(":");
+ return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
+ }
+}
diff --git a/version3/src/main/java/part2/Client/serviceCenter/ZkWatcher/watchZK.java b/version3/src/main/java/part2/Client/serviceCenter/ZkWatcher/watchZK.java
new file mode 100644
index 0000000..61d6a9c
--- /dev/null
+++ b/version3/src/main/java/part2/Client/serviceCenter/ZkWatcher/watchZK.java
@@ -0,0 +1,82 @@
+package part2.Client.serviceCenter.ZkWatcher;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import part2.Client.cache.serviceCache;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/4 1:00
+ */
+public class watchZK {
+ // curator 提供的zookeeper客户端
+ private CuratorFramework client;
+ //本地缓存
+ serviceCache cache;
+
+ public watchZK(CuratorFramework client,serviceCache cache){
+ this.client=client;
+ this.cache=cache;
+ }
+
+ /**
+ * 监听当前节点和子节点的 更新,创建,删除
+ * @param path
+ */
+ public void watchToUpdate(String path) throws InterruptedException {
+ CuratorCache curatorCache = CuratorCache.build(client, "/");
+ curatorCache.listenable().addListener(new CuratorCacheListener() {
+ @Override
+ public void event(Type type, ChildData childData, ChildData childData1) {
+ // 第一个参数:事件类型(枚举)
+ // 第二个参数:节点更新前的状态、数据
+ // 第三个参数:节点更新后的状态、数据
+ // 创建节点时:节点刚被创建,不存在 更新前节点 ,所以第二个参数为 null
+ // 删除节点时:节点被删除,不存在 更新后节点 ,所以第三个参数为 null
+ // 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据
+ switch (type.name()) {
+ case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
+ //获取更新的节点的路径
+ String path=new String(childData1.getPath());
+ //按照格式 ,读取
+ String[] pathList= path.split("/");
+ if(pathList.length<=2) break;
+ else {
+ String serviceName=pathList[1];
+ String address=pathList[2];
+ //将新注册的服务加入到本地缓存中
+ cache.addServcieToCache(serviceName,address);
+ }
+ break;
+ case "NODE_CHANGED": // 节点更新
+ if (childData.getData() != null) {
+ System.out.println("修改前的数据: " + new String(childData.getData()));
+ } else {
+ System.out.println("节点第一次赋值!");
+ }
+ System.out.println("修改后的数据: " + new String(childData1.getData()));
+ break;
+ case "NODE_DELETED": // 节点删除
+ String path_d=new String(childData.getPath());
+ //按照格式 ,读取
+ String[] pathList_d= path_d.split("/");
+ if(pathList_d.length<=2) break;
+ else {
+ String serviceName=pathList_d[1];
+ String address=pathList_d[2];
+ //将新注册的服务加入到本地缓存中
+ cache.delete(serviceName,address);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ });
+ //开启监听
+ curatorCache.start();
+ }
+}
\ No newline at end of file
diff --git a/version3/src/main/java/part2/Client/serviceCenter/balance/LoadBalance.java b/version3/src/main/java/part2/Client/serviceCenter/balance/LoadBalance.java
new file mode 100644
index 0000000..ef28246
--- /dev/null
+++ b/version3/src/main/java/part2/Client/serviceCenter/balance/LoadBalance.java
@@ -0,0 +1,15 @@
+package part2.Client.serviceCenter.balance;
+
+import java.util.List;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/19 21:00
+ * 给服务地址列表,根据不同的负载均衡策略选择一个
+ */
+public interface LoadBalance {
+ String balance(List addressList);
+ void addNode(String node) ;
+ void delNode(String node);
+}
\ No newline at end of file
diff --git a/version3/src/main/java/part2/Client/serviceCenter/balance/impl/ConsistencyHashBalance.java b/version3/src/main/java/part2/Client/serviceCenter/balance/impl/ConsistencyHashBalance.java
new file mode 100644
index 0000000..a67ecc9
--- /dev/null
+++ b/version3/src/main/java/part2/Client/serviceCenter/balance/impl/ConsistencyHashBalance.java
@@ -0,0 +1,119 @@
+package part2.Client.serviceCenter.balance.impl;
+
+import part2.Client.serviceCenter.balance.LoadBalance;
+
+import java.util.*;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/19 21:16
+ * 一致性哈希算法 负载均衡
+ */
+public class ConsistencyHashBalance implements LoadBalance {
+ // 虚拟节点的个数
+ private static final int VIRTUAL_NUM = 5;
+
+ // 虚拟节点分配,key是hash值,value是虚拟节点服务器名称
+ private static SortedMap shards = new TreeMap();
+
+ // 真实节点列表
+ private static List realNodes = new LinkedList();
+
+ //模拟初始服务器
+ private static String[] servers =null;
+
+ private static void init(List serviceList) {
+ for (String server :serviceList) {
+ realNodes.add(server);
+ System.out.println("真实节点[" + server + "] 被添加");
+ for (int i = 0; i < VIRTUAL_NUM; i++) {
+ String virtualNode = server + "&&VN" + i;
+ int hash = getHash(virtualNode);
+ shards.put(hash, virtualNode);
+ System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加");
+ }
+ }
+ }
+ /**
+ * 获取被分配的节点名
+ *
+ * @param node
+ * @return
+ */
+ public static String getServer(String node,List serviceList) {
+ init(serviceList);
+ int hash = getHash(node);
+ Integer key = null;
+ SortedMap subMap = shards.tailMap(hash);
+ if (subMap.isEmpty()) {
+ key = shards.lastKey();
+ } else {
+ key = subMap.firstKey();
+ }
+ String virtualNode = shards.get(key);
+ return virtualNode.substring(0, virtualNode.indexOf("&&"));
+ }
+
+ /**
+ * 添加节点
+ *
+ * @param node
+ */
+ public void addNode(String node) {
+ if (!realNodes.contains(node)) {
+ realNodes.add(node);
+ System.out.println("真实节点[" + node + "] 上线添加");
+ for (int i = 0; i < VIRTUAL_NUM; i++) {
+ String virtualNode = node + "&&VN" + i;
+ int hash = getHash(virtualNode);
+ shards.put(hash, virtualNode);
+ System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加");
+ }
+ }
+ }
+
+ /**
+ * 删除节点
+ *
+ * @param node
+ */
+ public void delNode(String node) {
+ if (realNodes.contains(node)) {
+ realNodes.remove(node);
+ System.out.println("真实节点[" + node + "] 下线移除");
+ for (int i = 0; i < VIRTUAL_NUM; i++) {
+ String virtualNode = node + "&&VN" + i;
+ int hash = getHash(virtualNode);
+ shards.remove(hash);
+ System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被移除");
+ }
+ }
+ }
+
+ /**
+ * FNV1_32_HASH算法
+ */
+ private static int getHash(String str) {
+ final int p = 16777619;
+ int hash = (int) 2166136261L;
+ for (int i = 0; i < str.length(); i++)
+ hash = (hash ^ str.charAt(i)) * p;
+ hash += hash << 13;
+ hash ^= hash >> 7;
+ hash += hash << 3;
+ hash ^= hash >> 17;
+ hash += hash << 5;
+ // 如果算出来的值为负数则取其绝对值
+ if (hash < 0)
+ hash = Math.abs(hash);
+ return hash;
+ }
+
+ @Override
+ public String balance(List addressList) {
+ String random= UUID.randomUUID().toString();
+ return getServer(random,addressList);
+ }
+
+}
diff --git a/version3/src/main/java/part2/Client/serviceCenter/balance/impl/RandomLoadBalance.java b/version3/src/main/java/part2/Client/serviceCenter/balance/impl/RandomLoadBalance.java
new file mode 100644
index 0000000..a45602a
--- /dev/null
+++ b/version3/src/main/java/part2/Client/serviceCenter/balance/impl/RandomLoadBalance.java
@@ -0,0 +1,24 @@
+package part2.Client.serviceCenter.balance.impl;
+
+import part2.Client.serviceCenter.balance.LoadBalance;
+
+import java.util.List;
+import java.util.Random;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/19 21:20
+ * 随机 负载均衡
+ */
+public class RandomLoadBalance implements LoadBalance {
+ @Override
+ public String balance(List addressList) {
+ Random random=new Random();
+ int choose = random.nextInt(addressList.size());
+ System.out.println("负载均衡选择了"+choose+"服务器");
+ return null;
+ }
+ public void addNode(String node){} ;
+ public void delNode(String node){};
+}
diff --git a/version3/src/main/java/part2/Client/serviceCenter/balance/impl/RoundLoadBalance.java b/version3/src/main/java/part2/Client/serviceCenter/balance/impl/RoundLoadBalance.java
new file mode 100644
index 0000000..6748fb4
--- /dev/null
+++ b/version3/src/main/java/part2/Client/serviceCenter/balance/impl/RoundLoadBalance.java
@@ -0,0 +1,24 @@
+package part2.Client.serviceCenter.balance.impl;
+
+import part2.Client.serviceCenter.balance.LoadBalance;
+
+import java.util.List;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/19 21:21
+ * 轮询 负载均衡
+ */
+public class RoundLoadBalance implements LoadBalance {
+ private int choose=-1;
+ @Override
+ public String balance(List addressList) {
+ choose++;
+ choose=choose%addressList.size();
+ System.out.println("负载均衡选择了"+choose+"服务器");
+ return addressList.get(choose);
+ }
+ public void addNode(String node) {};
+ public void delNode(String node){};
+}
diff --git a/version3/src/main/java/part2/Server/TestServer.java b/version3/src/main/java/part2/Server/TestServer.java
new file mode 100644
index 0000000..54941a3
--- /dev/null
+++ b/version3/src/main/java/part2/Server/TestServer.java
@@ -0,0 +1,27 @@
+package part2.Server;
+
+
+import part2.Server.provider.ServiceProvider;
+import part2.Server.server.RpcServer;
+import part2.Server.server.impl.NettyRPCRPCServer;
+import part2.common.service.Impl.UserServiceImpl;
+import part2.common.service.UserService;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/11 19:39
+ */
+
+public class TestServer {
+ public static void main(String[] args) throws InterruptedException {
+ UserService userService=new UserServiceImpl();
+
+ ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999);
+
+ serviceProvider.provideServiceInterface(userService,true);
+
+ RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
+ rpcServer.start(9999);
+ }
+}
diff --git a/version3/src/main/java/part2/Server/netty/handler/NettyRPCServerHandler.java b/version3/src/main/java/part2/Server/netty/handler/NettyRPCServerHandler.java
new file mode 100644
index 0000000..adefac7
--- /dev/null
+++ b/version3/src/main/java/part2/Server/netty/handler/NettyRPCServerHandler.java
@@ -0,0 +1,51 @@
+package part2.Server.netty.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.AllArgsConstructor;
+import part2.Server.provider.ServiceProvider;
+import part2.common.Message.RpcRequest;
+import part2.common.Message.RpcResponse;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 16:40
+ * 因为是服务器端,我们知道接受到请求格式是RPCRequest
+ * Object类型也行,强制转型就行
+ */
+@AllArgsConstructor
+public class NettyRPCServerHandler extends SimpleChannelInboundHandler {
+ private ServiceProvider serviceProvider;
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
+ RpcResponse response = getResponse(request);
+ ctx.writeAndFlush(response);
+ ctx.close();
+ }
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+ private RpcResponse getResponse(RpcRequest rpcRequest){
+ //得到服务名
+ String interfaceName=rpcRequest.getInterfaceName();
+ //得到服务端相应服务实现类
+ Object service = serviceProvider.getService(interfaceName);
+ //反射调用方法
+ Method method=null;
+ try {
+ method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
+ Object invoke=method.invoke(service,rpcRequest.getParams());
+ return RpcResponse.sussess(invoke);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ e.printStackTrace();
+ System.out.println("方法执行错误");
+ return RpcResponse.fail();
+ }
+ }
+}
diff --git a/version3/src/main/java/part2/Server/netty/nettyInitializer/NettyServerInitializer.java b/version3/src/main/java/part2/Server/netty/nettyInitializer/NettyServerInitializer.java
new file mode 100644
index 0000000..4d174a1
--- /dev/null
+++ b/version3/src/main/java/part2/Server/netty/nettyInitializer/NettyServerInitializer.java
@@ -0,0 +1,30 @@
+package part2.Server.netty.nettyInitializer;
+
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import lombok.AllArgsConstructor;
+import part2.Server.netty.handler.NettyRPCServerHandler;
+import part2.Server.provider.ServiceProvider;
+import part2.common.serializer.myCode.MyDecoder;
+import part2.common.serializer.myCode.MyEncoder;
+import part2.common.serializer.mySerializer.JsonSerializer;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 16:15
+ */
+@AllArgsConstructor
+public class NettyServerInitializer extends ChannelInitializer {
+ private ServiceProvider serviceProvider;
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ //使用自定义的编/解码器
+ pipeline.addLast(new MyEncoder(new JsonSerializer()));
+ pipeline.addLast(new MyDecoder());
+ pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
+ }
+}
diff --git a/version3/src/main/java/part2/Server/provider/ServiceProvider.java b/version3/src/main/java/part2/Server/provider/ServiceProvider.java
new file mode 100644
index 0000000..80e4134
--- /dev/null
+++ b/version3/src/main/java/part2/Server/provider/ServiceProvider.java
@@ -0,0 +1,47 @@
+package part2.Server.provider;
+
+import part2.Server.serviceRegister.ServiceRegister;
+import part2.Server.serviceRegister.impl.ZKServiceRegister;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/16 17:35
+ */
+public class ServiceProvider {
+ private Map interfaceProvider;
+
+ private int port;
+ private String host;
+ //注册服务类
+ private ServiceRegister serviceRegister;
+
+ public ServiceProvider(String host,int port){
+ //需要传入服务端自身的网络地址
+ this.host=host;
+ this.port=port;
+ this.interfaceProvider=new HashMap<>();
+ this.serviceRegister=new ZKServiceRegister();
+ }
+
+ public void provideServiceInterface(Object service,boolean canRetry){
+ String serviceName=service.getClass().getName();
+ Class>[] interfaceName=service.getClass().getInterfaces();
+
+ for (Class> clazz:interfaceName){
+ //本机的映射表
+ interfaceProvider.put(clazz.getName(),service);
+ //在注册中心注册服务
+ serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port),canRetry);
+ }
+ }
+
+ public Object getService(String interfaceName){
+ return interfaceProvider.get(interfaceName);
+ }
+
+}
diff --git a/version3/src/main/java/part2/Server/server/RpcServer.java b/version3/src/main/java/part2/Server/server/RpcServer.java
new file mode 100644
index 0000000..6abbf2a
--- /dev/null
+++ b/version3/src/main/java/part2/Server/server/RpcServer.java
@@ -0,0 +1,11 @@
+package part2.Server.server;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/12 11:26
+ */
+public interface RpcServer {
+ void start(int port);
+ void stop();
+}
diff --git a/version3/src/main/java/part2/Server/server/impl/NettyRPCRPCServer.java b/version3/src/main/java/part2/Server/server/impl/NettyRPCRPCServer.java
new file mode 100644
index 0000000..7a4349a
--- /dev/null
+++ b/version3/src/main/java/part2/Server/server/impl/NettyRPCRPCServer.java
@@ -0,0 +1,48 @@
+package part2.Server.server.impl;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.AllArgsConstructor;
+import part2.Server.netty.nettyInitializer.NettyServerInitializer;
+import part2.Server.provider.ServiceProvider;
+import part2.Server.server.RpcServer;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 14:01
+ */
+@AllArgsConstructor
+public class NettyRPCRPCServer implements RpcServer {
+ private ServiceProvider serviceProvider;
+ @Override
+ public void start(int port) {
+ // netty 服务线程组boss负责建立连接, work负责具体的请求
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+ NioEventLoopGroup workGroup = new NioEventLoopGroup();
+ System.out.println("netty服务端启动了");
+ try {
+ //启动netty服务器
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ //初始化
+ serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
+ .childHandler(new NettyServerInitializer(serviceProvider));
+ //同步堵塞
+ ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
+ //死循环监听
+ channelFuture.channel().closeFuture().sync();
+ }catch (InterruptedException e){
+ e.printStackTrace();
+ }finally {
+ bossGroup.shutdownGracefully();
+ workGroup.shutdownGracefully();
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/version3/src/main/java/part2/Server/server/impl/SimpleRPCRPCServer.java b/version3/src/main/java/part2/Server/server/impl/SimpleRPCRPCServer.java
new file mode 100644
index 0000000..8033bab
--- /dev/null
+++ b/version3/src/main/java/part2/Server/server/impl/SimpleRPCRPCServer.java
@@ -0,0 +1,39 @@
+package part2.Server.server.impl;
+
+
+import lombok.AllArgsConstructor;
+import part2.Server.provider.ServiceProvider;
+import part2.Server.server.RpcServer;
+import part2.Server.server.work.WorkThread;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/12 11:37
+ */
+@AllArgsConstructor
+public class SimpleRPCRPCServer implements RpcServer {
+ private ServiceProvider serviceProvide;
+ @Override
+ public void start(int port) {
+ try {
+ ServerSocket serverSocket=new ServerSocket(port);
+ System.out.println("服务器启动了");
+ while (true) {
+ Socket socket = serverSocket.accept();
+ new Thread(new WorkThread(socket,serviceProvide)).start();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/version3/src/main/java/part2/Server/server/work/WorkThread.java b/version3/src/main/java/part2/Server/server/work/WorkThread.java
new file mode 100644
index 0000000..a3985bf
--- /dev/null
+++ b/version3/src/main/java/part2/Server/server/work/WorkThread.java
@@ -0,0 +1,58 @@
+package part2.Server.server.work;
+
+
+import lombok.AllArgsConstructor;
+import part2.Server.provider.ServiceProvider;
+import part2.common.Message.RpcRequest;
+import part2.common.Message.RpcResponse;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.Socket;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/14 17:39
+ */
+@AllArgsConstructor
+public class WorkThread implements Runnable{
+ private Socket socket;
+ private ServiceProvider serviceProvide;
+ @Override
+ public void run() {
+ try {
+ ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
+ ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
+ //读取客户端传过来的request
+ RpcRequest rpcRequest = (RpcRequest) ois.readObject();
+ //反射调用服务方法获取返回值
+ RpcResponse rpcResponse=getResponse(rpcRequest);
+ //向客户端写入response
+ oos.writeObject(rpcResponse);
+ oos.flush();
+ } catch (IOException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ private RpcResponse getResponse(RpcRequest rpcRequest){
+ //得到服务名
+ String interfaceName=rpcRequest.getInterfaceName();
+ //得到服务端相应服务实现类
+ Object service = serviceProvide.getService(interfaceName);
+ //反射调用方法
+ Method method=null;
+ try {
+ method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
+ Object invoke=method.invoke(service,rpcRequest.getParams());
+ return RpcResponse.sussess(invoke);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ e.printStackTrace();
+ System.out.println("方法执行错误");
+ return RpcResponse.fail();
+ }
+ }
+}
diff --git a/version3/src/main/java/part2/Server/serviceRegister/ServiceRegister.java b/version3/src/main/java/part2/Server/serviceRegister/ServiceRegister.java
new file mode 100644
index 0000000..da48e4d
--- /dev/null
+++ b/version3/src/main/java/part2/Server/serviceRegister/ServiceRegister.java
@@ -0,0 +1,15 @@
+package part2.Server.serviceRegister;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 16:58
+ */
+// 服务注册接口
+public interface ServiceRegister {
+ // 注册:保存服务与地址。
+ void register(String serviceName, InetSocketAddress serviceAddress,boolean canRetry);
+
+}
diff --git a/version3/src/main/java/part2/Server/serviceRegister/impl/ZKServiceRegister.java b/version3/src/main/java/part2/Server/serviceRegister/impl/ZKServiceRegister.java
new file mode 100644
index 0000000..04e9e7c
--- /dev/null
+++ b/version3/src/main/java/part2/Server/serviceRegister/impl/ZKServiceRegister.java
@@ -0,0 +1,68 @@
+package part2.Server.serviceRegister.impl;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.CreateMode;
+import part2.Server.serviceRegister.ServiceRegister;
+
+import java.net.InetSocketAddress;
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 17:28
+ */
+public class ZKServiceRegister implements ServiceRegister {
+ // curator 提供的zookeeper客户端
+ private CuratorFramework client;
+ //zookeeper根路径节点
+ private static final String ROOT_PATH = "MyRPC";
+ private static final String RETRY = "CanRetry";
+
+ //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接
+ public ZKServiceRegister(){
+ // 指数时间重试
+ RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
+ // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接
+ // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
+ // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
+ // 使用心跳监听状态
+ this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
+ .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
+ this.client.start();
+ System.out.println("zookeeper 连接成功");
+ }
+ //注册服务到注册中心
+ @Override
+ public void register(String serviceName, InetSocketAddress serviceAddress,boolean canRetry) {
+ try {
+ // serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址
+ if(client.checkExists().forPath("/" + serviceName) == null){
+ client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
+ }
+ // 路径地址,一个/代表一个节点
+ String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress);
+ // 临时节点,服务器下线就删除节点
+ client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
+ //如果这个服务是幂等性,就增加到节点中
+ if (canRetry){
+ path ="/"+RETRY+"/"+serviceName;
+ client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
+ }
+ } catch (Exception e) {
+ System.out.println("此服务已存在");
+ }
+ }
+ // 地址 -> XXX.XXX.XXX.XXX:port 字符串
+ private String getServiceAddress(InetSocketAddress serverAddress) {
+ return serverAddress.getHostName() +
+ ":" +
+ serverAddress.getPort();
+ }
+ // 字符串解析为地址
+ private InetSocketAddress parseAddress(String address) {
+ String[] result = address.split(":");
+ return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
+ }
+}
diff --git a/version3/src/main/java/part2/common/Message/MessageType.java b/version3/src/main/java/part2/common/Message/MessageType.java
new file mode 100644
index 0000000..0241fd6
--- /dev/null
+++ b/version3/src/main/java/part2/common/Message/MessageType.java
@@ -0,0 +1,17 @@
+package part2.common.Message;
+
+import lombok.AllArgsConstructor;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/2 22:29
+ */
+@AllArgsConstructor
+public enum MessageType {
+ REQUEST(0),RESPONSE(1);
+ private int code;
+ public int getCode(){
+ return code;
+ }
+}
\ No newline at end of file
diff --git a/version3/src/main/java/part2/common/Message/RpcRequest.java b/version3/src/main/java/part2/common/Message/RpcRequest.java
new file mode 100644
index 0000000..9ed8c3a
--- /dev/null
+++ b/version3/src/main/java/part2/common/Message/RpcRequest.java
@@ -0,0 +1,29 @@
+package part2.common.Message;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/1 18:30
+ * 定义发送的消息格式
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+@Builder
+public class RpcRequest implements Serializable {
+ //服务类名,客户端只知道接口
+ private String interfaceName;
+ //调用的方法名
+ private String methodName;
+ //参数列表
+ private Object[] params;
+ //参数类型
+ private Class>[] paramsType;
+}
diff --git a/version3/src/main/java/part2/common/Message/RpcResponse.java b/version3/src/main/java/part2/common/Message/RpcResponse.java
new file mode 100644
index 0000000..8df2e59
--- /dev/null
+++ b/version3/src/main/java/part2/common/Message/RpcResponse.java
@@ -0,0 +1,35 @@
+package part2.common.Message;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/1 19:18
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+@Builder
+public class RpcResponse implements Serializable {
+ //状态信息
+ private int code;
+ private String message;
+ //更新:加入传输数据的类型,以便在自定义序列化器中解析
+ private Class> dataType;
+ //具体数据
+ private Object data;
+
+ public static RpcResponse sussess(Object data){
+ return RpcResponse.builder().code(200).dataType(data.getClass()).data(data).build();
+ }
+ public static RpcResponse fail(){
+ return RpcResponse.builder().code(500).message("服务器发生错误").build();
+ }
+}
+
diff --git a/version3/src/main/java/part2/common/pojo/User.java b/version3/src/main/java/part2/common/pojo/User.java
new file mode 100644
index 0000000..bf999d7
--- /dev/null
+++ b/version3/src/main/java/part2/common/pojo/User.java
@@ -0,0 +1,25 @@
+package part2.common.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/1/28 17:50
+ */
+@Builder
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class User implements Serializable {
+ // 客户端和服务端共有的
+ private Integer id;
+ private String userName;
+ private Boolean sex;
+}
+
diff --git a/version3/src/main/java/part2/common/serializer/myCode/MyDecoder.java b/version3/src/main/java/part2/common/serializer/myCode/MyDecoder.java
new file mode 100644
index 0000000..7970141
--- /dev/null
+++ b/version3/src/main/java/part2/common/serializer/myCode/MyDecoder.java
@@ -0,0 +1,41 @@
+package part2.common.serializer.myCode;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import part2.common.Message.MessageType;
+import part2.common.serializer.mySerializer.Serializer;
+
+import java.util.List;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/2 22:24
+ * 按照自定义的消息格式解码数据
+ */
+public class MyDecoder extends ByteToMessageDecoder {
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List