diff --git a/db/dm8/module/maku-module-iot.sql b/db/dm8/module/maku-module-iot.sql index 78d560ab8d485bd8d60ae74626fbc240af1b4366..6c5efafbcb3630b3b36d2fb816311e61d5080002 100644 --- a/db/dm8/module/maku-module-iot.sql +++ b/db/dm8/module/maku-module-iot.sql @@ -10,6 +10,7 @@ CREATE TABLE iot_device ( temperature varchar(10), status int, running_status int DEFAULT 0, + protocol_type varchar(20), up_time datetime, down_time datetime, tenant_id bigint, @@ -33,6 +34,7 @@ COMMENT ON COLUMN iot_device.app_version IS 'App版本号'; COMMENT ON COLUMN iot_device.battery_percent IS '电池电量百分比'; COMMENT ON COLUMN iot_device.status IS '状态,0禁用,1启用'; COMMENT ON COLUMN iot_device.running_status IS '运行状态,0.离线状态 1.在线状态 2.正常待机 3.用户使用中 4.OTA升级中'; +COMMENT ON COLUMN iot_device.protocol_type IS '协议类型'; COMMENT ON COLUMN iot_device.up_time IS '上线时间'; COMMENT ON COLUMN iot_device.down_time IS '下线时间'; COMMENT ON COLUMN iot_device.tenant_id IS '租户ID'; diff --git a/db/mysql/module/maku-module-iot.sql b/db/mysql/module/maku-module-iot.sql index 7d307ed2f125d115953d010418b8407395d1032b..8bfd5f02644ed35d251e8e31cc77b75a21aba19b 100644 --- a/db/mysql/module/maku-module-iot.sql +++ b/db/mysql/module/maku-module-iot.sql @@ -11,6 +11,7 @@ CREATE TABLE iot_device ( temperature varchar(10) DEFAULT NULL COMMENT '温度', status tinyint NOT NULL DEFAULT '1' COMMENT '状态,0禁用,1启用', running_status int NOT NULL DEFAULT '0' COMMENT '运行状态,0.离线状态 1.在线状态 2.正常待机 3.用户使用中 4.OTA升级中', + protocol_type varchar(20) NOT NULL DEFAULT 'MQTT' COMMENT '协议类型', up_time datetime DEFAULT NULL COMMENT '上线时间', down_time datetime DEFAULT NULL COMMENT '下线时间', tenant_id bigint DEFAULT NULL COMMENT '租户ID', @@ -94,6 +95,17 @@ INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, r INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '登出', 'SIGN_OFF', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now()); INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'OTA升级', 'OTA_UPGRADE', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_protocol_type', '设备协议类型', '设备协议类型', 0, 10000, 0, 0, 10000, now(), 10000, now() ); +SET @typeId = @@identity; +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'MQTT', 'MQTT', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'TCP', 'TCP', NULL, NULL, 1, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'UDP', 'UDP', NULL, NULL, 2, NULL, 0, 1, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'BLE', 'BLE', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'CoAP', 'CoAP', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'LwM2M', 'LwM2M', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'Modbus', 'Modbus', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); + + INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_property', '设备属性', '设备通用属性:运行状态|APP版本|电池电量百分比|温度', 0, 10000, 0, 0, 10000, now(), 10000, now() ); SET @typeId = @@identity; INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '运行状态', 'RUNNING_STATUS', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now()); diff --git a/db/postgresql/module/maku-module-iot.sql b/db/postgresql/module/maku-module-iot.sql index d93d5529f187202cd2f8a4a0138c75c0b9d9ee51..7ad3984906616a7f9c09b61485ebf6f059cea8fa 100644 --- a/db/postgresql/module/maku-module-iot.sql +++ b/db/postgresql/module/maku-module-iot.sql @@ -10,6 +10,7 @@ CREATE TABLE iot_device ( temperature varchar(10), status int, running_status int DEFAULT 0, + protocol_type varchar(20), up_time timestamp, down_time timestamp, tenant_id int8, @@ -33,6 +34,7 @@ COMMENT ON COLUMN iot_device.app_version IS 'App版本号'; COMMENT ON COLUMN iot_device.battery_percent IS '电池电量百分比'; COMMENT ON COLUMN iot_device.status IS '状态,0禁用,1启用'; COMMENT ON COLUMN iot_device.running_status IS '运行状态,0.离线状态 1.在线状态 2.正常待机 3.用户使用中 4.OTA升级中'; +COMMENT ON COLUMN iot_device.protocol_type IS '协议类型'; COMMENT ON COLUMN iot_device.up_time IS '上线时间'; COMMENT ON COLUMN iot_device.down_time IS '下线时间'; COMMENT ON COLUMN iot_device.tenant_id IS '租户ID'; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java similarity index 72% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java index 2b01d9523d14b21b394d37238b50cf6104df7316..db67d94f9e2eae999b7595cd455412346e755ed8 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -9,7 +9,7 @@ import lombok.Data; * @author eden on 2024/6/17 */ @Data -public class BaseCommandResponse { +public class BaseCommandResponseDTO extends BaseDeviceID { /** * 命令ID */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java new file mode 100644 index 0000000000000000000000000000000000000000..5b323373d30f58ccffaf6d9ab7d966e288d14544 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java @@ -0,0 +1,17 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * 设备ID + * + * @author LSF maku_lsf@163.com + */ +@Data +@Schema(description = "设备ID") +public class BaseDeviceID { + + @Schema(description = "设备ID") + protected String deviceId; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/CommandResponseChan.java similarity index 41% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/CommandResponseChan.java index 79369653642b2c47c95dcf226baaa87bf17a729e..adb1adfeb83314560aef29187295ea08fa04573f 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/CommandResponseChan.java @@ -1,23 +1,28 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.dto; +import lombok.extern.slf4j.Slf4j; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * 数据生产消费者 + * 数据生产消费者通道 */ -public class Chan { +@Slf4j +public class CommandResponseChan { // 存储通道的 ConcurrentHashMap - private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); - private final CompletableFuture future = new CompletableFuture<>(); + private final CompletableFuture future = new CompletableFuture<>(); + + private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L; // 私有构造函数,不允许外部直接实例化 - private Chan() { + private CommandResponseChan() { } /** @@ -27,11 +32,21 @@ public class Chan { * @param isNeedCreate 是否需要创建新的通道实例 * @return 通道实例 */ - public static Chan getInstance(String commandId, boolean isNeedCreate) { + public static CommandResponseChan getInstance(String commandId, boolean isNeedCreate) { if (!isNeedCreate) { return CHANNEL.get(commandId); } - return CHANNEL.computeIfAbsent(commandId, k -> new Chan()); + return CHANNEL.computeIfAbsent(commandId, k -> new CommandResponseChan()); + } + + /** + * 从通道中获取数据,默认超时时间为 5 秒 + * + * @param commandId 通道标识 + * @return 获取的数据,如果超时返回 null + */ + public BaseCommandResponseDTO get(String commandId) { + return get(commandId, DEFAULT_WAIT_MILLISECONDS); } /** @@ -41,17 +56,24 @@ public class Chan { * @param timeout 超时时间(毫秒) * @return 获取的数据,如果超时返回 null */ - public Object get(String commandId, long timeout) { - Chan chan = CHANNEL.get(commandId); - if (Objects.isNull(chan)) { + public BaseCommandResponseDTO get(String commandId, long timeout) { + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { return null; } try { - return chan.future.get(timeout, TimeUnit.MILLISECONDS); + return channel.future.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // 超时异常处理 + log.error("Device response timeout. {}", commandId); + return null; } catch (Exception e) { + // 其他异常处理 + e.printStackTrace(); return null; } finally { - CHANNEL.remove(commandId, chan); + // 确保在获取数据后移除通道 + CHANNEL.remove(commandId, channel); } } @@ -60,15 +82,15 @@ public class Chan { * * @param response 要放入的数据 */ - public void put(BaseCommandResponse response) { + public void put(BaseCommandResponseDTO response) { String commandId = response.getCommandId(); if (commandId == null) { return; } - Chan chan = CHANNEL.get(commandId); - if (Objects.isNull(chan)) { + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { return; } - chan.future.complete(response); + channel.future.complete(response); } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java similarity index 86% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java index e304a294e9b88ec0dd0fb8b64009a6ba7bc4ae6f..7a921ff11ab8f9f1d6263817c26df764600fbc43 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -11,7 +11,7 @@ import net.maku.iot.enums.DeviceCommandEnum; */ @Data @Schema(description = "设备命令对象") -public class DeviceCommandDTO { +public class DeviceCommandDTO extends BaseDeviceID { /** * 命令类型 */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java similarity index 96% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java index e4f6133dc659bc6b8bc90b2c16d6d6a1ffec2dc6..1793647b77b591e068faf7708e309acc9c2df285 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.dto; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import io.swagger.v3.oas.annotations.media.Schema; @@ -15,7 +15,7 @@ import net.maku.iot.enums.DeviceCommandEnum; @Data @Schema(description = "设备命令响应DTO") @JsonIgnoreProperties(ignoreUnknown = true) -public class DeviceCommandResponseDTO extends BaseCommandResponse { +public class DeviceCommandResponseDTO extends BaseCommandResponseDTO { /** * 命令类型 */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java similarity index 85% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java index d75b32904441da81c03e12fbfdca11dd2d41f800..e43249e44124dae6b16c81bb415c2743be86df08 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -11,7 +11,7 @@ import net.maku.iot.enums.DevicePropertyEnum; */ @Data @Schema(description = "设备属性对象") -public class DevicePropertyDTO { +public class DevicePropertyDTO extends BaseDeviceID { /** * 设备属性类型 */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java new file mode 100644 index 0000000000000000000000000000000000000000..adf7658a3493313859dc02066f9f084cd900c073 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java @@ -0,0 +1,16 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * tcp通讯数据包 + * + * @author LSF maku_lsf@163.com + */ +@Data +@Schema(description = "tcp通讯数据包装类") +public class TcpMsgDTO { + private String topic; + private Object msg; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java similarity index 94% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java index d1d2febcebc4cd7c5096cd1cc1c39f2c7961b94f..e1a3fc41c1462c8e7fd820cf5b610aa687ae4bdc 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt; +package net.maku.iot.communication.mqtt; import jakarta.annotation.Resource; -import net.maku.iot.mqtt.config.MqttConfig; +import net.maku.iot.communication.mqtt.config.MqttConfig; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.support.MessageBuilder; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java similarity index 97% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java index ca3bf050a9c600749dbb60a5f5f3e8e22f5c1704..fea1398991db63b65538b070b8642cec31a864c4 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java @@ -1,11 +1,11 @@ -package net.maku.iot.mqtt.config; +package net.maku.iot.communication.mqtt.config; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.mqtt.factory.MqttMessageHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.factory.MqttMessageHandlerFactory; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java similarity index 89% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java index 742584d9bddb64345d03caa2f448d84d8946c8dd..61322d06aeb0d5f0446fee8d7c5d53df4b2da43f 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.factory; +package net.maku.iot.communication.mqtt.factory; import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler; +import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java similarity index 89% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java index e0d6bb427e1bd149cd78bab73f8540a6bdcbdef5..b84218cfc6a279206f1b918e653b5419ca3e9b0d 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.factory; +package net.maku.iot.communication.mqtt.factory; import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler; +import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java similarity index 91% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java index 368cf546cc33001cdef7aab5943dcee9a11546d6..d672227eddeb7f6a684090bf7fe9cfa6e8d503c9 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.factory; +package net.maku.iot.communication.mqtt.factory; import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.MqttMessageHandler; +import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java similarity index 72% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java index 2c04a66da5c544ba6983adb932ae87d85834686b..76bb2214d9575c212b25df6c2efff7e5904d8ab3 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; /** * 设备命令响应处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java similarity index 88% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java index 551a16b316821f182fb66f168d1fc8d5d9ae37d6..5cae0e2a1489a838b9f9c920d18eb6aec573d517 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java @@ -1,13 +1,13 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.service.MQTTService; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.mqtt.factory.DeviceCommandResponseHandlerFactory; -import net.maku.iot.mqtt.service.DeviceMqttService; import org.springframework.stereotype.Component; import java.util.Optional; @@ -21,10 +21,10 @@ import java.util.Optional; @Component @RequiredArgsConstructor public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandler { - private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory; + private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory; - private final DeviceMqttService deviceMqttService; + private final MQTTService deviceMqttService; @Override public boolean supports(String topic) { @@ -43,7 +43,7 @@ public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandl .ifPresent(responseDTO -> { // 调用设备命令执行器的命令响应处理逻辑 try { - deviceMqttService.commandReplied(topic, responseDTO); + deviceMqttService.commandReplied( responseDTO); } catch (Exception e) { log.error(StrUtil.format("调用设备命令执行器响应处理方法出错,topic:{}, message:{}", topic, message), e); } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java similarity index 73% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java index 739f266b447e05154a41d4db26eddb5c13c66375..17eee7295ca5eb930a997da82173181642d91971 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; /** * 设备属性变化处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java similarity index 88% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java index f47e205aea71f856b7777f4410df79f03b633e69..12998574cc10e1187e6ad61a39fc1e499c1110b1 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java @@ -1,12 +1,12 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; -import net.maku.iot.mqtt.factory.DevicePropertyChangeHandlerFactory; import org.springframework.stereotype.Component; import java.util.Optional; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java similarity index 88% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java index 33307ea332eba25cfe0160619bd14b22d3d83a86..a16367aa310e9dd40298e83c1acfd30fbe14d87a 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; /** * MQTT订阅消息处理接口 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java new file mode 100644 index 0000000000000000000000000000000000000000..0707cf457772a530809ae27259c8fdf463ae823f --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java @@ -0,0 +1,31 @@ +package net.maku.iot.communication.service; + +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; + + +/** + * 通信协议具备功能 + * + * @author LSF maku_lsf@163.com + */ +public interface BaseCommunication { + + // 异步发送指令,不等待设备响应 + String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应 + DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应,调试实现 + DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //模拟设备属性上报 + void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload); + + //模拟设备服务指令响应数据 + void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload); + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..b05cb4152f7b1a11a4c9606876359958164e1aa8 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java @@ -0,0 +1,36 @@ +package net.maku.iot.communication.service; + +import lombok.AllArgsConstructor; +import net.maku.framework.common.exception.ServerException; +import org.springframework.stereotype.Service; + +/** + * 设备协议服务工厂 + * + * @author LSF maku_lsf@163.com + */ +@Service +@AllArgsConstructor +public class CommunicationServiceFactory { + + private final MQTTService mqttService; + private final TCPService tcpService; + + public BaseCommunication getProtocol(String protocolType) { + if (protocolType == null) { + new ServerException("协议不存在!"); + } + switch (protocolType) { + case "MQTT": + return mqttService; + case "TCP": + return tcpService; +// case "Modbus": +// return tcpService; + default: + return null; + } + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java similarity index 71% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java index 0538be026686d619bcf71b601911a7dc308c6935..fda022627dc4b38aa8989d52d64e18944d016295 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java @@ -1,42 +1,36 @@ -package net.maku.iot.mqtt.service; +package net.maku.iot.communication.service; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.dto.CommandResponseChan; +import net.maku.iot.communication.dto.DeviceCommandDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.MqttGateway; import net.maku.iot.dto.DeviceClientDTO; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.enums.DeviceServiceEnum; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.MqttGateway; -import net.maku.iot.mqtt.dto.Chan; -import net.maku.iot.mqtt.dto.DeviceCommandDTO; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; import net.maku.iot.service.IotDeviceServiceLogService; -import net.maku.iot.utils.MqttUtils; import org.springframework.stereotype.Component; import java.util.UUID; -import java.util.concurrent.*; /** - * 设备命令发送服务 - **/ + * MQTT协议服务类 + * + * @author LSF maku_lsf@163.com + */ @Slf4j @Component @RequiredArgsConstructor -public class DeviceMqttService { - private final MqttUtils mqttUtils; +public class MQTTService implements BaseCommunication { + private final MqttGateway mqttGateway; private final IotDeviceServiceLogService iotDeviceEventLogService; - /** - * 命令等待exchanger缓存,key: command id - */ - private final ConcurrentMap> commandExchangers = new ConcurrentHashMap<>(); - /** * 异步发送命令,返回命令id * @@ -45,11 +39,11 @@ public class DeviceMqttService { * @param payload * @return */ + @Override public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { return asyncSendCommand(device, command, payload, Boolean.FALSE); } - /** * 异步发送命令,返回命令id * @@ -81,7 +75,6 @@ public class DeviceMqttService { return commandId; } - /** * 同步发送命令并返回响应结果 * @@ -106,16 +99,14 @@ public class DeviceMqttService { public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { // 构建并发送命令 String commandId = asyncSendCommand(device, command, payload, retained); - // 等待返回结果 超时3秒(可控) - Object receiver = Chan.getInstance(commandId, true).get(commandId, 3 * 1000L); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); if (receiver == null) { - log.error("Failed to receive the message. {}", device.getName()); throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); } return (DeviceCommandResponseDTO) receiver; } - /** * 发送命令并返回响应结果,模拟设备响应 * @@ -146,50 +137,18 @@ public class DeviceMqttService { } }).start(); - // 等待设备响应 - return waitCommandResponse(command, commandId); - } - - - /** - * 订阅设备命令响应主题并等待获取返回结果 - * - * @param command - * @param commandId - * @return - */ - private DeviceCommandResponseDTO waitCommandResponse(DeviceCommandEnum command, String commandId) { - // 创建命令响应等待exchanger - Exchanger commandExchanger = new Exchanger<>(); - commandExchangers.put(commandId, commandExchanger); - - try { - Object result = commandExchanger.exchange("", 10, TimeUnit.SECONDS); - return (DeviceCommandResponseDTO) result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServerException(StrUtil.format("{} <{}>,{} 命令中断", - DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e); - } catch (TimeoutException e) { - throw new ServerException(StrUtil.format("{} <{}>,{} 命令超时", - DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e); - } finally { - // 移除命令响应等待exchanger - commandExchangers.remove(commandId); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); } + return (DeviceCommandResponseDTO) receiver; } /** - * 设备命令响应处理 - * - * @param topic - * @param commandResponse + * 模拟设备属性上报 */ - public void commandReplied(String topic, DeviceCommandResponseDTO commandResponse) { - Chan chan = Chan.getInstance(commandResponse.getCommandId(), false); - chan.put(commandResponse); - } - + @Override public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { // 封装 设备属性上报的 topic String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); @@ -202,6 +161,14 @@ public class DeviceMqttService { } } + + /** + * 模拟设备服务指令响应数据 + * + * @param device + * @param payload + */ + @Override public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { // 封装 设备命令执行结果的 topic String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); @@ -209,9 +176,20 @@ public class DeviceMqttService { mqttGateway.sendToMqtt(commandTopic, payload); } catch (Exception e) { log.error(e.getMessage()); - throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟命令执行结果上报失败! Topic:{} ", device.getCode(), device.getName(), commandTopic)); } } + /** + * 设备命令响应处理,把设备响应结果放入通道中 + * + * @param commandResponse 设备命令响应 + */ + public void commandReplied(DeviceCommandResponseDTO commandResponse) { + CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false); + commandResponseChan.put(commandResponse); + } + + } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java new file mode 100644 index 0000000000000000000000000000000000000000..12f36e35dcabb407cdc8298be8a960e13a79ae2a --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java @@ -0,0 +1,139 @@ +package net.maku.iot.communication.service; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.dto.CommandResponseChan; +import net.maku.iot.communication.dto.DeviceCommandDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.tcp.TcpGateway; +import net.maku.iot.dto.DeviceClientDTO; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.enums.DeviceTopicEnum; +import net.maku.iot.service.IotDeviceServiceLogService; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +/** + * TCP协议服务类 + * + * @author LSF maku_lsf@163.com + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class TCPService implements BaseCommunication { + + private final TcpGateway tcpGateway; + private final IotDeviceServiceLogService iotDeviceEventLogService; + + @Override + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + // 构建命令对象 + String commandId = StrUtil.replaceChars(UUID.randomUUID().toString(), "-", ""); + DeviceCommandDTO commandDTO = new DeviceCommandDTO(); + commandDTO.setCommand(command); + commandDTO.setId(commandId); + commandDTO.setDeviceId(String.valueOf(device.getId())); + commandDTO.setPayload(payload); + String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device)); + + // 发送命令到设备命令主题 + try { + tcpGateway.sendCommandToDevice(device.getId(),commandTopic, JSONUtil.toJsonStr(commandDTO)); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 失败,原因:{}", + command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic,e.getMessage())); + } + log.info("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 成功", command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic); + iotDeviceEventLogService.createAndSaveDeviceServiceLog(device.getId(), device.getTenantId(), command, commandId, payload); + return commandId; + } + + @Override + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; + } + + @Override + public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload); + + // 2秒后模拟设备响应 + new Thread(() -> { + try { + //模拟设备正常响应 + Thread.sleep(2000); + //模拟设备超时响应 + //Thread.sleep(15000); + DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO(); + simulateResponseDto.setCommandId(commandId); + simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!"); + simulateResponseDto.setDeviceId(String.valueOf(device.getId())); + simulateResponseDto.setCommand(command); + simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("模拟设备响应线程被中断", e); + } + }).start(); + + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; + } + + @Override + public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { + // 封装 设备属性上报的 topic + String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); + try { + tcpGateway.simulateDeviceReport(device.getId(), commandTopic, payload, DevicePropertyDTO.class); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } + } + + @Override + public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { + // 封装 设备命令执行结果的 topic + String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); + try { + tcpGateway.simulateDeviceReport(device.getId(), commandTopic, payload, DeviceCommandResponseDTO.class); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟命令执行结果上报失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } + } + + + /** + * 设备命令响应处理,把设备响应结果放入通道中 + * + * @param commandResponse 设备命令响应 + */ + public void commandReplied(DeviceCommandResponseDTO commandResponse) { + CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false); + commandResponseChan.put(commandResponse); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java new file mode 100644 index 0000000000000000000000000000000000000000..b62034413e8e64f89b41bf8c2858b6cb5771095b --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java @@ -0,0 +1,74 @@ +package net.maku.iot.communication.tcp; + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.exception.ServerException; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.DeviceCommandDTO; +import net.maku.iot.communication.dto.TcpMsgDTO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentMap; + +/** + * TCP 网关 + * + * @author LSF maku_lsf@163.com + */ +@Component +@Slf4j +public class TcpGateway { + + @Autowired + private final ConcurrentMap deviceChannels; + + @Autowired + public TcpGateway(ConcurrentMap deviceChannels) { + this.deviceChannels = deviceChannels; + } + + /** + * 发送命令到设备 + * @param deviceId 设备ID + * @param commandTopic 命令主题 + * @param payload 命令内容 + */ + public void sendCommandToDevice(Long deviceId, String commandTopic, String payload) { + Channel channel = deviceChannels.get(deviceId.toString()); + if (channel != null && channel.isActive()) { + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setTopic(commandTopic); + DeviceCommandDTO deviceCommandDTO = JsonUtils.parseObject(payload, DeviceCommandDTO.class); + deviceCommandDTO.setDeviceId(deviceId.toString()); + tcpMsgDTO.setMsg(deviceCommandDTO); + + String commandJson = JsonUtils.toJsonString(tcpMsgDTO); +// channel.writeAndFlush(commandJson); + log.info("发送命令到设备 {}: {}", deviceId, payload); + } else { + throw new ServerException("设备"+deviceId+"不在线或通道无效"); + } + } + + public void simulateDeviceReport(Long deviceId, String commandTopic, String payload, Class reportDtoclazz) { + Channel channel = deviceChannels.get(deviceId.toString()); + if (channel != null && channel.isActive()) { + try { + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setTopic(commandTopic); + tcpMsgDTO.setMsg(JsonUtils.parseObject(payload, reportDtoclazz)); + String devicePropertyJson = JsonUtils.toJsonString(tcpMsgDTO); + // 模拟上报,触发 channelRead 处理 + channel.pipeline().fireChannelRead(devicePropertyJson); + log.info("模拟设备 {} 上报数据: {}", deviceId, devicePropertyJson); + } catch (Exception e) { + log.error("模拟设备上报数据时出现错误", e); + } + } else { + throw new ServerException("设备 " + deviceId + " 不在线或通道无效"); + } + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/DeviceClientStartupConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/DeviceClientStartupConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..bd3ad61ab4728561cb353e608b439bb789f48aef --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/DeviceClientStartupConfig.java @@ -0,0 +1,41 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.Bootstrap; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.ContextRefreshedEvent; + +/** + * Netty启动顺序配置 + * + * @author LSF maku_lsf@163.com + */ +@Configuration +@Slf4j +public class DeviceClientStartupConfig implements ApplicationListener { + + private final ObjectProvider deviceClientProvider; + private final DeviceEmulatorConfig deviceEmulatorConfig; + + public DeviceClientStartupConfig(ObjectProvider deviceClientProvider, DeviceEmulatorConfig deviceEmulatorConfig) { + this.deviceClientProvider = deviceClientProvider; + this.deviceEmulatorConfig = deviceEmulatorConfig; + } + + @Override + public void onApplicationEvent(ContextRefreshedEvent event) { + try { + Thread.sleep(5000); // 延迟5秒以确保服务器启动 + Bootstrap deviceClientBootstrap = deviceClientProvider.getIfAvailable(); + if (deviceClientBootstrap != null) { + deviceEmulatorConfig.configureBootstrap(deviceClientBootstrap); + deviceClientBootstrap.connect("127.0.0.1", 8888).sync(); + log.info("Connected to Netty server on port 8888"); + } + } catch (InterruptedException e) { + log.error("Failed to connect to Netty server", e); + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/DeviceEmulatorConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/DeviceEmulatorConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..8f4b4b0d52c341e909806e9a2699619548a614a5 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/DeviceEmulatorConfig.java @@ -0,0 +1,97 @@ +package net.maku.iot.communication.tcp.config; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.TcpMsgDTO; +import net.maku.iot.dto.DeviceClientDTO; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DevicePropertyEnum; +import net.maku.iot.enums.DeviceRunningStatusEnum; +import net.maku.iot.enums.DeviceTopicEnum; +import net.maku.iot.service.IotDeviceService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 设备模拟器配置,用于启动模拟设备,方便调试,默认启动系统所有的TCP设备 + * + * @author LSF maku_lsf@163.com + */ +@Slf4j +@Configuration +public class DeviceEmulatorConfig { + + @Autowired + private IotDeviceService deviceService; + + @Bean + public Bootstrap nettyClient() { + Bootstrap nettyClient = new Bootstrap(); + nettyClient.group(new NioEventLoopGroup()) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) // 设置为长连接 + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60*1000); // 设置连接超时时间 + return nettyClient; + } + + public void configureBootstrap(Bootstrap bootstrap) { + List devices = deviceService.list(new LambdaQueryWrapper().eq(IotDeviceEntity::getProtocolType, "TCP")); + for (IotDeviceEntity device : devices) { + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + //模拟设备认证 + Map authenticateMap = new HashMap(); + authenticateMap.put("authenticate", device.getId().toString()); + String authenticateMapJson = JsonUtils.toJsonString(authenticateMap); + log.info("------------------------> 发送认证信息到服务端: {}", authenticateMapJson); + ctx.writeAndFlush(authenticateMapJson); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) { + log.info("设备 {} 接收到服务端消息: {}", device.getId(), msg); + //模拟属性上报 + if(msg.contains("authenticate passed")){ + String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); + + DevicePropertyDTO devicePropertyDTO = new DevicePropertyDTO(); + devicePropertyDTO.setDeviceId(device.getId().toString()); + devicePropertyDTO.setPropertyType(DevicePropertyEnum.RUNNING_STATUS); + devicePropertyDTO.setPayload(String.valueOf(DeviceRunningStatusEnum.ONLINE.getValue())); + + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setTopic(commandTopic); + tcpMsgDTO.setMsg(devicePropertyDTO); + + String runningStatusjson = JsonUtils.toJsonString(tcpMsgDTO); + log.info("------------------------> 设备发送上线文本:{}",runningStatusjson); + ctx.writeAndFlush(runningStatusjson); + } + } + }); + } + }); + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..0add88cc836b0b751c5274d546da98cbbd39b922 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java @@ -0,0 +1,72 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory; +import net.maku.iot.communication.tcp.handler.ConnectionHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Netty服务配置 + * + * @author LSF maku_lsf@163.com + */ +@Configuration +@Slf4j +public class NettyServerConfig { + + @Bean + public ConcurrentMap deviceChannels() { + return new ConcurrentHashMap<>(); + } + + @Autowired + public TcpMessageHandlerFactory tcpMessageHandlerFactory; + + @Bean + public ServerBootstrap nettyServer(ConcurrentMap deviceChannels) { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast( + new StringDecoder(), + new StringEncoder(), + // 添加设备连接处理器 + new ConnectionHandler(deviceChannels,tcpMessageHandlerFactory) + ); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + return bootstrap; + } + + @Bean + public ChannelFuture serverChannelFuture(ServerBootstrap serverBootstrap) throws InterruptedException { + try { + ChannelFuture future = serverBootstrap.bind(8888).sync(); + log.info("------------------------ Netty 服务器在端口 8888 启动成功"); + return future; + } catch (Exception e) { + log.error("------------------------ Netty 服务器启动失败", e); + throw e; + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..a15fcd048f680296d40094f80eea13ddcb37082a --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java @@ -0,0 +1,47 @@ +package net.maku.iot.communication.tcp.factory; + +import lombok.RequiredArgsConstructor; +import net.maku.iot.communication.tcp.handler.TCPMessageHandler; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * TCP消息处理器工厂,自动获取所有实现的处理器实例 + * + * @author LSF maku_lsf@163.com + */ +@Component +@RequiredArgsConstructor +public class TcpMessageHandlerFactory { + private final ApplicationContext applicationContext; + + /** + * 所有消息处理器 + */ + private List messageHandlers; + + private List loadHandlers() { + if (messageHandlers != null) { + return messageHandlers; + } + messageHandlers = new ArrayList<>(applicationContext.getBeansOfType(TCPMessageHandler.class).values()); + return messageHandlers; + } + + /** + * 获取与主题对应的tcp消息处理器 + * + * @param topic 主题 + * @return 处理器列表 + */ + public List getHandlersForTopic(String topic) { + return Collections.unmodifiableList(loadHandlers().stream() + .filter(handler -> handler.supports(topic)) + .collect(Collectors.toList())); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..344df9dfba3f50115423ac54d8a64bc3a0d4433e --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java @@ -0,0 +1,129 @@ +package net.maku.iot.communication.tcp.handler; + +import cn.hutool.core.util.StrUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.AttributeKey; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.TcpMsgDTO; +import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory; + +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * TCP服务器连接处理器 + * + * @author LSF maku_lsf@163.com + */ +@Slf4j +public class ConnectionHandler extends ChannelInboundHandlerAdapter { + + public static final AttributeKey DEVICE_ID = AttributeKey.valueOf("DEVICE_ID"); + + private ConcurrentMap deviceChannels; + private final TcpMessageHandlerFactory tcpMessageHandlerFactory; + + public ConnectionHandler(ConcurrentMap deviceChannels,TcpMessageHandlerFactory tcpMessageHandlerFactory) { + this.deviceChannels = deviceChannels; + this.tcpMessageHandlerFactory = tcpMessageHandlerFactory; + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) { + System.out.printf("channelActive"); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg == null) { + return; + } + //鉴权认证 + if (authenticate(ctx, msg)) { + //这里可以根据业务自定义扩展消息处理 + if (StrUtil.contains(msg.toString(), "topic")) { + // 处理 TCP 消息 + handleTcpMessage( JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class)); + } else { + // 处理其他类型的消息 + log.warn("接收到未知的消息类型:{}", msg); + } + } else { + ctx.close(); + } + + } + + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + String deviceId = getDeviceId(ctx.channel()); + if (deviceId != null) { + deviceChannels.remove(deviceId); + } + log.info(" 设备 {} 断开连接", deviceId == null ? "未知设备" : deviceId); + } + + private void handleTcpMessage( TcpMsgDTO message) { + String topic = message.getTopic(); + if (topic != null) { + tcpMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> { + handler.handle(topic, message.getMsg()); + }); + } else { + log.warn("接收到主题为null的消息。"); + } + } + + + /** + * TCP连接鉴权,自行根据业务扩展 + */ + private boolean authenticate(ChannelHandlerContext ctx, Object message) { + String messageRegex = "\"(authenticate|deviceId)\"\\s*:\\s*\"\\d+\""; + Pattern messagePattern = Pattern.compile(messageRegex); + Matcher matcherPattern = messagePattern.matcher(message.toString()); + if (!matcherPattern.find()) { + ctx.writeAndFlush("设备消息无法识别!"); + return false; + } + if (StrUtil.contains(message.toString(), "authenticate")) { + Pattern pattern = Pattern.compile("\"authenticate\"\\s*:\\s*\"(\\d+)\""); + Matcher matcher = pattern.matcher(message.toString()); + if (matcher.find()) { + String deviceId = matcher.group(1); +// setDeviceId(ctx.channel(), deviceId); + ctx.channel().attr(DEVICE_ID).set(deviceId); + deviceChannels.put(deviceId, ctx.channel()); + ctx.writeAndFlush("authenticate passed"); + } + } + + if (StrUtil.contains(message.toString(), "deviceId")) { + Pattern pattern = Pattern.compile("\"deviceId\"\\s*:\\s*\"(\\d+)\""); + Matcher matcher = pattern.matcher(message.toString()); + if (matcher.find()) { + String deviceId = matcher.group(1); + Channel channel = deviceChannels.get(deviceId); + if (channel == null) { + ctx.writeAndFlush("设备连接不存在!请重新连接"); + return false; + } + } + } + return true; + } + + private String getDeviceId(Channel channel) { + return channel.attr(DEVICE_ID).get(); + } + +// private String setDeviceId(Channel channel, String deviceId) { +// return channel.attr(AttributeKey.valueOf("deviceId")).setIfAbsent(deviceId); +// } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..2ddf1d13352c8cdf33d41df2774834cf07274285 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java @@ -0,0 +1,72 @@ +package net.maku.iot.communication.tcp.handler; + +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; +import net.maku.iot.communication.service.TCPService; +import net.maku.iot.enums.DeviceTopicEnum; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * 设备命令响应处理器 + * + * @author LSF maku_lsf@163.com + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DeviceCommandResponseTCPMessageHandler implements TCPMessageHandler { + + private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory; + + private final TCPService deviceTCPService; + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic()); + } + + @Override + public void handle(String topic, Object message) { + DeviceCommandResponseDTO commandResponseDTO = parseCommandReplyMessage(topic, message); + Optional.ofNullable(commandResponseDTO.getCommand()) + .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令类型! 主题:'{}',消息:{}", topic, message))); + Optional.ofNullable(commandResponseDTO.getCommandId()) + .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令ID! 主题:'{}',消息:{}", topic, message))); + Optional.ofNullable(commandResponseDTO) + .ifPresent(responseDTO -> { + // 调用设备命令执行器的命令响应处理逻辑 + try { + deviceTCPService.commandReplied( responseDTO); + } catch (Exception e) { + log.error(StrUtil.format("调用设备命令执行器响应处理方法出错,topic:{}, message:{}", topic, message), e); + } + // 调用自定义命令响应处理器 + try { + deviceCommandResponseHandlerFactory.getHandlers().forEach(h -> h.handle(topic, responseDTO)); + } catch (Exception e) { + log.error(StrUtil.format("调用设备命令响应响应处理器出错,topic:{}, message:{}", topic, message), e); + } + }); + } + + private DeviceCommandResponseDTO parseCommandReplyMessage(String topic, Object message) { + try { + ObjectMapper mapper = new ObjectMapper(); + DeviceCommandResponseDTO commandResponse = mapper.convertValue(message, DeviceCommandResponseDTO.class); + if (StrUtil.isBlank(commandResponse.getCommandId())) { + log.error(StrUtil.format("主题'{}'的消息,缺失指令ID", topic)); + return null; + } + return commandResponse; + + } catch (Exception e) { + log.error(StrUtil.format("将主题'{}'的消息解析为设备命令响应对象失败", topic), e); + return null; + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..0307d3ad9ad5f8464286d71106298f211c6cd6b0 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java @@ -0,0 +1,48 @@ +package net.maku.iot.communication.tcp.handler; + +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory; +import net.maku.iot.enums.DeviceTopicEnum; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * 设备属性上报消息处理器 + * + * @author LSF maku_lsf@163.com + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DevicePropertyTCPMessageHandler implements TCPMessageHandler { + + private final DevicePropertyChangeHandlerFactory statusChangeHandlerFactory; + + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic()); + } + + @Override + public void handle(String topic, Object message) { + DevicePropertyDTO devicePropertyDTO = parseStatusMessage(topic, message); + Optional.ofNullable(devicePropertyDTO) + .ifPresent(deviceProperty -> statusChangeHandlerFactory.getHandlers() + .forEach(h -> h.handle(topic, deviceProperty))); + } + + private DevicePropertyDTO parseStatusMessage(String topic, Object message) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.convertValue(message, DevicePropertyDTO.class); + } catch (Exception e) { + log.error(StrUtil.format("将主题'{}'的消息解析为设备运行状态对象失败", topic), e); + return null; + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..5632532b7b7d52e8451acbdf458ac896cb5b3bf5 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java @@ -0,0 +1,26 @@ +package net.maku.iot.communication.tcp.handler; + + +/** + * TCP消息处理接口 + * + * @author LSF maku_lsf@163.com + */ +public interface TCPMessageHandler { + + /** + * 是否支持处理指定的topic + * + * @param topic + * @return + */ + boolean supports(String topic); + + /** + * TCP消息处理接口 + * + * @param topic + * @param message + */ + void handle(String topic, Object message); +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java index 9f0340991eb42d6d3725302017c3e5cc91775290..2632c9122169d58aef6acf3a59f74630a4ae06b8 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java @@ -32,6 +32,11 @@ public class IotDeviceEntity extends BaseEntity { */ private Integer type; + /** + * 设备和服务器通信协议类型 + */ + private String protocolType; + /** * 唯一标识码 */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java new file mode 100644 index 0000000000000000000000000000000000000000..2e15eeef8530964ca78868dea30b4d11803a09f9 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java @@ -0,0 +1,28 @@ +package net.maku.iot.enums; + +import cn.hutool.core.util.StrUtil; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Arrays; + +/** + * IOT常用的通信协议 + * + * @author LSF maku_lsf@163.com + */ +@Getter +@RequiredArgsConstructor +public enum IOTProtocolEnum { + + MQTT("MQTT"), + TCP("TCP"), + UDP("UDP"), + BLE("BLE"), + CoAP("CoAP"), + LwM2M("LwM2M"), + Modbus("Modbus"); + + private final String value; +} + diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java index eafbde48598b38c28d0072af3a1cb490586f1ecd..b2e0105d140cd20668c0d6298e48301ffae3753a 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java @@ -3,8 +3,9 @@ package net.maku.iot.service; import net.maku.framework.common.utils.PageResult; import net.maku.framework.mybatis.service.BaseService; import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.query.IotDeviceQuery; +import net.maku.iot.communication.service.BaseCommunication; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; import net.maku.iot.vo.DeviceCommandVO; import net.maku.iot.vo.DeviceReportAttributeDataVO; @@ -27,6 +28,27 @@ public interface IotDeviceService extends BaseService { void delete(List idList); + /** + * 根据设备的协议类型获取发送服务 + * @param device 设备 + * @return + */ + BaseCommunication getSendService(IotDeviceEntity device); + + /** + * 根据协议类型获取发送服务 + * @param protocolType + * @return + */ + BaseCommunication getSendService(String protocolType); + + /** + * 根据设备ID获取发送服务 + * @param deviceId + * @return + */ + BaseCommunication getSendService(Long deviceId); + /** * 对设备下发指令-同步响应模式 * diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java index d7bebfa59c6bca70a45fd519f4bb704963e97c70..0fc2e5e3a2e0dd6811389db2786d0b7e5e937501 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java @@ -16,12 +16,13 @@ import net.maku.iot.convert.IotDeviceConvert; import net.maku.iot.dao.IotDeviceDao; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.*; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; -import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler; -import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler; -import net.maku.iot.mqtt.service.DeviceMqttService; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; +import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; import net.maku.iot.query.IotDeviceQuery; +import net.maku.iot.communication.service.BaseCommunication; +import net.maku.iot.communication.service.CommunicationServiceFactory; import net.maku.iot.service.IotDeviceEventLogService; import net.maku.iot.service.IotDeviceService; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; @@ -35,7 +36,7 @@ import java.time.LocalDateTime; import java.util.List; /** - * 设备表 + * 设备服务类 * * @author LSF maku_lsf@163.com */ @@ -45,8 +46,7 @@ import java.util.List; public class IotDeviceServiceImpl extends BaseServiceImpl implements IotDeviceService, DevicePropertyChangeHandler, DeviceCommandResponseHandler { - //todo 后续版本更改为根据物模型自动选择不同的通信层Service - private final DeviceMqttService mqttService; + private final CommunicationServiceFactory communicationService; private final IotDeviceEventLogService deviceEventLogService; @Override @@ -84,6 +84,28 @@ public class IotDeviceServiceImpl extends BaseServiceImpl