Merge pull request #27 from xiaobaozi-web/main

version5
This commit is contained in:
程序员Carl 2024-12-09 17:15:22 +08:00 committed by GitHub
commit 52f4e674bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
163 changed files with 4261 additions and 222 deletions

BIN
README.assets/image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 60 KiB

View File

@ -203,15 +203,22 @@
![1720376759165](README.assets/1720376759165.png)
# 版本五
- SPI机制
- 配置顶
- 新增kryo、Hessian、protostuff等序列化方式
- 优化关闭方法
![](README.assets/image.png)
# TodoList
- [ ] 补充其它序列化方式如ProtoBufHessian
- [ ] 集成Spring
- [ ] 使用注解注册服务,消费服务
- [ ] 主动下线失败次数过多的节点

8
version5/.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile default="true" name="Default" enabled="true" />
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="krpc-core" />
<module name="krpc-common" />
<module name="krpc-provider" />
<module name="krpc-api" />
<module name="krpc-consumer" />
</profile>
</annotationProcessing>
<bytecodeTargetLevel>
<module name="krpc-spring-starter" target="17" />
</bytecodeTargetLevel>
</component>
</project>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/krpc-api/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-api/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-common/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-common/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-consumer/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-consumer/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-core/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-core/src/main/java/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-core/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-provider/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-provider/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-spring-starter/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/krpc-spring-starter/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

View File

@ -0,0 +1,8 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="JavadocDeclaration" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ADDITIONAL_TAGS" value="Version,LastChangeDate,Author,Description,InterfaceName,ClassName" />
</inspection_tool>
</profile>
</component>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="http://maven.aliyun.com/nexus/content/groups/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>

19
version5/.idea/misc.xml Normal file
View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
<option name="ignoredFiles">
<set>
<option value="$PROJECT_DIR$/krpc-spring-starter/pom.xml" />
</set>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" project-jdk-name="17 (2)" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>

4
version5/.idea/vcs.xml Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings" defaultProject="true" />
</project>

20
version5/krpc-api/pom.xml Normal file
View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.kama</groupId>
<artifactId>version5</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>krpc-api</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,26 @@
package com.kama.pojo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ClassName User
* @Description User对象
* @Author Tong
* @LastChangeDate 2024-12-05 0:53
* @Version v5.0
*/
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean gender;
}

View File

@ -0,0 +1,19 @@
package com.kama.service;
import com.kama.pojo.User;
/**
* @InterfaceName UserService
* @Description 接口
* @Author Tong
* @LastChangeDate 2024-12-05 0:52
* @Version v1.0
*/
public interface UserService {
// 查询
User getUserByUserId(Integer id);
// 新增
Integer insertUserId(User user);
}

Binary file not shown.

View File

@ -0,0 +1,5 @@
#Generated by Maven
#Thu Dec 05 15:18:25 CST 2024
groupId=com.kama
artifactId=krpc-api
version=1.0-SNAPSHOT

View File

@ -0,0 +1,3 @@
com\kama\pojo\User$UserBuilder.class
com\kama\pojo\User.class
com\kama\service\UserService.class

View File

@ -0,0 +1,2 @@
D:\java_stduy\version5\krpc-api\src\main\java\com\kama\pojo\User.java
D:\java_stduy\version5\krpc-api\src\main\java\com\kama\service\UserService.java

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.kama</groupId>
<artifactId>version5</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>krpc-common</artifactId>
<dependencies>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
<scope>compile</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -0,0 +1,17 @@
package common.exception;
/**
* @ClassName SerializeException
* @Description ToDo
* @Author Tong
* @LastChangeDate 2024-12-02 19:18
* @Version v1.0
*/
public class SerializeException extends RuntimeException{
public SerializeException(String message) {
super(message);
}
public SerializeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,13 @@
package common.message;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public enum MessageType {
REQUEST(0), RESPONSE(1);
private int code;
public int getCode() {
return code;
}
}

View File

@ -0,0 +1,31 @@
package common.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ClassName RpcRequest
* @Description 定义请求消息格式
* @Author Tong
* @LastChangeDate 2024-11-29 10:12
* @Version v5.0
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class RpcRequest implements Serializable {
//接口名方法名参数列表参数类型
private String interfaceName;
private String methodName;
private Object[] params;
private Class<?>[] paramsType;
}

View File

@ -0,0 +1,37 @@
package common.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @ClassName RpcResponse
* @Description 定义响应消息格式
* @Author Tong
* @LastChangeDate 2024-11-29 10:14
* @Version v5.0
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class RpcResponse implements Serializable {
//状态信息
private int code;
private String message;
//更新加入传输数据的类型以便在自定义序列化器中解析
private Class<?> dataType;
//具体数据
private Object data;
public static RpcResponse sussess(Object data) {
return RpcResponse.builder().code(200).dataType(data.getClass()).data(data).build();
}
public static RpcResponse fail(String msg) {
return RpcResponse.builder().code(500).message(msg).build();
}
}

View File

@ -0,0 +1,58 @@
package common.serializer.mycoder;
import common.exception.SerializeException;
import common.message.MessageType;
import common.serializer.myserializer.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
/**
* @ClassName MyDecoder
* @Description 解码器
* @Author Tong
* @LastChangeDate 2024-11-29 10:32
* @Version v5.0
*/
@Slf4j
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
//检查可读字节数
if (in.readableBytes() < 6) { // messageType + serializerType + length
return;
}
//1.读取消息类型
short messageType = in.readShort();
// 现在还只支持request与response请求
if (messageType != MessageType.REQUEST.getCode() &&
messageType != MessageType.RESPONSE.getCode()) {
log.warn("暂不支持此种数据, messageType: {}", messageType);
return;
}
//2.读取序列化的方式&类型
short serializerType = in.readShort();
Serializer serializer = Serializer.getSerializerByCode(serializerType);
if (serializer == null) {
log.error("不存在对应的序列化器, serializerType: {}", serializerType);
throw new SerializeException("不存在对应的序列化器, serializerType: " + serializerType);
}
//3.读取序列化数组长度
int length = in.readInt();
if (in.readableBytes() < length) {
return; // 数据不完整等待更多数据
}
//4.读取序列化数组
byte[] bytes = new byte[length];
in.readBytes(bytes);
log.debug("Received bytes: {}", Arrays.toString(bytes));
Object deserialize = serializer.deserialize(bytes, messageType);
out.add(deserialize);
}
}

View File

@ -0,0 +1,50 @@
package common.serializer.mycoder;
import common.message.MessageType;
import common.message.RpcRequest;
import common.message.RpcResponse;
import common.serializer.myserializer.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName MyEncoder
* @Description 编码器
* @Author Tong
* @LastChangeDate 2024-11-29 10:32
* @Version v5.0
*/
@Slf4j
@AllArgsConstructor
public class MyEncoder extends MessageToByteEncoder {
private Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
log.debug("Encoding message of type: {}", msg.getClass());
//1.写入消息类型
if (msg instanceof RpcRequest) {
out.writeShort(MessageType.REQUEST.getCode());
} else if (msg instanceof RpcResponse) {
out.writeShort(MessageType.RESPONSE.getCode());
} else {
log.error("Unknown message type: {}", msg.getClass());
throw new IllegalArgumentException("Unknown message type: " + msg.getClass());
}
//2.写入序列化方式
out.writeShort(serializer.getType());
//得到序列化数组
byte[] serializeBytes = serializer.serialize(msg);
if (serializeBytes == null || serializeBytes.length == 0) {
throw new IllegalArgumentException("Serialized message is empty");
}
//3.写入长度
out.writeInt(serializeBytes.length);
//4.写入序列化数组
out.writeBytes(serializeBytes);
}
}

View File

@ -0,0 +1,52 @@
package common.serializer.myserializer;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import common.exception.SerializeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* @ClassName HessianSerializer
* @Description Hessian序列化
* @Author Tong
* @LastChangeDate 2024-11-29 11:49
* @Version v5.0
*/
public class HessianSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
// 使用 ByteArrayOutputStream HessianOutput 来实现对象的序列化
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
hessianOutput.writeObject(obj); // 将对象写入输出流
return byteArrayOutputStream.toByteArray(); // 返回字节数组
} catch (IOException e) {
throw new SerializeException("Serialization failed");
}
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
// 使用 ByteArrayInputStream HessianInput 来实现反序列化
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) {
HessianInput hessianInput = new HessianInput(byteArrayInputStream);
return hessianInput.readObject(); // 读取并返回对象
} catch (IOException e) {
throw new SerializeException("Deserialization failed");
}
}
@Override
public int getType() {
return 3;
}
@Override
public String toString() {
return "Hessian";
}
}

View File

@ -0,0 +1,78 @@
package common.serializer.myserializer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import common.message.RpcRequest;
import common.message.RpcResponse;
/**
* @ClassName JsonSerializer
* @Description json序列化
* @Author Tong
* @LastChangeDate 2024-11-29 10:33
* @Version v5.0
*/
public class JsonSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
byte[] bytes = JSONObject.toJSONBytes(obj);
return bytes;
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
// 传输的消息分为request与response
switch (messageType){
case 0:
RpcRequest request = JSON.parseObject(bytes, RpcRequest.class);
Object[] objects = new Object[request.getParams().length];
// 把json字串转化成对应的对象 fastjson可以读出基本数据类型不用转化
// 对转换后的request中的params属性逐个进行类型判断
for(int i = 0; i < objects.length; i++){
Class<?> paramsType = request.getParamsType()[i];
//判断每个对象类型是否和paramsTypes中的一致
if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())){
//如果不一致就行进行类型转换
objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i],request.getParamsType()[i]);
}else{
//如果一致就直接赋给objects[i]
objects[i] = request.getParams()[i];
}
}
request.setParams(objects);
obj = request;
break;
case 1:
RpcResponse response = JSON.parseObject(bytes, RpcResponse.class);
// 如果类型为空说明返回错误
if(response.getDataType()==null){
obj = RpcResponse.fail("类型为空");
break;
}
Class<?> dataType = response.getDataType();
//判断转化后的response对象中的data的类型是否正确
if(!dataType.isAssignableFrom(response.getData().getClass())){
response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
}
obj = response;
break;
default:
System.out.println("暂时不支持此种消息");
throw new RuntimeException();
}
return obj;
}
//1 代表json序列化方式
@Override
public int getType() {
return 1;
}
@Override
public String toString() {
return "Json";
}
}

View File

@ -0,0 +1,81 @@
package common.serializer.myserializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.kama.pojo.User;
import common.exception.SerializeException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* @ClassName KryoSerializer
* @Description kryo序列化
* @Author Tong
* @LastChangeDate 2024-11-29 11:29
* @Version v5.0
*/
public class KryoSerializer implements Serializer {
private Kryo kryo;
public KryoSerializer() {
this.kryo = new Kryo();
}
@Override
public byte[] serialize(Object obj) {
if (obj == null) {
throw new IllegalArgumentException("Cannot serialize null object");
}
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
kryo.writeObject(output, obj); // 使用 Kryo 写入对象
return output.toBytes(); // 返回字节数组
} catch (Exception e) {
throw new SerializeException("Serialization failed");
}
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("Cannot deserialize null or empty byte array");
}
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
// 根据 messageType 来反序列化不同的类
Class<?> clazz = getClassForMessageType(messageType);
return kryo.readObject(input, clazz); // 使用 Kryo 反序列化对象
} catch (Exception e) {
throw new SerializeException("Deserialization failed");
}
}
@Override
public int getType() {
return 2;
}
private Class<?> getClassForMessageType(int messageType) {
if (messageType == 1) {
return User.class; // 假设我们在此反序列化成 User
} else {
throw new SerializeException("Unknown message type: " + messageType);
}
}
@Override
public String toString() {
return "Kryo";
}
}

View File

@ -0,0 +1,60 @@
package common.serializer.myserializer;
import java.io.*;
/**
* @ClassName ObjectSerializer
* @Description JDK序列化方式
* @Author Tong
* @LastChangeDate 2024-11-29 10:34
* @Version v5.0
*/
public class ObjectSerializer implements Serializer {
//利用Java io 对象 -字节数组
@Override
public byte[] serialize(Object obj) {
byte[] bytes=null;
ByteArrayOutputStream bos=new ByteArrayOutputStream();
try {
//是一个对象输出流用于将 Java 对象序列化为字节流并将其连接到bos上
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
//刷新 ObjectOutputStream确保所有缓冲区中的数据都被写入到底层流中
oos.flush();
//将bos其内部缓冲区中的数据转换为字节数组
bytes = bos.toByteArray();
oos.close();
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
return bytes;
}
//字节数组 -对象
@Override
public Object deserialize(byte[] bytes, int messageType) {
Object obj = null;
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
try {
ObjectInputStream ois = new ObjectInputStream(bis);
obj = ois.readObject();
ois.close();
bis.close();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
return obj;
}
//0 代表Java 原生序列器
@Override
public int getType() {
return 0;
}
@Override
public String toString() {
return "JDK";
}
}

View File

@ -0,0 +1,83 @@
package common.serializer.myserializer;
import com.kama.pojo.User;
import common.exception.SerializeException;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
/**
* @ClassName ProtostuffSerializer
* @Description protostuff序列化
* @Author Tong
* @LastChangeDate 2024-11-29 11:55
* @Version v5.0
*/
public class ProtostuffSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
// 检查 null 对象
if (obj == null) {
throw new IllegalArgumentException("Cannot serialize null object");
}
// 获取对象的 schema
Schema schema = RuntimeSchema.getSchema(obj.getClass());
// 使用 LinkedBuffer 来创建缓冲区默认大小 1024
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
// 序列化对象为字节数组
byte[] bytes;
try {
bytes = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}
return bytes;
}
@Override
public Object deserialize(byte[] bytes, int messageType) {
if (bytes == null || bytes.length == 0) {
throw new IllegalArgumentException("Cannot deserialize null or empty byte array");
}
// 根据 messageType 来决定反序列化的类这里假设 `messageType` 是类的标识符
Class<?> clazz = getClassForMessageType(messageType);
// 获取对象的 schema
Schema schema = RuntimeSchema.getSchema(clazz);
// 创建一个空的对象实例
Object obj;
try {
obj = clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new SerializeException("Deserialization failed due to reflection issues");
}
// 反序列化字节数组为对象
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
@Override
public int getType() {
return 4;
}
// 用于根据 messageType 获取对应的类
private Class<?> getClassForMessageType(int messageType) {
if (messageType == 1) {
return User.class; // 假设我们在此反序列化成 User
} else {
throw new SerializeException("Unknown message type: " + messageType);
}
}
@Override
public String toString() {
return "Protostuff";
}
}

View File

@ -0,0 +1,35 @@
package common.serializer.myserializer;
import java.util.HashMap;
import java.util.Map;
/**
* @InterfaceName Serializer
* @Description 序列化接口
* @Author Tong
* @LastChangeDate 2024-11-29 10:33
* @Version v5.0
*/
public interface Serializer {
byte[] serialize(Object obj);
Object deserialize(byte[] bytes, int messageType);
int getType();
// 使用 Map 存储序列化器
static Serializer getSerializerByCode(int code) {
// 静态映射保证只初始化一次
Map<Integer, Serializer> serializerMap = new HashMap<>();
serializerMap.put(0, new ObjectSerializer());
serializerMap.put(1, new JsonSerializer());
serializerMap.put(2, new KryoSerializer());
serializerMap.put(3, new HessianSerializer());
serializerMap.put(4, new ProtostuffSerializer());
return serializerMap.get(code); // 如果不存在则返回 null
}
}

View File

@ -0,0 +1,111 @@
package common.spi;
import cn.hutool.core.io.resource.ResourceUtil;
import common.serializer.myserializer.Serializer;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName SpiLoader
* @Description spi实现
* @Author Tong
* @LastChangeDate 2024-12-05 14:53
* @Version v5.0
*/
@Slf4j
public class SpiLoader {
// 存储已加载的 SPI 实现类的映射
private static final Map<String, Map<String, Class<? extends Serializer>>> loadedSpiMap = new ConcurrentHashMap<>();
// 缓存实例避免重复实例化
private static final Map<String, Object> instanceCache = new ConcurrentHashMap<>();
// SPI 配置文件的路径
private static final String SPI_CONFIG_DIR = "META-INF/serializer/";
/**
* 加载指定接口的 SPI 实现类
*
* @param serviceInterface 接口类
*/
public static void loadSpi(Class<?> serviceInterface) {
String interfaceName = serviceInterface.getName();
// 如果已经加载过该接口的 SPI 实现直接返回
if (loadedSpiMap.containsKey(interfaceName)) {
return;
}
Map<String, Class<? extends Serializer>> keyClassMap = new HashMap<>();
// 读取配置文件获取所有实现类
List<URL> resources = ResourceUtil.getResources(SPI_CONFIG_DIR + serviceInterface.getName());
for (URL resource : resources) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.openStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if (!line.trim().isEmpty() && !line.startsWith("#")) {
String[] parts = line.split("=");
if (parts.length == 2) {
String key = parts[0].trim();
String className = parts[1].trim();
Class<?> implClass = Class.forName(className);
if (serviceInterface.isAssignableFrom(implClass)) {
keyClassMap.put(key, (Class<? extends Serializer>) implClass);
}
}
}
}
} catch (IOException | ClassNotFoundException e) {
log.error("Failed to load SPI resource: " + resource, e);
}
}
// 将该接口的 SPI 实现类存入缓存
loadedSpiMap.put(interfaceName, keyClassMap);
}
/**
* 根据接口和 key 获取 SPI 实现类实例
*
* @param serviceInterface 接口类
* @param key 序列化器的 key
* @param <T> 接口类型
* @return 对应的 SPI 实现类实例
*/
public static <T> T getInstance(Class<T> serviceInterface, String key) {
String interfaceName = serviceInterface.getName();
Map<String, Class<? extends Serializer>> keyClassMap = loadedSpiMap.get(interfaceName);
if (keyClassMap == null) {
throw new RuntimeException("SPI not loaded for " + interfaceName);
}
Class<? extends Serializer> implClass = keyClassMap.get(key);
if (implClass == null) {
throw new RuntimeException("No SPI implementation found for key " + key);
}
// 从缓存中获取实例如果不存在则创建
String implClassName = implClass.getName();
if (!instanceCache.containsKey(implClassName)) {
try {
instanceCache.put(implClassName, implClass.newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Failed to instantiate SPI implementation: " + implClassName, e);
}
}
return (T) instanceCache.get(implClassName);
}
}

View File

@ -0,0 +1,48 @@
package common.util;
import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName ConfigUtil
* @Description 工具
* @Author Tong
* @LastChangeDate 2024-12-05 11:12
* @Version v5.0
*/
@Slf4j
public class ConfigUtil {
// 加载配置文件使用默认环境
public static <T> T loadConfig(Class<T> targetClass, String prefix) {
return loadConfig(targetClass, prefix, "");
}
// 加载配置文件支持指定环境
public static <T> T loadConfig(Class<T> targetClass, String prefix, String environment) {
StringBuilder configFileNameBuilder = new StringBuilder("application");
if (StrUtil.isNotBlank(environment)) {
configFileNameBuilder.append("-").append(environment);
}
configFileNameBuilder.append(".properties");
// 加载配置文件
Props properties = new Props(configFileNameBuilder.toString());
if (properties.isEmpty()) {
log.warn("配置文件 '{}' 为空或加载失败!", configFileNameBuilder.toString());
} else {
log.info("加载配置文件: '{}'", configFileNameBuilder.toString());
}
// 返回转化后的配置对象
try {
return properties.toBean(targetClass, prefix);
} catch (Exception e) {
log.error("配置转换失败,目标类: {}", targetClass.getName(), e);
throw new RuntimeException("配置加载失败", e);
}
}
}

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.kama</groupId>
<artifactId>version5</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>krpc-consumer</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,66 @@
package com.kama.consumer;
import com.kama.client.proxy.ClientProxy;
import com.kama.pojo.User;
import com.kama.service.UserService;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ClassName ConsumerExample
* @Description 客户端测试
* @Author Tong
* @LastChangeDate 2024-12-05 16:20
* @Version v5.0
*/
@Slf4j
public class ConsumerTest {
private static final int THREAD_POOL_SIZE = 20;
private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public static void main(String[] args) throws InterruptedException {
ClientProxy clientProxy = new ClientProxy();
UserService proxy = clientProxy.getProxy(UserService.class);
for (int i = 0; i < 120; i++) {
final Integer i1 = i;
if (i % 30 == 0) {
// Simulate delay for every 30 requests
Thread.sleep(10000);
}
// Submit tasks to executor service (thread pool)
executorService.submit(() -> {
try {
User user = proxy.getUserByUserId(i1);
if (user != null) {
log.info("从服务端得到的user={}", user);
} else {
log.warn("获取的 user 为 null, userId={}", i1);
}
Integer id = proxy.insertUserId(User.builder()
.id(i1)
.userName("User" + i1)
.gender(true)
.build());
if (id != null) {
log.info("向服务端插入user的id={}", id);
} else {
log.warn("插入失败返回的id为null, userId={}", i1);
}
} catch (Exception e) {
log.error("调用服务时发生异常userId={}", i1, e);
}
});
}
// Gracefully shutdown the executor service
executorService.shutdown();
}
}

View File

@ -0,0 +1,20 @@
package com.kama.consumer;
import com.kama.config.KRpcConfig;
import common.util.ConfigUtil;
/**
* @ClassName ConsumerTestConfig
* @Description 测试配置顶
* @Author Tong
* @LastChangeDate 2024-12-05 11:29
* @Version v1.0
*/
public class ConsumerTestConfig {
public static void main(String[] args) {
KRpcConfig rpc = ConfigUtil.loadConfig(KRpcConfig.class, "rpc");
System.out.println(rpc);
}
}

View File

@ -0,0 +1,7 @@
rpc.name=krpc
rpc.version=1.0.0
rpc.port=9999
rpc.serializer=Hessian
rpc.host=localhost
rpc.registry=zookeeper
rpc.loadBalance=ConsistencyHash

View File

@ -0,0 +1,7 @@
rpc.name=krpc
rpc.version=1.0.0
rpc.port=9999
rpc.serializer=Hessian
rpc.host=localhost
rpc.registry=zookeeper
rpc.loadBalance=ConsistencyHash

View File

@ -0,0 +1,5 @@
#Generated by Maven
#Thu Dec 05 15:19:00 CST 2024
groupId=com.kama
artifactId=krpc-consumer
version=1.0-SNAPSHOT

View File

@ -0,0 +1,2 @@
com\kama\ConsumerTestConfig.class
com\kama\ConsumerApplication.class

View File

@ -0,0 +1,2 @@
D:\java_stduy\version5\krpc-consumer\src\main\java\com\kama\ConsumerTestConfig.java
D:\java_stduy\version5\krpc-consumer\src\main\java\com\kama\ConsumerApplication.java

114
version5/krpc-core/pom.xml Normal file
View File

@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.kama</groupId>
<artifactId>version5</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>krpc-core</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
<scope>compile</scope>
</dependency>
<!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn但不影响正常使用-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.66</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>17.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.kama</groupId>
<artifactId>krpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.10</version>
</dependency>
<!-- JUnit 5 API 和 Engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.11.3</version>
<scope>test</scope>
</dependency>
<!-- JUnit Platform Launcher -->
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,48 @@
package com.kama;
import com.kama.config.KRpcConfig;
import com.kama.config.RpcConstant;
import common.util.ConfigUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName RpcApplication
* @Description 测试配置顶学习更多参考Dubbo
* @Author Tong
* @LastChangeDate 2024-12-05 11:22
* @Version v5.0
*/
@Slf4j
public class KRpcApplication {
private static volatile KRpcConfig rpcConfigInstance;
public static void initialize(KRpcConfig customRpcConfig) {
rpcConfigInstance = customRpcConfig;
log.info("RPC 框架初始化,配置 = {}", customRpcConfig);
}
public static void initialize() {
KRpcConfig customRpcConfig;
try {
customRpcConfig = ConfigUtil.loadConfig(KRpcConfig.class, RpcConstant.CONFIG_FILE_PREFIX);
log.info("成功加载配置文件,配置文件名称 = {}", RpcConstant.CONFIG_FILE_PREFIX); // 添加成功加载的日志
} catch (Exception e) {
// 配置加载失败使用默认配置
customRpcConfig = new KRpcConfig();
log.warn("配置加载失败,使用默认配置");
}
initialize(customRpcConfig);
}
public static KRpcConfig getRpcConfig() {
if (rpcConfigInstance == null) {
synchronized (KRpcApplication.class) {
if (rpcConfigInstance == null) {
initialize(); // 确保在第一次调用时初始化
}
}
}
return rpcConfigInstance;
}
}

View File

@ -0,0 +1,75 @@
package com.kama.client.cache;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName serviceCache
* @Description 建立本地缓存
* @Author Tong
* @LastChangeDate 2024-12-02 10:34
* @Version v5.0
*/
@Slf4j
public class ServiceCache {
//key: serviceName 服务名
//value addressList 服务提供者列表
private static Map<String, List<String>> cache = new ConcurrentHashMap<>();
//添加服务
public void addServiceToCache(String serviceName, String address) {
if (cache.containsKey(serviceName)) {
List<String> addressList = cache.get(serviceName);
addressList.add(address);
log.info("有服务名情况将name为{}和地址为{}的服务添加到本地缓存中", serviceName, address);
} else {
List<String> addressList = new ArrayList<>();
addressList.add(address);
cache.put(serviceName, addressList);
log.info("无服务名情况将name为{}和地址为{}的服务添加到本地缓存中", serviceName, address);
}
}
//修改服务地址
public void replaceServiceAddress(String serviceName, String oldAddress, String newAddress) {
if (cache.containsKey(serviceName)) {
List<String> addressList = cache.get(serviceName);
addressList.remove(oldAddress);
addressList.add(newAddress);
log.info("将服务{}的地址{}替换为{}", serviceName, oldAddress, newAddress);
} else {
log.error("旧地址{}不在服务{}的地址列表中", oldAddress, serviceName);
}
}
//从缓存中取服务地址列表
public List<String> getServiceListFromCache(String serviceName) {
if (!cache.containsKey(serviceName)) {
log.warn("服务{}未找到", serviceName);
//返回个不可修改的空列表避免调用的时候出现空指针异常
return Collections.emptyList();
}
return cache.get(serviceName);
}
//从缓存中删除服务地址
public void delete(String serviceName, String address) {
List<String> addressList = cache.get(serviceName);
if (addressList != null && addressList.contains(address)) {
addressList.remove(address);
log.info("将name为{}和地址为{}的服务从本地缓存中删除", serviceName, address);
if (addressList.isEmpty()) {
cache.remove(serviceName); // 移除该服务的缓存条目
log.info("服务{}的地址列表为空,已从缓存中清除", serviceName);
}
} else {
log.warn("删除失败,地址{}不在服务{}的地址列表中", address, serviceName);
}
}
}

View File

@ -0,0 +1,109 @@
package com.kama.client.circuitbreaker;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName CircuitBreaker
* @Description 熔断器的状态
* @Author Tong
* @LastChangeDate 2024-12-02 10:45
* @Version v5.0
*/
@Slf4j
public class CircuitBreaker {
//当前状态
private CircuitBreakerState state = CircuitBreakerState.CLOSED;
private AtomicInteger failureCount = new AtomicInteger(0);
private AtomicInteger successCount = new AtomicInteger(0);
private AtomicInteger requestCount = new AtomicInteger(0);
//失败次数阈值
private final int failureThreshold;
//半开启-关闭状态的成功次数比例
private final double halfOpenSuccessRate;
//恢复时间
private final long retryTimePeriod;
//上一次失败时间
private long lastFailureTime = 0;
public CircuitBreaker(int failureThreshold, double halfOpenSuccessRate, long retryTimePeriod) {
this.failureThreshold = failureThreshold;
this.halfOpenSuccessRate = halfOpenSuccessRate;
this.retryTimePeriod = retryTimePeriod;
}
//查看当前熔断器是否允许请求通过
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
log.info("熔断前检查, 当前失败次数:{}", failureCount);
switch (state) {
case OPEN:
if (currentTime - lastFailureTime > retryTimePeriod) {
state = CircuitBreakerState.HALF_OPEN;
resetCounts();
log.info("熔断已解除,进入半开启状态,允许请求通过");
return true;
}
log.warn("熔断生效中,拒绝请求!");
return false;
case HALF_OPEN:
requestCount.incrementAndGet();
log.info("当前为半开启状态,计数请求");
return true;
case CLOSED:
default:
log.info("当前为正常状态,允许请求通过");
return true;
}
}
//记录成功
public synchronized void recordSuccess() {
if (state == CircuitBreakerState.HALF_OPEN) {
successCount.incrementAndGet();
if (successCount.get() >= halfOpenSuccessRate * requestCount.get()) {
state = CircuitBreakerState.CLOSED;
resetCounts();
log.info("成功次数已达到阈值,熔断器切换至关闭状态");
}
} else {
resetCounts();
log.info("熔断器处于关闭状态,重置计数器");
}
}
//记录失败
public synchronized void recordFailure() {
failureCount.incrementAndGet();
log.error("记录失败,当前失败次数:{}", failureCount);
lastFailureTime = System.currentTimeMillis();
if (state == CircuitBreakerState.HALF_OPEN) {
state = CircuitBreakerState.OPEN;
lastFailureTime = System.currentTimeMillis();
log.warn("半开启状态下发生失败,熔断器切换至开启状态");
} else if (failureCount.get() >= failureThreshold) {
state = CircuitBreakerState.OPEN;
log.error("失败次数已超过阈值,熔断器切换至开启状态");
}
}
//重置次数
private void resetCounts() {
failureCount.set(0);
successCount.set(0);
requestCount.set(0);
}
public CircuitBreakerState getState() {
return state;
}
}
enum CircuitBreakerState {
//关闭开启半开启
CLOSED, OPEN, HALF_OPEN
}

View File

@ -0,0 +1,28 @@
package com.kama.client.circuitbreaker;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName CircuitBreakerState
* @Description 提供熔断器
* @Author Tong
* @LastChangeDate 2024-12-02 10:47
* @Version v5.0
*/
@Slf4j
public class CircuitBreakerProvider {
// 使用线程安全的 ConcurrentHashMap
private Map<String, CircuitBreaker> circuitBreakerMap = new ConcurrentHashMap<>();
public synchronized CircuitBreaker getCircuitBreaker(String serviceName) {
// 使用 computeIfAbsent避免手动同步
return circuitBreakerMap.computeIfAbsent(serviceName, key -> {
log.info("服务 [{}] 不存在熔断器,创建新的熔断器实例", serviceName);
// 创建并返回新熔断器
return new CircuitBreaker(1, 0.5, 10000);
});
}
}

View File

@ -0,0 +1,34 @@
package com.kama.client.netty;
import common.message.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName NettyClientHandler
* @Description 客户端处理器
* @Author Tong
* @LastChangeDate 2024-12-02 10:15
* @Version v5.0
*/
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名让sendRequest里读取response
AttributeKey<RpcResponse> RESPONSE_KEY = AttributeKey.valueOf("RPCResponse");
// 将响应存入 Channel 属性
ctx.channel().attr(RESPONSE_KEY).set(response);
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Channel exception occurred", cause);
ctx.close();
}
}

View File

@ -0,0 +1,40 @@
package com.kama.client.netty;
import common.serializer.mycoder.MyDecoder;
import common.serializer.mycoder.MyEncoder;
import common.serializer.myserializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName NettyClientInitializer
* @Description 配置自定义的编码器以及Handler
* @Author Tong
* @LastChangeDate 2024-12-02 10:16
* @Version v5.0
*/
@Slf4j
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 使用自定义的编码器和解码器
try {
// 根据传入的序列化器类型初始化编码器
pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyClientHandler());
log.info("Netty client pipeline initialized with serializer type: {}",Serializer.getSerializerByCode(3).toString());
} catch (Exception e) {
log.error("Error initializing Netty client pipeline", e);
throw e; // 重新抛出异常确保管道初始化失败时处理正确
}
}
}

View File

@ -0,0 +1,92 @@
package com.kama.client.proxy;
import com.kama.client.circuitbreaker.CircuitBreaker;
import com.kama.client.circuitbreaker.CircuitBreakerProvider;
import com.kama.client.retry.GuavaRetry;
import com.kama.client.rpcclient.RpcClient;
import com.kama.client.rpcclient.impl.NettyRpcClient;
import com.kama.client.servicecenter.ServiceCenter;
import com.kama.client.servicecenter.ZKServiceCenter;
import common.message.RpcRequest;
import common.message.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @ClassName ClientProxy
* @Description 动态代理
* @Author Tong
* @LastChangeDate 2024-12-02 10:14
* @Version v5.0
*/
@Slf4j
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象反射封装成一个request
private RpcClient rpcClient;
private ServiceCenter serviceCenter;
private CircuitBreakerProvider circuitBreakerProvider;
public ClientProxy() throws InterruptedException {
serviceCenter = new ZKServiceCenter();
rpcClient = new NettyRpcClient(serviceCenter);
circuitBreakerProvider = new CircuitBreakerProvider();
}
//jdk动态代理每一次代理对象调用方法都会经过此方法增强反射获取request对象socket发送到服务端
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//获取熔断器
CircuitBreaker circuitBreaker = circuitBreakerProvider.getCircuitBreaker(method.getName());
//判断熔断器是否允许请求经过
if (!circuitBreaker.allowRequest()) {
log.warn("熔断器开启,请求被拒绝: {}", request);
//这里可以针对熔断做特殊处理返回特殊值
return null;
}
//数据传输
RpcResponse response;
//后续添加逻辑为保持幂等性只对白名单上的服务进行重试
// 如果启用重试机制先检查是否需要重试
if (serviceCenter.checkRetry(request.getInterfaceName())) {
//调用retry框架进行重试操作
try {
log.info("尝试重试调用服务: {}", request.getInterfaceName());
response = new GuavaRetry().sendServiceWithRetry(request, rpcClient);
} catch (Exception e) {
log.error("重试调用失败: {}", request.getInterfaceName(), e);
circuitBreaker.recordFailure();
throw e; // 将异常抛给调用者
}
} else {
//只调用一次
response = rpcClient.sendRequest(request);
}
//记录response的状态上报给熔断器
if (response != null) {
if (response.getCode() == 200) {
circuitBreaker.recordSuccess();
} else if (response.getCode() == 500) {
circuitBreaker.recordFailure();
}
log.info("收到响应: {} 状态码: {}", request.getInterfaceName(), response.getCode());
}
return response != null ? response.getData() : null;
}
public <T> T getProxy(Class<T> clazz) {
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T) o;
}
}

View File

@ -0,0 +1,48 @@
package com.kama.client.retry;
import com.kama.client.rpcclient.RpcClient;
import com.github.rholder.retry.*;
import common.message.RpcRequest;
import common.message.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @ClassName guavaRetry
* @Description 重试策略
* @Author Tong
* @LastChangeDate 2024-12-02 10:44
* @Version v5.0
*/
@Slf4j
public class GuavaRetry {
public RpcResponse sendServiceWithRetry(RpcRequest request, RpcClient rpcClient) {
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
//无论出现什么异常都进行重试
.retryIfException()
//返回结果为 error时进行重试
.retryIfResult(response -> Objects.equals(response.getCode(), 500))
//重试等待策略等待 2s 后再进行重试
.withWaitStrategy(WaitStrategies.fixedWait(2, TimeUnit.SECONDS))
//重试停止策略重试达到 3
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("重试第 {} 次", attempt.getAttemptNumber());
}
})
.build();
try {
return retryer.call(() -> rpcClient.sendRequest(request));
} catch (Exception e) {
log.error("重试失败: 请求 {} 执行时遇到异常", request.getMethodName(), e);
}
return RpcResponse.fail("重试失败,所有重试尝试已结束");
}
}

View File

@ -0,0 +1,17 @@
package com.kama.client.rpcclient;
import common.message.RpcRequest;
import common.message.RpcResponse;
/**
* @InterfaceName RpcClient
* @Description 定义底层通信方法
* @Author Tong
* @LastChangeDate 2024-12-02 10:11
* @Version v5.0
*/
public interface RpcClient {
RpcResponse sendRequest(RpcRequest request);
}

View File

@ -0,0 +1,103 @@
package com.kama.client.rpcclient.impl;
import com.kama.client.netty.NettyClientInitializer;
import com.kama.client.rpcclient.RpcClient;
import com.kama.client.servicecenter.ServiceCenter;
import common.message.RpcRequest;
import common.message.RpcResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @ClassName NettyRpcClient
* @Description Netty客户端
* @Author Tong
* @LastChangeDate 2024-12-02 11:03
* @Version v5.0
*/
@Slf4j
public class NettyRpcClient implements RpcClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private ServiceCenter serviceCenter;
public NettyRpcClient(ServiceCenter serviceCenter) throws InterruptedException {
this.serviceCenter = serviceCenter;
}
//netty客户端初始化
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
//从注册中心获取host,post
InetSocketAddress address = serviceCenter.serviceDiscovery(request.getInterfaceName());
if (address == null) {
log.error("服务发现失败,返回的地址为 null");
return RpcResponse.fail("服务发现失败,地址为 null");
}
String host = address.getHostName();
int port = address.getPort();
try {
// 连接到远程服务
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
//sync()堵塞获取结果
channel.closeFuture().sync();
// 阻塞的获得结果通过给channel设计别名获取特定名字下的channel中的内容这个在hanlder中设置
// AttributeKey是线程隔离的不会由线程安全问题
// 当前场景下选择堵塞获取结果
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get();
if (response == null) {
log.error("服务响应为空,可能是请求失败或超时");
return RpcResponse.fail("服务响应为空");
}
log.info("收到响应: {}", response);
return response;
} catch (InterruptedException e) {
log.error("请求被中断,发送请求失败: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("发送请求时发生异常: {}", e.getMessage(), e);
} finally {
// 连接断开后优雅地关闭 Netty 资源
shutdown();
}
return RpcResponse.fail("请求失败");
}
// 优雅关闭 Netty 资源
private void shutdown() {
try {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().sync();
}
} catch (InterruptedException e) {
log.error("关闭 Netty 资源时发生异常: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}

View File

@ -0,0 +1,57 @@
package com.kama.client.rpcclient.impl;
import com.kama.client.rpcclient.RpcClient;
import common.message.RpcRequest;
import common.message.RpcResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
/**
* @ClassName SimpleSocketRpcClient
* @Description 实现简单客户都
* @Author Tong
* @LastChangeDate 2024-12-02 10:12
* @Version v5.0
*/
public class SimpleSocketRpcClient implements RpcClient {
private String host;
private int port;
public SimpleSocketRpcClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public RpcResponse sendRequest(RpcRequest request) {
// 定义响应对象
RpcResponse response = null;
// 创建 Socket 和流对象
try (Socket socket = new Socket(host, port);
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream())) {
// 发送请求对象
oos.writeObject(request);
oos.flush();
// 接收响应对象
response = (RpcResponse) ois.readObject();
} catch (UnknownHostException e) {
System.err.println("未知的主机: " + host);
} catch (IOException e) {
System.err.println("I/O 错误: " + e.getMessage());
} catch (ClassNotFoundException e) {
System.err.println("无法识别的类: " + e.getMessage());
}
return response;
}
}

View File

@ -0,0 +1,20 @@
package com.kama.client.servicecenter;
import java.net.InetSocketAddress;
/**
* @InterfaceName ServiceCenter
* @Description 服务中心接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:31
* @Version v5.0
*/
public interface ServiceCenter {
// 查询根据服务名查找地址
InetSocketAddress serviceDiscovery(String serviceName);
//判断是否可重试
boolean checkRetry(String serviceName);
}

View File

@ -0,0 +1,113 @@
package com.kama.client.servicecenter;
import com.kama.client.cache.ServiceCache;
import com.kama.client.servicecenter.ZKWatcher.watchZK;
import com.kama.client.servicecenter.balance.LoadBalance;
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
import com.kama.client.servicecenter.balance.impl.RandomLoadBalance;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @ClassName ZKServiceCenter
* @Description 从服务中心获取服务地址
* @Author Tong
* @LastChangeDate 2024-12-02 10:33
* @Version v5.0
*/
@Slf4j
public class ZKServiceCenter implements ServiceCenter {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//zookeeper根路径节点
private static final String ROOT_PATH = "MyRPC";
private static final String RETRY = "CanRetry";
//serviceCache
private ServiceCache cache;
private final LoadBalance loadBalance = new ConsistencyHashBalance();
//负责zookeeper客户端的初始化并与zookeeper服务端进行连接
public ZKServiceCenter() throws InterruptedException {
// 指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定不管是服务提供者还是消费者都要与之建立连接
// sessionTimeoutMs zoo.cfg中的tickTime 有关系
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
this.client.start();
log.info("Zookeeper 连接成功");
//初始化本地缓存
cache = new ServiceCache();
//加入zookeeper事件监听器
watchZK watcher = new watchZK(client, cache);
//监听启动
watcher.watchToUpdate(ROOT_PATH);
}
//根据服务名接口名返回地址
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
//先从本地缓存中找
List<String> addressList = cache.getServiceListFromCache(serviceName);
//如果找不到再去zookeeper中找
//这种i情况基本不会发生或者说只会出现在初始化阶段
if (addressList == null) {
addressList = client.getChildren().forPath("/" + serviceName);
// 如果本地缓存中没有该服务名的地址列表则添加
List<String> cachedAddresses = cache.getServiceListFromCache(serviceName);
if (cachedAddresses == null || cachedAddresses.isEmpty()) {
// 假设 addServiceToCache 方法可以处理单个地址
for (String address : addressList) {
cache.addServiceToCache(serviceName, address);
}
}
}
if (addressList.isEmpty()) {
log.warn("未找到服务:{}", serviceName);
return null;
}
// 负载均衡得到地址
String address = loadBalance.balance(addressList);
return parseAddress(address);
} catch (Exception e) {
log.error("服务发现失败,服务名:{}", serviceName, e);
}
return null;
}
//保证线程安全使用CopyOnWriteArraySet
private Set<String> retryServiceCache = new CopyOnWriteArraySet<>();
//写一个白名单缓存优化性能
public boolean checkRetry(String serviceName) {
// 如果缓存为空则从 Zookeeper 中加载白名单
if (retryServiceCache.isEmpty()) {
try {
// 获取 Zookeeper 上的 /RETRY 路径下的所有子节点服务名称
List<String> serviceList = client.getChildren().forPath("/" + RETRY);
// 将从 Zookeeper 获取到的服务名称列表添加到缓存中
retryServiceCache.addAll(serviceList);
} catch (Exception e) {
log.error("检查重试失败,服务名:{}", serviceName, e);
}
}
// 判断服务是否在缓存的白名单中
return retryServiceCache.contains(serviceName);
}
// 字符串解析为地址
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}

View File

@ -0,0 +1,98 @@
package com.kama.client.servicecenter.ZKWatcher;
import com.kama.client.cache.ServiceCache;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
/**
* @ClassName watchZK
* @Description 节点监听
* @Author Tong
* @LastChangeDate 2024-12-02 10:37
* @Version v5.0
*/
@Slf4j
public class watchZK {
// curator 提供的zookeeper客户端
private CuratorFramework client;
//本地缓存
ServiceCache cache;
public watchZK(CuratorFramework client, ServiceCache cache) {
this.client = client;
this.cache = cache;
}
/**
* 监听当前节点和子节点的 更新创建删除
*
* @param path
*/
public void watchToUpdate(String path) throws InterruptedException {
CuratorCache curatorCache = CuratorCache.build(client, "/");
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData childData, ChildData childData1) {
// 第一个参数事件类型枚举
// 第二个参数节点更新前的状态数据
// 第三个参数节点更新后的状态数据
// 创建节点时节点刚被创建不存在 更新前节点 所以第二个参数为 null
// 删除节点时节点被删除不存在 更新后节点 所以第三个参数为 null
// 节点创建时没有赋予值 create /curator/app1 只创建节点在这种情况下更新前节点的 data null获取不到更新前节点的数据
switch (type.name()) {
case "NODE_CREATED": // 监听器第一次执行时节点存在也会触发次事件
String[] pathList = pasrePath(childData1);
if (pathList.length <= 2) break;
else {
String serviceName = pathList[1];
String address = pathList[2];
//将新注册的服务加入到本地缓存中
cache.addServiceToCache(serviceName, address);
log.info("节点创建:服务名称 {} 地址 {}", serviceName, address);
}
break;
case "NODE_CHANGED": // 节点更新
if (childData.getData() != null) {
log.debug("修改前的数据: {}", new String(childData.getData()));
} else {
log.debug("节点第一次赋值!");
}
String[] oldPathList = pasrePath(childData);
String[] newPathList = pasrePath(childData1);
cache.replaceServiceAddress(oldPathList[1], oldPathList[2], newPathList[2]);
log.info("节点更新:服务名称 {} 地址从 {} 更新为 {}", oldPathList[1], oldPathList[2], newPathList[2]);
break;
case "NODE_DELETED": // 节点删除
String[] pathList_d = pasrePath(childData);
if (pathList_d.length <= 2) break;
else {
String serviceName = pathList_d[1];
String address = pathList_d[2];
//将新注册的服务加入到本地缓存中
cache.delete(serviceName, address);
log.info("节点删除:服务名称 {} 地址 {}", serviceName, address);
}
break;
default:
break;
}
}
});
//开启监听
curatorCache.start();
}
//解析节点对应地址
public String[] pasrePath(ChildData childData) {
//获取更新的节点的路径
String path = new String(childData.getPath());
log.info("节点路径:{}",path);
//按照格式 读取
return path.split("/");
}
}

View File

@ -0,0 +1,20 @@
package com.kama.client.servicecenter.balance;
import java.util.List;
/**
* @InterfaceName LoadBalance
* @Description 负载均衡接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:40
* @Version v5.0
*/
public interface LoadBalance {
String balance(List<String> addressList);
void addNode(String node);
void delNode(String node);
}

View File

@ -0,0 +1,152 @@
package com.kama.client.servicecenter.balance.impl;
import com.kama.client.servicecenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @ClassName ConsistencyHashBalance
* @Description 一致性哈希算法负载均衡
* @Author Tong
* @LastChangeDate 2024-12-02 10:42
* @Version v5.0
*/
@Slf4j
public class ConsistencyHashBalance implements LoadBalance {
// 虚拟节点的个数
private static final int VIRTUAL_NUM = 5;
// 虚拟节点分配key是hash值value是虚拟节点服务器名称
private SortedMap<Integer, String> shards = new TreeMap<Integer,String>();
// 真实节点列表
private List<String> realNodes = new LinkedList<>();
// 获取虚拟节点的个数
public static int getVirtualNum() {
return VIRTUAL_NUM;
}
// 初始化虚拟节点
public void init(List<String> serviceList) {
for (String server : serviceList) {
realNodes.add(server);
log.info("真实节点[{}] 被添加", server);
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = server + "&&VN" + i;
int hash = getHash(virtualNode);
shards.put(hash, virtualNode);
log.info("虚拟节点[{}] hash:{},被添加", virtualNode, hash);
}
}
}
/**
* 获取被分配的节点名
*
* @param node 请求的节点通常是请求的唯一标识符
* @return 负责该请求的真实节点名称
*/
public String getServer(String node, List<String> serviceList) {
if (shards.isEmpty()) {
init(serviceList); // 初始化如果shards为空
}
int hash = getHash(node);
Integer key = null;
SortedMap<Integer, String> subMap = shards.tailMap(hash);
if (subMap.isEmpty()) {
key = shards.firstKey(); // 如果没有大于该hash的节点则返回最小的hash值
} else {
key = subMap.firstKey();
}
String virtualNode = shards.get(key);
return virtualNode.substring(0, virtualNode.indexOf("&&"));
}
/**
* 添加节点
*
* @param node 新加入的节点
*/
public void addNode(String node) {
if (!realNodes.contains(node)) {
realNodes.add(node);
log.info("真实节点[{}] 上线添加", node);
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
shards.put(hash, virtualNode);
log.info("虚拟节点[{}] hash:{},被添加", virtualNode, hash);
}
}
}
/**
* 删除节点
*
* @param node 被移除的节点
*/
public void delNode(String node) {
if (realNodes.contains(node)) {
realNodes.remove(node);
log.info("真实节点[{}] 下线移除", node);
for (int i = 0; i < VIRTUAL_NUM; i++) {
String virtualNode = node + "&&VN" + i;
int hash = getHash(virtualNode);
shards.remove(hash);
log.info("虚拟节点[{}] hash:{},被移除", virtualNode, hash);
}
}
}
/**
* FNV1_32_HASH算法
*/
private static int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 如果算出来的值为负数则取其绝对值
if (hash < 0)
hash = Math.abs(hash);
return hash;
}
@Override
public String balance(List<String> addressList) {
// 如果 addressList 为空或 null抛出 IllegalArgumentException
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Address list cannot be null or empty");
}
// 使用UUID作为请求的唯一标识符来进行一致性哈希
String random = UUID.randomUUID().toString();
return getServer(random, addressList);
}
public SortedMap<Integer, String> getShards() {
return shards;
}
public List<String> getRealNodes() {
return realNodes;
}
@Override
public String toString() {
return "ConsistencyHash";
}
}

View File

@ -0,0 +1,48 @@
package com.kama.client.servicecenter.balance.impl;
import com.kama.client.servicecenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @ClassName RandomLoadBalance
* @Description 随机法
* @Author Tong
* @LastChangeDate 2024-12-02 10:40
* @Version v5.0
*/
@Slf4j
public class RandomLoadBalance implements LoadBalance {
// 将Random声明为类级别的字段
private final Random random = new Random();
private final List<String> addressList = new CopyOnWriteArrayList<>();
@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Address list cannot be null or empty");
}
int choose = random.nextInt(addressList.size());
log.info("负载均衡选择了第 {} 号服务器,地址是:{}", choose, addressList.get(choose));
return addressList.get(choose); // 返回选择的服务器地址
}
@Override
public void addNode(String node) {
// 如果是动态添加节点可以将节点加入到addressList中
addressList.add(node);
log.info("节点 {} 已加入负载均衡", node);
}
@Override
public void delNode(String node) {
// 如果是动态删除节点可以将节点从addressList中移除
addressList.remove(node);
log.info("节点 {} 已从负载均衡中移除", node);
}
}

View File

@ -0,0 +1,52 @@
package com.kama.client.servicecenter.balance.impl;
import com.kama.client.servicecenter.balance.LoadBalance;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName RoundLoadBalance
* @Description 轮询法
* @Author Tong
* @LastChangeDate 2024-12-02 10:41
* @Version v5.0
*/
@Slf4j
public class RoundLoadBalance implements LoadBalance {
// 使用 AtomicInteger 保证线程安全
private AtomicInteger choose = new AtomicInteger(0);
private List<String> addressList = new CopyOnWriteArrayList<>();
@Override
public String balance(List<String> addressList) {
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Address list cannot be null or empty");
}
// 获取当前索引并更新为下一个
int currentChoose = choose.getAndUpdate(i -> (i + 1) % addressList.size());
String selectedServer = addressList.get(currentChoose);
log.info("负载均衡选择了服务器: {}", selectedServer);
return selectedServer; // 返回被选择的服务器地址
}
@Override
public void addNode(String node) {
// 如果是动态添加节点可以将节点加入到 addressList
addressList.add(node);
log.info("节点 {} 已加入负载均衡", node);
}
@Override
public void delNode(String node) {
// 如果是动态删除节点可以将节点从 addressList 中移除
addressList.remove(node);
log.info("节点 {} 已从负载均衡中移除", node);
}
}

View File

@ -0,0 +1,37 @@
package com.kama.config;
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
import com.kama.server.serviceRegister.impl.ZKServiceRegister;
import common.serializer.myserializer.Serializer;
import lombok.*;
/**
* @ClassName KRpcConfig
* @Description 配置文件
* @Author Tong
* @LastChangeDate 2024-12-05 11:02
* @Version v5.0
*/
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class KRpcConfig {
//名称
private String name = "krpc";
//端口
private Integer port = 9999;
//主机名
private String host = "localhost";
//版本号
private String version = "1.0.0";
//注册中心
private String registry = new ZKServiceRegister().toString();
//序列化器
private String serializer = Serializer.getSerializerByCode(3).toString();
//负载均衡
private String loadBalance = new ConsistencyHashBalance().toString();
}

View File

@ -0,0 +1,20 @@
package com.kama.config;
/**
* @InterfaceName RpcConstants
* @Description
* @Author Tong
* @LastChangeDate 2024-12-05 11:17
* @Version v5.0
*/
public interface RpcConstant {
//默认的配置文件前缀
String CONFIG_FILE_PREFIX = "rpc";
//默认的服务版本号
String DEFAULT_VERSION_DEFAULT = "1.0.0";
}

View File

@ -0,0 +1,71 @@
package com.kama.server.netty;
import common.message.RpcRequest;
import common.message.RpcResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import com.kama.server.provider.ServiceProvider;
import com.kama.server.ratelimit.RateLimit;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @ClassName NettyRpcServerHandler
* @Description 服务端处理器
* @Author Tong
* @LastChangeDate 2024-12-02 10:26
* @Version v5.0
*/
@AllArgsConstructor // 使用 Lombok 自动生成构造器
@Slf4j
public class NettyRpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private final ServiceProvider serviceProvider; // 确保通过构造器注入 ServiceProvider
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
if (request == null) {
log.error("接收到非法请求RpcRequest 为空");
return;
}
RpcResponse response = getResponse(request);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("处理请求时发生异常: ", cause);
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest) {
//得到服务名
String interfaceName = rpcRequest.getInterfaceName();
//接口限流降级
RateLimit rateLimit = serviceProvider.getRateLimitProvider().getRateLimit(interfaceName);
if (!rateLimit.getToken()) {
//如果获取令牌失败进行限流降级快速返回结果
log.warn("服务限流,接口: {}", interfaceName);
return RpcResponse.fail("服务限流,接口 " + interfaceName + " 当前无法处理请求。请稍后再试。");
}
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method;
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke = method.invoke(service, rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
log.error("方法执行错误,接口: {}, 方法: {}", interfaceName, rpcRequest.getMethodName(), e);
return RpcResponse.fail("方法执行错误");
}
}
}

View File

@ -0,0 +1,33 @@
package com.kama.server.netty;
import common.serializer.mycoder.MyDecoder;
import common.serializer.mycoder.MyEncoder;
import common.serializer.myserializer.Serializer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import lombok.AllArgsConstructor;
import com.kama.server.provider.ServiceProvider;
/**
* @ClassName NettyServerInitializer
* @Description 服务端初始化器
* @Author Tong
* @LastChangeDate 2024-12-02 10:55
* @Version v5.0
*/
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
private ServiceProvider serviceProvider;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//使用自定义的编/解码器
pipeline.addLast(new MyEncoder(Serializer.getSerializerByCode(3)));
pipeline.addLast(new MyDecoder());
pipeline.addLast(new NettyRpcServerHandler(serviceProvider));
}
}

View File

@ -0,0 +1,60 @@
package com.kama.server.provider;
import com.kama.server.ratelimit.provider.RateLimitProvider;
import com.kama.server.serviceRegister.ServiceRegister;
import com.kama.server.serviceRegister.impl.ZKServiceRegister;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName ServiceProvider
* @Description 本地注册中心
* @Author Tong
* @LastChangeDate 2024-12-02 10:21
* @Version v5.0
*/
public class ServiceProvider {
private Map<String, Object> interfaceProvider;
private int port;
private String host;
//注册服务类
private ServiceRegister serviceRegister;
//限流器
private RateLimitProvider rateLimitProvider;
public ServiceProvider(String host, int port) {
//需要传入服务端自身的网络地址
this.host = host;
this.port = port;
this.interfaceProvider = new HashMap<>();
this.serviceRegister = new ZKServiceRegister();
this.rateLimitProvider = new RateLimitProvider();
}
public void provideServiceInterface(Object service, boolean canRetry) {
String serviceName = service.getClass().getName();
Class<?>[] interfaceName = service.getClass().getInterfaces();
for (Class<?> clazz : interfaceName) {
//本机的映射表
interfaceProvider.put(clazz.getName(), service);
//在注册中心注册服务
serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port), canRetry);
}
}
public Object getService(String interfaceName) {
return interfaceProvider.get(interfaceName);
}
public RateLimitProvider getRateLimitProvider() {
return rateLimitProvider;
}
}

View File

@ -0,0 +1,15 @@
package com.kama.server.ratelimit;
/**
* @InterfaceName RateLimit
* @Description 限流接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:50
* @Version v5.0
*/
public interface RateLimit {
//获取访问许可
boolean getToken();
}

View File

@ -0,0 +1,60 @@
package com.kama.server.ratelimit.impl;
import com.kama.server.ratelimit.RateLimit;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName TokenBucketRateLimitImpl
* @Description 全局限流
* @Author Tong
* @LastChangeDate 2024-12-02 10:53
* @Version v5.0
*/
@Slf4j
public class TokenBucketRateLimitImpl implements RateLimit {
// 令牌产生速率单位ms
private final int rate;
// 桶容量
private final int capacity;
// 当前桶容量
private volatile int curCapacity;
// 上次请求时间戳
private volatile long lastTimestamp;
public TokenBucketRateLimitImpl(int rate, int capacity) {
this.rate = rate;
this.capacity = capacity;
this.curCapacity = capacity;
this.lastTimestamp = System.currentTimeMillis();
}
@Override
public boolean getToken() {
// 优化同步仅限于关键部分减少锁竞争
synchronized (this) {
// 如果当前桶还有剩余就直接返回
if (curCapacity > 0) {
curCapacity--;
return true;
}
long currentTimestamp = System.currentTimeMillis();
// 如果距离上一次请求的时间大于 RATE 的时间间隔
if (currentTimestamp - lastTimestamp >= rate) {
// 计算这段时间内生成的令牌数量
int generatedTokens = (int) ((currentTimestamp - lastTimestamp) / rate);
if (generatedTokens > 1) {
// 只添加剩余令牌确保不会超过桶的容量
curCapacity = Math.min(capacity, curCapacity + generatedTokens - 1);
}
// 更新时间戳
lastTimestamp = currentTimestamp;
return true;
}
return false; // 如果无法获取令牌返回 false
}
}
}

View File

@ -0,0 +1,34 @@
package com.kama.server.ratelimit.provider;
import com.kama.server.ratelimit.RateLimit;
import com.kama.server.ratelimit.impl.TokenBucketRateLimitImpl;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName RateLimitProvider
* @Description 提供限流器
* @Author Tong
* @LastChangeDate 2024-12-02 10:54
* @Version v5.0
*/
@Slf4j
public class RateLimitProvider {
private final Map<String, RateLimit> rateLimitMap = new ConcurrentHashMap<>();
// 默认的限流桶容量和令牌生成速率
private static final int DEFAULT_CAPACITY = 100;
private static final int DEFAULT_RATE = 10;
// 提供限流实例
public RateLimit getRateLimit(String interfaceName) {
return rateLimitMap.computeIfAbsent(interfaceName, key -> {
RateLimit rateLimit = new TokenBucketRateLimitImpl(DEFAULT_CAPACITY, DEFAULT_RATE);
log.info("为接口 [{}] 创建了新的限流策略: {}", interfaceName, rateLimit);
return rateLimit;
});
}
}

View File

@ -0,0 +1,16 @@
package com.kama.server.server;
/**
* @InterfaceName RpcServer
* @Description 服务端接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:21
* @Version v1.0
*/
public interface RpcServer {
void start(int port);
void stop();
}

View File

@ -0,0 +1,81 @@
package com.kama.server.server.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kama.server.netty.NettyServerInitializer;
import com.kama.server.provider.ServiceProvider;
import com.kama.server.server.RpcServer;
/**
* @ClassName NettyRpcServer
* @Description Netty服务端
* @Author Tong
* @LastChangeDate 2024-12-02 10:25
* @Version v5.0
*/
@Slf4j
@AllArgsConstructor
public class NettyRpcServer implements RpcServer {
private final ServiceProvider serviceProvider; // 只需要 ServiceProvider
private ChannelFuture channelFuture; // ChannelFuture start 方法内初始化
public NettyRpcServer(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
}
@Override
public void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
log.info("Netty服务端启动了");
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyServerInitializer(serviceProvider));
// 同步阻塞绑定端口启动服务
channelFuture = serverBootstrap.bind(port).sync();
log.info("Netty服务端已绑定端口{}", port);
// 阻塞等待服务关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Netty服务端启动中断{}", e.getMessage(), e);
} finally {
shutdown(bossGroup, workGroup); // 集中管理线程组资源
log.info("Netty服务端关闭了");
}
}
@Override
public void stop() {
if (channelFuture != null) {
try {
channelFuture.channel().close().sync();
log.info("Netty服务端主通道已关闭");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("关闭Netty服务端主通道时中断{}", e.getMessage(), e);
}
} else {
log.warn("Netty服务端主通道尚未启动无法关闭");
}
}
private void shutdown(NioEventLoopGroup bossGroup, NioEventLoopGroup workGroup) {
if (bossGroup != null) {
bossGroup.shutdownGracefully().syncUninterruptibly();
}
if (workGroup != null) {
workGroup.shutdownGracefully().syncUninterruptibly();
}
}
}

View File

@ -0,0 +1,70 @@
package com.kama.server.server.impl;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kama.server.provider.ServiceProvider;
import com.kama.server.server.RpcServer;
import com.kama.server.server.work.WorkThread;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @ClassName SimpleRpcServer
* @Description 简单服务端
* @Author Tong
* @LastChangeDate 2024-12-02 10:23
* @Version v5.0
*/
@AllArgsConstructor
@Slf4j
public class SimpleRpcServer implements RpcServer {
private ServiceProvider serviceProvider;
// 控制服务器运行状态
private AtomicBoolean running = new AtomicBoolean(true);
private ServerSocket serverSocket;
@Override
public void start(int port) {
try {
serverSocket = new ServerSocket(port);
log.info("服务器启动了,监听端口:{}", port);
while (running.get()) {
try {
Socket socket = serverSocket.accept();
new Thread(new WorkThread(socket, serviceProvider)).start();
} catch (IOException e) {
if (running.get()) { // 如果不是因为服务器被停止导致的异常
log.error("接受连接时发生异常:{}", e.getMessage(), e);
}
}
}
} catch (IOException e) {
log.error("服务器启动失败:{}", e.getMessage(), e);
} finally {
stop();
}
}
@Override
public void stop() {
if (!running.get()) return; // 防止重复停止
running.set(false);
log.info("服务器正在关闭...");
// 关闭 ServerSocket
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
log.info("服务器已关闭");
} catch (IOException e) {
log.error("关闭服务器时发生异常:{}", e.getMessage(), e);
}
}
}
}

View File

@ -0,0 +1,61 @@
package com.kama.server.server.work;
import common.message.RpcRequest;
import common.message.RpcResponse;
import lombok.AllArgsConstructor;
import com.kama.server.provider.ServiceProvider;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
/**
* @ClassName WorkThread
* @Description
* @Author Tong
* @LastChangeDate 2024-12-02 10:22
* @Version v5.0
*/
@AllArgsConstructor
public class WorkThread implements Runnable {
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse = getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest) {
//得到服务名
String interfaceName = rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method;
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke = method.invoke(service, rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
return RpcResponse.fail("方法执行错误");
}
}
}

View File

@ -0,0 +1,16 @@
package com.kama.server.serviceRegister;
import java.net.InetSocketAddress;
/**
* @InterfaceName ServiceRegister
* @Description 服务注册接口
* @Author Tong
* @LastChangeDate 2024-12-02 10:27
* @Version v5.0
*/
public interface ServiceRegister {
void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry);
}

View File

@ -0,0 +1,74 @@
package com.kama.server.serviceRegister.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kama.server.serviceRegister.ServiceRegister;
import java.net.InetSocketAddress;
/**
* @ClassName ZKServiceRegister
* @Description zk服务注册中心
* @Author Tong
* @LastChangeDate 2024-12-02 10:28
* @Version v5.0
*/
@Slf4j
public class ZKServiceRegister implements ServiceRegister {
private CuratorFramework client;
private static final String ROOT_PATH = "MyRPC";
private static final String RETRY = "CanRetry";
public ZKServiceRegister() {
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
this.client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000)
.retryPolicy(policy)
.namespace(ROOT_PATH)
.build();
this.client.start();
log.info("Zookeeper 连接成功");
}
@Override
public void register(String serviceName, InetSocketAddress serviceAddress, boolean canRetry) {
try {
if (client.checkExists().forPath("/" + serviceName) == null) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + serviceName);
log.info("服务节点 {} 创建成功", "/" + serviceName);
}
String path = "/" + serviceName + "/" + getServiceAddress(serviceAddress);
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
log.info("服务地址 {} 注册成功", path);
} else {
log.info("服务地址 {} 已经存在,跳过注册", path);
}
if (canRetry) {
path = "/" + RETRY + "/" + serviceName;
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
log.info("重试标识 {} 注册成功", path);
}
} catch (Exception e) {
log.error("服务注册失败,服务名:{},错误信息:{}", serviceName, e.getMessage(), e);
}
}
@Override
public String toString() {
return "zookeeper";
}
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() + ":" + serverAddress.getPort();
}
}

View File

@ -0,0 +1,100 @@
package com.kama.test.balance;
import com.kama.client.servicecenter.balance.impl.ConsistencyHashBalance;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* @ClassName ConsistencyHashBalanceTest
* @Description 一致性哈希测试类
* @Author Tong
* @LastChangeDate 2024-12-05 15:39
* @Version v5.0
*/
public class ConsistencyHashBalanceTest {
private ConsistencyHashBalance balance;
@Before
public void setUp() {
balance = new ConsistencyHashBalance();
}
@Test
public void testInit() {
// 模拟真实节点
List<String> nodes = Arrays.asList("server1", "server2", "server3");
balance.init(nodes);
// 验证虚拟节点的初始化是否正确
assertTrue("shards should not be empty", balance.getShards().size() > 0);
assertTrue("realNodes should contain all nodes", balance.getRealNodes().containsAll(nodes));
}
@Test
public void testGetServer() {
// 模拟真实节点
List<String> nodes = Arrays.asList("server1", "server2", "server3");
balance.init(nodes);
// 使用 UUID 作为请求的唯一标识符进行负载均衡
String server = balance.getServer("request-1", nodes);
assertNotNull("Server should not be null", server);
assertTrue("Server should be one of the real nodes", nodes.contains(server));
// 确保多个请求的分配在不同节点上可根据测试的多次运行结果观察
String server2 = balance.getServer("request-2", nodes);
assertNotEquals("Server should be different from the previous request", server, server2);
}
@Test
public void testAddNode() {
// 模拟真实节点
List<String> nodes = Arrays.asList("server1", "server2");
balance.init(nodes);
// 新加入一个节点
balance.addNode("server3");
// 验证新节点是否被加入
assertTrue("server3 should be added", balance.getRealNodes().contains("server3"));
assertTrue("shards should contain virtual nodes for server3", balance.getShards().size() > 0);
}
@Test
public void testDelNode() {
// 模拟真实节点
List<String> nodes = Arrays.asList("server1", "server2");
balance.init(nodes);
// 删除一个节点
balance.delNode("server1");
// 验证该节点是否被移除
assertFalse("server1 should be removed", balance.getRealNodes().contains("server1"));
assertFalse("shards should not contain virtual nodes for server1", balance.getShards().values().stream().anyMatch(vn -> vn.startsWith("server1")));
}
@Test(expected = IllegalArgumentException.class)
public void testBalanceWithEmptyList() {
// 测试地址列表为空时抛出 IllegalArgumentException
balance.balance(Arrays.asList());
}
@Test(expected = IllegalArgumentException.class)
public void testBalanceWithNullList() {
// 测试地址列表为 null 抛出 IllegalArgumentException
balance.balance(null);
}
@Test
public void testGetVirtualNum() {
// 测试虚拟节点的数量
assertEquals("Virtual nodes count should be 5", 5, ConsistencyHashBalance.getVirtualNum());
}
}

View File

@ -0,0 +1,76 @@
package com.kama.test.balance;
import com.kama.client.servicecenter.balance.impl.RandomLoadBalance;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* @ClassName RandomLoadBalanceTest
* @Description 随机负载均衡器测试
* @Author Tong
* @LastChangeDate 2024-12-05 15:43
* @Version v5.0
*/
public class RandomLoadBalanceTest {
private RandomLoadBalance loadBalance;
@Before
public void setUp() {
// 在每个测试前初始化负载均衡器
loadBalance = new RandomLoadBalance();
}
@Test
public void testBalance_WithNonEmptyList() {
// 准备一个非空的地址列表
List<String> addressList = Arrays.asList("server1", "server2", "server3");
// 使用 balance 方法选择一个服务器
String selectedServer = loadBalance.balance(addressList);
// 确保选择的服务器在列表中
assertTrue(addressList.contains(selectedServer));
}
@Test(expected = IllegalArgumentException.class)
public void testBalance_WithEmptyList() {
// 测试空的节点列表应该抛出 IllegalArgumentException 异常
loadBalance.balance(Arrays.asList());
}
@Test(expected = IllegalArgumentException.class)
public void testBalance_WithNullList() {
// 测试 null 的节点列表应该抛出 IllegalArgumentException 异常
loadBalance.balance(null);
}
@Test
public void testAddNode() {
// 测试添加节点到负载均衡器
loadBalance.addNode("server4");
// 确保新添加的节点在负载均衡器中
List<String> addressList = Arrays.asList("server1", "server2", "server3", "server4");
String selectedServer = loadBalance.balance(addressList);
assertTrue(addressList.contains(selectedServer));
}
@Test
public void testDelNode() {
// 测试从负载均衡器中移除节点
loadBalance.addNode("server4");
loadBalance.delNode("server4");
// 确保删除后的节点不再在负载均衡器中
List<String> addressList = Arrays.asList("server1", "server2", "server3");
String selectedServer = loadBalance.balance(addressList);
assertFalse(addressList.contains("server4"));
}
}

View File

@ -0,0 +1,93 @@
package com.kama.test.balance;
import com.kama.client.servicecenter.balance.impl.RoundLoadBalance;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
/**
* @ClassName RoundLoadBalanceTest
* @Description 轮询测试类
* @Author Tong
* @LastChangeDate 2024-12-05 15:46
* @Version v5.0
*/
public class RoundLoadBalanceTest {
private RoundLoadBalance loadBalance;
@Before
public void setUp() {
// 在每个测试前初始化负载均衡器
loadBalance = new RoundLoadBalance();
}
@Test
public void testBalance_WithNonEmptyList() {
// 准备一个非空的地址列表
List<String> addressList = Arrays.asList("server1", "server2", "server3");
// 执行 balance 方法并获取返回的服务器
String selectedServer = loadBalance.balance(addressList);
// 确保选择的服务器在列表中
assertTrue(addressList.contains(selectedServer));
}
@Test(expected = IllegalArgumentException.class)
public void testBalance_WithEmptyList() {
// 测试空的节点列表应该抛出 IllegalArgumentException 异常
loadBalance.balance(Arrays.asList());
}
@Test(expected = IllegalArgumentException.class)
public void testBalance_WithNullList() {
// 测试 null 的节点列表应该抛出 IllegalArgumentException 异常
loadBalance.balance(null);
}
@Test
public void testAddNode() {
// 测试添加节点到负载均衡器
loadBalance.addNode("server4");
// 确保新添加的节点在负载均衡器中
List<String> addressList = Arrays.asList("server1", "server2", "server3", "server4");
String selectedServer = loadBalance.balance(addressList);
assertTrue(addressList.contains(selectedServer));
}
@Test
public void testDelNode() {
// 测试从负载均衡器中移除节点
loadBalance.addNode("server4");
loadBalance.delNode("server4");
// 确保删除后的节点不再在负载均衡器中
List<String> addressList = Arrays.asList("server1", "server2", "server3");
String selectedServer = loadBalance.balance(addressList);
assertFalse(addressList.contains("server4"));
}
@Test
public void testBalance_RoundRobin() {
// 测试负载均衡是否按轮询顺序选择服务器
List<String> addressList = Arrays.asList("server1", "server2", "server3");
// 轮询选择服务器
String firstSelection = loadBalance.balance(addressList);
String secondSelection = loadBalance.balance(addressList);
String thirdSelection = loadBalance.balance(addressList);
String fourthSelection = loadBalance.balance(addressList); // Should loop back to first
// 确保选择的服务器是轮询顺序的
assertNotEquals(firstSelection, secondSelection);
assertNotEquals(secondSelection, thirdSelection);
assertNotEquals(thirdSelection, fourthSelection);
assertEquals(firstSelection, fourthSelection); // Should be back to the first
}
}

View File

@ -0,0 +1,52 @@
package com.kama.test.serializer;
import common.exception.SerializeException;
import common.serializer.myserializer.HessianSerializer;
import org.junit.Test;
/**
* @ClassName HessianSerializerTest
* @Description Hessian测试类
* @Author Tong
* @LastChangeDate 2024-12-05 15:21
* @Version v5.0
*/
import org.junit.Test;
import static org.junit.Assert.*;
public class HessianSerializerTest {
private HessianSerializer serializer = new HessianSerializer();
@Test
public void testSerializeAndDeserialize() {
// 创建一个测试对象
String original = "Hello, Hessian!";
// 序列化
byte[] serialized = serializer.serialize(original);
assertNotNull("序列化结果不应为 null", serialized);
// 反序列化
Object deserialized = serializer.deserialize(serialized, 3);
assertNotNull("反序列化结果不应为 null", deserialized);
// 校验反序列化的结果
assertEquals("反序列化的对象应该与原对象相同", original, deserialized);
}
@Test
public void testDeserializeWithInvalidData() {
byte[] invalidData = new byte[]{1, 2, 3}; // 假数据
// 测试无效数据反序列化
try {
serializer.deserialize(invalidData, 3);
fail("反序列化时应抛出异常");
} catch (SerializeException e) {
assertEquals("Deserialization failed", e.getMessage());
}
}
}

View File

@ -0,0 +1,89 @@
package com.kama.test.serializer;
import com.kama.pojo.User;
import common.exception.SerializeException;
import common.serializer.myserializer.KryoSerializer;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @ClassName KryoSerializer
* @Description kryo测试类
* @Author Tong
* @LastChangeDate 2024-12-05 15:31
* @Version v5.0
*/
public class KryoSerializerTest {
private KryoSerializer serializer = new KryoSerializer();
@Test
public void testSerializeAndDeserialize() {
// 创建一个 User 对象
User originalUser = User.builder()
.id(1)
.userName("TestUser")
.gender(true)
.build();
// 序列化
byte[] serialized = serializer.serialize(originalUser);
assertNotNull("序列化结果不应为 null", serialized);
// 反序列化
Object deserialized = serializer.deserialize(serialized, 1);
assertNotNull("反序列化结果不应为 null", deserialized);
// 校验反序列化的对象是否与原对象相同
assertTrue("反序列化的对象应该是 User 类型", deserialized instanceof User);
User deserializedUser = (User) deserialized;
assertEquals("反序列化的 User 应该与原 User 相同", originalUser, deserializedUser);
}
@Test
public void testSerializeNullObject() {
// 测试序列化 null 对象
try {
serializer.serialize(null);
fail("序列化 null 对象时应抛出 IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("Cannot serialize null object", e.getMessage());
}
}
@Test
public void testDeserializeNullBytes() {
// 测试反序列化 null 或空字节数组
try {
serializer.deserialize(null, 1);
fail("反序列化 null 字节数组时应抛出 IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("Cannot deserialize null or empty byte array", e.getMessage());
}
}
@Test
public void testDeserializeEmptyBytes() {
// 测试反序列化空字节数组
try {
serializer.deserialize(new byte[0], 1);
fail("反序列化空字节数组时应抛出 IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("Cannot deserialize null or empty byte array", e.getMessage());
}
}
@Test
public void testDeserializeInvalidMessageType() {
// 测试反序列化未知的 messageType
byte[] serialized = serializer.serialize(new User(1, "TestUser", true));
try {
serializer.deserialize(serialized, 99); // 使用无效的 messageType
fail("反序列化时应抛出 SerializeException");
} catch (SerializeException e) {
assertEquals("Deserialization failed", e.getMessage());
}
}
}

Some files were not shown because too many files have changed in this diff Show More