Merge pull request #8 from GGBoooond/main

version4-限流熔断,熔断
This commit is contained in:
GG.Bond 2024-07-03 02:23:33 +08:00 committed by GitHub
commit 885b214760
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 1778 additions and 0 deletions

View File

@ -13,5 +13,52 @@
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
<scope>compile</scope>
</dependency>
<!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn但不影响正常使用-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,27 @@
package part1.Client;
import part1.Client.proxy.ClientProxy;
import part1.common.pojo.User;
import part1.common.service.UserService;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 18:39
*/
public class TestClient {
public static void main(String[] args) throws InterruptedException {
ClientProxy clientProxy=new ClientProxy();
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

View File

@ -0,0 +1,44 @@
package part1.Client.cache;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/6/4 0:57
*/
public class serviceCache {
//key: serviceName 服务名
//value addressList 服务提供者列表
private static Map<String, List<String>> cache=new HashMap<>();
//添加服务
public void addServcieToCache(String serviceName,String address){
if(cache.containsKey(serviceName)){
List<String> addressList = cache.get(serviceName);
addressList.add(address);
System.out.println("将name为"+serviceName+"和地址为"+address+"的服务添加到本地缓存中");
}else {
List<String> addressList=new ArrayList<>();
addressList.add(address);
cache.put(serviceName,addressList);
}
}
//从缓存中取服务地址列表
public List<String> getServiceListFromCache(String serviceName){
if(!cache.containsKey(serviceName)) {
return null;
}
return cache.get(serviceName);
}
//从缓存中删除服务地址
public void delete(String serviceName,String address){
List<String> addressList = cache.get(serviceName);
addressList.remove(address);
System.out.println("将name为"+serviceName+"和地址为"+address+"的服务从本地缓存中删除");
}
}

View File

@ -0,0 +1,87 @@
package part1.Client.circuitBreaker;
/**
* @author wxx
* @version 1.0
* @create 2024/7/2 21:49
*/
import java.util.concurrent.atomic.AtomicInteger;
public class CircuitBreaker {
//当前状态
private CircuitBreakerState state = CircuitBreakerState.CLOSED;
private AtomicInteger failureCount = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger requestCount = new AtomicInteger(0);
//失败次数阈值
private final int failureThreshold;
//半开启-关闭状态的成功次数比例
private final double halfOpenSuccessRate;
//恢复时间
private final long retryTimePeriod;
//上一次失败时间
private long lastFailureTime = 0;
public CircuitBreaker(int failureThreshold, double halfOpenSuccessRate,long retryTimePeriod) {
this.failureThreshold = failureThreshold;
this.halfOpenSuccessRate = halfOpenSuccessRate;
this.retryTimePeriod = retryTimePeriod;
}
//查看当前熔断器是否允许请求通过
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
switch (state) {
case OPEN:
if (currentTime - lastFailureTime > retryTimePeriod) {
state = CircuitBreakerState.HALF_OPEN;
resetCounts();
return true;
}
return false;
case HALF_OPEN:
requestCount.incrementAndGet();
return true;
case CLOSED:
default:
return true;
}
}
//记录成功
public synchronized void recordSuccess() {
if (state == CircuitBreakerState.HALF_OPEN) {
successCount.incrementAndGet();
if (successCount.get() >= halfOpenSuccessRate * requestCount.get()) {
state = CircuitBreakerState.CLOSED;
resetCounts();
}
} else {
resetCounts();
}
}
//记录失败
public synchronized void recordFailure() {
failureCount.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
if (state == CircuitBreakerState.HALF_OPEN) {
state = CircuitBreakerState.OPEN;
lastFailureTime = System.currentTimeMillis();
} else if (failureCount.get() >= failureThreshold) {
state = CircuitBreakerState.OPEN;
}
}
//重置次数
private void resetCounts() {
failureCount.set(0);
successCount.set(0);
requestCount.set(0);
}
public CircuitBreakerState getState() {
return state;
}
}
enum CircuitBreakerState {
//关闭开启半开启
CLOSED, OPEN, HALF_OPEN
}

View File

@ -0,0 +1,23 @@
package part1.Client.circuitBreaker;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/7/3 0:22
*/
public class CircuitBreakerProvider {
private Map<String,CircuitBreaker> circuitBreakerMap=new HashMap<>();
public CircuitBreaker getCircuitBreaker(String serviceName){
CircuitBreaker circuitBreaker;
if(circuitBreakerMap.containsKey(serviceName)){
circuitBreaker=circuitBreakerMap.get(serviceName);
}else {
circuitBreaker=new CircuitBreaker(3,0.5,10000);
}
return circuitBreaker;
}
}

View File

@ -0,0 +1,27 @@
package part1.Client.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import part1.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:29
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名让sendRequest里读取response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
ctx.channel().attr(key).set(response);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,26 @@
package part1.Client.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import part1.Client.netty.handler.NettyClientHandler;
import part1.common.serializer.myCode.MyDecoder;
import part1.common.serializer.myCode.MyEncoder;
import part1.common.serializer.mySerializer.JsonSerializer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 17:26
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//使用自定义的编/解码器
pipeline.addLast(new MyEncoder(new JsonSerializer()));
pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyClientHandler());
}
}

View File

@ -0,0 +1,73 @@
package part1.Client.proxy;
import part1.Client.circuitBreaker.CircuitBreaker;
import part1.Client.circuitBreaker.CircuitBreakerProvider;
import part1.Client.retry.guavaRetry;
import part1.Client.rpcClient.RpcClient;
import part1.Client.rpcClient.impl.NettyRpcClient;
import part1.Client.serviceCenter.ServiceCenter;
import part1.Client.serviceCenter.ZKServiceCenter;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author wxx
* @version 1.0
* @create 2024/2/6 16:49
*/
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private RpcClient rpcClient;
private ServiceCenter serviceCenter;
private CircuitBreakerProvider circuitBreakerProvider;
public ClientProxy() throws InterruptedException {
serviceCenter=new ZKServiceCenter();
rpcClient=new NettyRpcClient(serviceCenter);
circuitBreakerProvider=new CircuitBreakerProvider();
}
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//获取熔断器
CircuitBreaker circuitBreaker=circuitBreakerProvider.getCircuitBreaker(method.getName());
//判断熔断器是否允许请求经过
if (!circuitBreaker.allowRequest()){
//这里可以针对熔断做特殊处理返回特殊值
return null;
}
//数据传输
RpcResponse response;
//后续添加逻辑为保持幂等性只对白名单上的服务进行重试
if (serviceCenter.checkRetry(request.getInterfaceName())){
//调用retry框架进行重试操作
response=new guavaRetry().sendServiceWithRetry(request,rpcClient);
}else {
//只调用一次
response= rpcClient.sendRequest(request);
}
//记录response的状态上报给熔断器
if (response.getCode() ==200){
circuitBreaker.recordSuccess();
}
if (response.getCode()==500){
circuitBreaker.recordFailure();
}
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

View File

@ -0,0 +1,43 @@
package part1.Client.retry;
import com.github.rholder.retry.*;
import part1.Client.rpcClient.RpcClient;
import part1.common.Message.RpcResponse;
import part1.common.Message.RpcRequest;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @author wxx
* @version 1.0
* @create 2024/6/20 19:28
*/
public class guavaRetry {
private RpcClient rpcClient;
public RpcResponse sendServiceWithRetry(RpcRequest request, RpcClient rpcClient) {
this.rpcClient=rpcClient;
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
//无论出现什么异常都进行重试
.retryIfException()
//返回结果为 error时进行重试
.retryIfResult(response -> Objects.equals(response.getCode(), 500))
//重试等待策略等待 2s 后再进行重试
.withWaitStrategy(WaitStrategies.fixedWait(2, TimeUnit.SECONDS))
//重试停止策略重试达到 3
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
System.out.println("RetryListener: 第" + attempt.getAttemptNumber() + "次调用");
}
})
.build();
try {
return retryer.call(() -> rpcClient.sendRequest(request));
} catch (Exception e) {
e.printStackTrace();
}
return RpcResponse.fail();
}
}

View File

@ -0,0 +1,15 @@
package part1.Client.rpcClient;
import part1.common.Message.RpcResponse;
import part1.common.Message.RpcRequest;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:55
*/
public interface RpcClient {
//定义底层通信的方法
RpcResponse sendRequest(RpcRequest request);
}

View File

@ -0,0 +1,67 @@
package part1.Client.rpcClient.impl;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import part1.Client.netty.nettyInitializer.NettyClientInitializer;
import part1.Client.rpcClient.RpcClient;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import part1.Client.serviceCenter.ServiceCenter;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 19:40
*/
public class NettyRpcClient implements RpcClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private ServiceCenter serviceCenter;
public NettyRpcClient(ServiceCenter serviceCenter) throws InterruptedException {
this.serviceCenter=serviceCenter;
}
//netty客户端初始化
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
//从注册中心获取host,post
InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
String host = address.getHostName();
int port = address.getPort();
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果通过给channel设计别名获取特定名字下的channel中的内容这个在hanlder中设置
// AttributeKey是线程隔离的不会由线程安全问题
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}

View File

@ -0,0 +1,41 @@
package part1.Client.rpcClient.impl;
import part1.Client.rpcClient.RpcClient;
import part1.common.Message.RpcResponse;
import part1.common.Message.RpcRequest;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/5/2 18:58
*/
public class SimpleSocketRpcCilent implements RpcClient {
private String host;
private int port;
public SimpleSocketRpcCilent(String host,int port){
this.host=host;
this.port=port;
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,16 @@
package part1.Client.serviceCenter;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 21:42
*/
//服务中心接口
public interface ServiceCenter {
// 查询根据服务名查找地址
InetSocketAddress serviceDiscovery(String serviceName);
//判断是否可重试
boolean checkRetry(String serviceName) ;
}

View File

@ -0,0 +1,93 @@
package part1.Client.serviceCenter;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import part1.Client.serviceCenter.ZkWatcher.watchZK;
import part1.Client.cache.serviceCache;
import part1.Client.serviceCenter.balance.impl.ConsistencyHashBalance;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 21:41
*/
public class ZKServiceCenter implements ServiceCenter{
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
private static final String RETRY = "CanRetry";
//serviceCache
private serviceCache cache;
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceCenter() throws InterruptedException {
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
System.out.println("zookeeper 连接成功");
//初始化本地缓存
cache=new serviceCache();
//加入zookeeper事件监听器
watchZK watcher=new watchZK(client,cache);
//监听启动
watcher.watchToUpdate(ROOT_PATH);
}
//根据服务名接口名返回地址
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
//先从本地缓存中找
List<String> addressList=cache.getServiceListFromCache(serviceName);
//如果找不到再去zookeeper中找
//这种i情况基本不会发生或者说只会出现在初始化阶段
if(addressList==null) {
addressList=client.getChildren().forPath("/" + serviceName);
}
// 负载均衡得到地址
String address = new ConsistencyHashBalance().balance(addressList);
return parseAddress(address);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//
public boolean checkRetry(String serviceName) {
boolean canRetry =false;
try {
List<String> serviceList = client.getChildren().forPath("/" + RETRY);
for(String s:serviceList){
if(s.equals(serviceName)){
System.out.println("服务"+serviceName+"在白名单上,可进行重试");
canRetry=true;
}
}
}catch (Exception e) {
e.printStackTrace();
}
return canRetry;
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() +
":" +
serverAddress.getPort();
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,82 @@
package part1.Client.serviceCenter.ZkWatcher;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import part1.Client.cache.serviceCache;
/**
* @author wxx
* @version 1.0
* @create 2024/6/4 1:00
*/
public class watchZK {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//本地缓存
serviceCache cache;
public watchZK(CuratorFramework client,serviceCache cache){
this.client=client;
this.cache=cache;
}
/**
* 监听当前节点和子节点的 更新创建删除
* @param path
*/
public void watchToUpdate(String path) throws InterruptedException {
CuratorCache curatorCache = CuratorCache.build(client, "/");
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData childData, ChildData childData1) {
// 第一个参数事件类型枚举
// 第二个参数节点更新前的状态数据
// 第三个参数节点更新后的状态数据
// 创建节点时节点刚被创建不存在 更新前节点 所以第二个参数为 null
// 删除节点时节点被删除不存在 更新后节点 所以第三个参数为 null
// 节点创建时没有赋予值 create /curator/app1 只创建节点在这种情况下更新前节点的 data null获取不到更新前节点的数据
switch (type.name()) {
case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
//获取更新的节点的路径
String path=new String(childData1.getPath());
//按照格式 读取
String[] pathList= path.split("/");
if(pathList.length<=2) break;
else {
String serviceName=pathList[1];
String address=pathList[2];
//将新注册的服务加入到本地缓存中
cache.addServcieToCache(serviceName,address);
}
break;
case "NODE_CHANGED": // 节点更新
if (childData.getData() != null) {
System.out.println("修改前的数据: " + new String(childData.getData()));
} else {
System.out.println("节点第一次赋值!");
}
System.out.println("修改后的数据: " + new String(childData1.getData()));
break;
case "NODE_DELETED": // 节点删除
String path_d=new String(childData.getPath());
//按照格式 读取
String[] pathList_d= path_d.split("/");
if(pathList_d.length<=2) break;
else {
String serviceName=pathList_d[1];
String address=pathList_d[2];
//将新注册的服务加入到本地缓存中
cache.delete(serviceName,address);
}
break;
default:
break;
}
}
});
//开启监听
curatorCache.start();
}
}

View File

@ -0,0 +1,15 @@
package part1.Client.serviceCenter.balance;
import java.util.List;
/**
* @author wxx
* @version 1.0
* @create 2024/6/19 21:00
* 给服务地址列表根据不同的负载均衡策略选择一个
*/
public interface LoadBalance {
String balance(List<String> addressList);
void addNode(String node) ;
void delNode(String node);
}

View File

@ -0,0 +1,119 @@
package part1.Client.serviceCenter.balance.impl;
import part1.Client.serviceCenter.balance.LoadBalance;
import java.util.*;
/**
* @author wxx
* @version 1.0
* @create 2024/6/19 21:16
* 一致性哈希算法 负载均衡
*/
public class ConsistencyHashBalance implements LoadBalance {
// 虚拟节点的个数
private static final int VIRTUAL_NUM = 5;
// 虚拟节点分配key是hash值value是虚拟节点服务器名称
private static SortedMap<Integer, String> shards = new TreeMap<Integer, String>();
// 真实节点列表
private static List<String> realNodes = new LinkedList<String>();
//模拟初始服务器
private static String[] servers =null;
private static void init(List<String> serviceList) {
for (String server :serviceList) {
realNodes.add(server);
System.out.println("真实节点[" + server + "] 被添加");
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = server + "&&VN" + i;
int hash = getHash(virtualNode);
shards.put(hash, virtualNode);
System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加");
}
}
}
/**
* 获取被分配的节点名
*
* @param node
* @return
*/
public static String getServer(String node,List<String> serviceList) {
init(serviceList);
int hash = getHash(node);
Integer key = null;
SortedMap<Integer, String> subMap = shards.tailMap(hash);
if (subMap.isEmpty()) {
key = shards.lastKey();
} else {
key = subMap.firstKey();
}
String virtualNode = shards.get(key);
return virtualNode.substring(0, virtualNode.indexOf("&&"));
}
/**
* 添加节点
*
* @param node
*/
public void addNode(String node) {
if (!realNodes.contains(node)) {
realNodes.add(node);
System.out.println("真实节点[" + node + "] 上线添加");
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
shards.put(hash, virtualNode);
System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被添加");
}
}
}
/**
* 删除节点
*
* @param node
*/
public void delNode(String node) {
if (realNodes.contains(node)) {
realNodes.remove(node);
System.out.println("真实节点[" + node + "] 下线移除");
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
shards.remove(hash);
System.out.println("虚拟节点[" + virtualNode + "] hash:" + hash + ",被移除");
}
}
}
/**
* FNV1_32_HASH算法
*/
private static int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0)
hash = Math.abs(hash);
return hash;
}
@Override
public String balance(List<String> addressList) {
String random= UUID.randomUUID().toString();
return getServer(random,addressList);
}
}

View File

@ -0,0 +1,24 @@
package part1.Client.serviceCenter.balance.impl;
import part1.Client.serviceCenter.balance.LoadBalance;
import java.util.List;
import java.util.Random;
/**
* @author wxx
* @version 1.0
* @create 2024/6/19 21:20
* 随机 负载均衡
*/
public class RandomLoadBalance implements LoadBalance {
@Override
public String balance(List<String> addressList) {
Random random=new Random();
int choose = random.nextInt(addressList.size());
System.out.println("负载均衡选择了"+choose+"服务器");
return null;
}
public void addNode(String node){} ;
public void delNode(String node){};
}

View File

@ -0,0 +1,24 @@
package part1.Client.serviceCenter.balance.impl;
import part1.Client.serviceCenter.balance.LoadBalance;
import java.util.List;
/**
* @author wxx
* @version 1.0
* @create 2024/6/19 21:21
* 轮询 负载均衡
*/
public class RoundLoadBalance implements LoadBalance {
private int choose=-1;
@Override
public String balance(List<String> addressList) {
choose++;
choose=choose%addressList.size();
System.out.println("负载均衡选择了"+choose+"服务器");
return addressList.get(choose);
}
public void addNode(String node) {};
public void delNode(String node){};
}

View File

@ -0,0 +1,27 @@
package part1.Server;
import part1.Server.server.impl.NettyRPCRPCServer;
import part1.Server.provider.ServiceProvider;
import part1.Server.server.RpcServer;
import part1.common.service.Impl.UserServiceImpl;
import part1.common.service.UserService;
/**
* @author wxx
* @version 1.0
* @create 2024/2/11 19:39
*/
public class TestServer {
public static void main(String[] args) throws InterruptedException {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider("127.0.0.1",9999);
serviceProvider.provideServiceInterface(userService,true);
RpcServer rpcServer=new NettyRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

View File

@ -0,0 +1,59 @@
package part1.Server.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import part1.Server.provider.ServiceProvider;
import part1.Server.ratelimit.RateLimit;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:40
* 因为是服务器端我们知道接受到请求格式是RPCRequest
* Object类型也行强制转型就行
*/
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
RpcResponse response = getResponse(request);
ctx.writeAndFlush(response);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//接口限流降级
RateLimit rateLimit=serviceProvider.getRateLimitProvider().getRateLimit(interfaceName);
if(!rateLimit.getToken()){
//如果获取令牌失败进行限流降级快速返回结果
return RpcResponse.fail();
}
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,30 @@
package part1.Server.netty.nettyInitializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import lombok.AllArgsConstructor;
import part1.Server.netty.handler.NettyRPCServerHandler;
import part1.Server.provider.ServiceProvider;
import part1.common.serializer.myCode.MyDecoder;
import part1.common.serializer.myCode.MyEncoder;
import part1.common.serializer.mySerializer.JsonSerializer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 16:15
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//使用自定义的编/解码器
pipeline.addLast(new MyEncoder(new JsonSerializer()));
pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
}
}

View File

@ -0,0 +1,53 @@
package part1.Server.provider;
import part1.Server.ratelimit.provider.RateLimitProvider;
import part1.Server.serviceRegister.impl.ZKServiceRegister;
import part1.Server.serviceRegister.ServiceRegister;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/2/16 17:35
*/
public class ServiceProvider {
private Map<String,Object> interfaceProvider;
private int port;
private String host;
//注册服务类
private ServiceRegister serviceRegister;
//限流器
private RateLimitProvider rateLimitProvider;
public ServiceProvider(String host,int port){
//需要传入服务端自身的网络地址
this.host=host;
this.port=port;
this.interfaceProvider=new HashMap<>();
this.serviceRegister=new ZKServiceRegister();
this.rateLimitProvider=new RateLimitProvider();
}
public void provideServiceInterface(Object service,boolean canRetry){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
//本机的映射表
interfaceProvider.put(clazz.getName(),service);
//在注册中心注册服务
serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port),canRetry);
}
}
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
public RateLimitProvider getRateLimitProvider(){
return rateLimitProvider;
}
}

View File

@ -0,0 +1,11 @@
package part1.Server.ratelimit;
/**
* @author wxx
* @version 1.0
* @create 2024/7/2 1:43
*/
public interface RateLimit {
//获取访问许可
boolean getToken();
}

View File

@ -0,0 +1,49 @@
package part1.Server.ratelimit.impl;
import part1.Server.ratelimit.RateLimit;
/**
* @author wxx
* @version 1.0
* @create 2024/7/2 1:44
*/
public class TokenBucketRateLimitImpl implements RateLimit {
//令牌产生速率单位为ms
private static int RATE;
//桶容量
private static int CAPACITY;
//当前桶容量
private volatile int curCapcity;
//时间戳
private volatile long timeStamp=System.currentTimeMillis();
public TokenBucketRateLimitImpl(int rate,int capacity){
RATE=rate;
CAPACITY=capacity;
curCapcity=capacity;
}
@Override
public synchronized boolean getToken() {
//如果当前桶还有剩余就直接返回
if(curCapcity>0){
curCapcity--;
return true;
}
//如果桶无剩余
long current=System.currentTimeMillis();
//如果距离上一次的请求的时间大于RATE的时间
if(current-timeStamp>=RATE){
//计算这段时间间隔中生成的令牌如果>2,桶容量加上计算的令牌-1
//如果==1就不做操作因为这一次操作要消耗一个令牌
if((current-timeStamp)/RATE>=2){
curCapcity+=(int)(current-timeStamp)/RATE-1;
}
//保持桶内令牌容量<=10
if(curCapcity>CAPACITY) curCapcity=CAPACITY;
//刷新时间戳为本次请求
timeStamp=current;
return true;
}
//获得不到返回false
return false;
}
}

View File

@ -0,0 +1,25 @@
package part1.Server.ratelimit.provider;
import part1.Server.ratelimit.RateLimit;
import part1.Server.ratelimit.impl.TokenBucketRateLimitImpl;
import java.util.HashMap;
import java.util.Map;
/**
* @author wxx
* @version 1.0
* @create 2024/7/2 1:45
*/
public class RateLimitProvider {
private Map<String, RateLimit> rateLimitMap=new HashMap<>();
public RateLimit getRateLimit(String interfaceName){
if(!rateLimitMap.containsKey(interfaceName)){
RateLimit rateLimit=new TokenBucketRateLimitImpl(100,10);
rateLimitMap.put(interfaceName,rateLimit);
return rateLimit;
}
return rateLimitMap.get(interfaceName);
}
}

View File

@ -0,0 +1,11 @@
package part1.Server.server;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:26
*/
public interface RpcServer {
void start(int port);
void stop();
}

View File

@ -0,0 +1,48 @@
package part1.Server.server.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import part1.Server.netty.nettyInitializer.NettyServerInitializer;
import part1.Server.provider.ServiceProvider;
import part1.Server.server.RpcServer;
/**
* @author wxx
* @version 1.0
* @create 2024/2/26 14:01
*/
@AllArgsConstructor
public class NettyRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty 服务线程组boss负责建立连接 work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("netty服务端启动了");
try {
//启动netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.childHandler(new NettyServerInitializer(serviceProvider));
//同步堵塞
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
//死循环监听
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,39 @@
package part1.Server.server.impl;
import lombok.AllArgsConstructor;
import part1.Server.server.RpcServer;
import part1.Server.server.work.WorkThread;
import part1.Server.provider.ServiceProvider;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/12 11:37
*/
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
Socket socket = serverSocket.accept();
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}

View File

@ -0,0 +1,58 @@
package part1.Server.server.work;
import lombok.AllArgsConstructor;
import part1.Server.provider.ServiceProvider;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @author wxx
* @version 1.0
* @create 2024/2/14 17:39
*/
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

View File

@ -0,0 +1,15 @@
package part1.Server.serviceRegister;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 16:58
*/
// 服务注册接口
public interface ServiceRegister {
// 注册保存服务与地址
void register(String serviceName, InetSocketAddress serviceAddress,boolean canRetry);
}

View File

@ -0,0 +1,68 @@
package part1.Server.serviceRegister.impl;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import part1.Server.serviceRegister.ServiceRegister;
import java.net.InetSocketAddress;
/**
* @author wxx
* @version 1.0
* @create 2024/5/3 17:28
*/
public class ZKServiceRegister implements ServiceRegister {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
private static final String RETRY = "CanRetry";
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceRegister(){
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
System.out.println("zookeeper 连接成功");
}
//注册服务到注册中心
@Override
public void register(String serviceName, InetSocketAddress serviceAddress,boolean canRetry) {
try {
// serviceName创建成永久节点服务提供者下线时不删服务名只删地址
if(client.checkExists().forPath("/" + serviceName) == null){
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
}
// 路径地址一个/代表一个节点
String path = "/" + serviceName +"/"+ getServiceAddress(serviceAddress);
// 临时节点服务器下线就删除节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
//如果这个服务是幂等性就增加到节点中
if (canRetry){
path ="/"+RETRY+"/"+serviceName;
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
}
} catch (Exception e) {
System.out.println("此服务已存在");
}
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() +
":" +
serverAddress.getPort();
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,17 @@
package part1.common.Message;
import lombok.AllArgsConstructor;
/**
* @author wxx
* @version 1.0
* @create 2024/6/2 22:29
*/
@AllArgsConstructor
public enum MessageType {
REQUEST(0),RESPONSE(1);
private int code;
public int getCode(){
return code;
}
}

View File

@ -0,0 +1,29 @@
package part1.common.Message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 18:30
* 定义发送的消息格式
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,35 @@
package part1.common.Message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/2/1 19:18
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//更新加入传输数据的类型以便在自定义序列化器中解析
private Class<?> dataType;
//具体数据
private Object data;
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).dataType(data.getClass()).data(data).build();
}
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

View File

@ -0,0 +1,25 @@
package part1.common.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 17:50
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

View File

@ -0,0 +1,41 @@
package part1.common.serializer.myCode;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import part1.common.Message.MessageType;
import part1.common.serializer.mySerializer.Serializer;
import java.util.List;
/**
* @author wxx
* @version 1.0
* @create 2024/6/2 22:24
* 按照自定义的消息格式解码数据
*/
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
//1.读取消息类型
short messageType = in.readShort();
// 现在还只支持request与response请求
if(messageType != MessageType.REQUEST.getCode() &&
messageType != MessageType.RESPONSE.getCode()){
System.out.println("暂不支持此种数据");
return;
}
//2.读取序列化的方式&类型
short serializerType = in.readShort();
Serializer serializer = Serializer.getSerializerByCode(serializerType);
if(serializer == null)
throw new RuntimeException("不存在对应的序列化器");
//3.读取序列化数组长度
int length = in.readInt();
//4.读取序列化数组
byte[] bytes=new byte[length];
in.readBytes(bytes);
Object deserialize= serializer.deserialize(bytes, messageType);
out.add(deserialize);
}
}

View File

@ -0,0 +1,41 @@
package part1.common.serializer.myCode;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;
import part1.common.Message.MessageType;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
import part1.common.serializer.mySerializer.Serializer;
/**
* @author wxx
* @version 1.0
* @create 2024/6/2 22:24
* 依次按照自定义的消息格式写入传入的数据为request或者response
* 需要持有一个serialize器负责将传入的对象序列化成字节数组
*/
@AllArgsConstructor
public class MyEncoder extends MessageToByteEncoder {
private Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
System.out.println(msg.getClass());
//1.写入消息类型
if(msg instanceof RpcRequest){
out.writeShort(MessageType.REQUEST.getCode());
}
else if(msg instanceof RpcResponse){
out.writeShort(MessageType.RESPONSE.getCode());
}
//2.写入序列化方式
out.writeShort(serializer.getType());
//得到序列化数组
byte[] serializeBytes = serializer.serialize(msg);
//3.写入长度
out.writeInt(serializeBytes.length);
//4.写入序列化数组
out.writeBytes(serializeBytes);
}
}

View File

@ -0,0 +1,65 @@
package part1.common.serializer.mySerializer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import part1.common.Message.RpcRequest;
import part1.common.Message.RpcResponse;
/**
* @author wxx
* @version 1.0
* @create 2024/6/2 22:31
*/
public class JsonSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
byte[] bytes = JSONObject.toJSONBytes(obj);
return bytes;
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
// 传输的消息分为request与response
switch (messageType){
case 0:
RpcRequest request = JSON.parseObject(bytes, RpcRequest.class);
Object[] objects = new Object[request.getParams().length];
// 把json字串转化成对应的对象 fastjson可以读出基本数据类型不用转化
// 对转换后的request中的params属性逐个进行类型判断
for(int i = 0; i < objects.length; i++){
Class<?> paramsType = request.getParamsType()[i];
//判断每个对象类型是否和paramsTypes中的一致
if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())){
//如果不一致就行进行类型转换
objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i],request.getParamsType()[i]);
}else{
//如果一致就直接赋给objects[i]
objects[i] = request.getParams()[i];
}
}
request.setParams(objects);
obj = request;
break;
case 1:
RpcResponse response = JSON.parseObject(bytes, RpcResponse.class);
Class<?> dataType = response.getDataType();
//判断转化后的response对象中的data的类型是否正确
if(! dataType.isAssignableFrom(response.getData().getClass())){
response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
}
obj = response;
break;
default:
System.out.println("暂时不支持此种消息");
throw new RuntimeException();
}
return obj;
}
//1 代表json序列化方式
@Override
public int getType() {
return 1;
}
}

View File

@ -0,0 +1,54 @@
package part1.common.serializer.mySerializer;
import java.io.*;
/**
* @author wxx
* @version 1.0
* @create 2024/6/2 22:36
*/
public class ObjectSerializer implements Serializer {
//利用Java io 对象 -字节数组
@Override
public byte[] serialize(Object obj) {
byte[] bytes=null;
ByteArrayOutputStream bos=new ByteArrayOutputStream();
try {
//是一个对象输出流用于将 Java 对象序列化为字节流并将其连接到bos上
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
//刷新 ObjectOutputStream确保所有缓冲区中的数据都被写入到底层流中
oos.flush();
//将bos其内部缓冲区中的数据转换为字节数组
bytes = bos.toByteArray();
oos.close();
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
return bytes;
}
//字节数组 -对象
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
try {
ObjectInputStream ois = new ObjectInputStream(bis);
obj = ois.readObject();
ois.close();
bis.close();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
return obj;
}
//0 代表Java 原生序列器
@Override
public int getType() {
return 0;
}
}

View File

@ -0,0 +1,29 @@
package part1.common.serializer.mySerializer;
/**
* @author wxx
* @version 1.0
* @create 2024/6/2 22:31
*/
public interface Serializer {
// 把对象序列化成字节数组
byte[] serialize(Object obj);
// 从字节数组反序列化成消息, 使用java自带序列化方式不用messageType也能得到相应的对象序列化字节数组里包含类信息
// 其它方式需指定消息格式再根据message转化成相应的对象
Object deserialize(byte[] bytes, int messageType);
// 返回使用的序列器是哪个
// 0java自带序列化方式, 1: json序列化方式
int getType();
// 根据序号取出序列化器暂时有两种实现方式需要其它方式实现这个接口即可
static Serializer getSerializerByCode(int code){
switch (code){
case 0:
return new ObjectSerializer();
case 1:
return new JsonSerializer();
default:
return null;
}
}
}

View File

@ -0,0 +1,32 @@
package part1.common.service.Impl;
import part1.common.pojo.User;
import part1.common.service.UserService;
import java.util.Random;
import java.util.UUID;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:28
*/
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

View File

@ -0,0 +1,16 @@
package part1.common.service;
import part1.common.pojo.User;
/**
* @author wxx
* @version 1.0
* @create 2024/1/28 16:27
*/
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}

View File

@ -0,0 +1,8 @@
log4j.rootLogger=ERROR, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n