diff --git a/version1/pom.xml b/version1/pom.xml new file mode 100644 index 0000000..4ae066c --- /dev/null +++ b/version1/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + org.example + version1 + 1.0-SNAPSHOT + + 8 + 8 + UTF-8 + + + + org.projectlombok + lombok + 1.18.30 + compile + + + io.netty + netty-all + 4.1.51.Final + compile + + + + org.apache.curator + curator-recipes + 5.1.0 + + + + + + \ No newline at end of file diff --git a/version1/src/main/java/part1/Client/IOClient.java b/version1/src/main/java/part1/Client/IOClient.java new file mode 100644 index 0000000..8c7f61f --- /dev/null +++ b/version1/src/main/java/part1/Client/IOClient.java @@ -0,0 +1,36 @@ +package part1.Client; + + + +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/4 18:31 + */ +public class IOClient { + //这里负责底层与服务端的通信,发送request,返回response + public static RpcResponse sendRequest(String host, int port, RpcRequest request){ + try { + Socket socket=new Socket(host, port); + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + + oos.writeObject(request); + oos.flush(); + + RpcResponse response=(RpcResponse) ois.readObject(); + return response; + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/version1/src/main/java/part1/Client/TestClient.java b/version1/src/main/java/part1/Client/TestClient.java new file mode 100644 index 0000000..f9b7bab --- /dev/null +++ b/version1/src/main/java/part1/Client/TestClient.java @@ -0,0 +1,25 @@ +package part1.Client; + + +import part1.Client.proxy.ClientProxy; +import part1.common.service.UserService; +import part1.common.pojo.User; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 18:39 + */ +public class TestClient { + public static void main(String[] args) { + ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999); + UserService proxy=clientProxy.getProxy(UserService.class); + + User user = proxy.getUserByUserId(1); + System.out.println("从服务端得到的user="+user.toString()); + + User u=User.builder().id(100).userName("wxx").sex(true).build(); + Integer id = proxy.insertUserId(u); + System.out.println("向服务端插入user的id"+id); + } +} diff --git a/version1/src/main/java/part1/Client/proxy/ClientProxy.java b/version1/src/main/java/part1/Client/proxy/ClientProxy.java new file mode 100644 index 0000000..3701bdf --- /dev/null +++ b/version1/src/main/java/part1/Client/proxy/ClientProxy.java @@ -0,0 +1,40 @@ +package part1.Client.proxy; + + +import lombok.AllArgsConstructor; +import part1.Client.IOClient; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 16:49 + */ +@AllArgsConstructor +public class ClientProxy implements InvocationHandler { + //传入参数service接口的class对象,反射封装成一个request + private String host; + private int port; + + //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端) + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + //构建request + RpcRequest request=RpcRequest.builder() + .interfaceName(method.getDeclaringClass().getName()) + .methodName(method.getName()) + .params(args).paramsType(method.getParameterTypes()).build(); + //IOClient.sendRequest 和服务端进行数据传输 + RpcResponse response= IOClient.sendRequest(host,port,request); + return response.getData(); + } + public T getProxy(Class clazz){ + Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); + return (T)o; + } +} diff --git a/version1/src/main/java/part1/Server/TestServer.java b/version1/src/main/java/part1/Server/TestServer.java new file mode 100644 index 0000000..682b912 --- /dev/null +++ b/version1/src/main/java/part1/Server/TestServer.java @@ -0,0 +1,25 @@ +package part1.Server; + + +import part1.Server.server.RpcServer; +import part1.common.service.Impl.UserServiceImpl; +import part1.common.service.UserService; +import part1.Server.server.impl.SimpleRPCRPCServer; +import part1.Server.provider.ServiceProvider; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/11 19:39 + */ +public class TestServer { + public static void main(String[] args) { + UserService userService=new UserServiceImpl(); + + ServiceProvider serviceProvider=new ServiceProvider(); + serviceProvider.provideServiceInterface(userService); + + RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider); + rpcServer.start(9999); + } +} diff --git a/version1/src/main/java/part1/Server/provider/ServiceProvider.java b/version1/src/main/java/part1/Server/provider/ServiceProvider.java new file mode 100644 index 0000000..8864448 --- /dev/null +++ b/version1/src/main/java/part1/Server/provider/ServiceProvider.java @@ -0,0 +1,34 @@ +package part1.Server.provider; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/16 17:35 + */ +//本地服务存放器 +public class ServiceProvider { + //集合中存放服务的实例 + private Map interfaceProvider; + + public ServiceProvider(){ + this.interfaceProvider=new HashMap<>(); + } + //本地注册服务 + + public void provideServiceInterface(Object service){ + String serviceName=service.getClass().getName(); + Class[] interfaceName=service.getClass().getInterfaces(); + + for (Class clazz:interfaceName){ + interfaceProvider.put(clazz.getName(),service); + } + + } + //获取服务实例 + public Object getService(String interfaceName){ + return interfaceProvider.get(interfaceName); + } +} diff --git a/version1/src/main/java/part1/Server/server/RpcServer.java b/version1/src/main/java/part1/Server/server/RpcServer.java new file mode 100644 index 0000000..76df981 --- /dev/null +++ b/version1/src/main/java/part1/Server/server/RpcServer.java @@ -0,0 +1,12 @@ +package part1.Server.server; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:26 + */ +public interface RpcServer { + //开启监听 + void start(int port); + void stop(); +} diff --git a/version1/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java b/version1/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java new file mode 100644 index 0000000..09afdff --- /dev/null +++ b/version1/src/main/java/part1/Server/server/impl/SimpleRPCRPCServer.java @@ -0,0 +1,40 @@ +package part1.Server.server.impl; + + +import lombok.AllArgsConstructor; +import part1.Server.server.RpcServer; +import part1.Server.server.work.WorkThread; +import part1.Server.provider.ServiceProvider; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:37 + */ +@AllArgsConstructor +public class SimpleRPCRPCServer implements RpcServer { + private ServiceProvider serviceProvide; + @Override + public void start(int port) { + try { + ServerSocket serverSocket=new ServerSocket(port); + System.out.println("服务器启动了"); + while (true) { + //如果没有连接,会堵塞在这里 + Socket socket = serverSocket.accept(); + //有连接,创建一个新的线程执行处理 + new Thread(new WorkThread(socket,serviceProvide)).start(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + } +} diff --git a/version1/src/main/java/part1/Server/server/impl/ThreadPoolRPCRPCServer.java b/version1/src/main/java/part1/Server/server/impl/ThreadPoolRPCRPCServer.java new file mode 100644 index 0000000..0d32e1f --- /dev/null +++ b/version1/src/main/java/part1/Server/server/impl/ThreadPoolRPCRPCServer.java @@ -0,0 +1,58 @@ +package part1.Server.server.impl; + + +import part1.Server.server.RpcServer; +import part1.Server.server.work.WorkThread; +import part1.Server.provider.ServiceProvider; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/19 15:30 + */ +public class ThreadPoolRPCRPCServer implements RpcServer { + private final ThreadPoolExecutor threadPool; + private ServiceProvider serviceProvider; + + public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider){ + threadPool=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + 1000,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100)); + this.serviceProvider= serviceProvider; + } + public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue){ + + threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + this.serviceProvider = serviceProvider; + } + + @Override + public void start(int port) { + System.out.println("服务端启动了"); + try { + ServerSocket serverSocket=new ServerSocket(); + while (true){ + Socket socket= serverSocket.accept(); + threadPool.execute(new WorkThread(socket,serviceProvider)); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version1/src/main/java/part1/Server/server/work/WorkThread.java b/version1/src/main/java/part1/Server/server/work/WorkThread.java new file mode 100644 index 0000000..064b9c2 --- /dev/null +++ b/version1/src/main/java/part1/Server/server/work/WorkThread.java @@ -0,0 +1,58 @@ +package part1.Server.server.work; + + +import lombok.AllArgsConstructor; +import part1.Server.provider.ServiceProvider; +import part1.common.Message.RpcRequest; +import part1.common.Message.RpcResponse; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/14 17:39 + */ +@AllArgsConstructor +public class WorkThread implements Runnable{ + private Socket socket; + private ServiceProvider serviceProvide; + @Override + public void run() { + try { + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + //读取客户端传过来的request + RpcRequest rpcRequest = (RpcRequest) ois.readObject(); + //反射调用服务方法获取返回值 + RpcResponse rpcResponse=getResponse(rpcRequest); + //向客户端写入response + oos.writeObject(rpcResponse); + oos.flush(); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + private RpcResponse getResponse(RpcRequest rpcRequest){ + //得到服务名 + String interfaceName=rpcRequest.getInterfaceName(); + //得到服务端相应服务实现类 + Object service = serviceProvide.getService(interfaceName); + //反射调用方法 + Method method=null; + try { + method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType()); + Object invoke=method.invoke(service,rpcRequest.getParams()); + return RpcResponse.sussess(invoke); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + System.out.println("方法执行错误"); + return RpcResponse.fail(); + } + } +} diff --git a/version1/src/main/java/part1/common/Message/RpcRequest.java b/version1/src/main/java/part1/common/Message/RpcRequest.java new file mode 100644 index 0000000..0d8f171 --- /dev/null +++ b/version1/src/main/java/part1/common/Message/RpcRequest.java @@ -0,0 +1,25 @@ +package part1.common.Message; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 18:30 + * 定义发送的消息格式 + */ +@Data +@Builder +public class RpcRequest implements Serializable { + //服务类名,客户端只知道接口 + private String interfaceName; + //调用的方法名 + private String methodName; + //参数列表 + private Object[] params; + //参数类型 + private Class[] paramsType; +} diff --git a/version1/src/main/java/part1/common/Message/RpcResponse.java b/version1/src/main/java/part1/common/Message/RpcResponse.java new file mode 100644 index 0000000..906ee1c --- /dev/null +++ b/version1/src/main/java/part1/common/Message/RpcResponse.java @@ -0,0 +1,30 @@ +package part1.common.Message; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 19:18 + */ +@Data +@Builder +public class RpcResponse implements Serializable { + //状态信息 + private int code; + private String message; + //具体数据 + private Object data; + //构造成功信息 + public static RpcResponse sussess(Object data){ + return RpcResponse.builder().code(200).data(data).build(); + } + //构造失败信息 + public static RpcResponse fail(){ + return RpcResponse.builder().code(500).message("服务器发生错误").build(); + } +} + diff --git a/version1/src/main/java/part1/common/pojo/User.java b/version1/src/main/java/part1/common/pojo/User.java new file mode 100644 index 0000000..d7cef8c --- /dev/null +++ b/version1/src/main/java/part1/common/pojo/User.java @@ -0,0 +1,25 @@ +package part1.common.pojo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 17:50 + */ +@Builder +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User implements Serializable { + // 客户端和服务端共有的 + private Integer id; + private String userName; + private Boolean sex; +} + diff --git a/version1/src/main/java/part1/common/service/Impl/UserServiceImpl.java b/version1/src/main/java/part1/common/service/Impl/UserServiceImpl.java new file mode 100644 index 0000000..373243b --- /dev/null +++ b/version1/src/main/java/part1/common/service/Impl/UserServiceImpl.java @@ -0,0 +1,32 @@ +package part1.common.service.Impl; + + +import part1.common.pojo.User; +import part1.common.service.UserService; + +import java.util.Random; +import java.util.UUID; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:28 + */ +public class UserServiceImpl implements UserService { + @Override + public User getUserByUserId(Integer id) { + System.out.println("客户端查询了"+id+"的用户"); + // 模拟从数据库中取用户的行为 + Random random = new Random(); + User user = User.builder().userName(UUID.randomUUID().toString()) + .id(id) + .sex(random.nextBoolean()).build(); + return user; + } + + @Override + public Integer insertUserId(User user) { + System.out.println("插入数据成功"+user.getUserName()); + return user.getId(); + } +} \ No newline at end of file diff --git a/version1/src/main/java/part1/common/service/UserService.java b/version1/src/main/java/part1/common/service/UserService.java new file mode 100644 index 0000000..0e539bd --- /dev/null +++ b/version1/src/main/java/part1/common/service/UserService.java @@ -0,0 +1,16 @@ +package part1.common.service; + + +import part1.common.pojo.User; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:27 + */ +public interface UserService { + // 客户端通过这个接口调用服务端的实现类 + User getUserByUserId(Integer id); + //新增一个功能 + Integer insertUserId(User user); +} diff --git a/version1/src/main/java/part2/Client/IOClient.java b/version1/src/main/java/part2/Client/IOClient.java new file mode 100644 index 0000000..066736a --- /dev/null +++ b/version1/src/main/java/part2/Client/IOClient.java @@ -0,0 +1,36 @@ +package part2.Client; + + + +import part2.common.Message.RpcRequest; +import part2.common.Message.RpcResponse; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/4 18:31 + */ +public class IOClient { + //这里负责底层与服务端的通信,发送request,返回response + public static RpcResponse sendRequest(String host, int port, RpcRequest request){ + try { + Socket socket=new Socket(host, port); + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + + oos.writeObject(request); + oos.flush(); + + RpcResponse response=(RpcResponse) ois.readObject(); + return response; + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/version1/src/main/java/part2/Client/TestClient.java b/version1/src/main/java/part2/Client/TestClient.java new file mode 100644 index 0000000..3ce5d38 --- /dev/null +++ b/version1/src/main/java/part2/Client/TestClient.java @@ -0,0 +1,26 @@ +package part2.Client; + + +import part2.Client.proxy.ClientProxy; +import part2.common.pojo.User; +import part2.common.service.UserService; + + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 18:39 + */ +public class TestClient { + public static void main(String[] args) { + ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999,0); + UserService proxy=clientProxy.getProxy(UserService.class); + + User user = proxy.getUserByUserId(1); + System.out.println("从服务端得到的user="+user.toString()); + + User u=User.builder().id(100).userName("wxx").sex(true).build(); + Integer id = proxy.insertUserId(u); + System.out.println("向服务端插入user的id"+id); + } +} diff --git a/version1/src/main/java/part2/Client/netty/handler/NettyClientHandler.java b/version1/src/main/java/part2/Client/netty/handler/NettyClientHandler.java new file mode 100644 index 0000000..12d5693 --- /dev/null +++ b/version1/src/main/java/part2/Client/netty/handler/NettyClientHandler.java @@ -0,0 +1,28 @@ +package part2.Client.netty.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.AttributeKey; +import part2.common.Message.RpcResponse; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 17:29 + */ +public class NettyClientHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { + // 接收到response, 给channel设计别名,让sendRequest里读取response + AttributeKey key = AttributeKey.valueOf("RPCResponse"); + ctx.channel().attr(key).set(response); + ctx.channel().close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + //异常处理 + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/version1/src/main/java/part2/Client/netty/nettyInitializer/NettyClientInitializer.java b/version1/src/main/java/part2/Client/netty/nettyInitializer/NettyClientInitializer.java new file mode 100644 index 0000000..5bec33d --- /dev/null +++ b/version1/src/main/java/part2/Client/netty/nettyInitializer/NettyClientInitializer.java @@ -0,0 +1,43 @@ +package part2.Client.netty.nettyInitializer; + + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.serialization.ClassResolver; +import io.netty.handler.codec.serialization.ObjectDecoder; +import io.netty.handler.codec.serialization.ObjectEncoder; +import part2.Client.netty.handler.NettyClientHandler; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 17:26 + */ +public class NettyClientInitializer extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + //消息格式 【长度】【消息体】,解决沾包问题 + pipeline.addLast( + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); + //计算当前待发送消息的长度,写入到前4个字节中 + pipeline.addLast(new LengthFieldPrepender(4)); + //编码器 + //使用Java序列化方式,netty的自带的解码编码支持传输这种结构 + pipeline.addLast(new ObjectEncoder()); + //解码器 + //使用了Netty中的ObjectDecoder,它用于将字节流解码为 Java 对象。 + //在ObjectDecoder的构造函数中传入了一个ClassResolver 对象,用于解析类名并加载相应的类。 + pipeline.addLast(new ObjectDecoder(new ClassResolver() { + @Override + public Class resolve(String className) throws ClassNotFoundException { + return Class.forName(className); + } + })); + + pipeline.addLast(new NettyClientHandler()); + } +} diff --git a/version1/src/main/java/part2/Client/proxy/ClientProxy.java b/version1/src/main/java/part2/Client/proxy/ClientProxy.java new file mode 100644 index 0000000..a6b5a4e --- /dev/null +++ b/version1/src/main/java/part2/Client/proxy/ClientProxy.java @@ -0,0 +1,53 @@ +package part2.Client.proxy; + + +import lombok.AllArgsConstructor; +import part2.common.Message.RpcRequest; +import part2.common.Message.RpcResponse; +import part2.Client.IOClient; +import part2.Client.rpcClient.RpcClient; +import part2.Client.rpcClient.impl.NettyRpcClient; +import part2.Client.rpcClient.impl.SimpleSocketRpcCilent; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 16:49 + */ +public class ClientProxy implements InvocationHandler { + //传入参数service接口的class对象,反射封装成一个request + + private RpcClient rpcClient; + public ClientProxy(String host,int port,int choose){ + switch (choose){ + case 0: + rpcClient=new NettyRpcClient(host,port); + break; + case 1: + rpcClient=new SimpleSocketRpcCilent(host,port); + } + } + public ClientProxy(String host,int port){ + rpcClient=new NettyRpcClient(host,port); + } + //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端) + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + //构建request + RpcRequest request=RpcRequest.builder() + .interfaceName(method.getDeclaringClass().getName()) + .methodName(method.getName()) + .params(args).paramsType(method.getParameterTypes()).build(); + //数据传输 + RpcResponse response= rpcClient.sendRequest(request); + return response.getData(); + } + public T getProxy(Class clazz){ + Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); + return (T)o; + } +} diff --git a/version1/src/main/java/part2/Client/rpcClient/RpcClient.java b/version1/src/main/java/part2/Client/rpcClient/RpcClient.java new file mode 100644 index 0000000..21690b1 --- /dev/null +++ b/version1/src/main/java/part2/Client/rpcClient/RpcClient.java @@ -0,0 +1,15 @@ +package part2.Client.rpcClient; + +import part2.common.Message.RpcRequest; +import part2.common.Message.RpcResponse; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 18:55 + */ +public interface RpcClient { + + //定义底层通信的方法 + RpcResponse sendRequest(RpcRequest request); +} diff --git a/version1/src/main/java/part2/Client/rpcClient/impl/NettyRpcClient.java b/version1/src/main/java/part2/Client/rpcClient/impl/NettyRpcClient.java new file mode 100644 index 0000000..b3dab27 --- /dev/null +++ b/version1/src/main/java/part2/Client/rpcClient/impl/NettyRpcClient.java @@ -0,0 +1,62 @@ +package part2.Client.rpcClient.impl; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.AttributeKey; +import part2.Client.netty.nettyInitializer.NettyClientInitializer; +import part2.common.Message.RpcRequest; +import part2.common.Message.RpcResponse; +import part2.Client.rpcClient.RpcClient; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 19:40 + */ +public class NettyRpcClient implements RpcClient { + private String host; + private int port; + private static final Bootstrap bootstrap; + private static final EventLoopGroup eventLoopGroup; + public NettyRpcClient(String host,int port){ + this.host=host; + this.port=port; + } + //netty客户端初始化 + static { + eventLoopGroup = new NioEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) + //NettyClientInitializer这里 配置netty对消息的处理机制 + .handler(new NettyClientInitializer()); + } + @Override + public RpcResponse sendRequest(RpcRequest request) { + try { + //创建一个channelFuture对象,代表这一个操作事件,sync方法表示堵塞直到connect完成 + ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); + //channel表示一个连接的单位,类似socket + Channel channel = channelFuture.channel(); + // 发送数据 + channel.writeAndFlush(request); + //sync()堵塞获取结果 + channel.closeFuture().sync(); + // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置) + // AttributeKey是,线程隔离的,不会由线程安全问题。 + // 当前场景下选择堵塞获取结果 + // 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener... + AttributeKey key = AttributeKey.valueOf("RPCResponse"); + RpcResponse response = channel.attr(key).get(); + + System.out.println(response); + return response; + } catch (InterruptedException e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/version1/src/main/java/part2/Client/rpcClient/impl/SimpleSocketRpcCilent.java b/version1/src/main/java/part2/Client/rpcClient/impl/SimpleSocketRpcCilent.java new file mode 100644 index 0000000..7e9f09c --- /dev/null +++ b/version1/src/main/java/part2/Client/rpcClient/impl/SimpleSocketRpcCilent.java @@ -0,0 +1,41 @@ +package part2.Client.rpcClient.impl; + +import part2.common.Message.RpcRequest; +import part2.common.Message.RpcResponse; +import part2.Client.rpcClient.RpcClient; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 18:58 + */ +public class SimpleSocketRpcCilent implements RpcClient { + private String host; + private int port; + public SimpleSocketRpcCilent(String host,int port){ + this.host=host; + this.port=port; + } + @Override + public RpcResponse sendRequest(RpcRequest request) { + try { + Socket socket=new Socket(host, port); + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + + oos.writeObject(request); + oos.flush(); + + RpcResponse response=(RpcResponse) ois.readObject(); + return response; + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/version1/src/main/java/part2/Server/TestServer.java b/version1/src/main/java/part2/Server/TestServer.java new file mode 100644 index 0000000..a055f3e --- /dev/null +++ b/version1/src/main/java/part2/Server/TestServer.java @@ -0,0 +1,26 @@ +package part2.Server; + + +import part2.Server.server.impl.NettyRPCRPCServer; +import part2.common.service.Impl.UserServiceImpl; +import part2.common.service.UserService; +import part2.Server.provider.ServiceProvider; +import part2.Server.server.RpcServer; +import part2.Server.server.impl.SimpleRPCRPCServer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/11 19:39 + */ +public class TestServer { + public static void main(String[] args) { + UserService userService=new UserServiceImpl(); + + ServiceProvider serviceProvider=new ServiceProvider(); + serviceProvider.provideServiceInterface(userService); + + RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider); + rpcServer.start(9999); + } +} diff --git a/version1/src/main/java/part2/Server/netty/handler/NettyRPCServerHandler.java b/version1/src/main/java/part2/Server/netty/handler/NettyRPCServerHandler.java new file mode 100644 index 0000000..14948cc --- /dev/null +++ b/version1/src/main/java/part2/Server/netty/handler/NettyRPCServerHandler.java @@ -0,0 +1,52 @@ +package part2.Server.netty.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.AllArgsConstructor; +import part2.Server.provider.ServiceProvider; +import part2.common.Message.RpcRequest; +import part2.common.Message.RpcResponse; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 16:40 + * 因为是服务器端,我们知道接受到请求格式是RPCRequest + * Object类型也行,强制转型就行 + */ +@AllArgsConstructor +public class NettyRPCServerHandler extends SimpleChannelInboundHandler { + private ServiceProvider serviceProvider; + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { + //接收request,读取并调用服务 + RpcResponse response = getResponse(request); + ctx.writeAndFlush(response); + ctx.close(); + } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + private RpcResponse getResponse(RpcRequest rpcRequest){ + //得到服务名 + String interfaceName=rpcRequest.getInterfaceName(); + //得到服务端相应服务实现类 + Object service = serviceProvider.getService(interfaceName); + //反射调用方法 + Method method=null; + try { + method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType()); + Object invoke=method.invoke(service,rpcRequest.getParams()); + return RpcResponse.sussess(invoke); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + System.out.println("方法执行错误"); + return RpcResponse.fail(); + } + } +} diff --git a/version1/src/main/java/part2/Server/netty/nettyInitializer/NettyServerInitializer.java b/version1/src/main/java/part2/Server/netty/nettyInitializer/NettyServerInitializer.java new file mode 100644 index 0000000..254b04c --- /dev/null +++ b/version1/src/main/java/part2/Server/netty/nettyInitializer/NettyServerInitializer.java @@ -0,0 +1,46 @@ +package part2.Server.netty.nettyInitializer; + + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.serialization.ClassResolver; +import io.netty.handler.codec.serialization.ObjectDecoder; +import io.netty.handler.codec.serialization.ObjectEncoder; +import lombok.AllArgsConstructor; +import part2.Server.netty.handler.NettyRPCServerHandler; +import part2.Server.provider.ServiceProvider; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 16:15 + */ +@AllArgsConstructor +public class NettyServerInitializer extends ChannelInitializer { + private ServiceProvider serviceProvider; + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + //消息格式 【长度】【消息体】,解决沾包问题 + pipeline.addLast( + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); + //计算当前待发送消息的长度,写入到前4个字节中 + pipeline.addLast(new LengthFieldPrepender(4)); + + //使用Java序列化方式,netty的自带的解码编码支持传输这种结构 + pipeline.addLast(new ObjectEncoder()); + //使用了Netty中的ObjectDecoder,它用于将字节流解码为 Java 对象。 + //在ObjectDecoder的构造函数中传入了一个ClassResolver 对象,用于解析类名并加载相应的类。 + pipeline.addLast(new ObjectDecoder(new ClassResolver() { + @Override + public Class resolve(String className) throws ClassNotFoundException { + return Class.forName(className); + } + })); + + pipeline.addLast(new NettyRPCServerHandler(serviceProvider)); + } +} diff --git a/version1/src/main/java/part2/Server/provider/ServiceProvider.java b/version1/src/main/java/part2/Server/provider/ServiceProvider.java new file mode 100644 index 0000000..5cee41f --- /dev/null +++ b/version1/src/main/java/part2/Server/provider/ServiceProvider.java @@ -0,0 +1,32 @@ +package part2.Server.provider; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/16 17:35 + */ +public class ServiceProvider { + private Map interfaceProvider; + + public ServiceProvider(){ + this.interfaceProvider=new HashMap<>(); + } + + public void provideServiceInterface(Object service){ + String serviceName=service.getClass().getName(); + Class[] interfaceName=service.getClass().getInterfaces(); + + for (Class clazz:interfaceName){ + interfaceProvider.put(clazz.getName(),service); + } + + } + + public Object getService(String interfaceName){ + return interfaceProvider.get(interfaceName); + } + +} diff --git a/version1/src/main/java/part2/Server/server/RpcServer.java b/version1/src/main/java/part2/Server/server/RpcServer.java new file mode 100644 index 0000000..6abbf2a --- /dev/null +++ b/version1/src/main/java/part2/Server/server/RpcServer.java @@ -0,0 +1,11 @@ +package part2.Server.server; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:26 + */ +public interface RpcServer { + void start(int port); + void stop(); +} diff --git a/version1/src/main/java/part2/Server/server/impl/NettyRPCRPCServer.java b/version1/src/main/java/part2/Server/server/impl/NettyRPCRPCServer.java new file mode 100644 index 0000000..97632a6 --- /dev/null +++ b/version1/src/main/java/part2/Server/server/impl/NettyRPCRPCServer.java @@ -0,0 +1,49 @@ +package part2.Server.server.impl; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.AllArgsConstructor; +import part2.Server.netty.nettyInitializer.NettyServerInitializer; +import part2.Server.provider.ServiceProvider; +import part2.Server.server.RpcServer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 14:01 + */ +@AllArgsConstructor +public class NettyRPCRPCServer implements RpcServer { + private ServiceProvider serviceProvider; + @Override + public void start(int port) { + // netty 服务线程组boss负责建立连接, work负责具体的请求 + NioEventLoopGroup bossGroup = new NioEventLoopGroup(); + NioEventLoopGroup workGroup = new NioEventLoopGroup(); + System.out.println("netty服务端启动了"); + try { + //启动netty服务器 + ServerBootstrap serverBootstrap = new ServerBootstrap(); + //初始化 + serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) + //NettyClientInitializer这里 配置netty对消息的处理机制 + .childHandler(new NettyServerInitializer(serviceProvider)); + //同步堵塞 + ChannelFuture channelFuture=serverBootstrap.bind(port).sync(); + //死循环监听 + channelFuture.channel().closeFuture().sync(); + }catch (InterruptedException e){ + e.printStackTrace(); + }finally { + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version1/src/main/java/part2/Server/server/impl/SimpleRPCRPCServer.java b/version1/src/main/java/part2/Server/server/impl/SimpleRPCRPCServer.java new file mode 100644 index 0000000..8033bab --- /dev/null +++ b/version1/src/main/java/part2/Server/server/impl/SimpleRPCRPCServer.java @@ -0,0 +1,39 @@ +package part2.Server.server.impl; + + +import lombok.AllArgsConstructor; +import part2.Server.provider.ServiceProvider; +import part2.Server.server.RpcServer; +import part2.Server.server.work.WorkThread; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:37 + */ +@AllArgsConstructor +public class SimpleRPCRPCServer implements RpcServer { + private ServiceProvider serviceProvide; + @Override + public void start(int port) { + try { + ServerSocket serverSocket=new ServerSocket(port); + System.out.println("服务器启动了"); + while (true) { + Socket socket = serverSocket.accept(); + new Thread(new WorkThread(socket,serviceProvide)).start(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version1/src/main/java/part2/Server/server/impl/ThreadPoolRPCRPCServer.java b/version1/src/main/java/part2/Server/server/impl/ThreadPoolRPCRPCServer.java new file mode 100644 index 0000000..834c9a5 --- /dev/null +++ b/version1/src/main/java/part2/Server/server/impl/ThreadPoolRPCRPCServer.java @@ -0,0 +1,58 @@ +package part2.Server.server.impl; + + +import part2.Server.provider.ServiceProvider; +import part2.Server.server.RpcServer; +import part2.Server.server.work.WorkThread; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/19 15:30 + */ +public class ThreadPoolRPCRPCServer implements RpcServer { + private final ThreadPoolExecutor threadPool; + private ServiceProvider serviceProvider; + + public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider){ + threadPool=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + 1000,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100)); + this.serviceProvider= serviceProvider; + } + public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue){ + + threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + this.serviceProvider = serviceProvider; + } + + @Override + public void start(int port) { + System.out.println("服务端启动了"); + try { + ServerSocket serverSocket=new ServerSocket(); + while (true){ + Socket socket= serverSocket.accept(); + threadPool.execute(new WorkThread(socket,serviceProvider)); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version1/src/main/java/part2/Server/server/work/WorkThread.java b/version1/src/main/java/part2/Server/server/work/WorkThread.java new file mode 100644 index 0000000..a3985bf --- /dev/null +++ b/version1/src/main/java/part2/Server/server/work/WorkThread.java @@ -0,0 +1,58 @@ +package part2.Server.server.work; + + +import lombok.AllArgsConstructor; +import part2.Server.provider.ServiceProvider; +import part2.common.Message.RpcRequest; +import part2.common.Message.RpcResponse; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/14 17:39 + */ +@AllArgsConstructor +public class WorkThread implements Runnable{ + private Socket socket; + private ServiceProvider serviceProvide; + @Override + public void run() { + try { + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + //读取客户端传过来的request + RpcRequest rpcRequest = (RpcRequest) ois.readObject(); + //反射调用服务方法获取返回值 + RpcResponse rpcResponse=getResponse(rpcRequest); + //向客户端写入response + oos.writeObject(rpcResponse); + oos.flush(); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + private RpcResponse getResponse(RpcRequest rpcRequest){ + //得到服务名 + String interfaceName=rpcRequest.getInterfaceName(); + //得到服务端相应服务实现类 + Object service = serviceProvide.getService(interfaceName); + //反射调用方法 + Method method=null; + try { + method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType()); + Object invoke=method.invoke(service,rpcRequest.getParams()); + return RpcResponse.sussess(invoke); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + System.out.println("方法执行错误"); + return RpcResponse.fail(); + } + } +} diff --git a/version1/src/main/java/part2/common/Message/RpcRequest.java b/version1/src/main/java/part2/common/Message/RpcRequest.java new file mode 100644 index 0000000..f6f398d --- /dev/null +++ b/version1/src/main/java/part2/common/Message/RpcRequest.java @@ -0,0 +1,25 @@ +package part2.common.Message; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 18:30 + * 定义发送的消息格式 + */ +@Data +@Builder +public class RpcRequest implements Serializable { + //服务类名,客户端只知道接口 + private String interfaceName; + //调用的方法名 + private String methodName; + //参数列表 + private Object[] params; + //参数类型 + private Class[] paramsType; +} diff --git a/version1/src/main/java/part2/common/Message/RpcResponse.java b/version1/src/main/java/part2/common/Message/RpcResponse.java new file mode 100644 index 0000000..a681df8 --- /dev/null +++ b/version1/src/main/java/part2/common/Message/RpcResponse.java @@ -0,0 +1,29 @@ +package part2.common.Message; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 19:18 + */ +@Data +@Builder +public class RpcResponse implements Serializable { + //状态信息 + private int code; + private String message; + //具体数据 + private Object data; + + public static RpcResponse sussess(Object data){ + return RpcResponse.builder().code(200).data(data).build(); + } + public static RpcResponse fail(){ + return RpcResponse.builder().code(500).message("服务器发生错误").build(); + } +} + diff --git a/version1/src/main/java/part2/common/pojo/User.java b/version1/src/main/java/part2/common/pojo/User.java new file mode 100644 index 0000000..bf999d7 --- /dev/null +++ b/version1/src/main/java/part2/common/pojo/User.java @@ -0,0 +1,25 @@ +package part2.common.pojo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 17:50 + */ +@Builder +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User implements Serializable { + // 客户端和服务端共有的 + private Integer id; + private String userName; + private Boolean sex; +} + diff --git a/version1/src/main/java/part2/common/service/Impl/UserServiceImpl.java b/version1/src/main/java/part2/common/service/Impl/UserServiceImpl.java new file mode 100644 index 0000000..37e6262 --- /dev/null +++ b/version1/src/main/java/part2/common/service/Impl/UserServiceImpl.java @@ -0,0 +1,32 @@ +package part2.common.service.Impl; + + +import part2.common.pojo.User; +import part2.common.service.UserService; + +import java.util.Random; +import java.util.UUID; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:28 + */ +public class UserServiceImpl implements UserService { + @Override + public User getUserByUserId(Integer id) { + System.out.println("客户端查询了"+id+"的用户"); + // 模拟从数据库中取用户的行为 + Random random = new Random(); + User user = User.builder().userName(UUID.randomUUID().toString()) + .id(id) + .sex(random.nextBoolean()).build(); + return user; + } + + @Override + public Integer insertUserId(User user) { + System.out.println("插入数据成功"+user.getUserName()); + return user.getId(); + } +} \ No newline at end of file diff --git a/version1/src/main/java/part2/common/service/UserService.java b/version1/src/main/java/part2/common/service/UserService.java new file mode 100644 index 0000000..b6817fe --- /dev/null +++ b/version1/src/main/java/part2/common/service/UserService.java @@ -0,0 +1,16 @@ +package part2.common.service; + + +import part2.common.pojo.User; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:27 + */ +public interface UserService { + // 客户端通过这个接口调用服务端的实现类 + User getUserByUserId(Integer id); + //新增一个功能 + Integer insertUserId(User user); +} diff --git a/version1/src/main/java/part3/Client/TestClient.java b/version1/src/main/java/part3/Client/TestClient.java new file mode 100644 index 0000000..112bcc3 --- /dev/null +++ b/version1/src/main/java/part3/Client/TestClient.java @@ -0,0 +1,27 @@ +package part3.Client; + + +import part3.common.pojo.User; +import part3.common.service.UserService; +import part3.Client.proxy.ClientProxy; + + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 18:39 + */ +public class TestClient { + public static void main(String[] args) { + ClientProxy clientProxy=new ClientProxy(); + //ClientProxy clientProxy=new part2.Client.proxy.ClientProxy("127.0.0.1",9999,0); + UserService proxy=clientProxy.getProxy(UserService.class); + + User user = proxy.getUserByUserId(1); + System.out.println("从服务端得到的user="+user.toString()); + + User u=User.builder().id(100).userName("wxx").sex(true).build(); + Integer id = proxy.insertUserId(u); + System.out.println("向服务端插入user的id"+id); + } +} diff --git a/version1/src/main/java/part3/Client/netty/handler/NettyClientHandler.java b/version1/src/main/java/part3/Client/netty/handler/NettyClientHandler.java new file mode 100644 index 0000000..28f173e --- /dev/null +++ b/version1/src/main/java/part3/Client/netty/handler/NettyClientHandler.java @@ -0,0 +1,27 @@ +package part3.Client.netty.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.AttributeKey; +import part3.common.Message.RpcResponse; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 17:29 + */ +public class NettyClientHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { + // 接收到response, 给channel设计别名,让sendRequest里读取response + AttributeKey key = AttributeKey.valueOf("RPCResponse"); + ctx.channel().attr(key).set(response); + ctx.channel().close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/version1/src/main/java/part3/Client/netty/nettyInitializer/NettyClientInitializer.java b/version1/src/main/java/part3/Client/netty/nettyInitializer/NettyClientInitializer.java new file mode 100644 index 0000000..bca4a4c --- /dev/null +++ b/version1/src/main/java/part3/Client/netty/nettyInitializer/NettyClientInitializer.java @@ -0,0 +1,42 @@ +package part3.Client.netty.nettyInitializer; + + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.serialization.ClassResolver; +import io.netty.handler.codec.serialization.ObjectDecoder; +import io.netty.handler.codec.serialization.ObjectEncoder; +import part3.Client.netty.handler.NettyClientHandler; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 17:26 + */ +public class NettyClientInitializer extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + //消息格式 【长度】【消息体】,解决沾包问题 + pipeline.addLast( + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); + //计算当前待发送消息的长度,写入到前4个字节中 + pipeline.addLast(new LengthFieldPrepender(4)); + + //使用Java序列化方式,netty的自带的解码编码支持传输这种结构 + pipeline.addLast(new ObjectEncoder()); + //使用了Netty中的ObjectDecoder,它用于将字节流解码为 Java 对象。 + //在ObjectDecoder的构造函数中传入了一个ClassResolver 对象,用于解析类名并加载相应的类。 + pipeline.addLast(new ObjectDecoder(new ClassResolver() { + @Override + public Class resolve(String className) throws ClassNotFoundException { + return Class.forName(className); + } + })); + + pipeline.addLast(new NettyClientHandler()); + } +} diff --git a/version1/src/main/java/part3/Client/proxy/ClientProxy.java b/version1/src/main/java/part3/Client/proxy/ClientProxy.java new file mode 100644 index 0000000..8cb957e --- /dev/null +++ b/version1/src/main/java/part3/Client/proxy/ClientProxy.java @@ -0,0 +1,43 @@ +package part3.Client.proxy; + + +import part3.common.Message.RpcRequest; +import part3.common.Message.RpcResponse; +import part3.Client.rpcClient.RpcClient; +import part3.Client.rpcClient.impl.NettyRpcClient; +import part3.Client.rpcClient.impl.SimpleSocketRpcCilent; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/6 16:49 + */ +public class ClientProxy implements InvocationHandler { + //传入参数service接口的class对象,反射封装成一个request + + private RpcClient rpcClient; + public ClientProxy(){ + rpcClient=new NettyRpcClient(); + } + + //jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端) + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + //构建request + RpcRequest request=RpcRequest.builder() + .interfaceName(method.getDeclaringClass().getName()) + .methodName(method.getName()) + .params(args).paramsType(method.getParameterTypes()).build(); + //数据传输 + RpcResponse response= rpcClient.sendRequest(request); + return response.getData(); + } + public T getProxy(Class clazz){ + Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); + return (T)o; + } +} diff --git a/version1/src/main/java/part3/Client/rpcClient/RpcClient.java b/version1/src/main/java/part3/Client/rpcClient/RpcClient.java new file mode 100644 index 0000000..44b702b --- /dev/null +++ b/version1/src/main/java/part3/Client/rpcClient/RpcClient.java @@ -0,0 +1,15 @@ +package part3.Client.rpcClient; + +import part3.common.Message.RpcRequest; +import part3.common.Message.RpcResponse; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 18:55 + */ +public interface RpcClient { + + //定义底层通信的方法 + RpcResponse sendRequest(RpcRequest request); +} diff --git a/version1/src/main/java/part3/Client/rpcClient/impl/NettyRpcClient.java b/version1/src/main/java/part3/Client/rpcClient/impl/NettyRpcClient.java new file mode 100644 index 0000000..a4fc493 --- /dev/null +++ b/version1/src/main/java/part3/Client/rpcClient/impl/NettyRpcClient.java @@ -0,0 +1,68 @@ +package part3.Client.rpcClient.impl; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.AttributeKey; +import part3.Client.serviceCenter.ServiceCenter; +import part3.Client.serviceCenter.ZKServiceCenter; +import part3.common.Message.RpcRequest; +import part3.common.Message.RpcResponse; +import part3.Client.netty.nettyInitializer.NettyClientInitializer; +import part3.Client.rpcClient.RpcClient; + +import java.net.InetSocketAddress; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 19:40 + */ +public class NettyRpcClient implements RpcClient { + + private static final Bootstrap bootstrap; + private static final EventLoopGroup eventLoopGroup; + + private ServiceCenter serviceCenter; + public NettyRpcClient(){ + this.serviceCenter=new ZKServiceCenter(); + } + + //netty客户端初始化 + static { + eventLoopGroup = new NioEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) + .handler(new NettyClientInitializer()); + } + @Override + public RpcResponse sendRequest(RpcRequest request) { + //从注册中心获取host,post + InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName()); + String host = address.getHostName(); + int port = address.getPort(); + try { + ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); + Channel channel = channelFuture.channel(); + // 发送数据 + channel.writeAndFlush(request); + //sync()堵塞获取结果 + channel.closeFuture().sync(); + // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置) + // AttributeKey是,线程隔离的,不会由线程安全问题。 + // 当前场景下选择堵塞获取结果 + // 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener... + AttributeKey key = AttributeKey.valueOf("RPCResponse"); + RpcResponse response = channel.attr(key).get(); + + System.out.println(response); + return response; + } catch (InterruptedException e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/version1/src/main/java/part3/Client/rpcClient/impl/SimpleSocketRpcCilent.java b/version1/src/main/java/part3/Client/rpcClient/impl/SimpleSocketRpcCilent.java new file mode 100644 index 0000000..b2f6af3 --- /dev/null +++ b/version1/src/main/java/part3/Client/rpcClient/impl/SimpleSocketRpcCilent.java @@ -0,0 +1,41 @@ +package part3.Client.rpcClient.impl; + +import part3.common.Message.RpcRequest; +import part3.common.Message.RpcResponse; +import part3.Client.rpcClient.RpcClient; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/2 18:58 + */ +public class SimpleSocketRpcCilent implements RpcClient { + private String host; + private int port; + public SimpleSocketRpcCilent(String host,int port){ + this.host=host; + this.port=port; + } + @Override + public RpcResponse sendRequest(RpcRequest request) { + try { + Socket socket=new Socket(host, port); + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + + oos.writeObject(request); + oos.flush(); + + RpcResponse response=(RpcResponse) ois.readObject(); + return response; + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/version1/src/main/java/part3/Client/serviceCenter/ServiceCenter.java b/version1/src/main/java/part3/Client/serviceCenter/ServiceCenter.java new file mode 100644 index 0000000..1b2c02e --- /dev/null +++ b/version1/src/main/java/part3/Client/serviceCenter/ServiceCenter.java @@ -0,0 +1,14 @@ +package part3.Client.serviceCenter; + +import java.net.InetSocketAddress; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 21:42 + */ +//服务中心接口 +public interface ServiceCenter { + // 查询:根据服务名查找地址 + InetSocketAddress serviceDiscovery(String serviceName); +} diff --git a/version1/src/main/java/part3/Client/serviceCenter/ZKServiceCenter.java b/version1/src/main/java/part3/Client/serviceCenter/ZKServiceCenter.java new file mode 100644 index 0000000..a23ff1c --- /dev/null +++ b/version1/src/main/java/part3/Client/serviceCenter/ZKServiceCenter.java @@ -0,0 +1,59 @@ +package part3.Client.serviceCenter; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.net.InetSocketAddress; +import java.util.List; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 21:41 + */ +public class ZKServiceCenter implements ServiceCenter{ + // curator 提供的zookeeper客户端 + private CuratorFramework client; + //zookeeper根路径节点 + private static final String ROOT_PATH = "MyRPC"; + + //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接 + public ZKServiceCenter(){ + // 指数时间重试 + RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); + // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接 + // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系, + // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍 + // 使用心跳监听状态 + this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") + .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build(); + this.client.start(); + System.out.println("zookeeper 连接成功"); + } + //根据服务名(接口名)返回地址 + @Override + public InetSocketAddress serviceDiscovery(String serviceName) { + try { + List strings = client.getChildren().forPath("/" + serviceName); + // 这里默认用的第一个,后面加负载均衡 + String string = strings.get(0); + return parseAddress(string); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + // 地址 -> XXX.XXX.XXX.XXX:port 字符串 + private String getServiceAddress(InetSocketAddress serverAddress) { + return serverAddress.getHostName() + + ":" + + serverAddress.getPort(); + } + // 字符串解析为地址 + private InetSocketAddress parseAddress(String address) { + String[] result = address.split(":"); + return new InetSocketAddress(result[0], Integer.parseInt(result[1])); + } +} diff --git a/version1/src/main/java/part3/Server/TestServer.java b/version1/src/main/java/part3/Server/TestServer.java new file mode 100644 index 0000000..63a8ef4 --- /dev/null +++ b/version1/src/main/java/part3/Server/TestServer.java @@ -0,0 +1,26 @@ +package part3.Server; + + +import part3.Server.server.impl.NettyRPCRPCServer; +import part3.common.service.Impl.UserServiceImpl; +import part3.common.service.UserService; +import part3.Server.provider.ServiceProvider; +import part3.Server.server.RpcServer; +import part3.Server.server.impl.SimpleRPCRPCServer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/11 19:39 + */ +public class TestServer { + public static void main(String[] args) { + UserService userService=new UserServiceImpl(); + + ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999); + serviceProvider.provideServiceInterface(userService); + + RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider); + rpcServer.start(9999); + } +} diff --git a/version1/src/main/java/part3/Server/netty/handler/NettyRPCServerHandler.java b/version1/src/main/java/part3/Server/netty/handler/NettyRPCServerHandler.java new file mode 100644 index 0000000..aea757f --- /dev/null +++ b/version1/src/main/java/part3/Server/netty/handler/NettyRPCServerHandler.java @@ -0,0 +1,51 @@ +package part3.Server.netty.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.AllArgsConstructor; +import part3.Server.provider.ServiceProvider; +import part3.common.Message.RpcRequest; +import part3.common.Message.RpcResponse; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 16:40 + * 因为是服务器端,我们知道接受到请求格式是RPCRequest + * Object类型也行,强制转型就行 + */ +@AllArgsConstructor +public class NettyRPCServerHandler extends SimpleChannelInboundHandler { + private ServiceProvider serviceProvider; + @Override + protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { + RpcResponse response = getResponse(request); + ctx.writeAndFlush(response); + ctx.close(); + } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + private RpcResponse getResponse(RpcRequest rpcRequest){ + //得到服务名 + String interfaceName=rpcRequest.getInterfaceName(); + //得到服务端相应服务实现类 + Object service = serviceProvider.getService(interfaceName); + //反射调用方法 + Method method=null; + try { + method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType()); + Object invoke=method.invoke(service,rpcRequest.getParams()); + return RpcResponse.sussess(invoke); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + System.out.println("方法执行错误"); + return RpcResponse.fail(); + } + } +} diff --git a/version1/src/main/java/part3/Server/netty/nettyInitializer/NettyServerInitializer.java b/version1/src/main/java/part3/Server/netty/nettyInitializer/NettyServerInitializer.java new file mode 100644 index 0000000..aff1fdb --- /dev/null +++ b/version1/src/main/java/part3/Server/netty/nettyInitializer/NettyServerInitializer.java @@ -0,0 +1,46 @@ +package part3.Server.netty.nettyInitializer; + + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.serialization.ClassResolver; +import io.netty.handler.codec.serialization.ObjectDecoder; +import io.netty.handler.codec.serialization.ObjectEncoder; +import lombok.AllArgsConstructor; +import part3.Server.provider.ServiceProvider; +import part3.Server.netty.handler.NettyRPCServerHandler; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 16:15 + */ +@AllArgsConstructor +public class NettyServerInitializer extends ChannelInitializer { + private ServiceProvider serviceProvider; + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + //消息格式 【长度】【消息体】,解决沾包问题 + pipeline.addLast( + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); + //计算当前待发送消息的长度,写入到前4个字节中 + pipeline.addLast(new LengthFieldPrepender(4)); + + //使用Java序列化方式,netty的自带的解码编码支持传输这种结构 + pipeline.addLast(new ObjectEncoder()); + //使用了Netty中的ObjectDecoder,它用于将字节流解码为 Java 对象。 + //在ObjectDecoder的构造函数中传入了一个ClassResolver 对象,用于解析类名并加载相应的类。 + pipeline.addLast(new ObjectDecoder(new ClassResolver() { + @Override + public Class resolve(String className) throws ClassNotFoundException { + return Class.forName(className); + } + })); + + pipeline.addLast(new NettyRPCServerHandler(serviceProvider)); + } +} diff --git a/version1/src/main/java/part3/Server/provider/ServiceProvider.java b/version1/src/main/java/part3/Server/provider/ServiceProvider.java new file mode 100644 index 0000000..fa39e0a --- /dev/null +++ b/version1/src/main/java/part3/Server/provider/ServiceProvider.java @@ -0,0 +1,47 @@ +package part3.Server.provider; + +import part3.Server.serviceRegister.ServiceRegister; +import part3.Server.serviceRegister.impl.ZKServiceRegister; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/16 17:35 + */ +public class ServiceProvider { + private Map interfaceProvider; + + private int port; + private String host; + //注册服务类 + private ServiceRegister serviceRegister; + + public ServiceProvider(String host,int port){ + //需要传入服务端自身的网络地址 + this.host=host; + this.port=port; + this.interfaceProvider=new HashMap<>(); + this.serviceRegister=new ZKServiceRegister(); + } + + public void provideServiceInterface(Object service){ + String serviceName=service.getClass().getName(); + Class[] interfaceName=service.getClass().getInterfaces(); + + for (Class clazz:interfaceName){ + //本机的映射表 + interfaceProvider.put(clazz.getName(),service); + //在注册中心注册服务 + serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port)); + } + } + + public Object getService(String interfaceName){ + return interfaceProvider.get(interfaceName); + } + +} diff --git a/version1/src/main/java/part3/Server/server/RpcServer.java b/version1/src/main/java/part3/Server/server/RpcServer.java new file mode 100644 index 0000000..d0a0fa5 --- /dev/null +++ b/version1/src/main/java/part3/Server/server/RpcServer.java @@ -0,0 +1,11 @@ +package part3.Server.server; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:26 + */ +public interface RpcServer { + void start(int port); + void stop(); +} diff --git a/version1/src/main/java/part3/Server/server/impl/NettyRPCRPCServer.java b/version1/src/main/java/part3/Server/server/impl/NettyRPCRPCServer.java new file mode 100644 index 0000000..2e4a34b --- /dev/null +++ b/version1/src/main/java/part3/Server/server/impl/NettyRPCRPCServer.java @@ -0,0 +1,48 @@ +package part3.Server.server.impl; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.AllArgsConstructor; +import part3.Server.netty.nettyInitializer.NettyServerInitializer; +import part3.Server.provider.ServiceProvider; +import part3.Server.server.RpcServer; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/26 14:01 + */ +@AllArgsConstructor +public class NettyRPCRPCServer implements RpcServer { + private ServiceProvider serviceProvider; + @Override + public void start(int port) { + // netty 服务线程组boss负责建立连接, work负责具体的请求 + NioEventLoopGroup bossGroup = new NioEventLoopGroup(); + NioEventLoopGroup workGroup = new NioEventLoopGroup(); + System.out.println("netty服务端启动了"); + try { + //启动netty服务器 + ServerBootstrap serverBootstrap = new ServerBootstrap(); + //初始化 + serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) + .childHandler(new NettyServerInitializer(serviceProvider)); + //同步堵塞 + ChannelFuture channelFuture=serverBootstrap.bind(port).sync(); + //死循环监听 + channelFuture.channel().closeFuture().sync(); + }catch (InterruptedException e){ + e.printStackTrace(); + }finally { + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version1/src/main/java/part3/Server/server/impl/SimpleRPCRPCServer.java b/version1/src/main/java/part3/Server/server/impl/SimpleRPCRPCServer.java new file mode 100644 index 0000000..f77262a --- /dev/null +++ b/version1/src/main/java/part3/Server/server/impl/SimpleRPCRPCServer.java @@ -0,0 +1,39 @@ +package part3.Server.server.impl; + + +import lombok.AllArgsConstructor; +import part3.Server.provider.ServiceProvider; +import part3.Server.server.RpcServer; +import part3.Server.server.work.WorkThread; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/12 11:37 + */ +@AllArgsConstructor +public class SimpleRPCRPCServer implements RpcServer { + private ServiceProvider serviceProvide; + @Override + public void start(int port) { + try { + ServerSocket serverSocket=new ServerSocket(port); + System.out.println("服务器启动了"); + while (true) { + Socket socket = serverSocket.accept(); + new Thread(new WorkThread(socket,serviceProvide)).start(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + + } +} diff --git a/version1/src/main/java/part3/Server/server/work/WorkThread.java b/version1/src/main/java/part3/Server/server/work/WorkThread.java new file mode 100644 index 0000000..32eb732 --- /dev/null +++ b/version1/src/main/java/part3/Server/server/work/WorkThread.java @@ -0,0 +1,58 @@ +package part3.Server.server.work; + + +import lombok.AllArgsConstructor; +import part3.common.Message.RpcRequest; +import part3.common.Message.RpcResponse; +import part3.Server.provider.ServiceProvider; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.Socket; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/14 17:39 + */ +@AllArgsConstructor +public class WorkThread implements Runnable{ + private Socket socket; + private ServiceProvider serviceProvide; + @Override + public void run() { + try { + ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream()); + ObjectInputStream ois=new ObjectInputStream(socket.getInputStream()); + //读取客户端传过来的request + RpcRequest rpcRequest = (RpcRequest) ois.readObject(); + //反射调用服务方法获取返回值 + RpcResponse rpcResponse=getResponse(rpcRequest); + //向客户端写入response + oos.writeObject(rpcResponse); + oos.flush(); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + } + } + private RpcResponse getResponse(RpcRequest rpcRequest){ + //得到服务名 + String interfaceName=rpcRequest.getInterfaceName(); + //得到服务端相应服务实现类 + Object service = serviceProvide.getService(interfaceName); + //反射调用方法 + Method method=null; + try { + method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType()); + Object invoke=method.invoke(service,rpcRequest.getParams()); + return RpcResponse.sussess(invoke); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + System.out.println("方法执行错误"); + return RpcResponse.fail(); + } + } +} diff --git a/version1/src/main/java/part3/Server/serviceRegister/ServiceRegister.java b/version1/src/main/java/part3/Server/serviceRegister/ServiceRegister.java new file mode 100644 index 0000000..65c5be3 --- /dev/null +++ b/version1/src/main/java/part3/Server/serviceRegister/ServiceRegister.java @@ -0,0 +1,15 @@ +package part3.Server.serviceRegister; + +import java.net.InetSocketAddress; + +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 16:58 + */ +// 服务注册接口 +public interface ServiceRegister { + // 注册:保存服务与地址。 + void register(String serviceName, InetSocketAddress serviceAddress); + +} diff --git a/version1/src/main/java/part3/Server/serviceRegister/impl/ZKServiceRegister.java b/version1/src/main/java/part3/Server/serviceRegister/impl/ZKServiceRegister.java new file mode 100644 index 0000000..9fe332a --- /dev/null +++ b/version1/src/main/java/part3/Server/serviceRegister/impl/ZKServiceRegister.java @@ -0,0 +1,62 @@ +package part3.Server.serviceRegister.impl; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; +import part3.Server.serviceRegister.ServiceRegister; + +import java.net.InetSocketAddress; +/** + * @author wxx + * @version 1.0 + * @create 2024/5/3 17:28 + */ +public class ZKServiceRegister implements ServiceRegister { + // curator 提供的zookeeper客户端 + private CuratorFramework client; + //zookeeper根路径节点 + private static final String ROOT_PATH = "MyRPC"; + + //负责zookeeper客户端的初始化,并与zookeeper服务端进行连接 + public ZKServiceRegister(){ + // 指数时间重试 + RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); + // zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接 + // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系, + // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍 + // 使用心跳监听状态 + this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") + .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build(); + this.client.start(); + System.out.println("zookeeper 连接成功"); + } + //注册服务到注册中心 + @Override + public void register(String serviceName, InetSocketAddress serviceAddress) { + try { + // serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址 + if(client.checkExists().forPath("/" + serviceName) == null){ + client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName); + } + // 路径地址,一个/代表一个节点 + String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress); + // 临时节点,服务器下线就删除节点 + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); + } catch (Exception e) { + System.out.println("此服务已存在"); + } + } + // 地址 -> XXX.XXX.XXX.XXX:port 字符串 + private String getServiceAddress(InetSocketAddress serverAddress) { + return serverAddress.getHostName() + + ":" + + serverAddress.getPort(); + } + // 字符串解析为地址 + private InetSocketAddress parseAddress(String address) { + String[] result = address.split(":"); + return new InetSocketAddress(result[0], Integer.parseInt(result[1])); + } +} diff --git a/version1/src/main/java/part3/common/Message/RpcRequest.java b/version1/src/main/java/part3/common/Message/RpcRequest.java new file mode 100644 index 0000000..63ba31f --- /dev/null +++ b/version1/src/main/java/part3/common/Message/RpcRequest.java @@ -0,0 +1,25 @@ +package part3.common.Message; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 18:30 + * 定义发送的消息格式 + */ +@Data +@Builder +public class RpcRequest implements Serializable { + //服务类名,客户端只知道接口 + private String interfaceName; + //调用的方法名 + private String methodName; + //参数列表 + private Object[] params; + //参数类型 + private Class[] paramsType; +} diff --git a/version1/src/main/java/part3/common/Message/RpcResponse.java b/version1/src/main/java/part3/common/Message/RpcResponse.java new file mode 100644 index 0000000..537b630 --- /dev/null +++ b/version1/src/main/java/part3/common/Message/RpcResponse.java @@ -0,0 +1,29 @@ +package part3.common.Message; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/2/1 19:18 + */ +@Data +@Builder +public class RpcResponse implements Serializable { + //状态信息 + private int code; + private String message; + //具体数据 + private Object data; + + public static RpcResponse sussess(Object data){ + return RpcResponse.builder().code(200).data(data).build(); + } + public static RpcResponse fail(){ + return RpcResponse.builder().code(500).message("服务器发生错误").build(); + } +} + diff --git a/version1/src/main/java/part3/common/pojo/User.java b/version1/src/main/java/part3/common/pojo/User.java new file mode 100644 index 0000000..46a0a87 --- /dev/null +++ b/version1/src/main/java/part3/common/pojo/User.java @@ -0,0 +1,25 @@ +package part3.common.pojo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 17:50 + */ +@Builder +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User implements Serializable { + // 客户端和服务端共有的 + private Integer id; + private String userName; + private Boolean sex; +} + diff --git a/version1/src/main/java/part3/common/service/Impl/UserServiceImpl.java b/version1/src/main/java/part3/common/service/Impl/UserServiceImpl.java new file mode 100644 index 0000000..6fc44b2 --- /dev/null +++ b/version1/src/main/java/part3/common/service/Impl/UserServiceImpl.java @@ -0,0 +1,32 @@ +package part3.common.service.Impl; + + +import part3.common.pojo.User; +import part3.common.service.UserService; + +import java.util.Random; +import java.util.UUID; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:28 + */ +public class UserServiceImpl implements UserService { + @Override + public User getUserByUserId(Integer id) { + System.out.println("客户端查询了"+id+"的用户"); + // 模拟从数据库中取用户的行为 + Random random = new Random(); + User user = User.builder().userName(UUID.randomUUID().toString()) + .id(id) + .sex(random.nextBoolean()).build(); + return user; + } + + @Override + public Integer insertUserId(User user) { + System.out.println("插入数据成功"+user.getUserName()); + return user.getId(); + } +} \ No newline at end of file diff --git a/version1/src/main/java/part3/common/service/UserService.java b/version1/src/main/java/part3/common/service/UserService.java new file mode 100644 index 0000000..07efce0 --- /dev/null +++ b/version1/src/main/java/part3/common/service/UserService.java @@ -0,0 +1,16 @@ +package part3.common.service; + + +import part3.common.pojo.User; + +/** + * @author wxx + * @version 1.0 + * @create 2024/1/28 16:27 + */ +public interface UserService { + // 客户端通过这个接口调用服务端的实现类 + User getUserByUserId(Integer id); + //新增一个功能 + Integer insertUserId(User user); +}