Merge pull request #2 from GGBoooond/main

version1
This commit is contained in:
GG.Bond 2024-05-05 22:08:18 +08:00 committed by GitHub
commit 929050552f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
134 changed files with 2162 additions and 0 deletions

38
version1/pom.xml Normal file
View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>version1</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
<scope>compile</scope>
</dependency>
<!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn但不影响正常使用-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,36 @@
package part1.Client;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/4 18:31
*/
public class IOClient {
//这里负责底层与服务端的通信发送request返回response
public static RpcResponse sendRequest(String host, int port, RpcRequest request){
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,25 @@
package part1.Client;
import part1.Client.proxy.ClientProxy;
import part1.common.service.UserService;
import part1.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 18:39
*/
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999);
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

View File

@ -0,0 +1,40 @@
package part1.Client.proxy;
import lombok.AllArgsConstructor;
import part1.Client.IOClient;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 16:49
*/
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private String host;
private int port;
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//IOClient.sendRequest 和服务端进行数据传输
RpcResponse response= IOClient.sendRequest(host,port,request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

View File

@ -0,0 +1,25 @@
package part1.Server;
import part1.Server.server.RpcServer;
import part1.common.service.Impl.UserServiceImpl;
import part1.common.service.UserService;
import part1.Server.server.impl.SimpleRPCRPCServer;
import part1.Server.provider.ServiceProvider;
/**
* @author wxx
* @version 1.0
* @create 2024/2/11 19:39
*/
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider();
serviceProvider.provideServiceInterface(userService);
RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

View File

@ -0,0 +1,34 @@
package part1.Server.provider;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/2/16 17:35
*/
//本地服务存放器
public class ServiceProvider {
//集合中存放服务的实例
private Map<String,Object> interfaceProvider;
public ServiceProvider(){
this.interfaceProvider=new HashMap<>();
}
//本地注册服务
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
interfaceProvider.put(clazz.getName(),service);
}
}
//获取服务实例
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}

View File

@ -0,0 +1,12 @@
package part1.Server.server;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:26
*/
public interface RpcServer {
//开启监听
void start(int port);
void stop();
}

View File

@ -0,0 +1,40 @@
package part1.Server.server.impl;
import lombok.AllArgsConstructor;
import part1.Server.server.RpcServer;
import part1.Server.server.work.WorkThread;
import part1.Server.provider.ServiceProvider;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:37
*/
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
//如果没有连接会堵塞在这里
Socket socket = serverSocket.accept();
//有连接创建一个新的线程执行处理
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part1.Server.server.impl;
import part1.Server.server.RpcServer;
import part1.Server.server.work.WorkThread;
import part1.Server.provider.ServiceProvider;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author wxx
* @version 1.0
* @create 2024/2/19 15:30
*/
public class ThreadPoolRPCRPCServer implements RpcServer {
private final ThreadPoolExecutor threadPool;
private ServiceProvider serviceProvider;
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider){
threadPool=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
1000,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
this.serviceProvider= serviceProvider;
}
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.serviceProvider = serviceProvider;
}
@Override
public void start(int port) {
System.out.println("服务端启动了");
try {
ServerSocket serverSocket=new ServerSocket();
while (true){
Socket socket= serverSocket.accept();
threadPool.execute(new WorkThread(socket,serviceProvider));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part1.Server.server.work;
import lombok.AllArgsConstructor;
import part1.Server.provider.ServiceProvider;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/14 17:39
*/
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,25 @@
package part1.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 18:30
* 定义发送的消息格式
*/
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,30 @@
package part1.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 19:18
*/
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//具体数据
private Object data;
//构造成功信息
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
//构造失败信息
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

View File

@ -0,0 +1,25 @@
package part1.common.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 17:50
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

View File

@ -0,0 +1,32 @@
package part1.common.service.Impl;
import part1.common.pojo.User;
import part1.common.service.UserService;
import java.util.Random;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:28
*/
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

View File

@ -0,0 +1,16 @@
package part1.common.service;
import part1.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:27
*/
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}

View File

@ -0,0 +1,36 @@
package part2.Client;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/4 18:31
*/
public class IOClient {
//这里负责底层与服务端的通信发送request返回response
public static RpcResponse sendRequest(String host, int port, RpcRequest request){
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,26 @@
package part2.Client;
import part2.Client.proxy.ClientProxy;
import part2.common.pojo.User;
import part2.common.service.UserService;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 18:39
*/
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999,0);
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

View File

@ -0,0 +1,28 @@
package part2.Client.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import part2.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:29
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名让sendRequest里读取response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
ctx.channel().attr(key).set(response);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常处理
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,43 @@
package part2.Client.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import part2.Client.netty.handler.NettyClientHandler;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:26
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//编码器
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//解码器
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyClientHandler());
}
}

View File

@ -0,0 +1,53 @@
package part2.Client.proxy;
import lombok.AllArgsConstructor;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import part2.Client.IOClient;
import part2.Client.rpcClient.RpcClient;
import part2.Client.rpcClient.impl.NettyRpcClient;
import part2.Client.rpcClient.impl.SimpleSocketRpcCilent;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 16:49
*/
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private RpcClient rpcClient;
public ClientProxy(String host,int port,int choose){
switch (choose){
case 0:
rpcClient=new NettyRpcClient(host,port);
break;
case 1:
rpcClient=new SimpleSocketRpcCilent(host,port);
}
}
public ClientProxy(String host,int port){
rpcClient=new NettyRpcClient(host,port);
}
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//数据传输
RpcResponse response= rpcClient.sendRequest(request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

View File

@ -0,0 +1,15 @@
package part2.Client.rpcClient;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:55
*/
public interface RpcClient {
//定义底层通信的方法
RpcResponse sendRequest(RpcRequest request);
}

View File

@ -0,0 +1,62 @@
package part2.Client.rpcClient.impl;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import part2.Client.netty.nettyInitializer.NettyClientInitializer;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import part2.Client.rpcClient.RpcClient;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 19:40
*/
public class NettyRpcClient implements RpcClient {
private String host;
private int port;
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
public NettyRpcClient(String host,int port){
this.host=host;
this.port=port;
}
//netty客户端初始化
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
//NettyClientInitializer这里 配置netty对消息的处理机制
.handler(new NettyClientInitializer());
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
try {
//创建一个channelFuture对象代表这一个操作事件sync方法表示堵塞直到connect完成
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//channel表示一个连接的单位类似socket
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果通过给channel设计别名获取特定名字下的channel中的内容这个在hanlder中设置
// AttributeKey是线程隔离的不会由线程安全问题
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -0,0 +1,41 @@
package part2.Client.rpcClient.impl;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import part2.Client.rpcClient.RpcClient;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:58
*/
public class SimpleSocketRpcCilent implements RpcClient {
private String host;
private int port;
public SimpleSocketRpcCilent(String host,int port){
this.host=host;
this.port=port;
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,26 @@
package part2.Server;
import part2.Server.server.impl.NettyRPCRPCServer;
import part2.common.service.Impl.UserServiceImpl;
import part2.common.service.UserService;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
import part2.Server.server.impl.SimpleRPCRPCServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/11 19:39
*/
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider();
serviceProvider.provideServiceInterface(userService);
RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

View File

@ -0,0 +1,52 @@
package part2.Server.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import part2.Server.provider.ServiceProvider;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:40
* 因为是服务器端我们知道接受到请求格式是RPCRequest
* Object类型也行强制转型就行
*/
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
//接收request读取并调用服务
RpcResponse response = getResponse(request);
ctx.writeAndFlush(response);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,46 @@
package part2.Server.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;
import part2.Server.netty.handler.NettyRPCServerHandler;
import part2.Server.provider.ServiceProvider;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:15
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
}
}

View File

@ -0,0 +1,32 @@
package part2.Server.provider;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/2/16 17:35
*/
public class ServiceProvider {
private Map<String,Object> interfaceProvider;
public ServiceProvider(){
this.interfaceProvider=new HashMap<>();
}
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
interfaceProvider.put(clazz.getName(),service);
}
}
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}

View File

@ -0,0 +1,11 @@
package part2.Server.server;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:26
*/
public interface RpcServer {
void start(int port);
void stop();
}

View File

@ -0,0 +1,49 @@
package part2.Server.server.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import part2.Server.netty.nettyInitializer.NettyServerInitializer;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 14:01
*/
@AllArgsConstructor
public class NettyRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty 服务线程组boss负责建立连接 work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("netty服务端启动了");
try {
//启动netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
//NettyClientInitializer这里 配置netty对消息的处理机制
.childHandler(new NettyServerInitializer(serviceProvider));
//同步堵塞
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
//死循环监听
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,39 @@
package part2.Server.server.impl;
import lombok.AllArgsConstructor;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
import part2.Server.server.work.WorkThread;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:37
*/
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
Socket socket = serverSocket.accept();
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part2.Server.server.impl;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
import part2.Server.server.work.WorkThread;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author wxx
* @version 1.0
* @create 2024/2/19 15:30
*/
public class ThreadPoolRPCRPCServer implements RpcServer {
private final ThreadPoolExecutor threadPool;
private ServiceProvider serviceProvider;
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider){
threadPool=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
1000,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
this.serviceProvider= serviceProvider;
}
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.serviceProvider = serviceProvider;
}
@Override
public void start(int port) {
System.out.println("服务端启动了");
try {
ServerSocket serverSocket=new ServerSocket();
while (true){
Socket socket= serverSocket.accept();
threadPool.execute(new WorkThread(socket,serviceProvider));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part2.Server.server.work;
import lombok.AllArgsConstructor;
import part2.Server.provider.ServiceProvider;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/14 17:39
*/
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,25 @@
package part2.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 18:30
* 定义发送的消息格式
*/
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,29 @@
package part2.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 19:18
*/
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//具体数据
private Object data;
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

View File

@ -0,0 +1,25 @@
package part2.common.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 17:50
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

View File

@ -0,0 +1,32 @@
package part2.common.service.Impl;
import part2.common.pojo.User;
import part2.common.service.UserService;
import java.util.Random;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:28
*/
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

View File

@ -0,0 +1,16 @@
package part2.common.service;
import part2.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:27
*/
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}

View File

@ -0,0 +1,27 @@
package part3.Client;
import part3.common.pojo.User;
import part3.common.service.UserService;
import part3.Client.proxy.ClientProxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 18:39
*/
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy();
//ClientProxy clientProxy=new part2.Client.proxy.ClientProxy("127.0.0.1",9999,0);
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

View File

@ -0,0 +1,27 @@
package part3.Client.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import part3.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:29
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名让sendRequest里读取response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
ctx.channel().attr(key).set(response);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,42 @@
package part3.Client.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import part3.Client.netty.handler.NettyClientHandler;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:26
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyClientHandler());
}
}

View File

@ -0,0 +1,43 @@
package part3.Client.proxy;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Client.rpcClient.RpcClient;
import part3.Client.rpcClient.impl.NettyRpcClient;
import part3.Client.rpcClient.impl.SimpleSocketRpcCilent;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 16:49
*/
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private RpcClient rpcClient;
public ClientProxy(){
rpcClient=new NettyRpcClient();
}
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//数据传输
RpcResponse response= rpcClient.sendRequest(request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

View File

@ -0,0 +1,15 @@
package part3.Client.rpcClient;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:55
*/
public interface RpcClient {
//定义底层通信的方法
RpcResponse sendRequest(RpcRequest request);
}

View File

@ -0,0 +1,68 @@
package part3.Client.rpcClient.impl;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import part3.Client.serviceCenter.ServiceCenter;
import part3.Client.serviceCenter.ZKServiceCenter;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Client.netty.nettyInitializer.NettyClientInitializer;
import part3.Client.rpcClient.RpcClient;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 19:40
*/
public class NettyRpcClient implements RpcClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private ServiceCenter serviceCenter;
public NettyRpcClient(){
this.serviceCenter=new ZKServiceCenter();
}
//netty客户端初始化
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
//从注册中心获取host,post
InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
String host = address.getHostName();
int port = address.getPort();
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果通过给channel设计别名获取特定名字下的channel中的内容这个在hanlder中设置
// AttributeKey是线程隔离的不会由线程安全问题
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -0,0 +1,41 @@
package part3.Client.rpcClient.impl;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Client.rpcClient.RpcClient;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:58
*/
public class SimpleSocketRpcCilent implements RpcClient {
private String host;
private int port;
public SimpleSocketRpcCilent(String host,int port){
this.host=host;
this.port=port;
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,14 @@
package part3.Client.serviceCenter;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 21:42
*/
//服务中心接口
public interface ServiceCenter {
// 查询根据服务名查找地址
InetSocketAddress serviceDiscovery(String serviceName);
}

View File

@ -0,0 +1,59 @@
package part3.Client.serviceCenter;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 21:41
*/
public class ZKServiceCenter implements ServiceCenter{
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceCenter(){
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
System.out.println("zookeeper 连接成功");
}
//根据服务名接口名返回地址
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
List<String> strings = client.getChildren().forPath("/" + serviceName);
// 这里默认用的第一个后面加负载均衡
String string = strings.get(0);
return parseAddress(string);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() +
":" +
serverAddress.getPort();
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,26 @@
package part3.Server;
import part3.Server.server.impl.NettyRPCRPCServer;
import part3.common.service.Impl.UserServiceImpl;
import part3.common.service.UserService;
import part3.Server.provider.ServiceProvider;
import part3.Server.server.RpcServer;
import part3.Server.server.impl.SimpleRPCRPCServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/11 19:39
*/
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999);
serviceProvider.provideServiceInterface(userService);
RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

View File

@ -0,0 +1,51 @@
package part3.Server.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import part3.Server.provider.ServiceProvider;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:40
* 因为是服务器端我们知道接受到请求格式是RPCRequest
* Object类型也行强制转型就行
*/
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
RpcResponse response = getResponse(request);
ctx.writeAndFlush(response);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,46 @@
package part3.Server.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;
import part3.Server.provider.ServiceProvider;
import part3.Server.netty.handler.NettyRPCServerHandler;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:15
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
}
}

View File

@ -0,0 +1,47 @@
package part3.Server.provider;
import part3.Server.serviceRegister.ServiceRegister;
import part3.Server.serviceRegister.impl.ZKServiceRegister;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/2/16 17:35
*/
public class ServiceProvider {
private Map<String,Object> interfaceProvider;
private int port;
private String host;
//注册服务类
private ServiceRegister serviceRegister;
public ServiceProvider(String host,int port){
//需要传入服务端自身的网络地址
this.host=host;
this.port=port;
this.interfaceProvider=new HashMap<>();
this.serviceRegister=new ZKServiceRegister();
}
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
//本机的映射表
interfaceProvider.put(clazz.getName(),service);
//在注册中心注册服务
serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port));
}
}
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}

View File

@ -0,0 +1,11 @@
package part3.Server.server;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:26
*/
public interface RpcServer {
void start(int port);
void stop();
}

View File

@ -0,0 +1,48 @@
package part3.Server.server.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import part3.Server.netty.nettyInitializer.NettyServerInitializer;
import part3.Server.provider.ServiceProvider;
import part3.Server.server.RpcServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 14:01
*/
@AllArgsConstructor
public class NettyRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty 服务线程组boss负责建立连接 work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("netty服务端启动了");
try {
//启动netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.childHandler(new NettyServerInitializer(serviceProvider));
//同步堵塞
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
//死循环监听
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,39 @@
package part3.Server.server.impl;
import lombok.AllArgsConstructor;
import part3.Server.provider.ServiceProvider;
import part3.Server.server.RpcServer;
import part3.Server.server.work.WorkThread;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:37
*/
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
Socket socket = serverSocket.accept();
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part3.Server.server.work;
import lombok.AllArgsConstructor;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Server.provider.ServiceProvider;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/14 17:39
*/
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,15 @@
package part3.Server.serviceRegister;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 16:58
*/
// 服务注册接口
public interface ServiceRegister {
// 注册保存服务与地址
void register(String serviceName, InetSocketAddress serviceAddress);
}

View File

@ -0,0 +1,62 @@
package part3.Server.serviceRegister.impl;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import part3.Server.serviceRegister.ServiceRegister;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 17:28
*/
public class ZKServiceRegister implements ServiceRegister {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceRegister(){
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
System.out.println("zookeeper 连接成功");
}
//注册服务到注册中心
@Override
public void register(String serviceName, InetSocketAddress serviceAddress) {
try {
// serviceName创建成永久节点服务提供者下线时不删服务名只删地址
if(client.checkExists().forPath("/" + serviceName) == null){
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
}
// 路径地址一个/代表一个节点
String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress);
// 临时节点服务器下线就删除节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (Exception e) {
System.out.println("此服务已存在");
}
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() +
":" +
serverAddress.getPort();
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,25 @@
package part3.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 18:30
* 定义发送的消息格式
*/
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,29 @@
package part3.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 19:18
*/
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//具体数据
private Object data;
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

View File

@ -0,0 +1,25 @@
package part3.common.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 17:50
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

View File

@ -0,0 +1,32 @@
package part3.common.service.Impl;
import part3.common.pojo.User;
import part3.common.service.UserService;
import java.util.Random;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:28
*/
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

View File

@ -0,0 +1,16 @@
package part3.common.service;
import part3.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:27
*/
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More