From c6eecb91c30f039a43c5449bd6fd0e967ce09d43 Mon Sep 17 00:00:00 2001 From: icanci Date: Sat, 26 Nov 2022 18:30:33 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rec/admin/biz/service/LockService.java | 10 +- .../biz/service/impl/LockServiceImpl.java | 34 +++++++ .../rec/admin/biz/thread/TriggerThread.java | 20 +++- .../admin/biz/thread/TriggerThreadStart.java | 31 ++++++ .../dal/mongodb/daointerface/LockDAO.java | 39 ++++++++ .../admin/dal/mongodb/dateobject/LockDO.java | 47 +++++++++ .../admin/dal/mongodb/mongo/MongoLockDAO.java | 96 +++++++++++++++++++ .../icanci/rec/common/model/lock/LockVO.java | 44 +++++++++ .../rec/common/utils/DateFormatUtils.java | 10 -- .../utils/GenerateRegisterKeyUtils.java | 16 ++++ 10 files changed, 327 insertions(+), 20 deletions(-) create mode 100644 rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/LockServiceImpl.java create mode 100644 rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThreadStart.java create mode 100644 rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/LockDAO.java create mode 100644 rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java create mode 100644 rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java create mode 100644 rec-common/src/main/java/cn/icanci/rec/common/model/lock/LockVO.java delete mode 100644 rec-common/src/main/java/cn/icanci/rec/common/utils/DateFormatUtils.java create mode 100644 rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/LockService.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/LockService.java index fc893be..3b1bbf9 100644 --- a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/LockService.java +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/LockService.java @@ -6,21 +6,15 @@ package cn.icanci.rec.admin.biz.service; * Tips: 分布式锁的实现是必要的,因为客户端不知道有多少;服务端也不知道有多少 * 所以在进行心跳检测的时候,需要这样进行处理,否则会消耗无所谓的带宽 * - * https://cloud.tencent.com/developer/article/1856998 * @author icanci * @since 1.0 Created in 2022/11/25 21:10 */ public interface LockService { - /** - * 加锁 - * - * @param key key - * @param expiration expiration - * @return 返回Token - */ + String acquire(String key, long expiration); boolean release(String key, String token); boolean refresh(String key, String token, long expiration); + } diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/LockServiceImpl.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/LockServiceImpl.java new file mode 100644 index 0000000..ebad0ec --- /dev/null +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/service/impl/LockServiceImpl.java @@ -0,0 +1,34 @@ +package cn.icanci.rec.admin.biz.service.impl; + +import cn.icanci.rec.admin.biz.service.LockService; +import cn.icanci.rec.admin.dal.mongodb.daointerface.LockDAO; + +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/26 16:37 + */ +@Service +public class LockServiceImpl implements LockService { + + @Resource + private LockDAO lockDAO; + + @Override + public String acquire(String key, long expiration) { + return lockDAO.lock(key, expiration); + } + + @Override + public boolean release(String key, String token) { + return lockDAO.release(key, token); + } + + @Override + public boolean refresh(String key, String token, long expiration) { + return lockDAO.refresh(key, token, expiration); + } +} diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java index 96ded62..25ee1c7 100644 --- a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThread.java @@ -1,10 +1,12 @@ package cn.icanci.rec.admin.biz.thread; +import cn.icanci.rec.admin.biz.service.LockService; import cn.icanci.rec.admin.biz.service.RegisterService; import cn.icanci.rec.common.model.config.RegisterVO; import cn.icanci.rec.common.model.socket.PublishDTO; import cn.icanci.rec.common.model.socket.SocketMessage; import cn.icanci.rec.common.model.socket.UriConstant; +import cn.icanci.rec.common.utils.GenerateRegisterKeyUtils; import cn.icanci.rec.engine.script.client.Client; import cn.icanci.rec.engine.script.client.RemoteException; import cn.icanci.rec.engine.script.client.http.HttpMethod; @@ -20,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +49,8 @@ public class TriggerThread { private static RegisterService registerService; + private static LockService lockService; + private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); private static final int REGISTER_TIME_OUT = 90; @@ -70,8 +75,10 @@ public class TriggerThread { public static void setRegisterService(RegisterService registerService) { TriggerThread.registerService = registerService; + } - start(); + public static void setLockService(LockService lockService) { + TriggerThread.lockService = lockService; } /** @@ -164,8 +171,15 @@ public class TriggerThread { @Override @SuppressWarnings("all") public void run() { + // 加锁 + String key = GenerateRegisterKeyUtils.generateKey(register); + String token = lockService.acquire(key, 3000); try { - // TODO 加分布式锁 + + if (StringUtils.isBlank(token)) { + return; + } + String address = register.getClientAddress(); int port = register.getClientPort(); String reqUrl = String.format(HEARTBEAT_REQUEST_FORMAT, address, port); @@ -187,6 +201,8 @@ public class TriggerThread { } catch (Throwable ex) { // no op logger.error("[HeartbeatRunner][Throwable] error message:{}", ex.getMessage()); + } finally { + lockService.release(key, token); } } } diff --git a/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThreadStart.java b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThreadStart.java new file mode 100644 index 0000000..43a09dd --- /dev/null +++ b/rec-admin/rec-admin-biz/src/main/java/cn/icanci/rec/admin/biz/thread/TriggerThreadStart.java @@ -0,0 +1,31 @@ +package cn.icanci.rec.admin.biz.thread; + +import cn.icanci.rec.admin.biz.service.LockService; +import cn.icanci.rec.admin.biz.service.RegisterService; + +import javax.annotation.Resource; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Service; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/26 17:34 + */ +@Service +public class TriggerThreadStart implements InitializingBean { + @Resource + private RegisterService registerService; + @Resource + private LockService lockService; + + @Override + public void afterPropertiesSet() throws Exception { + + TriggerThread.setRegisterService(registerService); + + TriggerThread.setLockService(lockService); + + TriggerThread.start(); + } +} diff --git a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/LockDAO.java b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/LockDAO.java new file mode 100644 index 0000000..12ac43f --- /dev/null +++ b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/daointerface/LockDAO.java @@ -0,0 +1,39 @@ +package cn.icanci.rec.admin.dal.mongodb.daointerface; + +import cn.icanci.rec.admin.dal.mongodb.dateobject.LockDO; + +/** + * 分布式锁实现参考:https://www.cnblogs.com/xiaoqi/p/mongodb-lock.html + * + * @author icanci + * @since 1.0 Created in 2022/11/26 16:39 + */ +public interface LockDAO { + /** + * 文档对应的名字 + */ + String COLLECTION_NAME = "rec-lock"; + /** + * 文档对应的Class + */ + Class COLLECTION_CLASS = LockDO.class; + + String lock(String key, long expireTime); + + boolean release(String key, String token); + + boolean refresh(String key, String token, long expiration); + + interface LockColumn { + /** id */ + String _id = "_id"; + /** key */ + String key = "key"; + /** expireAt */ + String expireAt = "expireAt"; + /** token */ + String token = "token"; + /** env */ + String env = "env"; + } +} diff --git a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java new file mode 100644 index 0000000..614419c --- /dev/null +++ b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java @@ -0,0 +1,47 @@ +package cn.icanci.rec.admin.dal.mongodb.dateobject; + +import org.springframework.data.annotation.Id; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/26 16:39 + */ +public class LockDO { + @Id + private String id; + private long expireAt; + private String token; + private String env; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public long getExpireAt() { + return expireAt; + } + + public void setExpireAt(long expireAt) { + this.expireAt = expireAt; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + + public String getEnv() { + return env; + } + + public void setEnv(String env) { + this.env = env; + } +} diff --git a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java new file mode 100644 index 0000000..6604aa3 --- /dev/null +++ b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java @@ -0,0 +1,96 @@ +package cn.icanci.rec.admin.dal.mongodb.mongo; + +import cn.icanci.rec.admin.dal.mongodb.daointerface.LockDAO; +import cn.icanci.rec.admin.dal.mongodb.dateobject.LockDO; +import cn.icanci.rec.admin.dal.utils.EnvUtils; +import cn.icanci.rec.admin.dal.utils.IDHolder; + +import javax.annotation.Resource; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.mongodb.core.FindAndModifyOptions; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.stereotype.Service; + +import com.mongodb.client.result.DeleteResult; +import com.mongodb.client.result.UpdateResult; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/26 16:54 + */ +@Service("lockDAO") +public class MongoLockDAO implements LockDAO { + private static final Logger logger = LoggerFactory.getLogger(MongoLockDAO.class); + + @Resource + protected MongoTemplate mongoTemplate; + + @Override + public String lock(String key, long expireTime) { + Query query = Query.query(Criteria.where(LockColumn.key).is(key)); + String token = IDHolder.generateNoBySnowFlake("LOCK"); + Update update = new Update().setOnInsert(LockColumn.key, key).setOnInsert(LockColumn.env, EnvUtils.getEnv()) + .setOnInsert(LockColumn.expireAt, System.currentTimeMillis() + expireTime).setOnInsert(LockColumn.token, token); + + FindAndModifyOptions options = new FindAndModifyOptions().upsert(true).returnNew(true); + LockDO lock = mongoTemplate.findAndModify(query, update, options, COLLECTION_CLASS, COLLECTION_NAME); + if (lock == null) { + return StringUtils.EMPTY; + } + boolean locked = StringUtils.equals(token, lock.getToken()); + + // 如果已过期 + if (!locked && lock.getExpireAt() < System.currentTimeMillis()) { + DeleteResult deleted = this.mongoTemplate.remove( + Query.query(Criteria.where(LockColumn.key).is(key).and(LockColumn.token).is(lock.getToken()).and(LockColumn.expireAt).is(lock.getExpireAt())), COLLECTION_CLASS, + COLLECTION_NAME); + if (deleted.getDeletedCount() >= 1) { + // 成功释放锁, 再次尝试获取锁 + return lock(key, expireTime); + } + } + + return locked ? token : StringUtils.EMPTY; + } + + @Override + public boolean release(String key, String token) { + Query query = Query.query(Criteria.where(LockColumn.key).is(key).and(LockColumn.token).is(token).and(LockColumn.env).is(EnvUtils.getEnv())); + DeleteResult deleted = mongoTemplate.remove(query, COLLECTION_CLASS, COLLECTION_NAME); + boolean released = deleted.getDeletedCount() == 1; + if (released) { + logger.info("Remove query successfully affected 1 record for key {} with token {}", key, token); + } else if (deleted.getDeletedCount() > 0) { + logger.error("Unexpected result from release for key {} with token {}, released {}", key, token, deleted); + } else { + logger.error("Remove query did not affect any records for key {} with token {}", key, token); + } + return released; + } + + @Override + public boolean refresh(String key, String token, long expiration) { + Query query = Query.query(Criteria.where(LockColumn.key).is(key).and(LockColumn.token).is(token).and(LockColumn.env).is(EnvUtils.getEnv())); + Update update = Update.update(LockColumn.expireAt, System.currentTimeMillis() + expiration); + UpdateResult updated = mongoTemplate.updateFirst(query, update, COLLECTION_CLASS, COLLECTION_NAME); + + final boolean refreshed = updated.getModifiedCount() == 1; + if (refreshed) { + logger.info("Refresh query successfully affected 1 record for key {} " + "with token {}", key, token); + } else if (updated.getModifiedCount() > 0) { + logger.error("Unexpected result from refresh for key {} with token {}, " + "released {}", key, token, updated); + } else { + logger.warn("Refresh query did not affect any records for key {} with token {}. " + "This is possible when refresh interval fires for the final time " + + "after the lock has been released", + key, token); + } + return refreshed; + } + +} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/model/lock/LockVO.java b/rec-common/src/main/java/cn/icanci/rec/common/model/lock/LockVO.java new file mode 100644 index 0000000..9184d32 --- /dev/null +++ b/rec-common/src/main/java/cn/icanci/rec/common/model/lock/LockVO.java @@ -0,0 +1,44 @@ +package cn.icanci.rec.common.model.lock; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/26 16:38 + */ +public class LockVO { + /** + * 文档id + */ + private String id; + /** + * 过期时间 + */ + private long expireAt; + /** + * token + */ + private String token; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public long getExpireAt() { + return expireAt; + } + + public void setExpireAt(long expireAt) { + this.expireAt = expireAt; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } +} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/utils/DateFormatUtils.java b/rec-common/src/main/java/cn/icanci/rec/common/utils/DateFormatUtils.java deleted file mode 100644 index 6db0a6c..0000000 --- a/rec-common/src/main/java/cn/icanci/rec/common/utils/DateFormatUtils.java +++ /dev/null @@ -1,10 +0,0 @@ -package cn.icanci.rec.common.utils; - -/** - * TODO 为脚本执行提供一些公共的日期方法 - * - * @author icanci - * @since 1.0 Created in 2022/11/13 15:23 - */ -public class DateFormatUtils { -} diff --git a/rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java b/rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java new file mode 100644 index 0000000..24e8a17 --- /dev/null +++ b/rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java @@ -0,0 +1,16 @@ +package cn.icanci.rec.common.utils; + +import cn.icanci.rec.common.model.config.RegisterVO; + +/** + * @author icanci + * @since 1.0 Created in 2022/11/26 17:38 + */ +public class GenerateRegisterKeyUtils { + + private static final String FORMAT = "%s#%s#%s"; + + public static String generateKey(RegisterVO register) { + return String.format(FORMAT, register.getClientAddress(), register.getClientPort(), register.getAppName()); + } +} -- Gitee From eb9d6f92a43c251d700157a4f3184671bab92f2b Mon Sep 17 00:00:00 2001 From: icanci Date: Sat, 26 Nov 2022 18:40:08 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=9F=BA=E4=BA=8EMongoDB=E7=9A=84=E5=88=86?= =?UTF-8?q?=E5=B8=83=E5=BC=8F=E9=94=81=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../icanci/rec/admin/dal/mongodb/dateobject/LockDO.java | 9 +++++++++ .../icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java | 2 +- .../rec/common/utils/GenerateRegisterKeyUtils.java | 4 ++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java index 614419c..89df80d 100644 --- a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java +++ b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/dateobject/LockDO.java @@ -11,6 +11,7 @@ public class LockDO { private String id; private long expireAt; private String token; + private String key; private String env; public String getId() { @@ -37,6 +38,14 @@ public class LockDO { this.token = token; } + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + public String getEnv() { return env; } diff --git a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java index 6604aa3..0d8b87d 100644 --- a/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java +++ b/rec-admin/rec-admin-dal/src/main/java/cn/icanci/rec/admin/dal/mongodb/mongo/MongoLockDAO.java @@ -69,7 +69,7 @@ public class MongoLockDAO implements LockDAO { } else if (deleted.getDeletedCount() > 0) { logger.error("Unexpected result from release for key {} with token {}, released {}", key, token, deleted); } else { - logger.error("Remove query did not affect any records for key {} with token {}", key, token); + logger.warn("Remove query did not affect any records for key {} with token {}", key, token); } return released; } diff --git a/rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java b/rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java index 24e8a17..b7cdd2b 100644 --- a/rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java +++ b/rec-common/src/main/java/cn/icanci/rec/common/utils/GenerateRegisterKeyUtils.java @@ -8,9 +8,9 @@ import cn.icanci.rec.common.model.config.RegisterVO; */ public class GenerateRegisterKeyUtils { - private static final String FORMAT = "%s#%s#%s"; + private static final String FORMAT = "%s#%s#%s#%s"; public static String generateKey(RegisterVO register) { - return String.format(FORMAT, register.getClientAddress(), register.getClientPort(), register.getAppName()); + return String.format(FORMAT, register.getAppName(), register.getClientAddress(), register.getClientPort(), register.getDomain()); } } -- Gitee