From 0eadc2e68426af77fd5a47e0c107db08332820d4 Mon Sep 17 00:00:00 2001
From: Wxx <2563806166@qq.com>
Date: Mon, 3 Jun 2024 01:11:44 +0800
Subject: [PATCH] version2
---
version2/pom.xml | 54 +++++++++++++++
.../main/java/part1/Client/TestClient.java | 28 ++++++++
.../netty/handler/NettyClientHandler.java | 27 ++++++++
.../NettyClientInitializer.java | 31 +++++++++
.../java/part1/Client/proxy/ClientProxy.java | 42 ++++++++++++
.../part1/Client/rpcClient/RpcClient.java | 15 ++++
.../Client/rpcClient/impl/NettyRpcClient.java | 68 +++++++++++++++++++
.../rpcClient/impl/SimpleSocketRpcCilent.java | 41 +++++++++++
.../Client/serviceCenter/ServiceCenter.java | 14 ++++
.../Client/serviceCenter/ZKServiceCenter.java | 59 ++++++++++++++++
.../main/java/part1/Server/TestServer.java | 26 +++++++
.../netty/handler/NettyRPCServerHandler.java | 51 ++++++++++++++
.../NettyServerInitializer.java | 35 ++++++++++
.../Server/provider/ServiceProvider.java | 47 +++++++++++++
.../java/part1/Server/server/RpcServer.java | 11 +++
.../Server/server/impl/NettyRPCRPCServer.java | 48 +++++++++++++
.../server/impl/SimpleRPCRPCServer.java | 39 +++++++++++
.../part1/Server/server/work/WorkThread.java | 58 ++++++++++++++++
.../serviceRegister/ServiceRegister.java | 15 ++++
.../impl/ZKServiceRegister.java | 62 +++++++++++++++++
.../part1/common/Message/MessageType.java | 17 +++++
.../java/part1/common/Message/RpcRequest.java | 29 ++++++++
.../part1/common/Message/RpcResponse.java | 35 ++++++++++
.../src/main/java/part1/common/pojo/User.java | 25 +++++++
.../common/serializer/myCode/MyDecoder.java | 42 ++++++++++++
.../common/serializer/myCode/MyEncoder.java | 41 +++++++++++
.../mySerializer/JsonSerializer.java | 65 ++++++++++++++++++
.../mySerializer/ObjectSerializer.java | 54 +++++++++++++++
.../serializer/mySerializer/Serializer.java | 29 ++++++++
.../common/service/Impl/UserServiceImpl.java | 32 +++++++++
.../part1/common/service/UserService.java | 16 +++++
version2/src/main/resources/log4j.properties | 8 +++
32 files changed, 1164 insertions(+)
create mode 100644 version2/pom.xml
create mode 100644 version2/src/main/java/part1/Client/TestClient.java
create mode 100644 version2/src/main/java/part1/Client/netty/handler/NettyClientHandler.java
create mode 100644 version2/src/main/java/part1/Client/netty/nettyInitializer/NettyClientInitializer.java
create mode 100644 version2/src/main/java/part1/Client/proxy/ClientProxy.java
create mode 100644 version2/src/main/java/part1/Client/rpcClient/RpcClient.java
create mode 100644 version2/src/main/java/part1/Client/rpcClient/impl/NettyRpcClient.java
create mode 100644 version2/src/main/java/part1/Client/rpcClient/impl/SimpleSocketRpcCilent.java
create mode 100644 version2/src/main/java/part1/Client/serviceCenter/ServiceCenter.java
create mode 100644 version2/src/main/java/part1/Client/serviceCenter/ZKServiceCenter.java
create mode 100644 version2/src/main/java/part1/Server/TestServer.java
create mode 100644 version2/src/main/java/part1/Server/netty/handler/NettyRPCServerHandler.java
create mode 100644 version2/src/main/java/part1/Server/netty/nettyInitializer/NettyServerInitializer.java
create mode 100644 version2/src/main/java/part1/Server/provider/ServiceProvider.java
create mode 100644 version2/src/main/java/part1/Server/server/RpcServer.java
create mode 100644 version2/src/main/java/part1/Server/server/impl/NettyRPCRPCServer.java
create mode 100644 version2/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java
create mode 100644 version2/src/main/java/part1/Server/server/work/WorkThread.java
create mode 100644 version2/src/main/java/part1/Server/serviceRegister/ServiceRegister.java
create mode 100644 version2/src/main/java/part1/Server/serviceRegister/impl/ZKServiceRegister.java
create mode 100644 version2/src/main/java/part1/common/Message/MessageType.java
create mode 100644 version2/src/main/java/part1/common/Message/RpcRequest.java
create mode 100644 version2/src/main/java/part1/common/Message/RpcResponse.java
create mode 100644 version2/src/main/java/part1/common/pojo/User.java
create mode 100644 version2/src/main/java/part1/common/serializer/myCode/MyDecoder.java
create mode 100644 version2/src/main/java/part1/common/serializer/myCode/MyEncoder.java
create mode 100644 version2/src/main/java/part1/common/serializer/mySerializer/JsonSerializer.java
create mode 100644 version2/src/main/java/part1/common/serializer/mySerializer/ObjectSerializer.java
create mode 100644 version2/src/main/java/part1/common/serializer/mySerializer/Serializer.java
create mode 100644 version2/src/main/java/part1/common/service/Impl/UserServiceImpl.java
create mode 100644 version2/src/main/java/part1/common/service/UserService.java
create mode 100644 version2/src/main/resources/log4j.properties
diff --git a/version2/pom.xml b/version2/pom.xml
new file mode 100644
index 0000000..4626ed3
--- /dev/null
+++ b/version2/pom.xml
@@ -0,0 +1,54 @@
+
+
+ 4.0.0
+
+ org.example
+ version2
+ 1.0-SNAPSHOT
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.30
+ compile
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.25
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+ 2.8.2
+
+
+
+ io.netty
+ netty-all
+ 4.1.51.Final
+ compile
+
+
+
+ org.apache.curator
+ curator-recipes
+ 5.1.0
+
+
+ com.alibaba
+ fastjson
+ 1.2.83
+
+
+
+
+
\ No newline at end of file
diff --git a/version2/src/main/java/part1/Client/TestClient.java b/version2/src/main/java/part1/Client/TestClient.java
new file mode 100644
index 0000000..6e6715c
--- /dev/null
+++ b/version2/src/main/java/part1/Client/TestClient.java
@@ -0,0 +1,28 @@
+package part1.Client;
+
+
+import part1.Client.proxy.ClientProxy;
+import part1.common.service.UserService;
+import part1.common.pojo.User;
+
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/6 18:39
+ */
+
+public class TestClient {
+ public static void main(String[] args) {
+ ClientProxy clientProxy=new ClientProxy();
+ //ClientProxy clientProxy=new part2.Client.proxy.ClientProxy("127.0.0.1",9999,0);
+ UserService proxy=clientProxy.getProxy(UserService.class);
+
+ User user = proxy.getUserByUserId(1);
+ System.out.println("从服务端得到的user="+user.toString());
+
+ User u=User.builder().id(100).userName("wxx").sex(true).build();
+ Integer id = proxy.insertUserId(u);
+ System.out.println("向服务端插入user的id"+id);
+ }
+}
diff --git a/version2/src/main/java/part1/Client/netty/handler/NettyClientHandler.java b/version2/src/main/java/part1/Client/netty/handler/NettyClientHandler.java
new file mode 100644
index 0000000..31858c6
--- /dev/null
+++ b/version2/src/main/java/part1/Client/netty/handler/NettyClientHandler.java
@@ -0,0 +1,27 @@
+package part1.Client.netty.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeKey;
+import part1.common.Message.RpcResponse;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 17:29
+ */
+public class NettyClientHandler extends SimpleChannelInboundHandler {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
+ // 接收到response, 给channel设计别名,让sendRequest里读取response
+ AttributeKey key = AttributeKey.valueOf("RPCResponse");
+ ctx.channel().attr(key).set(response);
+ ctx.channel().close();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+}
diff --git a/version2/src/main/java/part1/Client/netty/nettyInitializer/NettyClientInitializer.java b/version2/src/main/java/part1/Client/netty/nettyInitializer/NettyClientInitializer.java
new file mode 100644
index 0000000..d9705c1
--- /dev/null
+++ b/version2/src/main/java/part1/Client/netty/nettyInitializer/NettyClientInitializer.java
@@ -0,0 +1,31 @@
+package part1.Client.netty.nettyInitializer;
+
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.serialization.ClassResolver;
+import io.netty.handler.codec.serialization.ObjectDecoder;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import part1.Client.netty.handler.NettyClientHandler;
+import part1.common.serializer.myCode.MyDecoder;
+import part1.common.serializer.myCode.MyEncoder;
+import part1.common.serializer.mySerializer.JsonSerializer;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 17:26
+ */
+public class NettyClientInitializer extends ChannelInitializer {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ //使用自定义的编/解码器
+ pipeline.addLast(new MyEncoder(new JsonSerializer()));
+ pipeline.addLast(new MyDecoder());
+ pipeline.addLast(new NettyClientHandler());
+ }
+}
diff --git a/version2/src/main/java/part1/Client/proxy/ClientProxy.java b/version2/src/main/java/part1/Client/proxy/ClientProxy.java
new file mode 100644
index 0000000..c4ff57b
--- /dev/null
+++ b/version2/src/main/java/part1/Client/proxy/ClientProxy.java
@@ -0,0 +1,42 @@
+package part1.Client.proxy;
+
+
+import part1.Client.rpcClient.RpcClient;
+import part1.Client.rpcClient.impl.NettyRpcClient;
+import part1.common.Message.RpcRequest;
+import part1.common.Message.RpcResponse;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/6 16:49
+ */
+public class ClientProxy implements InvocationHandler {
+ //传入参数service接口的class对象,反射封装成一个request
+
+ private RpcClient rpcClient;
+ public ClientProxy(){
+ rpcClient=new NettyRpcClient();
+ }
+
+ //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ //构建request
+ RpcRequest request=RpcRequest.builder()
+ .interfaceName(method.getDeclaringClass().getName())
+ .methodName(method.getName())
+ .params(args).paramsType(method.getParameterTypes()).build();
+ //数据传输
+ RpcResponse response= rpcClient.sendRequest(request);
+ return response.getData();
+ }
+ public T getProxy(Class clazz){
+ Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
+ return (T)o;
+ }
+}
diff --git a/version2/src/main/java/part1/Client/rpcClient/RpcClient.java b/version2/src/main/java/part1/Client/rpcClient/RpcClient.java
new file mode 100644
index 0000000..c665530
--- /dev/null
+++ b/version2/src/main/java/part1/Client/rpcClient/RpcClient.java
@@ -0,0 +1,15 @@
+package part1.Client.rpcClient;
+
+import part1.common.Message.RpcRequest;
+import part1.common.Message.RpcResponse;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/2 18:55
+ */
+public interface RpcClient {
+
+ //定义底层通信的方法
+ RpcResponse sendRequest(RpcRequest request);
+}
diff --git a/version2/src/main/java/part1/Client/rpcClient/impl/NettyRpcClient.java b/version2/src/main/java/part1/Client/rpcClient/impl/NettyRpcClient.java
new file mode 100644
index 0000000..4cc425c
--- /dev/null
+++ b/version2/src/main/java/part1/Client/rpcClient/impl/NettyRpcClient.java
@@ -0,0 +1,68 @@
+package part1.Client.rpcClient.impl;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.AttributeKey;
+import part1.Client.rpcClient.RpcClient;
+import part1.common.Message.RpcRequest;
+import part1.Client.netty.nettyInitializer.NettyClientInitializer;
+import part1.Client.serviceCenter.ServiceCenter;
+import part1.Client.serviceCenter.ZKServiceCenter;
+import part1.common.Message.RpcResponse;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/2 19:40
+ */
+public class NettyRpcClient implements RpcClient {
+
+ private static final Bootstrap bootstrap;
+ private static final EventLoopGroup eventLoopGroup;
+
+ private ServiceCenter serviceCenter;
+ public NettyRpcClient(){
+ this.serviceCenter=new ZKServiceCenter();
+ }
+
+ //netty客户端初始化
+ static {
+ eventLoopGroup = new NioEventLoopGroup();
+ bootstrap = new Bootstrap();
+ bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
+ .handler(new NettyClientInitializer());
+ }
+ @Override
+ public RpcResponse sendRequest(RpcRequest request) {
+ //从注册中心获取host,post
+ InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
+ String host = address.getHostName();
+ int port = address.getPort();
+ try {
+ ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
+ Channel channel = channelFuture.channel();
+ // 发送数据
+ channel.writeAndFlush(request);
+ //sync()堵塞获取结果
+ channel.closeFuture().sync();
+ // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
+ // AttributeKey是,线程隔离的,不会由线程安全问题。
+ // 当前场景下选择堵塞获取结果
+ // 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
+ AttributeKey key = AttributeKey.valueOf("RPCResponse");
+ RpcResponse response = channel.attr(key).get();
+
+ System.out.println(response);
+ return response;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/version2/src/main/java/part1/Client/rpcClient/impl/SimpleSocketRpcCilent.java b/version2/src/main/java/part1/Client/rpcClient/impl/SimpleSocketRpcCilent.java
new file mode 100644
index 0000000..d74af41
--- /dev/null
+++ b/version2/src/main/java/part1/Client/rpcClient/impl/SimpleSocketRpcCilent.java
@@ -0,0 +1,41 @@
+package part1.Client.rpcClient.impl;
+
+import part1.Client.rpcClient.RpcClient;
+import part1.common.Message.RpcRequest;
+import part1.common.Message.RpcResponse;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/2 18:58
+ */
+public class SimpleSocketRpcCilent implements RpcClient {
+ private String host;
+ private int port;
+ public SimpleSocketRpcCilent(String host,int port){
+ this.host=host;
+ this.port=port;
+ }
+ @Override
+ public RpcResponse sendRequest(RpcRequest request) {
+ try {
+ Socket socket=new Socket(host, port);
+ ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
+ ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
+
+ oos.writeObject(request);
+ oos.flush();
+
+ RpcResponse response=(RpcResponse) ois.readObject();
+ return response;
+ } catch (IOException | ClassNotFoundException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+}
diff --git a/version2/src/main/java/part1/Client/serviceCenter/ServiceCenter.java b/version2/src/main/java/part1/Client/serviceCenter/ServiceCenter.java
new file mode 100644
index 0000000..1842b94
--- /dev/null
+++ b/version2/src/main/java/part1/Client/serviceCenter/ServiceCenter.java
@@ -0,0 +1,14 @@
+package part1.Client.serviceCenter;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 21:42
+ */
+//服务中心接口
+public interface ServiceCenter {
+ // 查询:根据服务名查找地址
+ InetSocketAddress serviceDiscovery(String serviceName);
+}
diff --git a/version2/src/main/java/part1/Client/serviceCenter/ZKServiceCenter.java b/version2/src/main/java/part1/Client/serviceCenter/ZKServiceCenter.java
new file mode 100644
index 0000000..b62de1f
--- /dev/null
+++ b/version2/src/main/java/part1/Client/serviceCenter/ZKServiceCenter.java
@@ -0,0 +1,59 @@
+package part1.Client.serviceCenter;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 21:41
+ */
+public class ZKServiceCenter implements ServiceCenter{
+ // curator 提供的zookeeper客户端
+ private CuratorFramework client;
+ //zookeeper根路径节点
+ private static final String ROOT_PATH = "MyRPC";
+
+ //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接
+ public ZKServiceCenter(){
+ // 指数时间重试
+ RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
+ // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接
+ // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
+ // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
+ // 使用心跳监听状态
+ this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
+ .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
+ this.client.start();
+ System.out.println("zookeeper 连接成功");
+ }
+ //根据服务名(接口名)返回地址
+ @Override
+ public InetSocketAddress serviceDiscovery(String serviceName) {
+ try {
+ List strings = client.getChildren().forPath("/" + serviceName);
+ // 这里默认用的第一个,后面加负载均衡
+ String string = strings.get(0);
+ return parseAddress(string);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ // 地址 -> XXX.XXX.XXX.XXX:port 字符串
+ private String getServiceAddress(InetSocketAddress serverAddress) {
+ return serverAddress.getHostName() +
+ ":" +
+ serverAddress.getPort();
+ }
+ // 字符串解析为地址
+ private InetSocketAddress parseAddress(String address) {
+ String[] result = address.split(":");
+ return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
+ }
+}
diff --git a/version2/src/main/java/part1/Server/TestServer.java b/version2/src/main/java/part1/Server/TestServer.java
new file mode 100644
index 0000000..baaf058
--- /dev/null
+++ b/version2/src/main/java/part1/Server/TestServer.java
@@ -0,0 +1,26 @@
+package part1.Server;
+
+
+import part1.Server.provider.ServiceProvider;
+import part1.Server.server.impl.NettyRPCRPCServer;
+import part1.Server.server.RpcServer;
+import part1.common.service.Impl.UserServiceImpl;
+import part1.common.service.UserService;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/11 19:39
+ */
+
+public class TestServer {
+ public static void main(String[] args) {
+ UserService userService=new UserServiceImpl();
+
+ ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999);
+ serviceProvider.provideServiceInterface(userService);
+
+ RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
+ rpcServer.start(9999);
+ }
+}
diff --git a/version2/src/main/java/part1/Server/netty/handler/NettyRPCServerHandler.java b/version2/src/main/java/part1/Server/netty/handler/NettyRPCServerHandler.java
new file mode 100644
index 0000000..5b01246
--- /dev/null
+++ b/version2/src/main/java/part1/Server/netty/handler/NettyRPCServerHandler.java
@@ -0,0 +1,51 @@
+package part1.Server.netty.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.AllArgsConstructor;
+import part1.Server.provider.ServiceProvider;
+import part1.common.Message.RpcRequest;
+import part1.common.Message.RpcResponse;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 16:40
+ * 因为是服务器端,我们知道接受到请求格式是RPCRequest
+ * Object类型也行,强制转型就行
+ */
+@AllArgsConstructor
+public class NettyRPCServerHandler extends SimpleChannelInboundHandler {
+ private ServiceProvider serviceProvider;
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
+ RpcResponse response = getResponse(request);
+ ctx.writeAndFlush(response);
+ ctx.close();
+ }
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+ private RpcResponse getResponse(RpcRequest rpcRequest){
+ //得到服务名
+ String interfaceName=rpcRequest.getInterfaceName();
+ //得到服务端相应服务实现类
+ Object service = serviceProvider.getService(interfaceName);
+ //反射调用方法
+ Method method=null;
+ try {
+ method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
+ Object invoke=method.invoke(service,rpcRequest.getParams());
+ return RpcResponse.sussess(invoke);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ e.printStackTrace();
+ System.out.println("方法执行错误");
+ return RpcResponse.fail();
+ }
+ }
+}
diff --git a/version2/src/main/java/part1/Server/netty/nettyInitializer/NettyServerInitializer.java b/version2/src/main/java/part1/Server/netty/nettyInitializer/NettyServerInitializer.java
new file mode 100644
index 0000000..bb7a4a1
--- /dev/null
+++ b/version2/src/main/java/part1/Server/netty/nettyInitializer/NettyServerInitializer.java
@@ -0,0 +1,35 @@
+package part1.Server.netty.nettyInitializer;
+
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.serialization.ClassResolver;
+import io.netty.handler.codec.serialization.ObjectDecoder;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import lombok.AllArgsConstructor;
+import part1.Server.netty.handler.NettyRPCServerHandler;
+import part1.Server.provider.ServiceProvider;
+import part1.common.serializer.myCode.MyDecoder;
+import part1.common.serializer.myCode.MyEncoder;
+import part1.common.serializer.mySerializer.JsonSerializer;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 16:15
+ */
+@AllArgsConstructor
+public class NettyServerInitializer extends ChannelInitializer {
+ private ServiceProvider serviceProvider;
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ //使用自定义的编/解码器
+ pipeline.addLast(new MyEncoder(new JsonSerializer()));
+ pipeline.addLast(new MyDecoder());
+ pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
+ }
+}
diff --git a/version2/src/main/java/part1/Server/provider/ServiceProvider.java b/version2/src/main/java/part1/Server/provider/ServiceProvider.java
new file mode 100644
index 0000000..f7355dc
--- /dev/null
+++ b/version2/src/main/java/part1/Server/provider/ServiceProvider.java
@@ -0,0 +1,47 @@
+package part1.Server.provider;
+
+import part1.Server.serviceRegister.ServiceRegister;
+import part1.Server.serviceRegister.impl.ZKServiceRegister;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/16 17:35
+ */
+public class ServiceProvider {
+ private Map interfaceProvider;
+
+ private int port;
+ private String host;
+ //注册服务类
+ private ServiceRegister serviceRegister;
+
+ public ServiceProvider(String host,int port){
+ //需要传入服务端自身的网络地址
+ this.host=host;
+ this.port=port;
+ this.interfaceProvider=new HashMap<>();
+ this.serviceRegister=new ZKServiceRegister();
+ }
+
+ public void provideServiceInterface(Object service){
+ String serviceName=service.getClass().getName();
+ Class>[] interfaceName=service.getClass().getInterfaces();
+
+ for (Class> clazz:interfaceName){
+ //本机的映射表
+ interfaceProvider.put(clazz.getName(),service);
+ //在注册中心注册服务
+ serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port));
+ }
+ }
+
+ public Object getService(String interfaceName){
+ return interfaceProvider.get(interfaceName);
+ }
+
+}
diff --git a/version2/src/main/java/part1/Server/server/RpcServer.java b/version2/src/main/java/part1/Server/server/RpcServer.java
new file mode 100644
index 0000000..7ed9584
--- /dev/null
+++ b/version2/src/main/java/part1/Server/server/RpcServer.java
@@ -0,0 +1,11 @@
+package part1.Server.server;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/12 11:26
+ */
+public interface RpcServer {
+ void start(int port);
+ void stop();
+}
diff --git a/version2/src/main/java/part1/Server/server/impl/NettyRPCRPCServer.java b/version2/src/main/java/part1/Server/server/impl/NettyRPCRPCServer.java
new file mode 100644
index 0000000..e3022e8
--- /dev/null
+++ b/version2/src/main/java/part1/Server/server/impl/NettyRPCRPCServer.java
@@ -0,0 +1,48 @@
+package part1.Server.server.impl;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.AllArgsConstructor;
+import part1.Server.provider.ServiceProvider;
+import part1.Server.netty.nettyInitializer.NettyServerInitializer;
+import part1.Server.server.RpcServer;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/26 14:01
+ */
+@AllArgsConstructor
+public class NettyRPCRPCServer implements RpcServer {
+ private ServiceProvider serviceProvider;
+ @Override
+ public void start(int port) {
+ // netty 服务线程组boss负责建立连接, work负责具体的请求
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+ NioEventLoopGroup workGroup = new NioEventLoopGroup();
+ System.out.println("netty服务端启动了");
+ try {
+ //启动netty服务器
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ //初始化
+ serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
+ .childHandler(new NettyServerInitializer(serviceProvider));
+ //同步堵塞
+ ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
+ //死循环监听
+ channelFuture.channel().closeFuture().sync();
+ }catch (InterruptedException e){
+ e.printStackTrace();
+ }finally {
+ bossGroup.shutdownGracefully();
+ workGroup.shutdownGracefully();
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/version2/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java b/version2/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java
new file mode 100644
index 0000000..277926e
--- /dev/null
+++ b/version2/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java
@@ -0,0 +1,39 @@
+package part1.Server.server.impl;
+
+
+import lombok.AllArgsConstructor;
+import part1.Server.provider.ServiceProvider;
+import part1.Server.server.work.WorkThread;
+import part1.Server.server.RpcServer;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/12 11:37
+ */
+@AllArgsConstructor
+public class SimpleRPCRPCServer implements RpcServer {
+ private ServiceProvider serviceProvide;
+ @Override
+ public void start(int port) {
+ try {
+ ServerSocket serverSocket=new ServerSocket(port);
+ System.out.println("服务器启动了");
+ while (true) {
+ Socket socket = serverSocket.accept();
+ new Thread(new WorkThread(socket,serviceProvide)).start();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/version2/src/main/java/part1/Server/server/work/WorkThread.java b/version2/src/main/java/part1/Server/server/work/WorkThread.java
new file mode 100644
index 0000000..064b9c2
--- /dev/null
+++ b/version2/src/main/java/part1/Server/server/work/WorkThread.java
@@ -0,0 +1,58 @@
+package part1.Server.server.work;
+
+
+import lombok.AllArgsConstructor;
+import part1.Server.provider.ServiceProvider;
+import part1.common.Message.RpcRequest;
+import part1.common.Message.RpcResponse;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.Socket;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/14 17:39
+ */
+@AllArgsConstructor
+public class WorkThread implements Runnable{
+ private Socket socket;
+ private ServiceProvider serviceProvide;
+ @Override
+ public void run() {
+ try {
+ ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
+ ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
+ //读取客户端传过来的request
+ RpcRequest rpcRequest = (RpcRequest) ois.readObject();
+ //反射调用服务方法获取返回值
+ RpcResponse rpcResponse=getResponse(rpcRequest);
+ //向客户端写入response
+ oos.writeObject(rpcResponse);
+ oos.flush();
+ } catch (IOException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+ private RpcResponse getResponse(RpcRequest rpcRequest){
+ //得到服务名
+ String interfaceName=rpcRequest.getInterfaceName();
+ //得到服务端相应服务实现类
+ Object service = serviceProvide.getService(interfaceName);
+ //反射调用方法
+ Method method=null;
+ try {
+ method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
+ Object invoke=method.invoke(service,rpcRequest.getParams());
+ return RpcResponse.sussess(invoke);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ e.printStackTrace();
+ System.out.println("方法执行错误");
+ return RpcResponse.fail();
+ }
+ }
+}
diff --git a/version2/src/main/java/part1/Server/serviceRegister/ServiceRegister.java b/version2/src/main/java/part1/Server/serviceRegister/ServiceRegister.java
new file mode 100644
index 0000000..54c52b1
--- /dev/null
+++ b/version2/src/main/java/part1/Server/serviceRegister/ServiceRegister.java
@@ -0,0 +1,15 @@
+package part1.Server.serviceRegister;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 16:58
+ */
+// 服务注册接口
+public interface ServiceRegister {
+ // 注册:保存服务与地址。
+ void register(String serviceName, InetSocketAddress serviceAddress);
+
+}
diff --git a/version2/src/main/java/part1/Server/serviceRegister/impl/ZKServiceRegister.java b/version2/src/main/java/part1/Server/serviceRegister/impl/ZKServiceRegister.java
new file mode 100644
index 0000000..d569338
--- /dev/null
+++ b/version2/src/main/java/part1/Server/serviceRegister/impl/ZKServiceRegister.java
@@ -0,0 +1,62 @@
+package part1.Server.serviceRegister.impl;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.CreateMode;
+import part1.Server.serviceRegister.ServiceRegister;
+
+import java.net.InetSocketAddress;
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/5/3 17:28
+ */
+public class ZKServiceRegister implements ServiceRegister {
+ // curator 提供的zookeeper客户端
+ private CuratorFramework client;
+ //zookeeper根路径节点
+ private static final String ROOT_PATH = "MyRPC";
+
+ //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接
+ public ZKServiceRegister(){
+ // 指数时间重试
+ RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
+ // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接
+ // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
+ // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
+ // 使用心跳监听状态
+ this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
+ .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
+ this.client.start();
+ System.out.println("zookeeper 连接成功");
+ }
+ //注册服务到注册中心
+ @Override
+ public void register(String serviceName, InetSocketAddress serviceAddress) {
+ try {
+ // serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址
+ if(client.checkExists().forPath("/" + serviceName) == null){
+ client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
+ }
+ // 路径地址,一个/代表一个节点
+ String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress);
+ // 临时节点,服务器下线就删除节点
+ client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
+ } catch (Exception e) {
+ System.out.println("此服务已存在");
+ }
+ }
+ // 地址 -> XXX.XXX.XXX.XXX:port 字符串
+ private String getServiceAddress(InetSocketAddress serverAddress) {
+ return serverAddress.getHostName() +
+ ":" +
+ serverAddress.getPort();
+ }
+ // 字符串解析为地址
+ private InetSocketAddress parseAddress(String address) {
+ String[] result = address.split(":");
+ return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
+ }
+}
diff --git a/version2/src/main/java/part1/common/Message/MessageType.java b/version2/src/main/java/part1/common/Message/MessageType.java
new file mode 100644
index 0000000..ebbcc83
--- /dev/null
+++ b/version2/src/main/java/part1/common/Message/MessageType.java
@@ -0,0 +1,17 @@
+package part1.common.Message;
+
+import lombok.AllArgsConstructor;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/2 22:29
+ */
+@AllArgsConstructor
+public enum MessageType {
+ REQUEST(0),RESPONSE(1);
+ private int code;
+ public int getCode(){
+ return code;
+ }
+}
\ No newline at end of file
diff --git a/version2/src/main/java/part1/common/Message/RpcRequest.java b/version2/src/main/java/part1/common/Message/RpcRequest.java
new file mode 100644
index 0000000..2642f38
--- /dev/null
+++ b/version2/src/main/java/part1/common/Message/RpcRequest.java
@@ -0,0 +1,29 @@
+package part1.common.Message;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/1 18:30
+ * 定义发送的消息格式
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+@Builder
+public class RpcRequest implements Serializable {
+ //服务类名,客户端只知道接口
+ private String interfaceName;
+ //调用的方法名
+ private String methodName;
+ //参数列表
+ private Object[] params;
+ //参数类型
+ private Class>[] paramsType;
+}
diff --git a/version2/src/main/java/part1/common/Message/RpcResponse.java b/version2/src/main/java/part1/common/Message/RpcResponse.java
new file mode 100644
index 0000000..7a44b45
--- /dev/null
+++ b/version2/src/main/java/part1/common/Message/RpcResponse.java
@@ -0,0 +1,35 @@
+package part1.common.Message;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/2/1 19:18
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+@Builder
+public class RpcResponse implements Serializable {
+ //状态信息
+ private int code;
+ private String message;
+ //更新:加入传输数据的类型,以便在自定义序列化器中解析
+ private Class> dataType;
+ //具体数据
+ private Object data;
+
+ public static RpcResponse sussess(Object data){
+ return RpcResponse.builder().code(200).dataType(data.getClass()).data(data).build();
+ }
+ public static RpcResponse fail(){
+ return RpcResponse.builder().code(500).message("服务器发生错误").build();
+ }
+}
+
diff --git a/version2/src/main/java/part1/common/pojo/User.java b/version2/src/main/java/part1/common/pojo/User.java
new file mode 100644
index 0000000..d7cef8c
--- /dev/null
+++ b/version2/src/main/java/part1/common/pojo/User.java
@@ -0,0 +1,25 @@
+package part1.common.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/1/28 17:50
+ */
+@Builder
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class User implements Serializable {
+ // 客户端和服务端共有的
+ private Integer id;
+ private String userName;
+ private Boolean sex;
+}
+
diff --git a/version2/src/main/java/part1/common/serializer/myCode/MyDecoder.java b/version2/src/main/java/part1/common/serializer/myCode/MyDecoder.java
new file mode 100644
index 0000000..8788c0e
--- /dev/null
+++ b/version2/src/main/java/part1/common/serializer/myCode/MyDecoder.java
@@ -0,0 +1,42 @@
+package part1.common.serializer.myCode;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import part1.common.Message.MessageType;
+import part1.common.serializer.mySerializer.Serializer;
+
+import java.awt.*;
+import java.util.List;
+
+/**
+ * @author wxx
+ * @version 1.0
+ * @create 2024/6/2 22:24
+ * 按照自定义的消息格式解码数据
+ */
+public class MyDecoder extends ByteToMessageDecoder {
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List