version3
This commit is contained in:
parent
c173a3e284
commit
d0118b24ac
@ -13,5 +13,40 @@
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.30</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<version>1.7.25</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-1.2-api</artifactId>
|
||||
<version>2.8.2</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
<version>4.1.51.Final</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn,但不影响正常使用-->
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-recipes</artifactId>
|
||||
<version>5.1.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.83</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
28
version3/src/main/java/part1/Client/TestClient.java
Normal file
28
version3/src/main/java/part1/Client/TestClient.java
Normal file
@ -0,0 +1,28 @@
|
||||
package part1.Client;
|
||||
|
||||
|
||||
import part1.Client.proxy.ClientProxy;
|
||||
import part1.common.pojo.User;
|
||||
import part1.common.service.UserService;
|
||||
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/6 18:39
|
||||
*/
|
||||
|
||||
public class TestClient {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
ClientProxy clientProxy=new ClientProxy();
|
||||
//ClientProxy clientProxy=new proxy.Client.part1.ClientProxy("127.0.0.1",9999,0);
|
||||
UserService proxy=clientProxy.getProxy(UserService.class);
|
||||
|
||||
User user = proxy.getUserByUserId(1);
|
||||
System.out.println("从服务端得到的user="+user.toString());
|
||||
|
||||
User u=User.builder().id(100).userName("wxx").sex(true).build();
|
||||
Integer id = proxy.insertUserId(u);
|
||||
System.out.println("向服务端插入user的id"+id);
|
||||
}
|
||||
}
|
||||
47
version3/src/main/java/part1/Client/cache/serviceCache.java
vendored
Normal file
47
version3/src/main/java/part1/Client/cache/serviceCache.java
vendored
Normal file
@ -0,0 +1,47 @@
|
||||
package part1.Client.cache;
|
||||
|
||||
import part1.Client.serviceCenter.balance.LoadBalance;
|
||||
import part1.Client.serviceCenter.balance.impl.ConsistencyHashBalance;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/4 0:57
|
||||
*/
|
||||
public class serviceCache {
|
||||
//key: serviceName 服务名
|
||||
//value: addressList 服务提供者列表
|
||||
private static Map<String, List<String>> cache=new HashMap<>();
|
||||
|
||||
//添加服务
|
||||
public void addServcieToCache(String serviceName,String address){
|
||||
if(cache.containsKey(serviceName)){
|
||||
List<String> addressList = cache.get(serviceName);
|
||||
addressList.add(address);
|
||||
System.out.println("将name为"+serviceName+"和地址为"+address+"的服务添加到本地缓存中");
|
||||
}else {
|
||||
List<String> addressList=new ArrayList<>();
|
||||
addressList.add(address);
|
||||
cache.put(serviceName,addressList);
|
||||
}
|
||||
}
|
||||
//从缓存中取服务地址列表
|
||||
public List<String> getServiceListFromCache(String serviceName){
|
||||
if(!cache.containsKey(serviceName)) {
|
||||
return null;
|
||||
}
|
||||
return cache.get(serviceName);
|
||||
}
|
||||
|
||||
//从缓存中删除服务地址
|
||||
public void delete(String serviceName,String address){
|
||||
List<String> addressList = cache.get(serviceName);
|
||||
addressList.remove(address);
|
||||
System.out.println("将name为"+serviceName+"和地址为"+address+"的服务从本地缓存中删除");
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,27 @@
|
||||
package part1.Client.netty.handler;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.util.AttributeKey;
|
||||
import part1.common.Message.RpcResponse;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/26 17:29
|
||||
*/
|
||||
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
|
||||
// 接收到response, 给channel设计别名,让sendRequest里读取response
|
||||
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
|
||||
ctx.channel().attr(key).set(response);
|
||||
ctx.channel().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,26 @@
|
||||
package part1.Client.netty.nettyInitializer;
|
||||
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import part1.Client.netty.handler.NettyClientHandler;
|
||||
import part1.common.serializer.myCode.MyDecoder;
|
||||
import part1.common.serializer.myCode.MyEncoder;
|
||||
import part1.common.serializer.mySerializer.JsonSerializer;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/26 17:26
|
||||
*/
|
||||
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
//使用自定义的编/解码器
|
||||
pipeline.addLast(new MyEncoder(new JsonSerializer()));
|
||||
pipeline.addLast(new MyDecoder());
|
||||
pipeline.addLast(new NettyClientHandler());
|
||||
}
|
||||
}
|
||||
42
version3/src/main/java/part1/Client/proxy/ClientProxy.java
Normal file
42
version3/src/main/java/part1/Client/proxy/ClientProxy.java
Normal file
@ -0,0 +1,42 @@
|
||||
package part1.Client.proxy;
|
||||
|
||||
|
||||
import part1.Client.rpcClient.RpcClient;
|
||||
import part1.common.Message.RpcRequest;
|
||||
import part1.common.Message.RpcResponse;
|
||||
import part1.Client.rpcClient.impl.NettyRpcClient;
|
||||
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/6 16:49
|
||||
*/
|
||||
public class ClientProxy implements InvocationHandler {
|
||||
//传入参数service接口的class对象,反射封装成一个request
|
||||
|
||||
private RpcClient rpcClient;
|
||||
public ClientProxy() throws InterruptedException {
|
||||
rpcClient=new NettyRpcClient();
|
||||
}
|
||||
|
||||
//jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||
//构建request
|
||||
RpcRequest request=RpcRequest.builder()
|
||||
.interfaceName(method.getDeclaringClass().getName())
|
||||
.methodName(method.getName())
|
||||
.params(args).paramsType(method.getParameterTypes()).build();
|
||||
//数据传输
|
||||
RpcResponse response= rpcClient.sendRequest(request);
|
||||
return response.getData();
|
||||
}
|
||||
public <T>T getProxy(Class<T> clazz){
|
||||
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
|
||||
return (T)o;
|
||||
}
|
||||
}
|
||||
15
version3/src/main/java/part1/Client/rpcClient/RpcClient.java
Normal file
15
version3/src/main/java/part1/Client/rpcClient/RpcClient.java
Normal file
@ -0,0 +1,15 @@
|
||||
package part1.Client.rpcClient;
|
||||
|
||||
import part1.common.Message.RpcRequest;
|
||||
import part1.common.Message.RpcResponse;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/5/2 18:55
|
||||
*/
|
||||
public interface RpcClient {
|
||||
|
||||
//定义底层通信的方法
|
||||
RpcResponse sendRequest(RpcRequest request);
|
||||
}
|
||||
@ -0,0 +1,68 @@
|
||||
package part1.Client.rpcClient.impl;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.AttributeKey;
|
||||
import part1.Client.rpcClient.RpcClient;
|
||||
import part1.Client.serviceCenter.ServiceCenter;
|
||||
import part1.Client.serviceCenter.ZKServiceCenter;
|
||||
import part1.common.Message.RpcRequest;
|
||||
import part1.common.Message.RpcResponse;
|
||||
import part1.Client.netty.nettyInitializer.NettyClientInitializer;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/5/2 19:40
|
||||
*/
|
||||
public class NettyRpcClient implements RpcClient {
|
||||
|
||||
private static final Bootstrap bootstrap;
|
||||
private static final EventLoopGroup eventLoopGroup;
|
||||
|
||||
private ServiceCenter serviceCenter;
|
||||
public NettyRpcClient() throws InterruptedException {
|
||||
this.serviceCenter=new ZKServiceCenter();
|
||||
}
|
||||
|
||||
//netty客户端初始化
|
||||
static {
|
||||
eventLoopGroup = new NioEventLoopGroup();
|
||||
bootstrap = new Bootstrap();
|
||||
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
|
||||
.handler(new NettyClientInitializer());
|
||||
}
|
||||
@Override
|
||||
public RpcResponse sendRequest(RpcRequest request) {
|
||||
//从注册中心获取host,post
|
||||
InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
|
||||
String host = address.getHostName();
|
||||
int port = address.getPort();
|
||||
try {
|
||||
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
|
||||
Channel channel = channelFuture.channel();
|
||||
// 发送数据
|
||||
channel.writeAndFlush(request);
|
||||
//sync()堵塞获取结果
|
||||
channel.closeFuture().sync();
|
||||
// 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
|
||||
// AttributeKey是,线程隔离的,不会由线程安全问题。
|
||||
// 当前场景下选择堵塞获取结果
|
||||
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
|
||||
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
|
||||
RpcResponse response = channel.attr(key).get();
|
||||
|
||||
System.out.println(response);
|
||||
return response;
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,41 @@
|
||||
package part1.Client.rpcClient.impl;
|
||||
|
||||
import part1.Client.rpcClient.RpcClient;
|
||||
import part1.common.Message.RpcRequest;
|
||||
import part1.common.Message.RpcResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.net.Socket;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/5/2 18:58
|
||||
*/
|
||||
public class SimpleSocketRpcCilent implements RpcClient {
|
||||
private String host;
|
||||
private int port;
|
||||
public SimpleSocketRpcCilent(String host,int port){
|
||||
this.host=host;
|
||||
this.port=port;
|
||||
}
|
||||
@Override
|
||||
public RpcResponse sendRequest(RpcRequest request) {
|
||||
try {
|
||||
Socket socket=new Socket(host, port);
|
||||
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
|
||||
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
|
||||
|
||||
oos.writeObject(request);
|
||||
oos.flush();
|
||||
|
||||
RpcResponse response=(RpcResponse) ois.readObject();
|
||||
return response;
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,14 @@
|
||||
package part1.Client.serviceCenter;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/5/3 21:42
|
||||
*/
|
||||
//服务中心接口
|
||||
public interface ServiceCenter {
|
||||
// 查询:根据服务名查找地址
|
||||
InetSocketAddress serviceDiscovery(String serviceName);
|
||||
}
|
||||
@ -0,0 +1,76 @@
|
||||
package part1.Client.serviceCenter;
|
||||
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import part1.Client.cache.serviceCache;
|
||||
import part1.Client.serviceCenter.ZkWatcher.watchZK;
|
||||
import part1.Client.serviceCenter.balance.impl.ConsistencyHashBalance;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/5/3 21:41
|
||||
*/
|
||||
public class ZKServiceCenter implements ServiceCenter{
|
||||
// curator 提供的zookeeper客户端
|
||||
private CuratorFramework client;
|
||||
//zookeeper根路径节点
|
||||
private static final String ROOT_PATH = "MyRPC";
|
||||
//serviceCache
|
||||
private serviceCache cache;
|
||||
|
||||
//负责zookeeper客户端的初始化,并与zookeeper服务端进行连接
|
||||
public ZKServiceCenter() throws InterruptedException {
|
||||
// 指数时间重试
|
||||
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
|
||||
// zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接
|
||||
// sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
|
||||
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
|
||||
// 使用心跳监听状态
|
||||
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
|
||||
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
|
||||
this.client.start();
|
||||
System.out.println("zookeeper 连接成功");
|
||||
//初始化本地缓存
|
||||
cache=new serviceCache();
|
||||
//加入zookeeper事件监听器
|
||||
watchZK watcher=new watchZK(client,cache);
|
||||
//监听启动
|
||||
watcher.watchToUpdate(ROOT_PATH);
|
||||
}
|
||||
//根据服务名(接口名)返回地址
|
||||
@Override
|
||||
public InetSocketAddress serviceDiscovery(String serviceName) {
|
||||
try {
|
||||
//先从本地缓存中找
|
||||
List<String> serviceList=cache.getServiceListFromCache(serviceName);
|
||||
//如果找不到,再去zookeeper中找
|
||||
//这种i情况基本不会发生,或者说只会出现在初始化阶段
|
||||
if(serviceList==null) {
|
||||
serviceList=client.getChildren().forPath("/" + serviceName);
|
||||
}
|
||||
// 负载均衡得到地址
|
||||
String address = new ConsistencyHashBalance().balance(serviceList);
|
||||
return parseAddress(address);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
|
||||
private String getServiceAddress(InetSocketAddress serverAddress) {
|
||||
return serverAddress.getHostName() +
|
||||
":" +
|
||||
serverAddress.getPort();
|
||||
}
|
||||
// 字符串解析为地址
|
||||
private InetSocketAddress parseAddress(String address) {
|
||||
String[] result = address.split(":");
|
||||
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,82 @@
|
||||
package part1.Client.serviceCenter.ZkWatcher;
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||
import org.apache.curator.framework.recipes.cache.CuratorCache;
|
||||
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
|
||||
import part1.Client.cache.serviceCache;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/4 1:00
|
||||
*/
|
||||
public class watchZK {
|
||||
// curator 提供的zookeeper客户端
|
||||
private CuratorFramework client;
|
||||
//本地缓存
|
||||
serviceCache cache;
|
||||
|
||||
public watchZK(CuratorFramework client,serviceCache cache){
|
||||
this.client=client;
|
||||
this.cache=cache;
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听当前节点和子节点的 更新,创建,删除
|
||||
* @param path
|
||||
*/
|
||||
public void watchToUpdate(String path) throws InterruptedException {
|
||||
CuratorCache curatorCache = CuratorCache.build(client, "/");
|
||||
curatorCache.listenable().addListener(new CuratorCacheListener() {
|
||||
@Override
|
||||
public void event(Type type, ChildData childData, ChildData childData1) {
|
||||
// 第一个参数:事件类型(枚举)
|
||||
// 第二个参数:节点更新前的状态、数据
|
||||
// 第三个参数:节点更新后的状态、数据
|
||||
// 创建节点时:节点刚被创建,不存在 更新前节点 ,所以第二个参数为 null
|
||||
// 删除节点时:节点被删除,不存在 更新后节点 ,所以第三个参数为 null
|
||||
// 节点创建时没有赋予值 create /curator/app1 只创建节点,在这种情况下,更新前节点的 data 为 null,获取不到更新前节点的数据
|
||||
switch (type.name()) {
|
||||
case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
|
||||
//获取更新的节点的路径
|
||||
String path=new String(childData1.getPath());
|
||||
//按照格式 ,读取
|
||||
String[] pathList= path.split("/");
|
||||
if(pathList.length<=2) break;
|
||||
else {
|
||||
String serviceName=pathList[1];
|
||||
String address=pathList[2];
|
||||
//将新注册的服务加入到本地缓存中
|
||||
cache.addServcieToCache(serviceName,address);
|
||||
}
|
||||
break;
|
||||
case "NODE_CHANGED": // 节点更新
|
||||
if (childData.getData() != null) {
|
||||
System.out.println("修改前的数据: " + new String(childData.getData()));
|
||||
} else {
|
||||
System.out.println("节点第一次赋值!");
|
||||
}
|
||||
System.out.println("修改后的数据: " + new String(childData1.getData()));
|
||||
break;
|
||||
case "NODE_DELETED": // 节点删除
|
||||
String path_d=new String(childData.getPath());
|
||||
//按照格式 ,读取
|
||||
String[] pathList_d= path_d.split("/");
|
||||
if(pathList_d.length<=2) break;
|
||||
else {
|
||||
String serviceName=pathList_d[1];
|
||||
String address=pathList_d[2];
|
||||
//将新注册的服务加入到本地缓存中
|
||||
cache.delete(serviceName,address);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
//开启监听
|
||||
curatorCache.start();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
package part1.Client.serviceCenter.balance;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/19 21:00
|
||||
* 给服务地址列表,根据不同的负载均衡策略选择一个
|
||||
*/
|
||||
public interface LoadBalance {
|
||||
String balance(List<String> addressList);
|
||||
void addNode(String node) ;
|
||||
void delNode(String node);
|
||||
}
|
||||
@ -0,0 +1,119 @@
|
||||
package part1.Client.serviceCenter.balance.impl;
|
||||
|
||||
import part1.Client.serviceCenter.balance.LoadBalance;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/19 21:16
|
||||
* 一致性哈希算法 负载均衡
|
||||
*/
|
||||
public class ConsistencyHashBalance implements LoadBalance {
|
||||
// 虚拟节点的个数
|
||||
private static final int VIRTUAL_NUM = 5;
|
||||
|
||||
// 虚拟节点分配,key是hash值,value是虚拟节点服务器名称
|
||||
private static SortedMap<Integer, String> shards = new TreeMap<Integer, String>();
|
||||
|
||||
// 真实节点列表
|
||||
private static List<String> realNodes = new LinkedList<String>();
|
||||
|
||||
//模拟初始服务器
|
||||
private static String[] servers =null;
|
||||
|
||||
private static void init(List<String> serviceList) {
|
||||
for (String server :serviceList) {
|
||||
realNodes.add(server);
|
||||
System.out.println("真实节点[" + server + "] 被添加");
|
||||
for (int i = 0; i < VIRTUAL_NUM; i++) {
|
||||
String virtualNode = server + "&&VN" + i;
|
||||
int hash = getHash(virtualNode);
|
||||
shards.put(hash, virtualNode);
|
||||
System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加");
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 获取被分配的节点名
|
||||
*
|
||||
* @param node
|
||||
* @return
|
||||
*/
|
||||
public static String getServer(String node,List<String> serviceList) {
|
||||
init(serviceList);
|
||||
int hash = getHash(node);
|
||||
Integer key = null;
|
||||
SortedMap<Integer, String> subMap = shards.tailMap(hash);
|
||||
if (subMap.isEmpty()) {
|
||||
key = shards.lastKey();
|
||||
} else {
|
||||
key = subMap.firstKey();
|
||||
}
|
||||
String virtualNode = shards.get(key);
|
||||
return virtualNode.substring(0, virtualNode.indexOf("&&"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加节点
|
||||
*
|
||||
* @param node
|
||||
*/
|
||||
public void addNode(String node) {
|
||||
if (!realNodes.contains(node)) {
|
||||
realNodes.add(node);
|
||||
System.out.println("真实节点[" + node + "] 上线添加");
|
||||
for (int i = 0; i < VIRTUAL_NUM; i++) {
|
||||
String virtualNode = node + "&&VN" + i;
|
||||
int hash = getHash(virtualNode);
|
||||
shards.put(hash, virtualNode);
|
||||
System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除节点
|
||||
*
|
||||
* @param node
|
||||
*/
|
||||
public void delNode(String node) {
|
||||
if (realNodes.contains(node)) {
|
||||
realNodes.remove(node);
|
||||
System.out.println("真实节点[" + node + "] 下线移除");
|
||||
for (int i = 0; i < VIRTUAL_NUM; i++) {
|
||||
String virtualNode = node + "&&VN" + i;
|
||||
int hash = getHash(virtualNode);
|
||||
shards.remove(hash);
|
||||
System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被移除");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* FNV1_32_HASH算法
|
||||
*/
|
||||
private static int getHash(String str) {
|
||||
final int p = 16777619;
|
||||
int hash = (int) 2166136261L;
|
||||
for (int i = 0; i < str.length(); i++)
|
||||
hash = (hash ^ str.charAt(i)) * p;
|
||||
hash += hash << 13;
|
||||
hash ^= hash >> 7;
|
||||
hash += hash << 3;
|
||||
hash ^= hash >> 17;
|
||||
hash += hash << 5;
|
||||
// 如果算出来的值为负数则取其绝对值
|
||||
if (hash < 0)
|
||||
hash = Math.abs(hash);
|
||||
return hash;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String balance(List<String> addressList) {
|
||||
String random= UUID.randomUUID().toString();
|
||||
return getServer(random,addressList);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package part1.Client.serviceCenter.balance.impl;
|
||||
|
||||
import part1.Client.serviceCenter.balance.LoadBalance;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/19 21:20
|
||||
* 随机 负载均衡
|
||||
*/
|
||||
public class RandomLoadBalance implements LoadBalance {
|
||||
@Override
|
||||
public String balance(List<String> addressList) {
|
||||
Random random=new Random();
|
||||
int choose = random.nextInt(addressList.size());
|
||||
System.out.println("负载均衡选择了"+choose+"服务器");
|
||||
return null;
|
||||
}
|
||||
public void addNode(String node){} ;
|
||||
public void delNode(String node){};
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package part1.Client.serviceCenter.balance.impl;
|
||||
|
||||
import part1.Client.serviceCenter.balance.LoadBalance;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/19 21:21
|
||||
* 轮询 负载均衡
|
||||
*/
|
||||
public class RoundLoadBalance implements LoadBalance {
|
||||
private int choose=-1;
|
||||
@Override
|
||||
public String balance(List<String> addressList) {
|
||||
choose++;
|
||||
choose=choose%addressList.size();
|
||||
System.out.println("负载均衡选择了"+choose+"服务器");
|
||||
return addressList.get(choose);
|
||||
}
|
||||
public void addNode(String node) {};
|
||||
public void delNode(String node){};
|
||||
}
|
||||
27
version3/src/main/java/part1/Server/TestServer.java
Normal file
27
version3/src/main/java/part1/Server/TestServer.java
Normal file
@ -0,0 +1,27 @@
|
||||
package part1.Server;
|
||||
|
||||
|
||||
import part1.Server.server.impl.NettyRPCRPCServer;
|
||||
import part1.common.service.Impl.UserServiceImpl;
|
||||
import part1.Server.provider.ServiceProvider;
|
||||
import part1.Server.server.RpcServer;
|
||||
import part1.common.service.UserService;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/11 19:39
|
||||
*/
|
||||
|
||||
public class TestServer {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
UserService userService=new UserServiceImpl();
|
||||
|
||||
ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999);
|
||||
|
||||
serviceProvider.provideServiceInterface(userService);
|
||||
|
||||
RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
|
||||
rpcServer.start(9999);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,51 @@
|
||||
package part1.Server.netty.handler;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import lombok.AllArgsConstructor;
|
||||
import part1.common.Message.RpcRequest;
|
||||
import part1.common.Message.RpcResponse;
|
||||
import part1.Server.provider.ServiceProvider;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/26 16:40
|
||||
* 因为是服务器端,我们知道接受到请求格式是RPCRequest
|
||||
* Object类型也行,强制转型就行
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
|
||||
private ServiceProvider serviceProvider;
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
|
||||
RpcResponse response = getResponse(request);
|
||||
ctx.writeAndFlush(response);
|
||||
ctx.close();
|
||||
}
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
private RpcResponse getResponse(RpcRequest rpcRequest){
|
||||
//得到服务名
|
||||
String interfaceName=rpcRequest.getInterfaceName();
|
||||
//得到服务端相应服务实现类
|
||||
Object service = serviceProvider.getService(interfaceName);
|
||||
//反射调用方法
|
||||
Method method=null;
|
||||
try {
|
||||
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
|
||||
Object invoke=method.invoke(service,rpcRequest.getParams());
|
||||
return RpcResponse.sussess(invoke);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
|
||||
e.printStackTrace();
|
||||
System.out.println("方法执行错误");
|
||||
return RpcResponse.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,30 @@
|
||||
package part1.Server.netty.nettyInitializer;
|
||||
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import part1.common.serializer.myCode.MyEncoder;
|
||||
import part1.common.serializer.mySerializer.JsonSerializer;
|
||||
import part1.Server.netty.handler.NettyRPCServerHandler;
|
||||
import part1.Server.provider.ServiceProvider;
|
||||
import part1.common.serializer.myCode.MyDecoder;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/26 16:15
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
|
||||
private ServiceProvider serviceProvider;
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
//使用自定义的编/解码器
|
||||
pipeline.addLast(new MyEncoder(new JsonSerializer()));
|
||||
pipeline.addLast(new MyDecoder());
|
||||
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
package part1.Server.provider;
|
||||
|
||||
import part1.Server.serviceRegister.ServiceRegister;
|
||||
import part1.Server.serviceRegister.impl.ZKServiceRegister;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/16 17:35
|
||||
*/
|
||||
public class ServiceProvider {
|
||||
private Map<String,Object> interfaceProvider;
|
||||
|
||||
private int port;
|
||||
private String host;
|
||||
//注册服务类
|
||||
private ServiceRegister serviceRegister;
|
||||
|
||||
public ServiceProvider(String host,int port){
|
||||
//需要传入服务端自身的网络地址
|
||||
this.host=host;
|
||||
this.port=port;
|
||||
this.interfaceProvider=new HashMap<>();
|
||||
this.serviceRegister=new ZKServiceRegister();
|
||||
}
|
||||
|
||||
public void provideServiceInterface(Object service){
|
||||
String serviceName=service.getClass().getName();
|
||||
Class<?>[] interfaceName=service.getClass().getInterfaces();
|
||||
|
||||
for (Class<?> clazz:interfaceName){
|
||||
//本机的映射表
|
||||
interfaceProvider.put(clazz.getName(),service);
|
||||
//在注册中心注册服务
|
||||
serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port));
|
||||
}
|
||||
}
|
||||
|
||||
public Object getService(String interfaceName){
|
||||
return interfaceProvider.get(interfaceName);
|
||||
}
|
||||
|
||||
}
|
||||
11
version3/src/main/java/part1/Server/server/RpcServer.java
Normal file
11
version3/src/main/java/part1/Server/server/RpcServer.java
Normal file
@ -0,0 +1,11 @@
|
||||
package part1.Server.server;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/12 11:26
|
||||
*/
|
||||
public interface RpcServer {
|
||||
void start(int port);
|
||||
void stop();
|
||||
}
|
||||
@ -0,0 +1,48 @@
|
||||
package part1.Server.server.impl;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import part1.Server.netty.nettyInitializer.NettyServerInitializer;
|
||||
import part1.Server.provider.ServiceProvider;
|
||||
import part1.Server.server.RpcServer;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/26 14:01
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class NettyRPCRPCServer implements RpcServer {
|
||||
private ServiceProvider serviceProvider;
|
||||
@Override
|
||||
public void start(int port) {
|
||||
// netty 服务线程组boss负责建立连接, work负责具体的请求
|
||||
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
|
||||
NioEventLoopGroup workGroup = new NioEventLoopGroup();
|
||||
System.out.println("netty服务端启动了");
|
||||
try {
|
||||
//启动netty服务器
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap();
|
||||
//初始化
|
||||
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
|
||||
.childHandler(new NettyServerInitializer(serviceProvider));
|
||||
//同步堵塞
|
||||
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
|
||||
//死循环监听
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
}catch (InterruptedException e){
|
||||
e.printStackTrace();
|
||||
}finally {
|
||||
bossGroup.shutdownGracefully();
|
||||
workGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,39 @@
|
||||
package part1.Server.server.impl;
|
||||
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import part1.Server.provider.ServiceProvider;
|
||||
import part1.Server.server.RpcServer;
|
||||
import part1.Server.server.work.WorkThread;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/12 11:37
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class SimpleRPCRPCServer implements RpcServer {
|
||||
private ServiceProvider serviceProvide;
|
||||
@Override
|
||||
public void start(int port) {
|
||||
try {
|
||||
ServerSocket serverSocket=new ServerSocket(port);
|
||||
System.out.println("服务器启动了");
|
||||
while (true) {
|
||||
Socket socket = serverSocket.accept();
|
||||
new Thread(new WorkThread(socket,serviceProvide)).start();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,58 @@
|
||||
package part1.Server.server.work;
|
||||
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import part1.common.Message.RpcResponse;
|
||||
import part1.Server.provider.ServiceProvider;
|
||||
import part1.common.Message.RpcRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.Socket;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/14 17:39
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class WorkThread implements Runnable{
|
||||
private Socket socket;
|
||||
private ServiceProvider serviceProvide;
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
|
||||
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
|
||||
//读取客户端传过来的request
|
||||
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
|
||||
//反射调用服务方法获取返回值
|
||||
RpcResponse rpcResponse=getResponse(rpcRequest);
|
||||
//向客户端写入response
|
||||
oos.writeObject(rpcResponse);
|
||||
oos.flush();
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
private RpcResponse getResponse(RpcRequest rpcRequest){
|
||||
//得到服务名
|
||||
String interfaceName=rpcRequest.getInterfaceName();
|
||||
//得到服务端相应服务实现类
|
||||
Object service = serviceProvide.getService(interfaceName);
|
||||
//反射调用方法
|
||||
Method method=null;
|
||||
try {
|
||||
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
|
||||
Object invoke=method.invoke(service,rpcRequest.getParams());
|
||||
return RpcResponse.sussess(invoke);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
|
||||
e.printStackTrace();
|
||||
System.out.println("方法执行错误");
|
||||
return RpcResponse.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
package part1.Server.serviceRegister;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/5/3 16:58
|
||||
*/
|
||||
// 服务注册接口
|
||||
public interface ServiceRegister {
|
||||
// 注册:保存服务与地址。
|
||||
void register(String serviceName, InetSocketAddress serviceAddress);
|
||||
|
||||
}
|
||||
@ -0,0 +1,62 @@
|
||||
package part1.Server.serviceRegister.impl;
|
||||
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import part1.Server.serviceRegister.ServiceRegister;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/5/3 17:28
|
||||
*/
|
||||
public class ZKServiceRegister implements ServiceRegister {
|
||||
// curator 提供的zookeeper客户端
|
||||
private CuratorFramework client;
|
||||
//zookeeper根路径节点
|
||||
private static final String ROOT_PATH = "MyRPC";
|
||||
|
||||
//负责zookeeper客户端的初始化,并与zookeeper服务端进行连接
|
||||
public ZKServiceRegister(){
|
||||
// 指数时间重试
|
||||
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
|
||||
// zookeeper的地址固定,不管是服务提供者还是,消费者都要与之建立连接
|
||||
// sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
|
||||
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
|
||||
// 使用心跳监听状态
|
||||
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
|
||||
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
|
||||
this.client.start();
|
||||
System.out.println("zookeeper 连接成功");
|
||||
}
|
||||
//注册服务到注册中心
|
||||
@Override
|
||||
public void register(String serviceName, InetSocketAddress serviceAddress) {
|
||||
try {
|
||||
// serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址
|
||||
if(client.checkExists().forPath("/" + serviceName) == null){
|
||||
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
|
||||
}
|
||||
// 路径地址,一个/代表一个节点
|
||||
String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress);
|
||||
// 临时节点,服务器下线就删除节点
|
||||
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
|
||||
} catch (Exception e) {
|
||||
System.out.println("此服务已存在");
|
||||
}
|
||||
}
|
||||
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
|
||||
private String getServiceAddress(InetSocketAddress serverAddress) {
|
||||
return serverAddress.getHostName() +
|
||||
":" +
|
||||
serverAddress.getPort();
|
||||
}
|
||||
// 字符串解析为地址
|
||||
private InetSocketAddress parseAddress(String address) {
|
||||
String[] result = address.split(":");
|
||||
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
|
||||
}
|
||||
}
|
||||
17
version3/src/main/java/part1/common/Message/MessageType.java
Normal file
17
version3/src/main/java/part1/common/Message/MessageType.java
Normal file
@ -0,0 +1,17 @@
|
||||
package part1.common.Message;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/2 22:29
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public enum MessageType {
|
||||
REQUEST(0),RESPONSE(1);
|
||||
private int code;
|
||||
public int getCode(){
|
||||
return code;
|
||||
}
|
||||
}
|
||||
29
version3/src/main/java/part1/common/Message/RpcRequest.java
Normal file
29
version3/src/main/java/part1/common/Message/RpcRequest.java
Normal file
@ -0,0 +1,29 @@
|
||||
package part1.common.Message;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/1 18:30
|
||||
* 定义发送的消息格式
|
||||
*/
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
@Builder
|
||||
public class RpcRequest implements Serializable {
|
||||
//服务类名,客户端只知道接口
|
||||
private String interfaceName;
|
||||
//调用的方法名
|
||||
private String methodName;
|
||||
//参数列表
|
||||
private Object[] params;
|
||||
//参数类型
|
||||
private Class<?>[] paramsType;
|
||||
}
|
||||
35
version3/src/main/java/part1/common/Message/RpcResponse.java
Normal file
35
version3/src/main/java/part1/common/Message/RpcResponse.java
Normal file
@ -0,0 +1,35 @@
|
||||
package part1.common.Message;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/2/1 19:18
|
||||
*/
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
@Builder
|
||||
public class RpcResponse implements Serializable {
|
||||
//状态信息
|
||||
private int code;
|
||||
private String message;
|
||||
//更新:加入传输数据的类型,以便在自定义序列化器中解析
|
||||
private Class<?> dataType;
|
||||
//具体数据
|
||||
private Object data;
|
||||
|
||||
public static RpcResponse sussess(Object data){
|
||||
return RpcResponse.builder().code(200).dataType(data.getClass()).data(data).build();
|
||||
}
|
||||
public static RpcResponse fail(){
|
||||
return RpcResponse.builder().code(500).message("服务器发生错误").build();
|
||||
}
|
||||
}
|
||||
|
||||
25
version3/src/main/java/part1/common/pojo/User.java
Normal file
25
version3/src/main/java/part1/common/pojo/User.java
Normal file
@ -0,0 +1,25 @@
|
||||
package part1.common.pojo;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/1/28 17:50
|
||||
*/
|
||||
@Builder
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class User implements Serializable {
|
||||
// 客户端和服务端共有的
|
||||
private Integer id;
|
||||
private String userName;
|
||||
private Boolean sex;
|
||||
}
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
package part1.common.serializer.myCode;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import part1.common.Message.MessageType;
|
||||
import part1.common.serializer.mySerializer.Serializer;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/2 22:24
|
||||
* 按照自定义的消息格式解码数据
|
||||
*/
|
||||
public class MyDecoder extends ByteToMessageDecoder {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
|
||||
//1.读取消息类型
|
||||
short messageType = in.readShort();
|
||||
// 现在还只支持request与response请求
|
||||
if(messageType != MessageType.REQUEST.getCode() &&
|
||||
messageType != MessageType.RESPONSE.getCode()){
|
||||
System.out.println("暂不支持此种数据");
|
||||
return;
|
||||
}
|
||||
//2.读取序列化的方式&类型
|
||||
short serializerType = in.readShort();
|
||||
Serializer serializer = Serializer.getSerializerByCode(serializerType);
|
||||
if(serializer == null)
|
||||
throw new RuntimeException("不存在对应的序列化器");
|
||||
//3.读取序列化数组长度
|
||||
int length = in.readInt();
|
||||
//4.读取序列化数组
|
||||
byte[] bytes=new byte[length];
|
||||
in.readBytes(bytes);
|
||||
Object deserialize= serializer.deserialize(bytes, messageType);
|
||||
out.add(deserialize);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,41 @@
|
||||
package part1.common.serializer.myCode;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToByteEncoder;
|
||||
import lombok.AllArgsConstructor;
|
||||
import part1.common.Message.MessageType;
|
||||
import part1.common.Message.RpcRequest;
|
||||
import part1.common.Message.RpcResponse;
|
||||
import part1.common.serializer.mySerializer.Serializer;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/2 22:24
|
||||
* 依次按照自定义的消息格式写入,传入的数据为request或者response
|
||||
* 需要持有一个serialize器,负责将传入的对象序列化成字节数组
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class MyEncoder extends MessageToByteEncoder {
|
||||
private Serializer serializer;
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
|
||||
System.out.println(msg.getClass());
|
||||
//1.写入消息类型
|
||||
if(msg instanceof RpcRequest){
|
||||
out.writeShort(MessageType.REQUEST.getCode());
|
||||
}
|
||||
else if(msg instanceof RpcResponse){
|
||||
out.writeShort(MessageType.RESPONSE.getCode());
|
||||
}
|
||||
//2.写入序列化方式
|
||||
out.writeShort(serializer.getType());
|
||||
//得到序列化数组
|
||||
byte[] serializeBytes = serializer.serialize(msg);
|
||||
//3.写入长度
|
||||
out.writeInt(serializeBytes.length);
|
||||
//4.写入序列化数组
|
||||
out.writeBytes(serializeBytes);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,65 @@
|
||||
package part1.common.serializer.mySerializer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import part1.common.Message.RpcResponse;
|
||||
import part1.common.Message.RpcRequest;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/2 22:31
|
||||
*/
|
||||
public class JsonSerializer implements Serializer {
|
||||
@Override
|
||||
public byte[] serialize(Object obj) {
|
||||
byte[] bytes = JSONObject.toJSONBytes(obj);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(byte[] bytes, int messageType) {
|
||||
Object obj = null;
|
||||
// 传输的消息分为request与response
|
||||
switch (messageType){
|
||||
case 0:
|
||||
RpcRequest request = JSON.parseObject(bytes, RpcRequest.class);
|
||||
Object[] objects = new Object[request.getParams().length];
|
||||
// 把json字串转化成对应的对象, fastjson可以读出基本数据类型,不用转化
|
||||
// 对转换后的request中的params属性逐个进行类型判断
|
||||
for(int i = 0; i < objects.length; i++){
|
||||
Class<?> paramsType = request.getParamsType()[i];
|
||||
//判断每个对象类型是否和paramsTypes中的一致
|
||||
if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())){
|
||||
//如果不一致,就行进行类型转换
|
||||
objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i],request.getParamsType()[i]);
|
||||
}else{
|
||||
//如果一致就直接赋给objects[i]
|
||||
objects[i] = request.getParams()[i];
|
||||
}
|
||||
}
|
||||
request.setParams(objects);
|
||||
obj = request;
|
||||
break;
|
||||
case 1:
|
||||
RpcResponse response = JSON.parseObject(bytes, RpcResponse.class);
|
||||
Class<?> dataType = response.getDataType();
|
||||
//判断转化后的response对象中的data的类型是否正确
|
||||
if(! dataType.isAssignableFrom(response.getData().getClass())){
|
||||
response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
|
||||
}
|
||||
obj = response;
|
||||
break;
|
||||
default:
|
||||
System.out.println("暂时不支持此种消息");
|
||||
throw new RuntimeException();
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
|
||||
//1 代表json序列化方式
|
||||
@Override
|
||||
public int getType() {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,54 @@
|
||||
package part1.common.serializer.mySerializer;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/2 22:36
|
||||
*/
|
||||
public class ObjectSerializer implements Serializer {
|
||||
//利用Java io 对象 -》字节数组
|
||||
@Override
|
||||
public byte[] serialize(Object obj) {
|
||||
byte[] bytes=null;
|
||||
ByteArrayOutputStream bos=new ByteArrayOutputStream();
|
||||
try {
|
||||
//是一个对象输出流,用于将 Java 对象序列化为字节流,并将其连接到bos上
|
||||
ObjectOutputStream oos = new ObjectOutputStream(bos);
|
||||
oos.writeObject(obj);
|
||||
//刷新 ObjectOutputStream,确保所有缓冲区中的数据都被写入到底层流中。
|
||||
oos.flush();
|
||||
//将bos其内部缓冲区中的数据转换为字节数组
|
||||
bytes = bos.toByteArray();
|
||||
oos.close();
|
||||
bos.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
//字节数组 -》对象
|
||||
@Override
|
||||
public Object deserialize(byte[] bytes, int messageType) {
|
||||
Object obj = null;
|
||||
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
|
||||
try {
|
||||
ObjectInputStream ois = new ObjectInputStream(bis);
|
||||
obj = ois.readObject();
|
||||
ois.close();
|
||||
bis.close();
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
|
||||
//0 代表Java 原生序列器
|
||||
@Override
|
||||
public int getType() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
package part1.common.serializer.mySerializer;
|
||||
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/6/2 22:31
|
||||
*/
|
||||
public interface Serializer {
|
||||
// 把对象序列化成字节数组
|
||||
byte[] serialize(Object obj);
|
||||
// 从字节数组反序列化成消息, 使用java自带序列化方式不用messageType也能得到相应的对象(序列化字节数组里包含类信息)
|
||||
// 其它方式需指定消息格式,再根据message转化成相应的对象
|
||||
Object deserialize(byte[] bytes, int messageType);
|
||||
// 返回使用的序列器,是哪个
|
||||
// 0:java自带序列化方式, 1: json序列化方式
|
||||
int getType();
|
||||
// 根据序号取出序列化器,暂时有两种实现方式,需要其它方式,实现这个接口即可
|
||||
static Serializer getSerializerByCode(int code){
|
||||
switch (code){
|
||||
case 0:
|
||||
return new ObjectSerializer();
|
||||
case 1:
|
||||
return new JsonSerializer();
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
package part1.common.service.Impl;
|
||||
|
||||
|
||||
import part1.common.pojo.User;
|
||||
import part1.common.service.UserService;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/1/28 16:28
|
||||
*/
|
||||
public class UserServiceImpl implements UserService {
|
||||
@Override
|
||||
public User getUserByUserId(Integer id) {
|
||||
System.out.println("客户端查询了"+id+"的用户");
|
||||
// 模拟从数据库中取用户的行为
|
||||
Random random = new Random();
|
||||
User user = User.builder().userName(UUID.randomUUID().toString())
|
||||
.id(id)
|
||||
.sex(random.nextBoolean()).build();
|
||||
return user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer insertUserId(User user) {
|
||||
System.out.println("插入数据成功"+user.getUserName());
|
||||
return user.getId();
|
||||
}
|
||||
}
|
||||
16
version3/src/main/java/part1/common/service/UserService.java
Normal file
16
version3/src/main/java/part1/common/service/UserService.java
Normal file
@ -0,0 +1,16 @@
|
||||
package part1.common.service;
|
||||
|
||||
|
||||
import part1.common.pojo.User;
|
||||
|
||||
/**
|
||||
* @author wxx
|
||||
* @version 1.0
|
||||
* @create 2024/1/28 16:27
|
||||
*/
|
||||
public interface UserService {
|
||||
// 客户端通过这个接口调用服务端的实现类
|
||||
User getUserByUserId(Integer id);
|
||||
//新增一个功能
|
||||
Integer insertUserId(User user);
|
||||
}
|
||||
8
version3/src/main/resources/log4j.properties
Normal file
8
version3/src/main/resources/log4j.properties
Normal file
@ -0,0 +1,8 @@
|
||||
log4j.rootLogger=ERROR, stdout
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
|
||||
log4j.appender.logfile=org.apache.log4j.FileAppender
|
||||
log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
|
||||
Loading…
Reference in New Issue
Block a user