version6:step1-心跳检测

This commit is contained in:
Wxx 2025-02-13 16:39:06 +08:00
parent f4f0a28179
commit 83b49f0365
167 changed files with 4091 additions and 1 deletions

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="version1" />
</profile>
</annotationProcessing>
</component>
</project>

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://maven.aliyun.com/repository/public" />
</remote-repository>
</component>
</project>

12
version1/.idea/misc.xml Normal file
View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="openjdk-17" project-jdk-type="JavaSDK" />
</project>

6
version1/.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

View File

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="9965864c-cbac-4894-9fcb-93103359c7c9" name="Changes" comment="" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
</component>
<component name="MavenImportPreferences">
<option name="generalSettings">
<MavenGeneralSettings>
<option name="useMavenConfig" value="true" />
</MavenGeneralSettings>
</option>
</component>
<component name="ProjectId" id="2rZLqmF1kJPC1rsizHNhnfx35iF" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="ProjectViewState">
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"WebServerToolWindowFactoryState": "false",
"last_opened_file_path": "E:/JAVA_coding/Rpc-New/RPC-Java/version1"
}
}]]></component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="9965864c-cbac-4894-9fcb-93103359c7c9" name="Changes" comment="" />
<created>1736761843999</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1736761843999</updated>
<workItem from="1736761845507" duration="6000" />
</task>
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
</project>

8
version4/.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="version4" />
</profile>
</annotationProcessing>
</component>
</project>

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://maven.aliyun.com/repository/public" />
</remote-repository>
</component>
</project>

12
version4/.idea/misc.xml Normal file
View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="openjdk-17" project-jdk-type="JavaSDK" />
</project>

6
version4/.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

View File

@ -16,5 +16,10 @@
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://maven.aliyun.com/repository/public" />
</remote-repository>
</component>
</project>

View File

@ -1,4 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings" defaultProject="true" />
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

8
version6/.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="krpc-core" />
<module name="krpc-common" />
<module name="krpc-provider" />
<module name="krpc-api" />
<module name="krpc-consumer" />
</profile>
</annotationProcessing>
</component>
</project>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/krpc-api/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-api/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-common/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-common/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-consumer/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-consumer/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-core/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-core/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-provider/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-provider/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://maven.aliyun.com/repository/public" />
</remote-repository>
</component>
</project>

14
version6/.idea/misc.xml Normal file
View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" project-jdk-name="17 (2)" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

6
version6/.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

20
version6/krpc-api/pom.xml Normal file
View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>version6</artifactId>
<groupId>com.kama</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>krpc-api</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,13 @@
package com.kama.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Retryable {
}

View File

@ -0,0 +1,26 @@
package com.kama.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ClassName User
* @Description User对象
* @Author Tong
* @LastChangeDate 2024-12-05 0:53
* @Version v5.0
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean gender;
}

View File

@ -0,0 +1,24 @@
package com.kama.service;
import com.kama.annotation.Retryable;
import com.kama.pojo.User;
/**
* @InterfaceName UserService
* @Description 接口
* @Author Tong
* @LastChangeDate 2024-12-05 0:52
* @Version v1.0
*/
public interface UserService {
// 查询
@Retryable
User getUserByUserId(Integer id);
// 新增
@Retryable
Integer insertUserId(User user);
}

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>version6</artifactId>
<groupId>com.kama</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>krpc-common</artifactId>
<dependencies>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
<scope>compile</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,17 @@
package common.exception;
/**
* @ClassName SerializeException
* @Description ToDo
* @Author Tong
* @LastChangeDate 2024-12-02 19:18
* @Version v1.0
*/
public class SerializeException extends RuntimeException{
public SerializeException(String message) {
super(message);
}
public SerializeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,13 @@
package common.message;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public enum MessageType {
REQUEST(0), RESPONSE(1);
private int code;
public int getCode() {
return code;
}
}

View File

@ -0,0 +1,31 @@
package common.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ClassName RpcRequest
* @Description 定义请求消息格式
* @Author Tong
* @LastChangeDate 2024-11-29 10:12
* @Version v5.0
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class RpcRequest implements Serializable {
//接口名方法名参数列表参数类型
private String interfaceName;
private String methodName;
private Object[] params;
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,37 @@
package common.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ClassName RpcResponse
* @Description 定义响应消息格式
* @Author Tong
* @LastChangeDate 2024-11-29 10:14
* @Version v5.0
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//更新加入传输数据的类型以便在自定义序列化器中解析
private Class<?> dataType;
//具体数据
private Object data;
public static RpcResponse sussess(Object data) {
return RpcResponse.builder().code(200).dataType(data.getClass()).data(data).build();
}
public static RpcResponse fail(String msg) {
return RpcResponse.builder().code(500).message(msg).build();
}
}

View File

@ -0,0 +1,58 @@
package common.serializer.mycoder;
import common.exception.SerializeException;
import common.message.MessageType;
import common.serializer.myserializer.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
/**
* @ClassName MyDecoder
* @Description 解码器
* @Author Tong
* @LastChangeDate 2024-11-29 10:32
* @Version v5.0
*/
@Slf4j
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
//检查可读字节数
if (in.readableBytes() < 6) { // messageType + serializerType + length
return;
}
//1.读取消息类型
short messageType = in.readShort();
// 现在还只支持request与response请求
if (messageType != MessageType.REQUEST.getCode() &&
messageType != MessageType.RESPONSE.getCode()) {
log.warn("暂不支持此种数据, messageType: {}", messageType);
return;
}
//2.读取序列化的方式&类型
short serializerType = in.readShort();
Serializer serializer = Serializer.getSerializerByCode(serializerType);
if (serializer == null) {
log.error("不存在对应的序列化器, serializerType: {}", serializerType);
throw new SerializeException("不存在对应的序列化器, serializerType: " + serializerType);
}
//3.读取序列化数组长度
int length = in.readInt();
if (in.readableBytes() < length) {
return; // 数据不完整等待更多数据
}
//4.读取序列化数组
byte[] bytes = new byte[length];
in.readBytes(bytes);
log.debug("Received bytes: {}", Arrays.toString(bytes));
Object deserialize = serializer.deserialize(bytes, messageType);
out.add(deserialize);
}
}

View File

@ -0,0 +1,50 @@
package common.serializer.mycoder;
import common.message.MessageType;
import common.message.RpcRequest;
import common.message.RpcResponse;
import common.serializer.myserializer.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName MyEncoder
* @Description 编码器
* @Author Tong
* @LastChangeDate 2024-11-29 10:32
* @Version v5.0
*/
@Slf4j
@AllArgsConstructor
public class MyEncoder extends MessageToByteEncoder {
private Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
log.debug("Encoding message of type: {}", msg.getClass());
//1.写入消息类型
if (msg instanceof RpcRequest) {
out.writeShort(MessageType.REQUEST.getCode());
} else if (msg instanceof RpcResponse) {
out.writeShort(MessageType.RESPONSE.getCode());
} else {
log.error("Unknown message type: {}", msg.getClass());
throw new IllegalArgumentException("Unknown message type: " + msg.getClass());
}
//2.写入序列化方式
out.writeShort(serializer.getType());
//得到序列化数组
byte[] serializeBytes = serializer.serialize(msg);
if (serializeBytes == null || serializeBytes.length == 0) {
throw new IllegalArgumentException("Serialized message is empty");
}
//3.写入长度
out.writeInt(serializeBytes.length);
//4.写入序列化数组
out.writeBytes(serializeBytes);
}
}

View File

@ -0,0 +1,51 @@
package common.serializer.myserializer;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import common.exception.SerializeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* @ClassName HessianSerializer
* @Description Hessian序列化
* @Author Tong
* @LastChangeDate 2024-11-29 11:49
* @Version v5.0
*/
public class HessianSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
// 使用 ByteArrayOutputStream HessianOutput 来实现对象的序列化
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
hessianOutput.writeObject(obj); // 将对象写入输出流
return byteArrayOutputStream.toByteArray(); // 返回字节数组
} catch (IOException e) {
throw new SerializeException("Serialization failed");
}
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
// 使用 ByteArrayInputStream HessianInput 来实现反序列化
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) {
HessianInput hessianInput = new HessianInput(byteArrayInputStream);
return hessianInput.readObject(); // 读取并返回对象
} catch (IOException e) {
throw new SerializeException("Deserialization failed");
}
}
@Override
public int getType() {
return 3;
}
@Override
public String toString() {
return "Hessian";
}
}

View File

@ -0,0 +1,78 @@
package common.serializer.myserializer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import common.message.RpcRequest;
import common.message.RpcResponse;
/**
* @ClassName JsonSerializer
* @Description json序列化
* @Author Tong
* @LastChangeDate 2024-11-29 10:33
* @Version v5.0
*/
public class JsonSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
byte[] bytes = JSONObject.toJSONBytes(obj);
return bytes;
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
// 传输的消息分为request与response
switch (messageType){
case 0:
RpcRequest request = JSON.parseObject(bytes, RpcRequest.class);
Object[] objects = new Object[request.getParams().length];
// 把json字串转化成对应的对象 fastjson可以读出基本数据类型不用转化
// 对转换后的request中的params属性逐个进行类型判断
for(int i = 0; i < objects.length; i++){
Class<?> paramsType = request.getParamsType()[i];
//判断每个对象类型是否和paramsTypes中的一致
if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())){
//如果不一致就行进行类型转换
objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i],request.getParamsType()[i]);
}else{
//如果一致就直接赋给objects[i]
objects[i] = request.getParams()[i];
}
}
request.setParams(objects);
obj = request;
break;
case 1:
RpcResponse response = JSON.parseObject(bytes, RpcResponse.class);
// 如果类型为空说明返回错误
if(response.getDataType()==null){
obj = RpcResponse.fail("类型为空");
break;
}
Class<?> dataType = response.getDataType();
//判断转化后的response对象中的data的类型是否正确
if(response.getData() != null && !dataType.isAssignableFrom(response.getData().getClass())){
response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
}
obj = response;
break;
default:
System.out.println("暂时不支持此种消息");
throw new RuntimeException();
}
return obj;
}
//1 代表json序列化方式
@Override
public int getType() {
return 1;
}
@Override
public String toString() {
return "Json";
}
}

View File

@ -0,0 +1,80 @@
package common.serializer.myserializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.kama.pojo.User;
import common.exception.SerializeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* @ClassName KryoSerializer
* @Description kryo序列化
* @Author Tong
* @LastChangeDate 2024-11-29 11:29
* @Version v5.0
*/
public class KryoSerializer implements Serializer {
private Kryo kryo;
public KryoSerializer() {
this.kryo = new Kryo();
}
@Override
public byte[] serialize(Object obj) {
if (obj == null) {
throw new IllegalArgumentException("Cannot serialize null object");
}
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
kryo.writeObject(output, obj); // 使用 Kryo 写入对象
return output.toBytes(); // 返回字节数组
} catch (Exception e) {
throw new SerializeException("Serialization failed");
}
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("Cannot deserialize null or empty byte array");
}
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
// 根据 messageType 来反序列化不同的类
Class<?> clazz = getClassForMessageType(messageType);
return kryo.readObject(input, clazz); // 使用 Kryo 反序列化对象
} catch (Exception e) {
throw new SerializeException("Deserialization failed");
}
}
@Override
public int getType() {
return 2;
}
private Class<?> getClassForMessageType(int messageType) {
if (messageType == 1) {
return User.class; // 假设我们在此反序列化成 User
} else {
throw new SerializeException("Unknown message type: " + messageType);
}
}
@Override
public String toString() {
return "Kryo";
}
}

View File

@ -0,0 +1,60 @@
package common.serializer.myserializer;
import java.io.*;
/**
* @ClassName ObjectSerializer
* @Description JDK序列化方式
* @Author Tong
* @LastChangeDate 2024-11-29 10:34
* @Version v5.0
*/
public class ObjectSerializer implements Serializer {
//利用Java io 对象 -字节数组
@Override
public byte[] serialize(Object obj) {
byte[] bytes=null;
ByteArrayOutputStream bos=new ByteArrayOutputStream();
try {
//是一个对象输出流用于将 Java 对象序列化为字节流并将其连接到bos上
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
//刷新 ObjectOutputStream确保所有缓冲区中的数据都被写入到底层流中
oos.flush();
//将bos其内部缓冲区中的数据转换为字节数组
bytes = bos.toByteArray();
oos.close();
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
return bytes;
}
//字节数组 -对象
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
try {
ObjectInputStream ois = new ObjectInputStream(bis);
obj = ois.readObject();
ois.close();
bis.close();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
return obj;
}
//0 代表Java 原生序列器
@Override
public int getType() {
return 0;
}
@Override
public String toString() {
return "JDK";
}
}

View File

@ -0,0 +1,83 @@
package common.serializer.myserializer;
import com.kama.pojo.User;
import common.exception.SerializeException;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
/**
* @ClassName ProtostuffSerializer
* @Description protostuff序列化
* @Author Tong
* @LastChangeDate 2024-11-29 11:55
* @Version v5.0
*/
public class ProtostuffSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
// 检查 null 对象
if (obj == null) {
throw new IllegalArgumentException("Cannot serialize null object");
}
// 获取对象的 schema
Schema schema = RuntimeSchema.getSchema(obj.getClass());
// 使用 LinkedBuffer 来创建缓冲区默认大小 1024
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
// 序列化对象为字节数组
byte[] bytes;
try {
bytes = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}
return bytes;
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("Cannot deserialize null or empty byte array");
}
// 根据 messageType 来决定反序列化的类这里假设 `messageType` 是类的标识符
Class<?> clazz = getClassForMessageType(messageType);
// 获取对象的 schema
Schema schema = RuntimeSchema.getSchema(clazz);
// 创建一个空的对象实例
Object obj;
try {
obj = clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new SerializeException("Deserialization failed due to reflection issues");
}
// 反序列化字节数组为对象
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
@Override
public int getType() {
return 4;
}
// 用于根据 messageType 获取对应的类
private Class<?> getClassForMessageType(int messageType) {
if (messageType == 1) {
return User.class; // 假设我们在此反序列化成 User
} else {
throw new SerializeException("Unknown message type: " + messageType);
}
}
@Override
public String toString() {
return "Protostuff";
}
}

View File

@ -0,0 +1,38 @@
package common.serializer.myserializer;
import java.util.HashMap;
import java.util.Map;
/**
* @InterfaceName Serializer
* @Description 序列化接口
* @Author Tong
* @LastChangeDate 2024-11-29 10:33
* @Version v5.0
*/
public interface Serializer {
byte[] serialize(Object obj);
Object deserialize(byte[] bytes, int messageType);
int getType();
// 定义静态常量 serializerMap
static final Map<Integer, Serializer> serializerMap = new HashMap<>();
// 使用 Map 存储序列化器
static Serializer getSerializerByCode(int code) {
// 静态映射保证只初始化一次
if(serializerMap.isEmpty()) {
serializerMap.put(0, new ObjectSerializer());
serializerMap.put(1, new JsonSerializer());
serializerMap.put(2, new KryoSerializer());
serializerMap.put(3, new HessianSerializer());
serializerMap.put(4, new ProtostuffSerializer());
}
return serializerMap.get(code); // 如果不存在则返回 null
}
}

View File

@ -0,0 +1,111 @@
package common.spi;
import cn.hutool.core.io.resource.ResourceUtil;
import common.serializer.myserializer.Serializer;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName SpiLoader
* @Description spi实现
* @Author Tong
* @LastChangeDate 2024-12-05 14:53
* @Version v5.0
*/
@Slf4j
public class SpiLoader {
// 存储已加载的 SPI 实现类的映射
private static final Map<String, Map<String, Class<? extends Serializer>>> loadedSpiMap = new ConcurrentHashMap<>();
// 缓存实例避免重复实例化
private static final Map<String, Object> instanceCache = new ConcurrentHashMap<>();
// SPI 配置文件的路径
private static final String SPI_CONFIG_DIR = "META-INF/serializer/";
/**
* 加载指定接口的 SPI 实现类
*
* @param serviceInterface 接口类
*/
public static void loadSpi(Class<?> serviceInterface) {
String interfaceName = serviceInterface.getName();
// 如果已经加载过该接口的 SPI 实现直接返回
if (loadedSpiMap.containsKey(interfaceName)) {
return;
}
Map<String, Class<? extends Serializer>> keyClassMap = new HashMap<>();
// 读取配置文件获取所有实现类
List<URL> resources = ResourceUtil.getResources(SPI_CONFIG_DIR + serviceInterface.getName());
for (URL resource : resources) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.openStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty() && !line.startsWith("#")) {
String[] parts = line.split("=");
if (parts.length == 2) {
String key = parts[0].trim();
String className = parts[1].trim();
Class<?> implClass = Class.forName(className);
if (serviceInterface.isAssignableFrom(implClass)) {
keyClassMap.put(key, (Class<? extends Serializer>) implClass);
}
}
}
}
} catch (IOException | ClassNotFoundException e) {
log.error("Failed to load SPI resource: " + resource, e);
}
}
// 将该接口的 SPI 实现类存入缓存
loadedSpiMap.put(interfaceName, keyClassMap);
}
/**
* 根据接口和 key 获取 SPI 实现类实例
*
* @param serviceInterface 接口类
* @param key 序列化器的 key
* @param <T> 接口类型
* @return 对应的 SPI 实现类实例
*/
public static <T> T getInstance(Class<T> serviceInterface, String key) {
String interfaceName = serviceInterface.getName();
Map<String, Class<? extends Serializer>> keyClassMap = loadedSpiMap.get(interfaceName);
if (keyClassMap == null) {
throw new RuntimeException("SPI not loaded for " + interfaceName);
}
Class<? extends Serializer> implClass = keyClassMap.get(key);
if (implClass == null) {
throw new RuntimeException("No SPI implementation found for key " + key);
}
// 从缓存中获取实例如果不存在则创建
String implClassName = implClass.getName();
if (!instanceCache.containsKey(implClassName)) {
try {
instanceCache.put(implClassName, implClass.newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Failed to instantiate SPI implementation: " + implClassName, e);
}
}
return (T) instanceCache.get(implClassName);
}
}

View File

@ -0,0 +1,48 @@
package common.util;
import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName ConfigUtil
* @Description 工具
* @Author Tong
* @LastChangeDate 2024-12-05 11:12
* @Version v5.0
*/
@Slf4j
public class ConfigUtil {
// 加载配置文件使用默认环境
public static <T> T loadConfig(Class<T> targetClass, String prefix) {
return loadConfig(targetClass, prefix, "");
}
// 加载配置文件支持指定环境
public static <T> T loadConfig(Class<T> targetClass, String prefix, String environment) {
StringBuilder configFileNameBuilder = new StringBuilder("application");
if (StrUtil.isNotBlank(environment)) {
configFileNameBuilder.append("-").append(environment);
}
configFileNameBuilder.append(".properties");
// 加载配置文件
Props properties = new Props(configFileNameBuilder.toString());
if (properties.isEmpty()) {
log.warn("配置文件 '{}' 为空或加载失败!", configFileNameBuilder.toString());
} else {
log.info("加载配置文件: '{}'", configFileNameBuilder.toString());
}
// 返回转化后的配置对象
try {
return properties.toBean(targetClass, prefix);
} catch (Exception e) {
log.error("配置转换失败,目标类: {}", targetClass.getName(), e);
throw new RuntimeException("配置加载失败", e);
}
}
}

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>version6</artifactId>
<groupId>com.kama</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>krpc-consumer</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,66 @@
package com.kama.consumer;
import com.kama.client.proxy.ClientProxy;
import com.kama.pojo.User;
import com.kama.service.UserService;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ClassName ConsumerExample
* @Description 客户端测试
* @Author Tong
* @LastChangeDate 2024-12-05 16:20
* @Version v5.0
*/
@Slf4j
public class ConsumerTest {
private static final int THREAD_POOL_SIZE = 20;
private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public static void main(String[] args) throws InterruptedException {
ClientProxy clientProxy = new ClientProxy();
UserService proxy = clientProxy.getProxy(UserService.class);
for (int i = 0; i < 120; i++) {
final Integer i1 = i;
if (i % 30 == 0) {
// Simulate delay for every 30 requests
Thread.sleep(10000);
}
// Submit tasks to executor service (thread pool)
executorService.submit(() -> {
try {
User user = proxy.getUserByUserId(i1);
if (user != null) {
log.info("从服务端得到的user={}", user);
} else {
log.warn("获取的 user 为 null, userId={}", i1);
}
Integer id = proxy.insertUserId(User.builder()
.id(i1)
.userName("User" + i1)
.gender(true)
.build());
if (id != null) {
log.info("向服务端插入user的id={}", id);
} else {
log.warn("插入失败返回的id为null, userId={}", i1);
}
} catch (Exception e) {
log.error("调用服务时发生异常userId={}", i1, e);
}
});
}
// Gracefully shutdown the executor service
executorService.shutdown();
clientProxy.close();
}
}

View File

@ -0,0 +1,20 @@
package com.kama.consumer;
import com.kama.config.KRpcConfig;
import common.util.ConfigUtil;
/**
* @ClassName ConsumerTestConfig
* @Description 测试配置顶
* @Author Tong
* @LastChangeDate 2024-12-05 11:29
* @Version v1.0
*/
public class ConsumerTestConfig {
public static void main(String[] args) {
KRpcConfig rpc = ConfigUtil.loadConfig(KRpcConfig.class, "rpc");
System.out.println(rpc);
}
}

View File

@ -0,0 +1,7 @@
rpc.name=krpc
rpc.version=1.0.0
rpc.port=9999
rpc.serializer=Hessian
rpc.host=localhost
rpc.registry=zookeeper
rpc.loadBalance=ConsistencyHash

View File

@ -0,0 +1,7 @@
rpc.name=krpc
rpc.version=1.0.0
rpc.port=9999
rpc.serializer=Hessian
rpc.host=localhost
rpc.registry=zookeeper
rpc.loadBalance=ConsistencyHash

115
version6/krpc-core/pom.xml Normal file
View File

@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>version6</artifactId>
<groupId>com.kama</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>krpc-core</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
<scope>compile</scope>
</dependency>
<!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn但不影响正常使用-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.66</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>17.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
</dependency>
<!-- JUnit 5 API 和 Engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.11.3</version>
<scope>test</scope>
</dependency>
<!-- JUnit Platform Launcher -->
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,48 @@
package com.kama;
import com.kama.config.KRpcConfig;
import com.kama.config.RpcConstant;
import common.util.ConfigUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName RpcApplication
* @Description 测试配置顶学习更多参考Dubbo
* @Author Tong
* @LastChangeDate 2024-12-05 11:22
* @Version v5.0
*/
@Slf4j
public class KRpcApplication {
private static volatile KRpcConfig rpcConfigInstance;
public static void initialize(KRpcConfig customRpcConfig) {
rpcConfigInstance = customRpcConfig;
log.info("RPC 框架初始化,配置 = {}", customRpcConfig);
}
public static void initialize() {
KRpcConfig customRpcConfig;
try {
customRpcConfig = ConfigUtil.loadConfig(KRpcConfig.class, RpcConstant.CONFIG_FILE_PREFIX);
log.info("成功加载配置文件,配置文件名称 = {}", RpcConstant.CONFIG_FILE_PREFIX); // 添加成功加载的日志
} catch (Exception e) {
// 配置加载失败使用默认配置
customRpcConfig = new KRpcConfig();
log.warn("配置加载失败,使用默认配置");
}
initialize(customRpcConfig);
}
public static KRpcConfig getRpcConfig() {
if (rpcConfigInstance == null) {
synchronized (KRpcApplication.class) {
if (rpcConfigInstance == null) {
initialize(); // 确保在第一次调用时初始化
}
}
}
return rpcConfigInstance;
}
}

View File

@ -0,0 +1,74 @@
package com.kama.client.cache;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName serviceCache
* @Description 建立本地缓存
* @Author Tong
* @LastChangeDate 2024-12-02 10:34
* @Version v5.0
*/
@Slf4j
public class ServiceCache {
//key: serviceName 服务名
//value addressList 服务提供者列表
private static Map<String, List<String>> cache = new ConcurrentHashMap<>();
//添加服务
public void addServiceToCache(String serviceName, String address) {
if (cache.containsKey(serviceName)) {
List<String> addressList = cache.get(serviceName);
addressList.add(address);
log.info("有服务名情况将name为{}和地址为{}的服务添加到本地缓存中", serviceName, address);
} else {
List<String> addressList = new ArrayList<>();
addressList.add(address);
cache.put(serviceName, addressList);
log.info("无服务名情况将name为{}和地址为{}的服务添加到本地缓存中", serviceName, address);
}
}
//修改服务地址
public void replaceServiceAddress(String serviceName, String oldAddress, String newAddress) {
if (cache.containsKey(serviceName)) {
List<String> addressList = cache.get(serviceName);
addressList.remove(oldAddress);
addressList.add(newAddress);
log.info("将服务{}的地址{}替换为{}", serviceName, oldAddress, newAddress);
} else {
log.error("旧地址{}不在服务{}的地址列表中", oldAddress, serviceName);
}
}
//从缓存中取服务地址列表
public List<String> getServiceListFromCache(String serviceName) {
if (!cache.containsKey(serviceName)) {
log.warn("服务{}未找到", serviceName);
//返回个不可修改的空列表避免调用的时候出现空指针异常
return Collections.emptyList();
}
return cache.get(serviceName);
}
//从缓存中删除服务地址
public void delete(String serviceName, String address) {
List<String> addressList = cache.get(serviceName);
if (addressList != null && addressList.contains(address)) {
addressList.remove(address);
log.info("将name为{}和地址为{}的服务从本地缓存中删除", serviceName, address);
if (addressList.isEmpty()) {
cache.remove(serviceName); // 移除该服务的缓存条目
log.info("服务{}的地址列表为空,已从缓存中清除", serviceName);
}
} else {
log.warn("删除失败,地址{}不在服务{}的地址列表中", address, serviceName);
}
}
}

View File

@ -0,0 +1,107 @@
package com.kama.client.circuitbreaker;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName CircuitBreaker
* @Description 熔断器的状态
* @Author Tong
* @LastChangeDate 2024-12-02 10:45
* @Version v5.0
*/
@Slf4j
public class CircuitBreaker {
//当前状态
private CircuitBreakerState state = CircuitBreakerState.CLOSED;
private AtomicInteger failureCount = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger requestCount = new AtomicInteger(0);
//失败次数阈值
private final int failureThreshold;
//半开启-关闭状态的成功次数比例
private final double halfOpenSuccessRate;
//恢复时间
private final long retryTimePeriod;
//上一次失败时间
private long lastFailureTime = 0;
public CircuitBreaker(int failureThreshold, double halfOpenSuccessRate, long retryTimePeriod) {
this.failureThreshold = failureThreshold;
this.halfOpenSuccessRate = halfOpenSuccessRate;
this.retryTimePeriod = retryTimePeriod;
}
//查看当前熔断器是否允许请求通过
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
log.info("熔断前检查, 当前失败次数:{}", failureCount);
switch (state) {
case OPEN:
if (currentTime - lastFailureTime > retryTimePeriod) {
state = CircuitBreakerState.HALF_OPEN;
resetCounts();
log.info("熔断已解除,进入半开启状态,允许请求通过");
return true;
}
log.warn("熔断生效中,拒绝请求!");
return false;
case HALF_OPEN:
requestCount.incrementAndGet();
log.info("当前为半开启状态,计数请求");
return true;
case CLOSED:
default:
log.info("当前为正常状态,允许请求通过");
return true;
}
}
//记录成功
public synchronized void recordSuccess() {
if (state == CircuitBreakerState.HALF_OPEN) {
successCount.incrementAndGet();
if (successCount.get() >= halfOpenSuccessRate * requestCount.get()) {
state = CircuitBreakerState.CLOSED;
resetCounts();
log.info("成功次数已达到阈值,熔断器切换至关闭状态");
}
} else {
resetCounts();
log.info("熔断器处于关闭状态,重置计数器");
}
}
//记录失败
public synchronized void recordFailure() {
failureCount.incrementAndGet();
log.error("记录失败,当前失败次数:{}", failureCount);
lastFailureTime = System.currentTimeMillis();
if (state == CircuitBreakerState.HALF_OPEN) {
state = CircuitBreakerState.OPEN;
lastFailureTime = System.currentTimeMillis();
log.warn("半开启状态下发生失败,熔断器切换至开启状态");
} else if (failureCount.get() >= failureThreshold) {
state = CircuitBreakerState.OPEN;
log.error("失败次数已超过阈值,熔断器切换至开启状态");
}
}
//重置次数
private void resetCounts() {
failureCount.set(0);
successCount.set(0);
requestCount.set(0);
}
public CircuitBreakerState getState() {
return state;
}
}
enum CircuitBreakerState {
//关闭开启半开启
CLOSED, OPEN, HALF_OPEN
}

View File

@ -0,0 +1,28 @@
package com.kama.client.circuitbreaker;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName CircuitBreakerState
* @Description 提供熔断器
* @Author Tong
* @LastChangeDate 2024-12-02 10:47
* @Version v5.0
*/
@Slf4j
public class CircuitBreakerProvider {
// 使用线程安全的 ConcurrentHashMap
private Map<String, CircuitBreaker> circuitBreakerMap = new ConcurrentHashMap<>();
public synchronized CircuitBreaker getCircuitBreaker(String serviceName) {
// 使用 computeIfAbsent避免手动同步
return circuitBreakerMap.computeIfAbsent(serviceName, key -> {
log.info("服务 [{}] 不存在熔断器,创建新的熔断器实例", serviceName);
// 创建并返回新熔断器
return new CircuitBreaker(1, 0.5, 10000);
});
}
}

View File

@ -0,0 +1,29 @@
package com.kama.client.netty;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @author wxx
* @version 1.0
* @create 2025/2/13 15:01
*/
public class HeartbeatHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
IdleState idleState = idleStateEvent.state();
if(idleState == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("两秒没有写数据,发送心跳包\n");
System.out.println("超过两秒没有写数据,发送心跳包");
}
}
}
}

View File

@ -0,0 +1,34 @@
package com.kama.client.netty;
import common.message.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName NettyClientHandler
* @Description 客户端处理器
* @Author Tong
* @LastChangeDate 2024-12-02 10:15
* @Version v5.0
*/
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名让sendRequest里读取response
AttributeKey<RpcResponse> RESPONSE_KEY = AttributeKey.valueOf("RPCResponse");
// 将响应存入 Channel 属性
ctx.channel().attr(RESPONSE_KEY).set(response);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Channel exception occurred", cause);
ctx.close();
}
}

View File

@ -0,0 +1,45 @@
package com.kama.client.netty;
import common.serializer.mycoder.MyDecoder;
import common.serializer.mycoder.MyEncoder;
import common.serializer.myserializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @ClassName NettyClientInitializer
* @Description 配置自定义的编码器以及Handler
* @Author Tong
* @LastChangeDate 2024-12-02 10:16
* @Version v5.0
*/
@Slf4j
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 使用自定义的编码器和解码器
try {
// 根据传入的序列化器类型初始化编码器
pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyClientHandler());
// 客户端只关注写事件如果超过2秒没有发送数据则发送心跳包
pipeline.addLast(new IdleStateHandler(0, 2, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).toString());
} catch (Exception e) {
log.error("Error initializing Netty client pipeline", e);
throw e; // 重新抛出异常确保管道初始化失败时处理正确
}
}
}

View File

@ -0,0 +1,117 @@
package com.kama.client.proxy;
import com.kama.client.circuitbreaker.CircuitBreaker;
import com.kama.client.circuitbreaker.CircuitBreakerProvider;
import com.kama.client.retry.GuavaRetry;
import com.kama.client.rpcclient.RpcClient;
import com.kama.client.rpcclient.impl.NettyRpcClient;
import com.kama.client.servicecenter.ServiceCenter;
import com.kama.client.servicecenter.ZKServiceCenter;
import common.message.RpcRequest;
import common.message.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
/**
* @ClassName ClientProxy
* @Description 动态代理
* @Author Tong
* @LastChangeDate 2024-12-02 10:14
* @Version v5.0
*/
@Slf4j
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private RpcClient rpcClient;
private ServiceCenter serviceCenter;
private CircuitBreakerProvider circuitBreakerProvider;
public ClientProxy() throws InterruptedException {
serviceCenter = new ZKServiceCenter();
circuitBreakerProvider = new CircuitBreakerProvider();
}
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//获取熔断器
CircuitBreaker circuitBreaker = circuitBreakerProvider.getCircuitBreaker(method.getName());
//判断熔断器是否允许请求经过
if (!circuitBreaker.allowRequest()) {
log.warn("熔断器开启,请求被拒绝: {}", request);
//这里可以针对熔断做特殊处理返回特殊值
return null;
}
//数据传输
RpcResponse response;
//后续添加逻辑为保持幂等性只对白名单上的服务进行重试
// 如果启用重试机制先检查是否需要重试
String methodSignature = getMethodSignature(request.getInterfaceName(), method);
log.info("方法签名: " + methodSignature);
InetSocketAddress serviceAddress = serviceCenter.serviceDiscovery(request);
rpcClient = new NettyRpcClient(serviceAddress);
if (serviceCenter.checkRetry(serviceAddress, methodSignature)) {
//调用retry框架进行重试操作
try {
log.info("尝试重试调用服务: {}", methodSignature);
response = new GuavaRetry().sendServiceWithRetry(request, rpcClient);
} catch (Exception e) {
log.error("重试调用失败: {}", methodSignature, e);
circuitBreaker.recordFailure();
throw e; // 将异常抛给调用者
}
} else {
//只调用一次
response = rpcClient.sendRequest(request);
}
//记录response的状态上报给熔断器
if (response != null) {
if (response.getCode() == 200) {
circuitBreaker.recordSuccess();
} else if (response.getCode() == 500) {
circuitBreaker.recordFailure();
}
log.info("收到响应: {} 状态码: {}", request.getInterfaceName(), response.getCode());
}
return response != null ? response.getData() : null;
}
public <T> T getProxy(Class<T> clazz) {
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T) o;
}
// 根据接口名字和方法获取方法签名
private String getMethodSignature(String interfaceName, Method method) {
StringBuilder sb = new StringBuilder();
sb.append(interfaceName).append("#").append(method.getName()).append("(");
Class<?>[] parameterTypes = method.getParameterTypes();
for (int i = 0; i < parameterTypes.length; i++) {
sb.append(parameterTypes[i].getName());
if (i < parameterTypes.length - 1) {
sb.append(",");
} else{
sb.append(")");
}
}
return sb.toString();
}
//关闭创建的资源
//如果在需要C-S保持长连接的场景下无需调用close方法
public void close(){
rpcClient.close();
serviceCenter.close();
}
}

View File

@ -0,0 +1,46 @@
package com.kama.client.retry;
import com.github.rholder.retry.*;
import com.kama.client.rpcclient.RpcClient;
import common.message.RpcRequest;
import common.message.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @ClassName guavaRetry
* @Description 重试策略
* @Author Tong
* @LastChangeDate 2024-12-02 10:44
* @Version v5.0
*/
@Slf4j
public class GuavaRetry {
public RpcResponse sendServiceWithRetry(RpcRequest request, RpcClient rpcClient) {
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
//无论出现什么异常都进行重试
.retryIfException()
//返回结果为 error时进行重试
.retryIfResult(response -> Objects.equals(response.getCode(), 500))
//重试等待策略等待 2s 后再进行重试
.withWaitStrategy(WaitStrategies.fixedWait(2, TimeUnit.SECONDS))
//重试停止策略重试达到 3
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("重试第 {} 次", attempt.getAttemptNumber());
}
})
.build();
try {
return retryer.call(() -> rpcClient.sendRequest(request));
} catch (Exception e) {
log.error("重试失败: 请求 {} 执行时遇到异常", request.getMethodName(), e);
}
return RpcResponse.fail("重试失败,所有重试尝试已结束");
}
}

View File

@ -0,0 +1,18 @@
package com.kama.client.rpcclient;
import common.message.RpcRequest;
import common.message.RpcResponse;
/**
* @InterfaceName RpcClient
* @Description 定义底层通信方法
* @Author Tong
* @LastChangeDate 2024-12-02 10:11
* @Version v5.0
*/
public interface RpcClient {
RpcResponse sendRequest(RpcRequest request);
void close();
}

View File

@ -0,0 +1,97 @@
package com.kama.client.rpcclient.impl;
import com.kama.client.netty.NettyClientInitializer;
import com.kama.client.rpcclient.RpcClient;
import common.message.RpcRequest;
import common.message.RpcResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @ClassName NettyRpcClient
* @Description Netty客户端
* @Author Tong
* @LastChangeDate 2024-12-02 11:03
* @Version v5.0
*/
@Slf4j
public class NettyRpcClient implements RpcClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private final InetSocketAddress address;
public NettyRpcClient(InetSocketAddress serviceAddress) {
this.address = serviceAddress;
}
//netty客户端初始化
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
//从注册中心获取host,post
if (address == null) {
log.error("服务发现失败,返回的地址为 null");
return RpcResponse.fail("服务发现失败,地址为 null");
}
String host = address.getHostName();
int port = address.getPort();
try {
// 连接到远程服务
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果通过给channel设计别名获取特定名字下的channel中的内容这个在hanlder中设置
// AttributeKey是线程隔离的不会由线程安全问题
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get();
if (response == null) {
log.error("服务响应为空,可能是请求失败或超时");
return RpcResponse.fail("服务响应为空");
}
log.info("收到响应: {}", response);
return response;
} catch (InterruptedException e) {
log.error("请求被中断,发送请求失败: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("发送请求时发生异常: {}", e.getMessage(), e);
} finally {
//
}
return RpcResponse.fail("请求失败");
}
// 优雅关闭 Netty 资源
public void close() {
try {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().sync();
}
} catch (InterruptedException e) {
log.error("关闭 Netty 资源时发生异常: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}

View File

@ -0,0 +1,61 @@
package com.kama.client.rpcclient.impl;
import com.kama.client.rpcclient.RpcClient;
import common.message.RpcRequest;
import common.message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
/**
* @ClassName SimpleSocketRpcClient
* @Description 实现简单客户都
* @Author Tong
* @LastChangeDate 2024-12-02 10:12
* @Version v5.0
*/
public class SimpleSocketRpcClient implements RpcClient {
private String host;
private int port;
public SimpleSocketRpcClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
// 定义响应对象
RpcResponse response = null;
// 创建 Socket 和流对象
try (Socket socket = new Socket(host, port);
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream())) {
// 发送请求对象
oos.writeObject(request);
oos.flush();
// 接收响应对象
response = (RpcResponse) ois.readObject();
} catch (UnknownHostException e) {
System.err.println("未知的主机: " + host);
} catch (IOException e) {
System.err.println("I/O 错误: " + e.getMessage());
} catch (ClassNotFoundException e) {
System.err.println("无法识别的类: " + e.getMessage());
}
return response;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,25 @@
package com.kama.client.servicecenter;
import common.message.RpcRequest;
import java.net.InetSocketAddress;
/**
* @InterfaceName ServiceCenter
* @Description 服务中心接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:31
* @Version v5.0
*/
public interface ServiceCenter {
// 查询根据服务名查找地址
InetSocketAddress serviceDiscovery(RpcRequest request);
//判断是否可重试
boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature);
//关闭客户端
void close();
}

View File

@ -0,0 +1,122 @@
package com.kama.client.servicecenter;
import com.kama.client.cache.ServiceCache;
import com.kama.client.servicecenter.ZKWatcher.watchZK;
import com.kama.client.servicecenter.balance.LoadBalance;
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
import common.message.RpcRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @ClassName ZKServiceCenter
* @Description 从服务中心获取服务地址
* @Author Tong
* @LastChangeDate 2024-12-02 10:33
* @Version v5.0
*/
@Slf4j
public class ZKServiceCenter implements ServiceCenter {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
private static final String RETRY = "CanRetry";
//serviceCache
private ServiceCache cache;
private final LoadBalance loadBalance = new ConsistencyHashBalance();
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceCenter() throws InterruptedException {
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
log.info("Zookeeper 连接成功");
//初始化本地缓存
cache = new ServiceCache();
//加入zookeeper事件监听器
watchZK watcher = new watchZK(client, cache);
//监听启动
watcher.watchToUpdate(ROOT_PATH);
}
//根据服务名接口名返回地址
@Override
public InetSocketAddress serviceDiscovery(RpcRequest request) {
String serviceName = request.getInterfaceName();
try {
//先从本地缓存中找
List<String> addressList = cache.getServiceListFromCache(serviceName);
//如果找不到再去zookeeper中找
//这种i情况基本不会发生或者说只会出现在初始化阶段
if (addressList == null) {
addressList = client.getChildren().forPath("/" + serviceName);
// 如果本地缓存中没有该服务名的地址列表则添加
List<String> cachedAddresses = cache.getServiceListFromCache(serviceName);
if (cachedAddresses == null || cachedAddresses.isEmpty()) {
// 假设 addServiceToCache 方法可以处理单个地址
for (String address : addressList) {
cache.addServiceToCache(serviceName, address);
}
}
}
if (addressList.isEmpty()) {
log.warn("未找到服务:{}", serviceName);
return null;
}
// 负载均衡得到地址
String address = loadBalance.balance(addressList);
return parseAddress(address);
} catch (Exception e) {
log.error("服务发现失败,服务名:{}", serviceName, e);
}
return null;
}
//保证线程安全使用CopyOnWriteArraySet
private Set<String> retryServiceCache = new CopyOnWriteArraySet<>();
//写一个白名单缓存优化性能
@Override
public boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature) {
if (retryServiceCache.isEmpty()) {
try {
CuratorFramework rootClient = client.usingNamespace(RETRY);
List<String> retryableMethods = rootClient.getChildren().forPath("/" + getServiceAddress(serviceAddress));
retryServiceCache.addAll(retryableMethods);
} catch (Exception e) {
log.error("检查重试失败,方法签名:{}", methodSignature, e);
}
}
return retryServiceCache.contains(methodSignature);
}
@Override
public void close() {
client.close();
}
// 将InetSocketAddress解析为格式为ip:port的字符串
private String getServiceAddress(InetSocketAddress serverAddress){
return serverAddress.getHostName() + ":" + serverAddress.getPort();
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,98 @@
package com.kama.client.servicecenter.ZKWatcher;
import com.kama.client.cache.ServiceCache;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
/**
* @ClassName watchZK
* @Description 节点监听
* @Author Tong
* @LastChangeDate 2024-12-02 10:37
* @Version v5.0
*/
@Slf4j
public class watchZK {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//本地缓存
ServiceCache cache;
public watchZK(CuratorFramework client, ServiceCache cache) {
this.client = client;
this.cache = cache;
}
/**
* 监听当前节点和子节点的 更新创建删除
*
* @param path
*/
public void watchToUpdate(String path) throws InterruptedException {
CuratorCache curatorCache = CuratorCache.build(client, "/");
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData childData, ChildData childData1) {
// 第一个参数事件类型枚举
// 第二个参数节点更新前的状态数据
// 第三个参数节点更新后的状态数据
// 创建节点时节点刚被创建不存在 更新前节点 所以第二个参数为 null
// 删除节点时节点被删除不存在 更新后节点 所以第三个参数为 null
// 节点创建时没有赋予值 create /curator/app1 只创建节点在这种情况下更新前节点的 data null获取不到更新前节点的数据
switch (type.name()) {
case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
String[] pathList = pasrePath(childData1);
if (pathList.length <= 2) break;
else {
String serviceName = pathList[1];
String address = pathList[2];
//将新注册的服务加入到本地缓存中
cache.addServiceToCache(serviceName, address);
log.info("节点创建:服务名称 {} 地址 {}", serviceName, address);
}
break;
case "NODE_CHANGED": // 节点更新
if (childData.getData() != null) {
log.debug("修改前的数据: {}", new String(childData.getData()));
} else {
log.debug("节点第一次赋值!");
}
String[] oldPathList = pasrePath(childData);
String[] newPathList = pasrePath(childData1);
cache.replaceServiceAddress(oldPathList[1], oldPathList[2], newPathList[2]);
log.info("节点更新:服务名称 {} 地址从 {} 更新为 {}", oldPathList[1], oldPathList[2], newPathList[2]);
break;
case "NODE_DELETED": // 节点删除
String[] pathList_d = pasrePath(childData);
if (pathList_d.length <= 2) break;
else {
String serviceName = pathList_d[1];
String address = pathList_d[2];
//将新注册的服务加入到本地缓存中
cache.delete(serviceName, address);
log.info("节点删除:服务名称 {} 地址 {}", serviceName, address);
}
break;
default:
break;
}
}
});
//开启监听
curatorCache.start();
}
//解析节点对应地址
public String[] pasrePath(ChildData childData) {
//获取更新的节点的路径
String path = new String(childData.getPath());
log.info("节点路径:{}",path);
//按照格式 读取
return path.split("/");
}
}

View File

@ -0,0 +1,20 @@
package com.kama.client.servicecenter.balance;
import java.util.List;
/**
* @InterfaceName LoadBalance
* @Description 负载均衡接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:40
* @Version v5.0
*/
public interface LoadBalance {
String balance(List<String> addressList);
void addNode(String node);
void delNode(String node);
}

View File

@ -0,0 +1,149 @@
package com.kama.client.servicecenter.balance.impl;
import com.kama.client.servicecenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
/**
* @ClassName ConsistencyHashBalance
* @Description 一致性哈希算法负载均衡
* @Author Tong
* @LastChangeDate 2024-12-02 10:42
* @Version v5.0
*/
@Slf4j
public class ConsistencyHashBalance implements LoadBalance {
// 虚拟节点的个数
private static final int VIRTUAL_NUM = 5;
// 虚拟节点分配key是hash值value是虚拟节点服务器名称
private SortedMap<Integer, String> shards = new TreeMap<Integer,String>();
// 真实节点列表
private List<String> realNodes = new LinkedList<>();
// 获取虚拟节点的个数
public static int getVirtualNum() {
return VIRTUAL_NUM;
}
// 初始化虚拟节点
public void init(List<String> serviceList) {
for (String server : serviceList) {
realNodes.add(server);
log.info("真实节点[{}] 被添加", server);
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = server + "&&VN" + i;
int hash = getHash(virtualNode);
shards.put(hash, virtualNode);
log.info("虚拟节点[{}] hash:{},被添加", virtualNode, hash);
}
}
}
/**
* 获取被分配的节点名
*
* @param node 请求的节点通常是请求的唯一标识符
* @return 负责该请求的真实节点名称
*/
public String getServer(String node, List<String> serviceList) {
if (shards.isEmpty()) {
init(serviceList); // 初始化如果shards为空
}
int hash = getHash(node);
Integer key = null;
SortedMap<Integer, String> subMap = shards.tailMap(hash);
if (subMap.isEmpty()) {
key = shards.firstKey(); // 如果没有大于该hash的节点则返回最小的hash值
} else {
key = subMap.firstKey();
}
String virtualNode = shards.get(key);
return virtualNode.substring(0, virtualNode.indexOf("&&"));
}
/**
* 添加节点
*
* @param node 新加入的节点
*/
public void addNode(String node) {
if (!realNodes.contains(node)) {
realNodes.add(node);
log.info("真实节点[{}] 上线添加", node);
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
shards.put(hash, virtualNode);
log.info("虚拟节点[{}] hash:{},被添加", virtualNode, hash);
}
}
}
/**
* 删除节点
*
* @param node 被移除的节点
*/
public void delNode(String node) {
if (realNodes.contains(node)) {
realNodes.remove(node);
log.info("真实节点[{}] 下线移除", node);
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
shards.remove(hash);
log.info("虚拟节点[{}] hash:{},被移除", virtualNode, hash);
}
}
}
/**
* FNV1_32_HASH算法
*/
private static int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0)
hash = Math.abs(hash);
return hash;
}
@Override
public String balance(List<String> addressList) {
// 如果 addressList 为空或 null抛出 IllegalArgumentException
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Address list cannot be null or empty");
}
// 使用UUID作为请求的唯一标识符来进行一致性哈希
String random = UUID.randomUUID().toString();
return getServer(random, addressList);
}
public SortedMap<Integer, String> getShards() {
return shards;
}
public List<String> getRealNodes() {
return realNodes;
}
@Override
public String toString() {
return "ConsistencyHash";
}
}

View File

@ -0,0 +1,48 @@
package com.kama.client.servicecenter.balance.impl;
import com.kama.client.servicecenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @ClassName RandomLoadBalance
* @Description 随机法
* @Author Tong
* @LastChangeDate 2024-12-02 10:40
* @Version v5.0
*/
@Slf4j
public class RandomLoadBalance implements LoadBalance {
// 将Random声明为类级别的字段
private final Random random = new Random();
private final List<String> addressList = new CopyOnWriteArrayList<>();
@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Address list cannot be null or empty");
}
int choose = random.nextInt(addressList.size());
log.info("负载均衡选择了第 {} 号服务器,地址是:{}", choose, addressList.get(choose));
return addressList.get(choose); // 返回选择的服务器地址
}
@Override
public void addNode(String node) {
// 如果是动态添加节点可以将节点加入到addressList中
addressList.add(node);
log.info("节点 {} 已加入负载均衡", node);
}
@Override
public void delNode(String node) {
// 如果是动态删除节点可以将节点从addressList中移除
addressList.remove(node);
log.info("节点 {} 已从负载均衡中移除", node);
}
}

View File

@ -0,0 +1,52 @@
package com.kama.client.servicecenter.balance.impl;
import com.kama.client.servicecenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName RoundLoadBalance
* @Description 轮询法
* @Author Tong
* @LastChangeDate 2024-12-02 10:41
* @Version v5.0
*/
@Slf4j
public class RoundLoadBalance implements LoadBalance {
// 使用 AtomicInteger 保证线程安全
private AtomicInteger choose = new AtomicInteger(0);
private List<String> addressList = new CopyOnWriteArrayList<>();
@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Address list cannot be null or empty");
}
// 获取当前索引并更新为下一个
int currentChoose = choose.getAndUpdate(i -> (i + 1) % addressList.size());
String selectedServer = addressList.get(currentChoose);
log.info("负载均衡选择了服务器: {}", selectedServer);
return selectedServer; // 返回被选择的服务器地址
}
@Override
public void addNode(String node) {
// 如果是动态添加节点可以将节点加入到 addressList
addressList.add(node);
log.info("节点 {} 已加入负载均衡", node);
}
@Override
public void delNode(String node) {
// 如果是动态删除节点可以将节点从 addressList 中移除
addressList.remove(node);
log.info("节点 {} 已从负载均衡中移除", node);
}
}

View File

@ -0,0 +1,37 @@
package com.kama.config;
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
import com.kama.server.serviceRegister.impl.ZKServiceRegister;
import common.serializer.myserializer.Serializer;
import lombok.*;
/**
* @ClassName KRpcConfig
* @Description 配置文件
* @Author Tong
* @LastChangeDate 2024-12-05 11:02
* @Version v5.0
*/
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class KRpcConfig {
//名称
private String name = "krpc";
//端口
private Integer port = 9999;
//主机名
private String host = "localhost";
//版本号
private String version = "1.0.0";
//注册中心
private String registry = new ZKServiceRegister().toString();
//序列化器
private String serializer = Serializer.getSerializerByCode(3).toString();
//负载均衡
private String loadBalance = new ConsistencyHashBalance().toString();
}

View File

@ -0,0 +1,20 @@
package com.kama.config;
/**
* @InterfaceName RpcConstants
* @Description
* @Author Tong
* @LastChangeDate 2024-12-05 11:17
* @Version v5.0
*/
public interface RpcConstant {
//默认的配置文件前缀
String CONFIG_FILE_PREFIX = "rpc";
//默认的服务版本号
String DEFAULT_VERSION_DEFAULT = "1.0.0";
}

View File

@ -0,0 +1,38 @@
package com.kama.server.netty;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @author wxx
* @version 1.0
* @create 2025/2/13 15:27
*/
public class HeartbeatHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 处理IdleState.READER_IDLE时间
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
IdleState idleState = ((IdleStateEvent) evt).state();
// 如果是触发的是读空闲时间说明已经超过n秒没有收到客户端心跳包
if(idleState == IdleState.READER_IDLE) {
System.out.println("超过n秒没有收到客户端心跳 channel: " + ctx.channel());
// 关闭channel避免造成更多资源占用
ctx.close();
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收到客户端数据, channel: " + ctx.channel() + ", 数据: " + msg.toString());
}
}

View File

@ -0,0 +1,72 @@
package com.kama.server.netty;
import com.kama.server.provider.ServiceProvider;
import com.kama.server.ratelimit.RateLimit;
import common.message.RpcRequest;
import common.message.RpcResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @ClassName NettyRpcServerHandler
* @Description 服务端处理器
* @Author Tong
* @LastChangeDate 2024-12-02 10:26
* @Version v5.0
*/
@AllArgsConstructor // 使用 Lombok 自动生成构造器
@Slf4j
public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private final ServiceProvider serviceProvider; // 确保通过构造器注入 ServiceProvider
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
if (request == null) {
log.error("接收到非法请求RpcRequest 为空");
return;
}
RpcResponse response = getResponse(request);
//ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("处理请求时发生异常: ", cause);
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest) {
//得到服务名
String interfaceName = rpcRequest.getInterfaceName();
//接口限流降级
RateLimit rateLimit = serviceProvider.getRateLimitProvider().getRateLimit(interfaceName);
if (!rateLimit.getToken()) {
//如果获取令牌失败进行限流降级快速返回结果
log.warn("服务限流,接口: {}", interfaceName);
return RpcResponse.fail("服务限流,接口 " + interfaceName + " 当前无法处理请求。请稍后再试。");
}
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method;
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke = method.invoke(service, rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
log.error("方法执行错误,接口: {}, 方法: {}", interfaceName, rpcRequest.getMethodName(), e);
return RpcResponse.fail("方法执行错误");
}
}
}

View File

@ -0,0 +1,40 @@
package com.kama.server.netty;
import com.kama.server.provider.ServiceProvider;
import common.serializer.mycoder.MyDecoder;
import common.serializer.mycoder.MyEncoder;
import common.serializer.myserializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.AllArgsConstructor;
import java.util.concurrent.TimeUnit;
/**
* @ClassName NettyServerInitializer
* @Description 服务端初始化器
* @Author Tong
* @LastChangeDate 2024-12-02 10:55
* @Version v5.0
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 服务端只关注读事件如果3秒内没有收到客户端的消息将会触发IdleState.READER_IDLE事件将由HeartbeatHandler进行处理
pipeline.addLast(new IdleStateHandler(3, 0, 0 ,TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
//使用自定义的编/解码器
pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyRpcServerHandler(serviceProvider));
}
}

View File

@ -0,0 +1,58 @@
package com.kama.server.provider;
import com.kama.server.ratelimit.provider.RateLimitProvider;
import com.kama.server.serviceRegister.ServiceRegister;
import com.kama.server.serviceRegister.impl.ZKServiceRegister;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName ServiceProvider
* @Description 本地注册中心
* @Author Tong
* @LastChangeDate 2024-12-02 10:21
* @Version v5.0
*/
public class ServiceProvider {
private Map<String, Object> interfaceProvider;
private int port;
private String host;
//注册服务类
private ServiceRegister serviceRegister;
//限流器
private RateLimitProvider rateLimitProvider;
public ServiceProvider(String host, int port) {
//需要传入服务端自身的网络地址
this.host = host;
this.port = port;
this.interfaceProvider = new HashMap<>();
this.serviceRegister = new ZKServiceRegister();
this.rateLimitProvider = new RateLimitProvider();
}
public void provideServiceInterface(Object service) {
String serviceName = service.getClass().getName();
Class<?>[] interfaceName = service.getClass().getInterfaces();
for (Class<?> clazz : interfaceName) {
//本机的映射表
interfaceProvider.put(clazz.getName(), service);
//在注册中心注册服务
serviceRegister.register(clazz, new InetSocketAddress(host, port));
}
}
public Object getService(String interfaceName) {
return interfaceProvider.get(interfaceName);
}
public RateLimitProvider getRateLimitProvider() {
return rateLimitProvider;
}
}

View File

@ -0,0 +1,15 @@
package com.kama.server.ratelimit;
/**
* @InterfaceName RateLimit
* @Description 限流接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:50
* @Version v5.0
*/
public interface RateLimit {
//获取访问许可
boolean getToken();
}

View File

@ -0,0 +1,60 @@
package com.kama.server.ratelimit.impl;
import com.kama.server.ratelimit.RateLimit;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName TokenBucketRateLimitImpl
* @Description 全局限流
* @Author Tong
* @LastChangeDate 2024-12-02 10:53
* @Version v5.0
*/
@Slf4j
public class TokenBucketRateLimitImpl implements RateLimit {
// 令牌产生速率单位ms
private final int rate;
// 桶容量
private final int capacity;
// 当前桶容量
private volatile int curCapacity;
// 上次请求时间戳
private volatile long lastTimestamp;
public TokenBucketRateLimitImpl(int rate, int capacity) {
this.rate = rate;
this.capacity = capacity;
this.curCapacity = capacity;
this.lastTimestamp = System.currentTimeMillis();
}
@Override
public boolean getToken() {
// 优化同步仅限于关键部分减少锁竞争
synchronized (this) {
// 如果当前桶还有剩余就直接返回
if (curCapacity > 0) {
curCapacity--;
return true;
}
long currentTimestamp = System.currentTimeMillis();
// 如果距离上一次请求的时间大于 RATE 的时间间隔
if (currentTimestamp - lastTimestamp >= rate) {
// 计算这段时间内生成的令牌数量
int generatedTokens = (int) ((currentTimestamp - lastTimestamp) / rate);
if (generatedTokens > 1) {
// 只添加剩余令牌确保不会超过桶的容量
curCapacity = Math.min(capacity, curCapacity + generatedTokens - 1);
}
// 更新时间戳
lastTimestamp = currentTimestamp;
return true;
}
return false; // 如果无法获取令牌返回 false
}
}
}

View File

@ -0,0 +1,33 @@
package com.kama.server.ratelimit.provider;
import com.kama.server.ratelimit.RateLimit;
import com.kama.server.ratelimit.impl.TokenBucketRateLimitImpl;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName RateLimitProvider
* @Description 提供限流器
* @Author Tong
* @LastChangeDate 2024-12-02 10:54
* @Version v5.0
*/
@Slf4j
public class RateLimitProvider {
private final Map<String, RateLimit> rateLimitMap = new ConcurrentHashMap<>();
// 默认的限流桶容量和令牌生成速率
private static final int DEFAULT_CAPACITY = 100;
private static final int DEFAULT_RATE = 10;
// 提供限流实例
public RateLimit getRateLimit(String interfaceName) {
return rateLimitMap.computeIfAbsent(interfaceName, key -> {
RateLimit rateLimit = new TokenBucketRateLimitImpl(DEFAULT_CAPACITY, DEFAULT_RATE);
log.info("为接口 [{}] 创建了新的限流策略: {}", interfaceName, rateLimit);
return rateLimit;
});
}
}

Some files were not shown because too many files have changed in this diff Show More