This commit is contained in:
Wxx 2024-05-05 20:32:24 +08:00
parent e04334d529
commit 7b2589bc2e
61 changed files with 2162 additions and 0 deletions

38
version1/pom.xml Normal file
View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>version1</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
<scope>compile</scope>
</dependency>
<!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn但不影响正常使用-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,36 @@
package part1.Client;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/4 18:31
*/
public class IOClient {
//这里负责底层与服务端的通信发送request返回response
public static RpcResponse sendRequest(String host, int port, RpcRequest request){
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,25 @@
package part1.Client;
import part1.Client.proxy.ClientProxy;
import part1.common.service.UserService;
import part1.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 18:39
*/
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999);
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

View File

@ -0,0 +1,40 @@
package part1.Client.proxy;
import lombok.AllArgsConstructor;
import part1.Client.IOClient;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 16:49
*/
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private String host;
private int port;
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//IOClient.sendRequest 和服务端进行数据传输
RpcResponse response= IOClient.sendRequest(host,port,request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

View File

@ -0,0 +1,25 @@
package part1.Server;
import part1.Server.server.RpcServer;
import part1.common.service.Impl.UserServiceImpl;
import part1.common.service.UserService;
import part1.Server.server.impl.SimpleRPCRPCServer;
import part1.Server.provider.ServiceProvider;
/**
* @author wxx
* @version 1.0
* @create 2024/2/11 19:39
*/
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider();
serviceProvider.provideServiceInterface(userService);
RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

View File

@ -0,0 +1,34 @@
package part1.Server.provider;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/2/16 17:35
*/
//本地服务存放器
public class ServiceProvider {
//集合中存放服务的实例
private Map<String,Object> interfaceProvider;
public ServiceProvider(){
this.interfaceProvider=new HashMap<>();
}
//本地注册服务
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
interfaceProvider.put(clazz.getName(),service);
}
}
//获取服务实例
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}

View File

@ -0,0 +1,12 @@
package part1.Server.server;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:26
*/
public interface RpcServer {
//开启监听
void start(int port);
void stop();
}

View File

@ -0,0 +1,40 @@
package part1.Server.server.impl;
import lombok.AllArgsConstructor;
import part1.Server.server.RpcServer;
import part1.Server.server.work.WorkThread;
import part1.Server.provider.ServiceProvider;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:37
*/
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
//如果没有连接会堵塞在这里
Socket socket = serverSocket.accept();
//有连接创建一个新的线程执行处理
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part1.Server.server.impl;
import part1.Server.server.RpcServer;
import part1.Server.server.work.WorkThread;
import part1.Server.provider.ServiceProvider;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author wxx
* @version 1.0
* @create 2024/2/19 15:30
*/
public class ThreadPoolRPCRPCServer implements RpcServer {
private final ThreadPoolExecutor threadPool;
private ServiceProvider serviceProvider;
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider){
threadPool=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
1000,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
this.serviceProvider= serviceProvider;
}
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.serviceProvider = serviceProvider;
}
@Override
public void start(int port) {
System.out.println("服务端启动了");
try {
ServerSocket serverSocket=new ServerSocket();
while (true){
Socket socket= serverSocket.accept();
threadPool.execute(new WorkThread(socket,serviceProvider));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part1.Server.server.work;
import lombok.AllArgsConstructor;
import part1.Server.provider.ServiceProvider;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/14 17:39
*/
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,25 @@
package part1.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 18:30
* 定义发送的消息格式
*/
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,30 @@
package part1.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 19:18
*/
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//具体数据
private Object data;
//构造成功信息
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
//构造失败信息
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

View File

@ -0,0 +1,25 @@
package part1.common.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 17:50
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

View File

@ -0,0 +1,32 @@
package part1.common.service.Impl;
import part1.common.pojo.User;
import part1.common.service.UserService;
import java.util.Random;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:28
*/
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

View File

@ -0,0 +1,16 @@
package part1.common.service;
import part1.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:27
*/
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}

View File

@ -0,0 +1,36 @@
package part2.Client;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/4 18:31
*/
public class IOClient {
//这里负责底层与服务端的通信发送request返回response
public static RpcResponse sendRequest(String host, int port, RpcRequest request){
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,26 @@
package part2.Client;
import part2.Client.proxy.ClientProxy;
import part2.common.pojo.User;
import part2.common.service.UserService;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 18:39
*/
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999,0);
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

View File

@ -0,0 +1,28 @@
package part2.Client.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import part2.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:29
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名让sendRequest里读取response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
ctx.channel().attr(key).set(response);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常处理
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,43 @@
package part2.Client.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import part2.Client.netty.handler.NettyClientHandler;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:26
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//编码器
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//解码器
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyClientHandler());
}
}

View File

@ -0,0 +1,53 @@
package part2.Client.proxy;
import lombok.AllArgsConstructor;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import part2.Client.IOClient;
import part2.Client.rpcClient.RpcClient;
import part2.Client.rpcClient.impl.NettyRpcClient;
import part2.Client.rpcClient.impl.SimpleSocketRpcCilent;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 16:49
*/
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private RpcClient rpcClient;
public ClientProxy(String host,int port,int choose){
switch (choose){
case 0:
rpcClient=new NettyRpcClient(host,port);
break;
case 1:
rpcClient=new SimpleSocketRpcCilent(host,port);
}
}
public ClientProxy(String host,int port){
rpcClient=new NettyRpcClient(host,port);
}
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//数据传输
RpcResponse response= rpcClient.sendRequest(request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

View File

@ -0,0 +1,15 @@
package part2.Client.rpcClient;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:55
*/
public interface RpcClient {
//定义底层通信的方法
RpcResponse sendRequest(RpcRequest request);
}

View File

@ -0,0 +1,62 @@
package part2.Client.rpcClient.impl;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import part2.Client.netty.nettyInitializer.NettyClientInitializer;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import part2.Client.rpcClient.RpcClient;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 19:40
*/
public class NettyRpcClient implements RpcClient {
private String host;
private int port;
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
public NettyRpcClient(String host,int port){
this.host=host;
this.port=port;
}
//netty客户端初始化
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
//NettyClientInitializer这里 配置netty对消息的处理机制
.handler(new NettyClientInitializer());
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
try {
//创建一个channelFuture对象代表这一个操作事件sync方法表示堵塞直到connect完成
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//channel表示一个连接的单位类似socket
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果通过给channel设计别名获取特定名字下的channel中的内容这个在hanlder中设置
// AttributeKey是线程隔离的不会由线程安全问题
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -0,0 +1,41 @@
package part2.Client.rpcClient.impl;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import part2.Client.rpcClient.RpcClient;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:58
*/
public class SimpleSocketRpcCilent implements RpcClient {
private String host;
private int port;
public SimpleSocketRpcCilent(String host,int port){
this.host=host;
this.port=port;
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,26 @@
package part2.Server;
import part2.Server.server.impl.NettyRPCRPCServer;
import part2.common.service.Impl.UserServiceImpl;
import part2.common.service.UserService;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
import part2.Server.server.impl.SimpleRPCRPCServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/11 19:39
*/
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider();
serviceProvider.provideServiceInterface(userService);
RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

View File

@ -0,0 +1,52 @@
package part2.Server.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import part2.Server.provider.ServiceProvider;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:40
* 因为是服务器端我们知道接受到请求格式是RPCRequest
* Object类型也行强制转型就行
*/
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
//接收request读取并调用服务
RpcResponse response = getResponse(request);
ctx.writeAndFlush(response);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,46 @@
package part2.Server.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;
import part2.Server.netty.handler.NettyRPCServerHandler;
import part2.Server.provider.ServiceProvider;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:15
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
}
}

View File

@ -0,0 +1,32 @@
package part2.Server.provider;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/2/16 17:35
*/
public class ServiceProvider {
private Map<String,Object> interfaceProvider;
public ServiceProvider(){
this.interfaceProvider=new HashMap<>();
}
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
interfaceProvider.put(clazz.getName(),service);
}
}
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}

View File

@ -0,0 +1,11 @@
package part2.Server.server;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:26
*/
public interface RpcServer {
void start(int port);
void stop();
}

View File

@ -0,0 +1,49 @@
package part2.Server.server.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import part2.Server.netty.nettyInitializer.NettyServerInitializer;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 14:01
*/
@AllArgsConstructor
public class NettyRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty 服务线程组boss负责建立连接 work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("netty服务端启动了");
try {
//启动netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
//NettyClientInitializer这里 配置netty对消息的处理机制
.childHandler(new NettyServerInitializer(serviceProvider));
//同步堵塞
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
//死循环监听
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,39 @@
package part2.Server.server.impl;
import lombok.AllArgsConstructor;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
import part2.Server.server.work.WorkThread;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:37
*/
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
Socket socket = serverSocket.accept();
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part2.Server.server.impl;
import part2.Server.provider.ServiceProvider;
import part2.Server.server.RpcServer;
import part2.Server.server.work.WorkThread;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author wxx
* @version 1.0
* @create 2024/2/19 15:30
*/
public class ThreadPoolRPCRPCServer implements RpcServer {
private final ThreadPoolExecutor threadPool;
private ServiceProvider serviceProvider;
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider){
threadPool=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
1000,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
this.serviceProvider= serviceProvider;
}
public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.serviceProvider = serviceProvider;
}
@Override
public void start(int port) {
System.out.println("服务端启动了");
try {
ServerSocket serverSocket=new ServerSocket();
while (true){
Socket socket= serverSocket.accept();
threadPool.execute(new WorkThread(socket,serviceProvider));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part2.Server.server.work;
import lombok.AllArgsConstructor;
import part2.Server.provider.ServiceProvider;
import part2.common.Message.RpcRequest;
import part2.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/14 17:39
*/
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,25 @@
package part2.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 18:30
* 定义发送的消息格式
*/
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,29 @@
package part2.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 19:18
*/
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//具体数据
private Object data;
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

View File

@ -0,0 +1,25 @@
package part2.common.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 17:50
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

View File

@ -0,0 +1,32 @@
package part2.common.service.Impl;
import part2.common.pojo.User;
import part2.common.service.UserService;
import java.util.Random;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:28
*/
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

View File

@ -0,0 +1,16 @@
package part2.common.service;
import part2.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:27
*/
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}

View File

@ -0,0 +1,27 @@
package part3.Client;
import part3.common.pojo.User;
import part3.common.service.UserService;
import part3.Client.proxy.ClientProxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 18:39
*/
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy();
//ClientProxy clientProxy=new part2.Client.proxy.ClientProxy("127.0.0.1",9999,0);
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

View File

@ -0,0 +1,27 @@
package part3.Client.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import part3.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:29
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名让sendRequest里读取response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
ctx.channel().attr(key).set(response);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,42 @@
package part3.Client.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import part3.Client.netty.handler.NettyClientHandler;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:26
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyClientHandler());
}
}

View File

@ -0,0 +1,43 @@
package part3.Client.proxy;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Client.rpcClient.RpcClient;
import part3.Client.rpcClient.impl.NettyRpcClient;
import part3.Client.rpcClient.impl.SimpleSocketRpcCilent;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 16:49
*/
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private RpcClient rpcClient;
public ClientProxy(){
rpcClient=new NettyRpcClient();
}
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//数据传输
RpcResponse response= rpcClient.sendRequest(request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

View File

@ -0,0 +1,15 @@
package part3.Client.rpcClient;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:55
*/
public interface RpcClient {
//定义底层通信的方法
RpcResponse sendRequest(RpcRequest request);
}

View File

@ -0,0 +1,68 @@
package part3.Client.rpcClient.impl;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import part3.Client.serviceCenter.ServiceCenter;
import part3.Client.serviceCenter.ZKServiceCenter;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Client.netty.nettyInitializer.NettyClientInitializer;
import part3.Client.rpcClient.RpcClient;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 19:40
*/
public class NettyRpcClient implements RpcClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private ServiceCenter serviceCenter;
public NettyRpcClient(){
this.serviceCenter=new ZKServiceCenter();
}
//netty客户端初始化
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
//从注册中心获取host,post
InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
String host = address.getHostName();
int port = address.getPort();
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果通过给channel设计别名获取特定名字下的channel中的内容这个在hanlder中设置
// AttributeKey是线程隔离的不会由线程安全问题
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -0,0 +1,41 @@
package part3.Client.rpcClient.impl;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Client.rpcClient.RpcClient;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:58
*/
public class SimpleSocketRpcCilent implements RpcClient {
private String host;
private int port;
public SimpleSocketRpcCilent(String host,int port){
this.host=host;
this.port=port;
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,14 @@
package part3.Client.serviceCenter;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 21:42
*/
//服务中心接口
public interface ServiceCenter {
// 查询根据服务名查找地址
InetSocketAddress serviceDiscovery(String serviceName);
}

View File

@ -0,0 +1,59 @@
package part3.Client.serviceCenter;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 21:41
*/
public class ZKServiceCenter implements ServiceCenter{
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceCenter(){
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
System.out.println("zookeeper 连接成功");
}
//根据服务名接口名返回地址
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
List<String> strings = client.getChildren().forPath("/" + serviceName);
// 这里默认用的第一个后面加负载均衡
String string = strings.get(0);
return parseAddress(string);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() +
":" +
serverAddress.getPort();
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,26 @@
package part3.Server;
import part3.Server.server.impl.NettyRPCRPCServer;
import part3.common.service.Impl.UserServiceImpl;
import part3.common.service.UserService;
import part3.Server.provider.ServiceProvider;
import part3.Server.server.RpcServer;
import part3.Server.server.impl.SimpleRPCRPCServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/11 19:39
*/
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999);
serviceProvider.provideServiceInterface(userService);
RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

View File

@ -0,0 +1,51 @@
package part3.Server.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import part3.Server.provider.ServiceProvider;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:40
* 因为是服务器端我们知道接受到请求格式是RPCRequest
* Object类型也行强制转型就行
*/
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
RpcResponse response = getResponse(request);
ctx.writeAndFlush(response);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,46 @@
package part3.Server.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;
import part3.Server.provider.ServiceProvider;
import part3.Server.netty.handler.NettyRPCServerHandler;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:15
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//消息格式 长度消息体解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));
//使用Java序列化方式netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//使用了Netty中的ObjectDecoder它用于将字节流解码为 Java 对象
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象用于解析类名并加载相应的类
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
}
}

View File

@ -0,0 +1,47 @@
package part3.Server.provider;
import part3.Server.serviceRegister.ServiceRegister;
import part3.Server.serviceRegister.impl.ZKServiceRegister;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/2/16 17:35
*/
public class ServiceProvider {
private Map<String,Object> interfaceProvider;
private int port;
private String host;
//注册服务类
private ServiceRegister serviceRegister;
public ServiceProvider(String host,int port){
//需要传入服务端自身的网络地址
this.host=host;
this.port=port;
this.interfaceProvider=new HashMap<>();
this.serviceRegister=new ZKServiceRegister();
}
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
//本机的映射表
interfaceProvider.put(clazz.getName(),service);
//在注册中心注册服务
serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port));
}
}
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}

View File

@ -0,0 +1,11 @@
package part3.Server.server;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:26
*/
public interface RpcServer {
void start(int port);
void stop();
}

View File

@ -0,0 +1,48 @@
package part3.Server.server.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import part3.Server.netty.nettyInitializer.NettyServerInitializer;
import part3.Server.provider.ServiceProvider;
import part3.Server.server.RpcServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 14:01
*/
@AllArgsConstructor
public class NettyRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty 服务线程组boss负责建立连接 work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("netty服务端启动了");
try {
//启动netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.childHandler(new NettyServerInitializer(serviceProvider));
//同步堵塞
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
//死循环监听
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,39 @@
package part3.Server.server.impl;
import lombok.AllArgsConstructor;
import part3.Server.provider.ServiceProvider;
import part3.Server.server.RpcServer;
import part3.Server.server.work.WorkThread;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:37
*/
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
Socket socket = serverSocket.accept();
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part3.Server.server.work;
import lombok.AllArgsConstructor;
import part3.common.Message.RpcRequest;
import part3.common.Message.RpcResponse;
import part3.Server.provider.ServiceProvider;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/14 17:39
*/
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,15 @@
package part3.Server.serviceRegister;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 16:58
*/
// 服务注册接口
public interface ServiceRegister {
// 注册保存服务与地址
void register(String serviceName, InetSocketAddress serviceAddress);
}

View File

@ -0,0 +1,62 @@
package part3.Server.serviceRegister.impl;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import part3.Server.serviceRegister.ServiceRegister;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 17:28
*/
public class ZKServiceRegister implements ServiceRegister {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceRegister(){
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
System.out.println("zookeeper 连接成功");
}
//注册服务到注册中心
@Override
public void register(String serviceName, InetSocketAddress serviceAddress) {
try {
// serviceName创建成永久节点服务提供者下线时不删服务名只删地址
if(client.checkExists().forPath("/" + serviceName) == null){
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
}
// 路径地址一个/代表一个节点
String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress);
// 临时节点服务器下线就删除节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (Exception e) {
System.out.println("此服务已存在");
}
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() +
":" +
serverAddress.getPort();
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,25 @@
package part3.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 18:30
* 定义发送的消息格式
*/
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,29 @@
package part3.common.Message;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 19:18
*/
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//具体数据
private Object data;
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

View File

@ -0,0 +1,25 @@
package part3.common.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 17:50
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

View File

@ -0,0 +1,32 @@
package part3.common.service.Impl;
import part3.common.pojo.User;
import part3.common.service.UserService;
import java.util.Random;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:28
*/
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

View File

@ -0,0 +1,16 @@
package part3.common.service;
import part3.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:27
*/
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}