fix:优雅关闭;单例序列化器

This commit is contained in:
Wxx 2025-02-12 16:37:47 +08:00
parent 70f076867c
commit f4f0a28179
19 changed files with 34 additions and 11 deletions

View File

@ -20,16 +20,19 @@ public interface Serializer {
int getType(); int getType();
// 定义静态常量 serializerMap
static final Map<Integer, Serializer> serializerMap = new HashMap<>();
// 使用 Map 存储序列化器 // 使用 Map 存储序列化器
static Serializer getSerializerByCode(int code) { static Serializer getSerializerByCode(int code) {
// 静态映射保证只初始化一次 // 静态映射保证只初始化一次
Map<Integer, Serializer> serializerMap = new HashMap<>(); if(serializerMap.isEmpty()) {
serializerMap.put(0, new ObjectSerializer()); serializerMap.put(0, new ObjectSerializer());
serializerMap.put(1, new JsonSerializer()); serializerMap.put(1, new JsonSerializer());
serializerMap.put(2, new KryoSerializer()); serializerMap.put(2, new KryoSerializer());
serializerMap.put(3, new HessianSerializer()); serializerMap.put(3, new HessianSerializer());
serializerMap.put(4, new ProtostuffSerializer()); serializerMap.put(4, new ProtostuffSerializer());
}
return serializerMap.get(code); // 如果不存在则返回 null return serializerMap.get(code); // 如果不存在则返回 null
} }
} }

View File

@ -61,6 +61,7 @@ public class ConsumerTest {
// Gracefully shutdown the executor service // Gracefully shutdown the executor service
executorService.shutdown(); executorService.shutdown();
clientProxy.close();
} }
} }

View File

@ -109,4 +109,11 @@ public class ClientProxy implements InvocationHandler {
} }
return sb.toString(); return sb.toString();
} }
//关闭创建的资源
//如果在需要C-S保持长连接的场景下无需调用close方法
public void close(){
rpcClient.close();
serviceCenter.close();
}
} }

View File

@ -14,4 +14,5 @@ import common.message.RpcResponse;
public interface RpcClient { public interface RpcClient {
RpcResponse sendRequest(RpcRequest request); RpcResponse sendRequest(RpcRequest request);
void close();
} }

View File

@ -68,7 +68,6 @@ public class NettyRpcClient implements RpcClient {
// 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener... // 其它场景也可以选择添加监听器的方式来异步获取结果 channelFuture.addListener...
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse"); AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
RpcResponse response = channel.attr(key).get(); RpcResponse response = channel.attr(key).get();
if (response == null) { if (response == null) {
log.error("服务响应为空,可能是请求失败或超时"); log.error("服务响应为空,可能是请求失败或超时");
return RpcResponse.fail("服务响应为空"); return RpcResponse.fail("服务响应为空");
@ -82,14 +81,13 @@ public class NettyRpcClient implements RpcClient {
} catch (Exception e) { } catch (Exception e) {
log.error("发送请求时发生异常: {}", e.getMessage(), e); log.error("发送请求时发生异常: {}", e.getMessage(), e);
} finally { } finally {
// 连接断开后优雅地关闭 Netty 资源 //
shutdown();
} }
return RpcResponse.fail("请求失败"); return RpcResponse.fail("请求失败");
} }
// 优雅关闭 Netty 资源 // 优雅关闭 Netty 资源
private void shutdown() { public void close() {
try { try {
if (eventLoopGroup != null) { if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().sync(); eventLoopGroup.shutdownGracefully().sync();

View File

@ -54,4 +54,9 @@ public class SimpleSocketRpcClient implements RpcClient {
return response; return response;
} }
@Override
public void close() {
}
} }

View File

@ -19,4 +19,7 @@ public interface ServiceCenter {
//判断是否可重试 //判断是否可重试
boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature); boolean checkRetry(InetSocketAddress serviceAddress, String methodSignature);
//关闭客户端
void close();
} }

View File

@ -104,6 +104,11 @@ public class ZKServiceCenter implements ServiceCenter {
return retryServiceCache.contains(methodSignature); return retryServiceCache.contains(methodSignature);
} }
@Override
public void close() {
client.close();
}
// 将InetSocketAddress解析为格式为ip:port的字符串 // 将InetSocketAddress解析为格式为ip:port的字符串
private String getServiceAddress(InetSocketAddress serverAddress){ private String getServiceAddress(InetSocketAddress serverAddress){
return serverAddress.getHostName() + ":" + serverAddress.getPort(); return serverAddress.getHostName() + ":" + serverAddress.getPort();