From b64647ceb0515ad72c0aeda81c98fb85ac34c0f1 Mon Sep 17 00:00:00 2001 From: icanci Date: Sat, 7 Jan 2023 13:19:48 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BD=91=E7=BB=9C=E5=B1=82=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ddk/client/DdkClientAutoConfig.java | 20 +++ .../ddk/client/properties/DdkProperties.java | 88 ++++++++++++ .../repository/DdkRepositoryHolder.java | 60 ++++++++ .../server/NamedNettyServerHandler.java | 122 ++++++++++++++++ .../NamedServer.java} | 5 +- .../ddk/client/server/RegisterServer.java | 130 ++++++++++++++++++ .../main/resources/META-INF/spring.factories | 1 + .../ddk/common/socket/PublishDTO.java | 14 ++ .../ddk/common/socket/PublishTypeEnum.java | 18 +++ .../ddk/common/socket/RegisterDTO.java | 54 ++++++++ .../ddk/common/socket/SocketMessage.java | 49 +++++++ .../ddk/common/socket/UriConstant.java | 31 +++++ 12 files changed, 590 insertions(+), 2 deletions(-) create mode 100644 client/src/main/java/cn/icanci/loopstack/ddk/client/DdkClientAutoConfig.java create mode 100644 client/src/main/java/cn/icanci/loopstack/ddk/client/properties/DdkProperties.java create mode 100644 client/src/main/java/cn/icanci/loopstack/ddk/client/repository/DdkRepositoryHolder.java create mode 100644 client/src/main/java/cn/icanci/loopstack/ddk/client/server/NamedNettyServerHandler.java rename client/src/main/java/cn/icanci/loopstack/ddk/client/{ClientServer.java => server/NamedServer.java} (48%) create mode 100644 client/src/main/java/cn/icanci/loopstack/ddk/client/server/RegisterServer.java create mode 100755 client/src/main/resources/META-INF/spring.factories create mode 100644 common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishDTO.java create mode 100644 common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishTypeEnum.java create mode 100644 common/src/main/java/cn/icanci/loopstack/ddk/common/socket/RegisterDTO.java create mode 100644 common/src/main/java/cn/icanci/loopstack/ddk/common/socket/SocketMessage.java create mode 100644 common/src/main/java/cn/icanci/loopstack/ddk/common/socket/UriConstant.java diff --git a/client/src/main/java/cn/icanci/loopstack/ddk/client/DdkClientAutoConfig.java b/client/src/main/java/cn/icanci/loopstack/ddk/client/DdkClientAutoConfig.java new file mode 100644 index 0000000..3b65564 --- /dev/null +++ b/client/src/main/java/cn/icanci/loopstack/ddk/client/DdkClientAutoConfig.java @@ -0,0 +1,20 @@ +package cn.icanci.loopstack.ddk.client; + +import cn.icanci.loopstack.ddk.client.properties.DdkProperties; + +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:27 + */ +@Configuration +@ComponentScan({ "cn.icanci.loopstack.ddk.client" }) +@EnableConfigurationProperties(DdkProperties.class) +@AutoConfigureBefore +public class DdkClientAutoConfig { + +} diff --git a/client/src/main/java/cn/icanci/loopstack/ddk/client/properties/DdkProperties.java b/client/src/main/java/cn/icanci/loopstack/ddk/client/properties/DdkProperties.java new file mode 100644 index 0000000..cb4034c --- /dev/null +++ b/client/src/main/java/cn/icanci/loopstack/ddk/client/properties/DdkProperties.java @@ -0,0 +1,88 @@ +package cn.icanci.loopstack.ddk.client.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:28 + */ +@Component +@ConfigurationProperties(prefix = "ddk") +public class DdkProperties { + + /** + * 是否加载 + */ + private boolean load = true; + /** + * appId + */ + private String appId; + + /** + * 加载的环境 + */ + private String env = "test"; + + /** + * 客户端注册的port + */ + private int clientPort = 11000; + /** + * 服务端ip,以,分隔 + */ + private String serverIps = "127.0.0.1"; + /** + * 服务端port + */ + private int serverPort = 9998; + + public boolean isLoad() { + return load; + } + + public void setLoad(boolean load) { + this.load = load; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getEnv() { + return env; + } + + public void setEnv(String env) { + this.env = env; + } + + public int getClientPort() { + return clientPort; + } + + public void setClientPort(int clientPort) { + this.clientPort = clientPort; + } + + public String getServerIps() { + return serverIps; + } + + public void setServerIps(String serverIps) { + this.serverIps = serverIps; + } + + public int getServerPort() { + return serverPort; + } + + public void setServerPort(int serverPort) { + this.serverPort = serverPort; + } +} diff --git a/client/src/main/java/cn/icanci/loopstack/ddk/client/repository/DdkRepositoryHolder.java b/client/src/main/java/cn/icanci/loopstack/ddk/client/repository/DdkRepositoryHolder.java new file mode 100644 index 0000000..9377b70 --- /dev/null +++ b/client/src/main/java/cn/icanci/loopstack/ddk/client/repository/DdkRepositoryHolder.java @@ -0,0 +1,60 @@ +package cn.icanci.loopstack.ddk.client.repository; + +import cn.icanci.loopstack.ddk.common.client.Client; +import cn.icanci.loopstack.ddk.common.client.http.HttpClientImpl; +import cn.icanci.loopstack.ddk.common.socket.PublishDTO; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Service; + +/** + * DdkRepositoryHolder + * 本地仓储缓存器 + * - 在load的时候,需要判断是否 + * @author icanci + * @since 1.0 Created in 2023/01/06 22:53 + */ +@Service +public class DdkRepositoryHolder implements InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(DdkRepositoryHolder.class); + + private static final Client CLIENT = HttpClientImpl.getInstance(); + + // 字段map + + // 方法map + + @Override + public void afterPropertiesSet() throws Exception { + + } + + /** + * 进行全局的加载 + * 1、判断是否具备权限 + * 2、加载字段数据,构建map + * 3、调用缓存方法 + */ + public synchronized void load() { + + } + + /** + * 刷新缓存数据 + * + * @param publish publish + */ + public synchronized void refresh(PublishDTO publish) { + } + + /** + * 调用执行的方法 + * + * @param publish publish + */ + public synchronized void invoke(PublishDTO publish) { + + } +} diff --git a/client/src/main/java/cn/icanci/loopstack/ddk/client/server/NamedNettyServerHandler.java b/client/src/main/java/cn/icanci/loopstack/ddk/client/server/NamedNettyServerHandler.java new file mode 100644 index 0000000..ae2b008 --- /dev/null +++ b/client/src/main/java/cn/icanci/loopstack/ddk/client/server/NamedNettyServerHandler.java @@ -0,0 +1,122 @@ +package cn.icanci.loopstack.ddk.client.server; + +import cn.hutool.json.JSONUtil; +import cn.icanci.loopstack.ddk.client.repository.DdkRepositoryHolder; +import cn.icanci.loopstack.ddk.common.socket.PublishDTO; +import cn.icanci.loopstack.ddk.common.socket.SocketMessage; +import cn.icanci.loopstack.ddk.common.socket.UriConstant; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.*; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; +import io.netty.util.internal.ThrowableUtil; + +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:49 + */ +public class NamedNettyServerHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(NamedNettyServerHandler.class); + + private final ThreadPoolExecutor pool; + + private static DdkRepositoryHolder ddkRepositoryHolder; + + public NamedNettyServerHandler(ThreadPoolExecutor pool) { + this.pool = pool; + } + + public static void setEngineRepositoryHolder(DdkRepositoryHolder engineRepositoryHolder) { + NamedNettyServerHandler.ddkRepositoryHolder = engineRepositoryHolder; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { + String requestData = msg.content().toString(CharsetUtil.UTF_8); + String uri = msg.uri(); + HttpMethod httpMethod = msg.method(); + boolean keepAlive = HttpUtil.isKeepAlive(msg); + // 对于这种配置数据,会有很频繁的变更 + pool.execute(() -> { + Object responseObj = process(httpMethod, uri, requestData); + + String responseJson = JSONUtil.toJsonStr(responseObj); + + writeResponse(ctx, keepAlive, responseJson); + }); + } + + /** + * 后置处理 + * + * @param httpMethod httpMethod + * @param uri uri + * @param requestData requestData + * @return Object + */ + private Object process(HttpMethod httpMethod, String uri, String requestData) { + if (HttpMethod.POST != httpMethod) { + return SocketMessage.fail("Only post requests are supported"); + } + if (StringUtils.isBlank(uri)) { + return SocketMessage.fail("Request uri is null"); + } + try { + switch (uri) { + case UriConstant.ToClient.HEARTBEAT: + logger.info("[{}][NamedNettyServerHandler][process] heartbeat", Thread.currentThread().getName()); + return SocketMessage.success(); + case UriConstant.ToClient.REFRESH: + ddkRepositoryHolder.refresh(JSONUtil.toBean(requestData, PublishDTO.class)); + logger.info("[{}][NamedNettyServerHandler][process] {} was refreshed!", Thread.currentThread().getName(), requestData); + return SocketMessage.success(); + case UriConstant.ToClient.INVOKE: + ddkRepositoryHolder.invoke(JSONUtil.toBean(requestData, PublishDTO.class)); + logger.info("[{}][NamedNettyServerHandler][process] {} was invoked!", Thread.currentThread().getName(), requestData); + return SocketMessage.success(); + default: + return SocketMessage.fail("Invalid request, uri-mapping(" + uri + ") not found"); + } + } catch (Exception e) { + return SocketMessage.fail(ThrowableUtil.stackTraceToString(e)); + } + } + + private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + if (keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } + ctx.writeAndFlush(response); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error(ThrowableUtil.stackTraceToString(cause)); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } +} diff --git a/client/src/main/java/cn/icanci/loopstack/ddk/client/ClientServer.java b/client/src/main/java/cn/icanci/loopstack/ddk/client/server/NamedServer.java similarity index 48% rename from client/src/main/java/cn/icanci/loopstack/ddk/client/ClientServer.java rename to client/src/main/java/cn/icanci/loopstack/ddk/client/server/NamedServer.java index 7f9a394..52ed09f 100644 --- a/client/src/main/java/cn/icanci/loopstack/ddk/client/ClientServer.java +++ b/client/src/main/java/cn/icanci/loopstack/ddk/client/server/NamedServer.java @@ -1,8 +1,9 @@ -package cn.icanci.loopstack.ddk.client; +package cn.icanci.loopstack.ddk.client.server; /** * @author icanci * @since 1.0 Created in 2023/01/01 09:55 */ -public class ClientServer { +public class NamedServer { + } diff --git a/client/src/main/java/cn/icanci/loopstack/ddk/client/server/RegisterServer.java b/client/src/main/java/cn/icanci/loopstack/ddk/client/server/RegisterServer.java new file mode 100644 index 0000000..31bf487 --- /dev/null +++ b/client/src/main/java/cn/icanci/loopstack/ddk/client/server/RegisterServer.java @@ -0,0 +1,130 @@ +package cn.icanci.loopstack.ddk.client.server; + +import java.util.concurrent.*; + +import javax.annotation.Resource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Maps; + +import cn.hutool.http.Method; +import cn.hutool.json.JSONUtil; +import cn.icanci.loopstack.ddk.client.properties.DdkProperties; +import cn.icanci.loopstack.ddk.common.client.Client; +import cn.icanci.loopstack.ddk.common.client.RemoteException; +import cn.icanci.loopstack.ddk.common.client.http.HttpClientImpl; +import cn.icanci.loopstack.ddk.common.result.R; +import cn.icanci.loopstack.ddk.common.socket.RegisterDTO; +import cn.icanci.loopstack.ddk.common.socket.UriConstant; +import cn.icanci.loopstack.ddk.common.utils.IPUtils; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:30 + */ +@Service +@SuppressWarnings("all") +public class RegisterServer implements InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(RegisterServer.class); + /** http实例 */ + private static final Client CLIENT = HttpClientImpl.getInstance(); + @Resource + private DdkProperties ddkProperties; + + private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); + + private static final ThreadPoolExecutor REGISTER_POOL = new ThreadPoolExecutor(CORE_SIZE, // + CORE_SIZE << 1, // + 60L, // + TimeUnit.SECONDS, // + new LinkedBlockingQueue<>(2000), // + runnable -> new Thread(runnable, "RegisterClient Biz Pool-" + runnable.hashCode()), // + (r, executor) -> { + throw new RuntimeException("RegisterClient Biz Pool is EXHAUSTED!"); + }); + + /** 注册请求地址 */ + private static final String REQ_URL_FORMAT = "http://%s:%s%s"; + + /** + * 将SDK所在服务注册到注册中心 + * + * @param serverAddress 服务端ip地址 + * @param serverPort 服务端端口 + * @param clientPort 客户端端口 + */ + public void register(String serverAddress, int serverPort, int clientPort) { + String appId = ddkProperties.getAppId(); + + String[] addresses = serverAddress.split(","); + + // TODO 注册地址打散,防止压力放在同一个机器上 + for (String address : addresses) { + try { + FutureTask task = new FutureTask<>(new RegisterCallable(address, clientPort, appId, serverPort)); + REGISTER_POOL.execute(task); + R r = task.get(10, TimeUnit.SECONDS); + if (r.isOk()) { + break; + } else { + logger.error("Task Register Exception:{}", r.getMessage()); + } + } catch (RemoteException | ExecutionException | InterruptedException | TimeoutException e) { + logger.warn("Register Exception:{}", e.getMessage()); + } + } + } + + @Override + public void afterPropertiesSet() throws Exception { + if (!ddkProperties.isLoad()) { + return; + } + // 项目启动加载缓存 + refreshCache(); + + // 注入注册bean + NamedServer.setRegisterService(this); + + // 启动服务器 + NamedServer.startClient(ddkProperties.getServerIps(), ddkProperties.getServerPort(), ddkProperties.getClientPort(), ddkProperties.getAppId()); + } + + private void refreshCache() { + // TODO 刷新全局缓存,进行处理 + } + + /** 注册器 */ + private static class RegisterCallable implements Callable { + + private final String address; + private final int clientPort; + private final String appId; + private final int serverPort; + + public RegisterCallable(String address, int clientPort, String appId, int serverPort) { + this.address = address; + this.clientPort = clientPort; + this.appId = appId; + this.serverPort = serverPort; + } + + @Override + public R call() throws Exception { + RegisterDTO registerDTO = new RegisterDTO(IPUtils.getHostIpAddress(), clientPort, appId); + String reqUrl = String.format(REQ_URL_FORMAT, address, serverPort, UriConstant.ToServer.REGISTER); + + Client.RpcRequest rpcRequest = new Client.RpcRequest(reqUrl, registerDTO, Maps.newHashMap(), Method.POST, 3, TimeUnit.SECONDS, 3); + + R call = CLIENT.call(rpcRequest, R.class); + + logger.info("[RegisterCallable][call] Register result:{}", JSONUtil.toJsonStr(call)); + + return call; + } + } +} diff --git a/client/src/main/resources/META-INF/spring.factories b/client/src/main/resources/META-INF/spring.factories new file mode 100755 index 0000000..b15672d --- /dev/null +++ b/client/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.icanci.loopstack.ddk.client.DdkClientAutoConfig \ No newline at end of file diff --git a/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishDTO.java b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishDTO.java new file mode 100644 index 0000000..2ab124c --- /dev/null +++ b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishDTO.java @@ -0,0 +1,14 @@ +package cn.icanci.loopstack.ddk.common.socket; + +import java.io.Serializable; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:57 + */ +public class PublishDTO implements Serializable { + /** + * 发布类型 + */ + private PublishTypeEnum publishType; +} diff --git a/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishTypeEnum.java b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishTypeEnum.java new file mode 100644 index 0000000..f2668ee --- /dev/null +++ b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/PublishTypeEnum.java @@ -0,0 +1,18 @@ +package cn.icanci.loopstack.ddk.common.socket; + +/** + * 发布类型 + * + * @author icanci + * @since 1.0 Created in 2023/01/07 09:35 + */ +public enum PublishTypeEnum { + /** + * 值刷新 + */ + VALUE_REFRESH, + /** + * 方法调用 + */ + METHOD_INVOKE; +} diff --git a/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/RegisterDTO.java b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/RegisterDTO.java new file mode 100644 index 0000000..425b8e4 --- /dev/null +++ b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/RegisterDTO.java @@ -0,0 +1,54 @@ +package cn.icanci.loopstack.ddk.common.socket; + +import java.io.Serializable; + +/** + * @author icanci + * @since 1.0 Created in 2023/01/06 22:42 + */ +public class RegisterDTO implements Serializable { + private static final long serialVersionUID = -2030921699127783725L; + + /** + * SDK 服务ip地址 + */ + private String clientAddress; + /** + * SDK 服务端口地址 + */ + private int clientPort; + /** + * SDK 服务服务ID + */ + private String appId; + + public RegisterDTO(String clientAddress, int clientPort, String appId) { + this.clientAddress = clientAddress; + this.clientPort = clientPort; + this.appId = appId; + } + + public String getClientAddress() { + return clientAddress; + } + + public void setClientAddress(String clientAddress) { + this.clientAddress = clientAddress; + } + + public int getClientPort() { + return clientPort; + } + + public void setClientPort(int clientPort) { + this.clientPort = clientPort; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } +} diff --git a/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/SocketMessage.java b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/SocketMessage.java new file mode 100644 index 0000000..deef580 --- /dev/null +++ b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/SocketMessage.java @@ -0,0 +1,49 @@ +package cn.icanci.loopstack.ddk.common.socket; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 21:01 + */ +@SuppressWarnings("all") +public class SocketMessage { + private boolean success; + private String errorMessage; + private T content; + + public static SocketMessage fail(String errorMessage) { + SocketMessage message = new SocketMessage(); + message.setSuccess(false); + message.setErrorMessage(errorMessage); + return message; + } + + public static SocketMessage success() { + SocketMessage message = new SocketMessage(); + message.setSuccess(true); + return message; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public T getContent() { + return content; + } + + public void setContent(T content) { + this.content = content; + } +} diff --git a/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/UriConstant.java b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/UriConstant.java new file mode 100644 index 0000000..d240ce1 --- /dev/null +++ b/common/src/main/java/cn/icanci/loopstack/ddk/common/socket/UriConstant.java @@ -0,0 +1,31 @@ +package cn.icanci.loopstack.ddk.common.socket; + +/** + * TODO 发布的处理 + * + * @author icanci + * @since 1.0 Created in 2023/01/06 22:41 + */ +public interface UriConstant { + /** + * 服务端调用Server端 + */ + interface ToClient { + /** 心跳 */ + String HEARTBEAT = "/ddk/register/heartbeat"; + /** 刷新 */ + String REFRESH = "/ddk/register/refresh"; + /** 方法调用 */ + String INVOKE = "/ddk/register/invoke"; + } + + /** + * 客户端调用Server + */ + interface ToServer { + /** 注册 */ + String REGISTER = "/ddk/register/doRegister"; + /** 初始化加载数据 包含仓储的refresh和方法的调用配置 */ + String LOAD = "/ddk/register/load"; + } +} -- Gitee