From ef94e0fa2aeda707d6d36c16ef5855be40c7b761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E9=94=8B?= Date: Fri, 25 Mar 2022 14:49:27 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0RabbitMq=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E5=8F=82=E6=95=B0=EF=BC=8C=E4=BD=BF=E7=94=A8=E5=B9=BF?= =?UTF-8?q?=E6=92=AD=E6=88=96=E6=99=AE=E9=80=9A=E9=98=9F=E5=88=97=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E7=9A=84=E5=BC=80=E5=85=B3=EF=BC=8C=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E5=85=A8=E9=83=A8=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/rabbitmq/JbootRabbitmqConfig.java | 24 +++++++++++++++++++ .../mq/rabbitmq/JbootRabbitmqImpl.java | 19 ++++++++------- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java index 488f2c07..9a8007b3 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java @@ -46,6 +46,14 @@ public class JbootRabbitmqConfig { private boolean queueDeclareExclusive = false; private boolean queueDeclareAutoDelete = false; + /** + * 使用队列 + */ + private boolean useQueue=true; + /** + * 使用广播 + */ + private boolean useBroadcast=true; private String broadcastExchangeDeclareExchangeType = "fanout"; private boolean broadcastExchangeDeclareDurable = false; @@ -181,4 +189,20 @@ public class JbootRabbitmqConfig { public void setBroadcastQueueDeclareAutoDelete(boolean broadcastQueueDeclareAutoDelete) { this.broadcastQueueDeclareAutoDelete = broadcastQueueDeclareAutoDelete; } + + public boolean useQueue(){ + return useQueue; + } + + public void setUseQueue(boolean useQueue) { + this.useQueue = useQueue; + } + + public boolean useBroadcast() { + return useBroadcast; + } + + public void setUseBroadcast(boolean useBroadcast) { + this.useBroadcast=useBroadcast; + } } diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java index 15ffe4b0..6d9e44d3 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java @@ -85,15 +85,16 @@ public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq { @Override protected void onStartListening() { for (String toChannel : channels) { - - //广播通道 - Channel broadcastChannel = getChannel(toChannel, false); - bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel); - - - //队列通道 - final Channel queueChannel = getChannel(toChannel, true); - bindChannel(queueChannel, toChannel, toChannel); + if(rabbitmqConfig.useBroadcast()) { + //广播通道 + Channel broadcastChannel = getChannel(toChannel, false); + bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel); + } + if(rabbitmqConfig.useQueue()) { + //队列通道 + final Channel queueChannel = getChannel(toChannel, true); + bindChannel(queueChannel, toChannel, toChannel); + } } } -- Gitee From 3591dfa17fd9ec71f9ddc7e042ca035a45edeaeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E9=94=8B?= Date: Fri, 25 Mar 2022 16:30:46 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E7=9A=84Rabbitmq=E9=98=9F=E5=88=97=E3=80=81=E5=B9=BF=E6=92=AD?= =?UTF-8?q?=E9=80=9A=E9=81=93=E5=BC=80=E5=90=AF=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java | 2 ++ .../java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java | 2 +- .../java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java | 9 +++++---- .../java/io/jboot/test/mq/rabbit/RabbitMqSender.java | 1 + 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java index 6d9e44d3..33595fbb 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java @@ -89,11 +89,13 @@ public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq { //广播通道 Channel broadcastChannel = getChannel(toChannel, false); bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel); + System.out.println("广播通道"); } if(rabbitmqConfig.useQueue()) { //队列通道 final Channel queueChannel = getChannel(toChannel, true); bindChannel(queueChannel, toChannel, toChannel); + System.out.println("队列通道"); } } } diff --git a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java index 066fe9ff..3dba0e38 100644 --- a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java +++ b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java @@ -20,7 +20,7 @@ public class RabbitMqReceiver1 { //设置 mq 的相关信息 JbootApplication.setBootArg("jboot.mq.type", "rabbitmq"); - JbootApplication.setBootArg("jboot.mq.channel", "channel1"); + JbootApplication.setBootArg("jboot.mq.channel", "channel1,channel2"); //以下可以不用配置,是默认信息 diff --git a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java index c6fb0a1d..9baac9be 100644 --- a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java +++ b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java @@ -19,8 +19,9 @@ public class RabbitMqReceiver2 { //设置 mq 的相关信息 JbootApplication.setBootArg("jboot.mq.type", "rabbitmq"); - JbootApplication.setBootArg("jboot.mq.channel", "channel2"); - + JbootApplication.setBootArg("jboot.mq.channel", "channel1,channel2,myChannel"); +// JbootApplication.setBootArg("jboot.mq.rabbitmq.useQueue", false); + JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastExchangeDeclareExchangeType", "direct"); //以下可以不用配置,是默认信息 // JbootApplication.setBootArg("jboot.mq.rabbitmq.username", "guest"); @@ -30,7 +31,7 @@ public class RabbitMqReceiver2 { // JbootApplication.setBootArg("jboot.mq.rabbitmq.virtualHost", ""); //非常重要,多个应用如果同时接受同一个 channel 的广播,必须配置此项,而且必须不能相同,否则广播的时候只有一个应用能够接受到 - JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastChannelPrefix", "app2"); +// JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastChannelPrefix", "app2"); //启动应用程序 JbootApplication.run(args); @@ -43,7 +44,7 @@ public class RabbitMqReceiver2 { public void onMessage(String channel, Object message, MessageContext context) { System.out.println("Receive msg: " + message + ", from channel: " + channel); } - },"channel2"); + }); Jboot.getMq().startListening(); diff --git a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java index 9b5d5634..620b023a 100644 --- a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java +++ b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java @@ -17,6 +17,7 @@ public class RabbitMqSender { //设置 mq 的相关信息 JbootApplication.setBootArg("jboot.mq.type", "rabbitmq"); + JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastExchangeDeclareExchangeType", "direct"); //以下可以不用配置,是默认信息 // JbootApplication.setBootArg("jboot.mq.rabbitmq.username", "guest"); -- Gitee From a5e4e0c1292be885845b20c09732bfd1803475f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E9=94=8B?= Date: Fri, 25 Mar 2022 16:45:23 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=B8=85=E7=90=86=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java index 33595fbb..6d9e44d3 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java @@ -89,13 +89,11 @@ public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq { //广播通道 Channel broadcastChannel = getChannel(toChannel, false); bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel); - System.out.println("广播通道"); } if(rabbitmqConfig.useQueue()) { //队列通道 final Channel queueChannel = getChannel(toChannel, true); bindChannel(queueChannel, toChannel, toChannel); - System.out.println("队列通道"); } } } -- Gitee