# rabbit-mq-plus **Repository Path**: pangshujiangjava/rabbit-mq-plus ## Basic Information - **Project Name**: rabbit-mq-plus - **Description**: Rabbitmq 消息发送与消费增强组件,100%不丢失解决方案 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 7 - **Created**: 2024-01-16 - **Last Updated**: 2024-06-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### rabbit-mq-plus #### 旨在解决 * 生产者:消息100%投递 * 消费者:消费失败自动按阶梯评率0分钟、5分钟、10分钟、20分钟 ......等策略进行补偿 *** #### 接入步骤: ##### 1. 引用rabbit-mq-plus的maven配置 ```python com.cfpamf.inf rabbit-mq-plus 0.0.1-opensource ``` ##### 2. 在@SpringBootApplication启动类上增加,扫描包路径com.cfpamf.athean.rabbit.mq.plus ```python 例:@ComponentScan(basePackages = { "com.cfpamf.athean.rabbit.mq.plus"}) ``` ##### 3. 配置数据库操作类扫描路径 ```python ##配置mybatis扫描目录 @MapperScan("com.cfpamf.athean.rabbit.mq.plus.mapper") ##配置mybatisPlus扫描目录 typeAliasesPackage: com.cfpamf.athean.rabbit.mq.plus.domain.po typeEnumsPackage: com.cfpamf.athean.rabbit.mq.plus.common.enums ##参考代码 MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource); sqlSessionFactoryBean.setTypeAliasesPackage("com.cfpamf.athean.rabbit.mq.plus.domain.po"); sqlSessionFactoryBean.setTypeEnumsPackage("com.cfpamf.athean.rabbit.mq.plus.common.enums"); 注意rabbitmq要开启ack,开启发送确认和发送失败回退 spring: rabbitmq: host: ${host} port: 5672 username: ${username} password: ${password} publisher-confirm-type: correlated publisher-confirms: true #开启发送确认,开启publisher-confirm,且选择correlated,则MQ发送结果,会异步回调ConfirmCallback方法 publisher-returns: true #开启发送失败回退 #template: #mandatory: true #定义当消息从交换机路由到队列失败时的策略。【true,则调用ReturnCallback;false:则直接丢弃消息】 #开启ack listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual #采取手动应答 # concurrency: 1 #指定最小的消费者数量 # max-concurrency: 1 #指定最大的消费者数量 retry: enabled: true # 是否支持重试 ``` ##### 4. 创建下面数据库表(待补充索引) ```python CREATE TABLE `rabbit_mq_plus_event_task` ( `id` bigint(20) NOT NULL COMMENT '编号', `event_no` varchar(64) NOT NULL COMMENT '事件编号', `event_type_code` varchar(20) NOT NULL COMMENT '事件类型代码:', `event_type_desc` varchar(20) NOT NULL COMMENT '事件类型描述:', `status_code` int(11) NOT NULL COMMENT '状态:10、待处理 20、处理中 30、已处理', `status_desc` varchar(10) NOT NULL COMMENT '状态描述:10、待处理 20、处理中 30、已处理', `max_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '最大重试次数', `retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数', `next_execute_time` datetime NOT NULL COMMENT '下一次执行时间', `message_body` varchar(1024) NOT NULL COMMENT '数据', `event_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '事件时间', `create_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '创建人', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '修改人', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `enabled_flag` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否可用 1:是 0:否(已删除)', PRIMARY KEY (`id`), KEY `idx_event_no_event_type_code` (`event_no`,`event_type_code`), KEY `idx_event_no_event_type_code_status_code` (`event_no`,`event_type_code`,`status_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='rabbitMqPlus事件任务表'; CREATE TABLE `rabbit_mq_plus_consumer_task` ( `id` bigint(20) NOT NULL COMMENT '编号', `queue_name` varchar(64) NOT NULL COMMENT '消息编号', `consumer_type_code` varchar(20) NOT NULL COMMENT '消费类型代码', `consumer_type_desc` varchar(20) NOT NULL COMMENT '消费类型描述', `consumer_tag` varchar(20) DEFAULT NULL COMMENT '消费标记', `status_code` int(11) NOT NULL COMMENT '状态:10、待处理 20、处理中 30、已处理', `status_desc` varchar(10) NOT NULL COMMENT '状态描述:10、待处理 20、处理中 30、已处理', `max_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '最大重试次数', `retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数', `consumer_listener_impl_class_name` varchar(128) NOT NULL COMMENT '消费者监听实现类', `next_execute_time` datetime NOT NULL COMMENT '下一次执行时间', `first_consumer_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '首次消费时间', `consumer_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '消费时间', `message_body` varchar(2048) NOT NULL COMMENT '数据', `create_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '创建人', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '修改人', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `enabled_flag` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否可用 1:是 0:否(已删除)', PRIMARY KEY (`id`), KEY `idx_queue_name_consumer_type_code_status_code` (`queue_name`,`consumer_type_code`,`status_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='rabbitMqPlus消费任务表'; ``` ##### 5. 使用方法 ###### 生产者: ```python @Resource RabbitMqPlusProducer rabbitMqPlusProducer; rabbitMqPlusProducer.sendTransactionalMessage(RabbitMqMessageDTO); ``` 补偿机制: com.cfpamf.athean.rabbit.mq.plus.producer.core.schedule.RabbitMqPlusProducerMessageRetryJob.runAutoRetryCheck()方法 ###### 消费者: 1.项目启动运行,消费者监听器 ```python public class XxxxRabbitMqPlusConsumerRun implements ApplicationRunner { @Resource RabbitMqConsumer rabbitMqConsumer; @Override public void run(ApplicationArguments args) throws Exception { rabbitMqConsumer.runConsumeListener(RabbitMqPlusConsumerDTO); } ``` 2.消费者实现RabbitMqPlusConsumerListener接口onMessage方法 3.失败补偿 定时调度com.cfpamf.athean.rabbit.mq.plus.consumer.core.schedule.RabbitMqPlusConsumerFailureRetryJob.runAutoRetryCheck()方法 *** 系统默认,删除30天后的发送成功消息记录,可以自定义时间 ```python rabbitMqPlus: delete: invalid: data: day:60 ``` #### Demo project 参考 demo目录中的demo.zip文件 [link](demo/demo.zip) #### 联系我们 ![输入图片说明](images/%E4%B8%AD%E5%92%8C%E5%BC%80%E6%BA%90%E4%BA%A4%E6%B5%81%E7%BE%A4.jpg)