diff --git a/README.md b/README.md index 4c86a1dc67da3f22cc49df02b0fa9b066bca7dc2..6c6d3f606972003d2ea4897a48a846fd2e757226 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,6 @@ REC(Rule Engine Component)规则引擎组件:提供统一的规则处理 - rec-engine-sdk:规则引擎SDK、SpringBootStarter模块(此处可能不太优雅) - rec-engine-sdk-http:规则引擎SDK的SPI实现,加载数据,HTTP实现 - rec-engine-sdk-netty:规则引擎SDK的SPI实现,加载数据,Netty实现 -- rec-socket:rec通信管理 - - rec-socket-client:rec通信客户端 - - rec-socket-server:rec通信服务端 @@ -269,4 +266,6 @@ REC(Rule Engine Component)规则引擎组件:提供统一的规则处理 - mvn versions:set -DnewVersion=0.0.0.2-SNAPSHOT - mvn versions:revert - mvn versions:commit -- mongodb数据库导出:mongodump -d rec -o /Users/icanci/Desktop \ No newline at end of file +- mongodb数据库导出:mongodump -d rec -o /Users/icanci/Desktop +- 思考:有个很扯淡的事情,既然已经支持SDK去加载数据了,那是否还需要定义SPI,然后提供默认实现加载数据呢? + - 这个问题值得商榷,因为服务端向客户端通知的时候,就可以携带数据了 \ No newline at end of file diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/config/RecAdminConfig.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/config/RecAdminConfig.java index cf6d64dc9041876989c2ced0afd780e6661bd25d..a61e44eda76cbd4280e60080a821fc754ac6e30a 100644 --- a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/config/RecAdminConfig.java +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/config/RecAdminConfig.java @@ -4,6 +4,7 @@ import cn.icanci.rec.spi.event.DefaultEventDispatcher; import cn.icanci.rec.spi.event.EventDispatcher; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; @@ -14,7 +15,7 @@ import org.springframework.context.annotation.Configuration; * @since 1.0 Created in 2022/11/13 14:06 */ @Configuration -public class RecAdminConfig implements ApplicationContextAware { +public class RecAdminConfig implements ApplicationContextAware, InitializingBean { /** * Spring 上下文 */ @@ -30,18 +31,13 @@ public class RecAdminConfig implements ApplicationContextAware { return new DefaultEventDispatcher(); } - // /** - // * 执行引擎 - // * - // * @return 返回脚本执行引擎 - // */ - // @Bean("recScriptEngine") - // public RecScriptEngine recScriptEngine() { - // return RecScriptEngineManager.getRecScriptEngine(); - // } - @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { context = applicationContext; } + + @Override + public void afterPropertiesSet() throws Exception { + // no op + } } diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/StrategyServiceImpl.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/StrategyServiceImpl.java index f9f3fea33b63fdffcf829ed2028976aed2bcb32e..77ec075446bf1d2b5b435e178a3b141d1163c250 100644 --- a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/StrategyServiceImpl.java +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/StrategyServiceImpl.java @@ -1,5 +1,21 @@ package cn.icanci.rec.admin.biz.service.impl; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.annotation.Resource; +import javax.script.CompiledScript; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import cn.icanci.rec.admin.biz.event.log.LogEvent; import cn.icanci.rec.admin.biz.mapper.config.StrategyMapper; import cn.icanci.rec.admin.biz.mapper.config.StrategyVoDtoMapper; @@ -17,29 +33,13 @@ import cn.icanci.rec.common.enums.ScriptTypeEnum; import cn.icanci.rec.common.model.config.StrategyVO; import cn.icanci.rec.common.utils.FastJsonUtils; import cn.icanci.rec.engine.script.RecScriptEngine; +import cn.icanci.rec.engine.sdk.actuator.RecRuleEngineActuator; import cn.icanci.rec.engine.sdk.actuator.RuleEngineRequest; import cn.icanci.rec.engine.sdk.actuator.RuleEngineResponse; -import cn.icanci.rec.engine.sdk.rule.EngineExecutor; import cn.icanci.rec.engine.sdk.rule.repository.DomainSceneKey; import cn.icanci.rec.engine.sdk.rule.repository.EngineRepositoryHolder; import cn.icanci.rec.spi.event.EventDispatcher; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import javax.annotation.Resource; -import javax.script.CompiledScript; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Service; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - /** * @author icanci * @since 1.0 Created in 2022/11/12 10:38 @@ -57,7 +57,7 @@ public class StrategyServiceImpl implements StrategyService { @Resource private EngineRepositoryHolder holder; @Resource - private EngineExecutor engineExecutor; + private RecRuleEngineActuator recRuleEngineActuator; @Resource private RecScriptEngine recScriptEngine; @Resource @@ -134,7 +134,7 @@ public class StrategyServiceImpl implements StrategyService { request.setParameters(StringUtils.isNotBlank(scriptContentTest) ? FastJsonUtils.fromJSONString(scriptContentTest, Map.class) : Maps.newHashMap()); // 8.执行 - RuleEngineResponse execute = engineExecutor.execute(request); + RuleEngineResponse execute = recRuleEngineActuator.executor(request); // 9.构建执行返回结果 result.setSuccess(execute.isSuccess()); diff --git a/rec-admin/rec-admin-views/src/main/resources/application.yml b/rec-admin/rec-admin-views/src/main/resources/application.yml index 5f7ace161c9d1b1506d5d78447a6c819b73ee6c7..221501e0b294805811387902733a7b55bc731af3 100644 --- a/rec-admin/rec-admin-views/src/main/resources/application.yml +++ b/rec-admin/rec-admin-views/src/main/resources/application.yml @@ -1,4 +1,3 @@ -# Tomcat server: tomcat: uri-encoding: UTF-8 @@ -9,15 +8,19 @@ server: logging: config: classpath:log4j2.xml +# rec.defaultPort 指的是服务端Netty监听端口 rec: env: test + server: + ip: 127.0.0.1 + port: 9999 + client: + port: 10000 + domain: + load-all: false + appName: rec-admin spring: - # datasource: - # driverClassName: com.mysql.cj.jdbc.Driver - # url: jdbc:mysql://127.0.0.1:3306/rec?useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC - # username: root - # password: root servlet: multipart: max-file-size: 10MB @@ -29,12 +32,4 @@ spring: data: mongodb: - uri: mongodb://127.0.0.1:27017/rec - -# 指定别名设置的包为所有entity -# type-aliases-package: cn.icanci.rec.admin.dal.model -# configuration: -# map-underscore-to-camel-case: true # 驼峰命名规范 -# mapper-locations: # mapper映射文件位置 -# - classpath:mybatis/mapper/*.xml - + uri: mongodb://127.0.0.1:27017/rec \ No newline at end of file diff --git a/rec-admin/rec-admin-web/src/main/java/cn/icanci/rec/admin/web/controller/trigger/TriggerController.java b/rec-admin/rec-admin-web/src/main/java/cn/icanci/rec/admin/web/controller/trigger/TriggerController.java new file mode 100644 index 0000000000000000000000000000000000000000..9078f841273f37151843f08a1090aa7561d7d196 --- /dev/null +++ b/rec-admin/rec-admin-web/src/main/java/cn/icanci/rec/admin/web/controller/trigger/TriggerController.java @@ -0,0 +1,31 @@ +package cn.icanci.rec.admin.web.controller.trigger; + +import cn.icanci.rec.admin.web.form.PublishForm; +import cn.icanci.rec.common.model.socket.RegisterDTO; +import cn.icanci.rec.common.result.R; + +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * TODO 基于域发布更新 + * + * @author icanci + * @since 1.0 Created in 2022/11/20 19:53 + */ +@RestController +@RequestMapping("/rec/trigger") +public class TriggerController { + + @PostMapping("publish") + public R debug(@RequestBody PublishForm publish) { + return R.builderOk().build(); + } + + @PostMapping("register") + public R register(@RequestBody RegisterDTO register) { + return R.builderOk().data("register", register).build(); + } +} diff --git a/rec-admin/rec-admin-web/src/main/java/cn/icanci/rec/admin/web/form/PublishForm.java b/rec-admin/rec-admin-web/src/main/java/cn/icanci/rec/admin/web/form/PublishForm.java new file mode 100644 index 0000000000000000000000000000000000000000..b10e30e955e6855771b647b098e48547ed707a66 --- /dev/null +++ b/rec-admin/rec-admin-web/src/main/java/cn/icanci/rec/admin/web/form/PublishForm.java @@ -0,0 +1,24 @@ +package cn.icanci.rec.admin.web.form; + +import java.io.Serializable; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 19:54 + */ +public class PublishForm implements Serializable { + private static final long serialVersionUID = -3531200558118766368L; + + /** + * 发布的域 + */ + private String domainCode; + + public String getDomainCode() { + return domainCode; + } + + public void setDomainCode(String domainCode) { + this.domainCode = domainCode; + } +} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/model/socket/PublishDTO.java b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/PublishDTO.java new file mode 100644 index 0000000000000000000000000000000000000000..e1448bd293c257d15706497d7e1d164235cdca78 --- /dev/null +++ b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/PublishDTO.java @@ -0,0 +1,22 @@ +package cn.icanci.rec.common.model.socket; + +import java.io.Serializable; +import java.util.Set; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 21:17 + */ +public class PublishDTO implements Serializable { + private static final long serialVersionUID = 1223193630071803254L; + + private Set domainCodes; + + public Set getDomainCodes() { + return domainCodes; + } + + public void setDomainCodes(Set domainCodes) { + this.domainCodes = domainCodes; + } +} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/model/socket/RegisterDTO.java b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/RegisterDTO.java new file mode 100644 index 0000000000000000000000000000000000000000..079d96e7dcf34dcac7b92e36f92d8eceae3f34ba --- /dev/null +++ b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/RegisterDTO.java @@ -0,0 +1,57 @@ +package cn.icanci.rec.common.model.socket; + +import java.io.Serializable; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 21:35 + */ +public class RegisterDTO implements Serializable { + private static final long serialVersionUID = -2030921699127783725L; + + /** + * SDK 服务ip地址 + */ + private String clientAddress; + /** + * SDK 服务端口地址 + */ + private int clientPort; + /** + * SDK 服务服务名字 + */ + private String appName; + + public RegisterDTO() { + } + + public RegisterDTO(String clientAddress, int clientPort, String appName) { + this.clientAddress = clientAddress; + this.clientPort = clientPort; + this.appName = appName; + } + + public String getClientAddress() { + return clientAddress; + } + + public void setClientAddress(String clientAddress) { + this.clientAddress = clientAddress; + } + + public int getClientPort() { + return clientPort; + } + + public void setClientPort(int clientPort) { + this.clientPort = clientPort; + } + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } +} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/model/socket/SocketMessage.java b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/SocketMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..a3feaefd31e0965cb8ea403830518ff9cc0c30f0 --- /dev/null +++ b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/SocketMessage.java @@ -0,0 +1,49 @@ +package cn.icanci.rec.common.model.socket; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 21:01 + */ +@SuppressWarnings("all") +public class SocketMessage { + private boolean success; + private String errorMessage; + private T content; + + public static SocketMessage fail(String errorMessage) { + SocketMessage message = new SocketMessage(); + message.setSuccess(false); + message.setErrorMessage(errorMessage); + return message; + } + + public static SocketMessage success() { + SocketMessage message = new SocketMessage(); + message.setSuccess(true); + return message; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public T getContent() { + return content; + } + + public void setContent(T content) { + this.content = content; + } +} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/model/socket/UriConstant.java b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/UriConstant.java new file mode 100644 index 0000000000000000000000000000000000000000..efbb75ac5aa690fa08852cf280ea16f7d5ab4065 --- /dev/null +++ b/rec-common/src/main/java/cn/icanci/rec/common/model/socket/UriConstant.java @@ -0,0 +1,15 @@ +package cn.icanci.rec.common.model.socket; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 21:06 + */ +public interface UriConstant { + + /** 心跳 */ + String heartbeat = "/rec/trigger/heartbeat"; + /** 刷新 */ + String refresh = "/rec/trigger/refresh"; + /** 注册 */ + String register = "/rec/trigger/register"; +} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/result/R.java b/rec-common/src/main/java/cn/icanci/rec/common/result/R.java index 84a2b9bc201ef60a85f75166826c332ee1310a81..fa7e6fca6c5fc96053b57e101a9f7b157ad6cdcf 100644 --- a/rec-common/src/main/java/cn/icanci/rec/common/result/R.java +++ b/rec-common/src/main/java/cn/icanci/rec/common/result/R.java @@ -1,5 +1,6 @@ package cn.icanci.rec.common.result; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.StringJoiner; @@ -10,7 +11,8 @@ import java.util.StringJoiner; * @author icanci * @since 1.0 Created in 2022/04/04 19:11 */ -public class R { +public class R implements Serializable { + private static final long serialVersionUID = -1343013883236338104L; /** 是否成功 */ private boolean ok; /** 错误码 */ @@ -18,7 +20,10 @@ public class R { /** 错误信息 */ private String message; /** 返回前端数据 */ - private Map data = new HashMap<>(); + private Map data = new HashMap<>(); + + public R() { + } /** * Builder diff --git a/rec-common/src/main/java/cn/icanci/rec/common/utils/IPUtils.java b/rec-common/src/main/java/cn/icanci/rec/common/utils/IPUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..ace584a225f1f76c0a3405ea43f2ec34fcae8a0a --- /dev/null +++ b/rec-common/src/main/java/cn/icanci/rec/common/utils/IPUtils.java @@ -0,0 +1,54 @@ +package cn.icanci.rec.common.utils; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; + +/** + * ip 地址工具类 + * + * @author icanci + * @since 1.0 Created in 2022/11/20 15:37 + */ +public class IPUtils { + + /** + * 获取主机地址 + */ + public static String getHostIpAddress() { + String realIp = null; + try { + InetAddress address = InetAddress.getLocalHost(); + // 如果是回环网卡地址, 则获取ipv4地址 + if (address.isLoopbackAddress()) { + address = getInet4Address(); + } + realIp = address.getHostAddress(); + } catch (Exception e) { + // no op + } + return realIp; + } + + /** + * 获取IPV4网络配置 + */ + private static InetAddress getInet4Address() throws SocketException { + // 获取所有网卡信息 + Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces(); + while (networkInterfaces.hasMoreElements()) { + NetworkInterface netInterface = networkInterfaces.nextElement(); + Enumeration addresses = netInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress ip = addresses.nextElement(); + if (ip instanceof Inet4Address) { + return ip; + } + } + } + return null; + } + +} \ No newline at end of file diff --git a/rec-core/src/main/java/cn/icanci/rec/core/facade/RuleExecutorFacade.java b/rec-core/src/main/java/cn/icanci/rec/core/facade/RuleExecutorFacade.java index 64421775aa079ba3aaed60479330e43cb57394a2..28806e928ec580b49c7b38959d1fc002a3638d7f 100644 --- a/rec-core/src/main/java/cn/icanci/rec/core/facade/RuleExecutorFacade.java +++ b/rec-core/src/main/java/cn/icanci/rec/core/facade/RuleExecutorFacade.java @@ -3,10 +3,10 @@ package cn.icanci.rec.core.facade; import cn.icanci.rec.core.model.ExecutorRequest; import cn.icanci.rec.core.model.ExecutorResponse; import cn.icanci.rec.core.utils.ParamUtils; +import cn.icanci.rec.engine.sdk.actuator.RecRuleEngineActuator; import cn.icanci.rec.engine.sdk.actuator.RuleEngineRequest; import cn.icanci.rec.engine.sdk.actuator.RuleEngineResponse; import cn.icanci.rec.engine.sdk.exception.ValidatorException; -import cn.icanci.rec.engine.sdk.rule.EngineExecutor; import javax.annotation.Resource; @@ -26,7 +26,7 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("/rec/core") public class RuleExecutorFacade { @Resource - private EngineExecutor engineExecutor; + private RecRuleEngineActuator recRuleEngineActuator; @PostMapping("doExecutor") public ExecutorResponse doExecutor(@RequestBody ExecutorRequest request) { @@ -35,7 +35,7 @@ public class RuleExecutorFacade { validator(request); // 2.数据转换 RuleEngineRequest req = mapper(request); - RuleEngineResponse execute = engineExecutor.execute(req); + RuleEngineResponse execute = recRuleEngineActuator.executor(req); // 3.执行结果转换 boolean success = execute.isSuccess(); if (success) { diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/RecEngineSDKAutoConfig.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/RecEngineSDKAutoConfig.java index 303f8b3703b13908d16c4c0e555718cbef05f90c..f705a553b8c26938fd97158ff8ce90ab8b3899dd 100644 --- a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/RecEngineSDKAutoConfig.java +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/RecEngineSDKAutoConfig.java @@ -7,6 +7,7 @@ import cn.icanci.rec.engine.sdk.exception.InjectionBeanException; import cn.icanci.rec.engine.sdk.extensions.RecExtensionLoader; import cn.icanci.rec.engine.sdk.extensions.SpringBean; import cn.icanci.rec.engine.sdk.properties.RecProperties; +import cn.icanci.rec.engine.sdk.rule.repository.EngineRepositoryHolder; import cn.icanci.rec.engine.sdk.spi.*; import java.lang.reflect.Field; @@ -14,7 +15,9 @@ import java.util.Set; import javax.annotation.Resource; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; @@ -34,14 +37,19 @@ import com.google.common.collect.Sets; @ComponentScan({ "cn.icanci.rec.engine.sdk" }) @EnableConfigurationProperties(RecProperties.class) //@ConditionalOnClass(RuleAggregationCluster.class) -public class RecEngineSDKAutoConfig implements ApplicationContextAware, CommandLineRunner { +public class RecEngineSDKAutoConfig implements ApplicationContextAware, CommandLineRunner, InitializingBean { + + @Resource + private RecProperties recProperties; + @Resource + private DomainSPI domainSPI; @Resource - private RecProperties recProperties; + private EngineRepositoryHolder engineRepositoryHolder; /** * Spring 上下文 */ - private ApplicationContext context; + private ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { @@ -180,4 +188,21 @@ public class RecEngineSDKAutoConfig implements ApplicationContextAware, CommandL public void run(String... args) throws Exception { // no op } + + @Override + public void afterPropertiesSet() throws Exception { + // 项目启动加载缓存 + boolean loadAll = recProperties.isLoadAll(); + if (loadAll) { + Set domainCodes = domainSPI.loadAllDomainCodes(); + engineRepositoryHolder.refresh(domainCodes); + return; + } + + String domain = recProperties.getDomain(); + if (StringUtils.isNotBlank(domain)) { + Set domainCodes = Sets.newHashSet(domain.replaceAll("\\s*", StringUtils.EMPTY).split(",")); + engineRepositoryHolder.refresh(domainCodes); + } + } } diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineConfigHolder.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineConfigHolder.java index 7934cd016329d726a4c13eba55d3ba54fe3ccc94..f26ea00ff1859aa4a2f8212cc0bc9efe21ab9a44 100644 --- a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineConfigHolder.java +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineConfigHolder.java @@ -1,7 +1,7 @@ package cn.icanci.rec.engine.sdk.context; /** - * 规则引擎上下文配置信息 + * TODO 规则引擎上下文配置信息 * * @author icanci * @since 1.0 Created in 2022/11/13 15:31 diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineContext.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineContext.java index 9d5ada6834126670981d7c0e96822765d278b0ad..ae42f16edd040ccf1a14b19c3dbac40713eb54a7 100644 --- a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineContext.java +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/context/RecRuleEngineContext.java @@ -1,7 +1,7 @@ package cn.icanci.rec.engine.sdk.context; /** - * 规则引擎上下文信息 + * TODO 规则引擎上下文信息 * * @author icanci * @since 1.0 Created in 2022/11/13 15:31 diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecClient.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecClient.java index 65bf19ece01656f826c5595fcc2b3609a8366b71..887019fa7340512ef097b94c20221590025bcd1c 100644 --- a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecClient.java +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecClient.java @@ -5,16 +5,10 @@ package cn.icanci.rec.engine.sdk.properties; * @since 1.0 Created in 2022/11/20 17:34 */ public class RecClient { - private String ip; - private int port; - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } + /** + * 客户端向服务端暴露的服务端口 + */ + private int port; public int getPort() { return port; diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecProperties.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecProperties.java index 6bc351d12c9e9d71c8b4a0a13b26da1111168697..d3bf921f556f7f26e0259c0a184a739adb608a6f 100644 --- a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecProperties.java +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecProperties.java @@ -11,6 +11,11 @@ import org.springframework.stereotype.Component; @ConfigurationProperties(prefix = "rec") public class RecProperties { + /** + * AppName信息 + */ + private String appName; + /** * 需要加载的域Code * 以英文,分隔 @@ -37,6 +42,14 @@ public class RecProperties { */ private RecServer server; + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + public String getDomain() { return domain; } diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecServer.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecServer.java index 62b576c7b267c32924fe164fefdd527541e4c105..3d994789abda756dc93ab747c636aba354512c97 100644 --- a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecServer.java +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/properties/RecServer.java @@ -5,8 +5,15 @@ package cn.icanci.rec.engine.sdk.properties; * @since 1.0 Created in 2022/11/20 17:35 */ public class RecServer { + /** + * 服务端IP + * 以英文,分隔 + */ private String ip; - private int port; + /** + * 服务端端口 + */ + private int port; public String getIp() { return ip; diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/rule/repository/EngineRepositoryHolder.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/rule/repository/EngineRepositoryHolder.java index ad3bb4b44a0e26fa12e87e860c7224ff38cdb898..ba74e6d71ec713380eb862670137c626be2c8366 100644 --- a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/rule/repository/EngineRepositoryHolder.java +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/rule/repository/EngineRepositoryHolder.java @@ -5,6 +5,7 @@ import cn.icanci.rec.common.enums.DataSourceTypeEnum; import cn.icanci.rec.common.enums.ScriptTypeEnum; import cn.icanci.rec.engine.script.RecScriptEngine; import cn.icanci.rec.engine.sdk.rule.EngineRepositoryLoader; +import cn.icanci.rec.engine.sdk.server.RecNettyClientHandler; import java.util.List; import java.util.Map; @@ -18,6 +19,7 @@ import javax.script.CompiledScript; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; import com.google.common.collect.Maps; @@ -27,7 +29,7 @@ import com.google.common.collect.Maps; * @since 1.0 Created in 2022/11/16 08:38 */ @Service -public class EngineRepositoryHolder { +public class EngineRepositoryHolder implements InitializingBean { Logger logger = LoggerFactory.getLogger(EngineRepositoryHolder.class); @Resource private EngineRepositoryLoader engineRepositoryLoader; @@ -118,7 +120,9 @@ public class EngineRepositoryHolder { repository.getDataSourceRepository().putAll(refreshDataSources); repository.getStrategyRepository().putAll(refreshStrategies); } catch (Throwable e) { + // 有一个刷新失败,则中断启动 logger.error("[EngineRepositoryHolder][refresh] refresh domain:{} fail,error message:{}", domain, e.getMessage()); + throw e; } } @@ -281,4 +285,9 @@ public class EngineRepositoryHolder { .orElse(Maps.newHashMap()) // .get(new DomainSceneKey(domainCode, sceneCode)); } + + @Override + public void afterPropertiesSet() throws Exception { + RecNettyClientHandler.setEngineRepositoryHolder(this); + } } diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RecNettyClient.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RecNettyClient.java new file mode 100644 index 0000000000000000000000000000000000000000..bc742d91083836abd1263c30ce78be289c55e6cb --- /dev/null +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RecNettyClient.java @@ -0,0 +1,112 @@ +package cn.icanci.rec.engine.sdk.server; + +import cn.icanci.rec.engine.sdk.exception.ValidatorException; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 18:17 + */ +public class RecNettyClient { + + private static final Logger logger = LoggerFactory.getLogger(RecNettyClient.class); + + private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(40, // + 200, // + 60L, // + TimeUnit.SECONDS, // + new LinkedBlockingQueue<>(2000), // + runnable -> new Thread(runnable, "RECServerThreadPool-" + runnable.hashCode()), // + (r, executor) -> { + throw new RuntimeException("RECServerThreadPool is EXHAUSTED!"); + }); + + private static RegisterService registerService; + + public static void setRegisterService(RegisterService registerService) { + RecNettyClient.registerService = registerService; + } + + /** + * 需要将本SDK所在的客户端注册到服务端 + * + * @param serverAddress 服务端ip地址 + * @param serverPort 服务端端口 + * @param clientPort 客户端端口 + * @param appName 客户端服务名 + */ + public static void startClient(String serverAddress, int serverPort, int clientPort, String appName) { + if (StringUtils.isBlank(appName)) { + throw new ValidatorException("App Name cannot be empty"); + } + startClient0(serverAddress, serverPort, clientPort, appName); + } + + private static void startClient0(String serverAddress, int serverPort, int clientPort, String appName) { + Thread recThread = new Thread(() -> { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + ServerBootstrap bootstrap = new ServerBootstrap(); + + bootstrap.group(bossGroup, workerGroup) // + .channel(NioServerSocketChannel.class) // + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS)); + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(5 * 1024 * 1024)); + pipeline.addLast(new RecNettyClientHandler(pool)); + } + }).childOption(ChannelOption.SO_KEEPALIVE, true); + + try { + ChannelFuture future = bootstrap.bind(clientPort).sync(); + + doRegistry(serverAddress, serverPort, clientPort, appName); + + future.channel().closeFuture().sync(); + + } catch (InterruptedException e) { + logger.info("REC remoting server interruptedException", e); + } catch (Exception e) { + logger.info("REC remoting server error", e); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + + }); + recThread.setDaemon(true); + recThread.start(); + } + + /** + * 将SDK所在服务注册到注册中心 + * + * @param serverAddress 服务端ip地址 + * @param serverPort 服务端端口 + * @param appName 客户端服务名 + */ + private static void doRegistry(String serverAddress, int serverPort, int clientPort, String appName) { + registerService.register(serverAddress, serverPort, clientPort, appName); + } + +} diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RecNettyClientHandler.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RecNettyClientHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..fb43a9c7767ad341681e44febdf28787db5b5fbd --- /dev/null +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RecNettyClientHandler.java @@ -0,0 +1,119 @@ +package cn.icanci.rec.engine.sdk.server; + +import cn.icanci.rec.common.model.socket.PublishDTO; +import cn.icanci.rec.common.model.socket.SocketMessage; +import cn.icanci.rec.common.model.socket.UriConstant; +import cn.icanci.rec.common.utils.FastJsonUtils; +import cn.icanci.rec.engine.sdk.rule.repository.EngineRepositoryHolder; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.*; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; +import io.netty.util.internal.ThrowableUtil; + +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REC的网络交互参考了xxl-job的设计 + * - xxl-job的网络设计比较简洁,此处就没有造轮子了 + * + * @author icanci + * @since 1.0 Created in 2022/11/20 18:22 + */ +public class RecNettyClientHandler extends SimpleChannelInboundHandler { + + private static final Logger logger = LoggerFactory.getLogger(RecNettyClientHandler.class); + + private static EngineRepositoryHolder engineRepositoryHolder; + + private final ThreadPoolExecutor pool; + + public RecNettyClientHandler(ThreadPoolExecutor pool) { + this.pool = pool; + } + + public static void setEngineRepositoryHolder(EngineRepositoryHolder engineRepositoryHolder) { + RecNettyClientHandler.engineRepositoryHolder = engineRepositoryHolder; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { + String requestData = msg.content().toString(CharsetUtil.UTF_8); + String uri = msg.uri(); + HttpMethod httpMethod = msg.method(); + boolean keepAlive = HttpUtil.isKeepAlive(msg); + // 对于这种配置数据,会有很频繁的变更 + pool.execute(() -> { + Object responseObj = process(httpMethod, uri, requestData); + + String responseJson = FastJsonUtils.toJSONString(responseObj); + + writeResponse(ctx, keepAlive, responseJson); + }); + } + + private Object process(HttpMethod httpMethod, String uri, String requestData) { + if (HttpMethod.POST != httpMethod) { + return SocketMessage.fail("Only post requests are supported"); + } + if (StringUtils.isBlank(uri)) { + return SocketMessage.fail("Request uri is null"); + } + try { + switch (uri) { + case UriConstant.heartbeat: + return SocketMessage.success(); + case UriConstant.refresh: + PublishDTO publish = FastJsonUtils.fromJSONString(requestData, PublishDTO.class); + Set domainCodes = publish.getDomainCodes(); + + if (CollectionUtils.isEmpty(domainCodes)) { + return SocketMessage.fail("The domain code list to refresh is empty"); + } + engineRepositoryHolder.refresh(domainCodes); + default: + return SocketMessage.fail("Invalid request, uri-mapping(" + uri + ") not found"); + } + } catch (Exception e) { + return SocketMessage.fail(ThrowableUtil.stackTraceToString(e)); + } + } + + private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + if (keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } + ctx.writeAndFlush(response); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error(ThrowableUtil.stackTraceToString(cause)); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } +} diff --git a/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RegisterService.java b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RegisterService.java new file mode 100644 index 0000000000000000000000000000000000000000..fcbad261c52f2ac116052d0b5ca1d3615a883181 --- /dev/null +++ b/rec-engine/rec-engine-sdk/src/main/java/cn/icanci/rec/engine/sdk/server/RegisterService.java @@ -0,0 +1,82 @@ +package cn.icanci.rec.engine.sdk.server; + +import cn.icanci.rec.common.model.socket.RegisterDTO; +import cn.icanci.rec.common.model.socket.UriConstant; +import cn.icanci.rec.common.result.R; +import cn.icanci.rec.common.utils.FastJsonUtils; +import cn.icanci.rec.common.utils.IPUtils; +import cn.icanci.rec.engine.script.client.Client; +import cn.icanci.rec.engine.script.client.RemoteException; +import cn.icanci.rec.engine.script.client.http.HttpMethod; +import cn.icanci.rec.engine.script.client.http.OkHttpClientImpl; +import cn.icanci.rec.engine.script.client.serializer.SerializerEnum; +import cn.icanci.rec.engine.sdk.properties.RecClient; +import cn.icanci.rec.engine.sdk.properties.RecProperties; +import cn.icanci.rec.engine.sdk.properties.RecServer; + +import java.util.concurrent.TimeUnit; + +import javax.annotation.Resource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Maps; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/20 21:29 + */ +@Service +public class RegisterService implements InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(RegisterService.class); + /** http实例 */ + private static final Client CLIENT = OkHttpClientImpl.getInstance(); + @Resource + private RecProperties recProperties; + /** DEFAULT_APPLICATION_JSON_VALUE */ + public static final String DEFAULT_APPLICATION_JSON_VALUE = "application/json"; + + /** 注册请求地址 */ + private static final String REQ_URL_FORMAT = "http://%s:%s%s"; + + /** + * 将SDK所在服务注册到注册中心 + * + * @param serverAddress 服务端ip地址 + * @param serverPort 服务端端口 + * @param appName 客户端服务名 + */ + public void register(String serverAddress, int serverPort, int clientPort, String appName) { + try { + String[] addresses = serverAddress.split(","); + for (String address : addresses) { + RegisterDTO registerDTO = new RegisterDTO(IPUtils.getHostIpAddress(), clientPort, appName); + String reqUrl = String.format(REQ_URL_FORMAT, address, serverPort, UriConstant.register); + + Client.RpcRequest rpcRequest = new Client.RpcRequest(registerDTO, DEFAULT_APPLICATION_JSON_VALUE, Maps.newHashMap(), reqUrl, HttpMethod.POST.name(), false, 3, + TimeUnit.SECONDS, 3, SerializerEnum.FASTJSON, SerializerEnum.FASTJSON); + + R call = CLIENT.call(rpcRequest, R.class); + + logger.info("Register result:{}", FastJsonUtils.toJSONString(call)); + } + } catch (RemoteException e) { + logger.warn("Register RemoteException:{}", e.getMessage()); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + + // 注入注册bean + RecNettyClient.setRegisterService(this); + + // 启动服务器 + RecServer server = recProperties.getServer(); + RecClient client = recProperties.getClient(); + RecNettyClient.startClient(server.getIp(), server.getPort(), client.getPort(), recProperties.getAppName()); + } +}