diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java index 488f2c07a854e2130b53c7d7756cf308bb573032..9a8007b31de813fc5efd0ac54c10b27e32c0f278 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqConfig.java @@ -46,6 +46,14 @@ public class JbootRabbitmqConfig { private boolean queueDeclareExclusive = false; private boolean queueDeclareAutoDelete = false; + /** + * 使用队列 + */ + private boolean useQueue=true; + /** + * 使用广播 + */ + private boolean useBroadcast=true; private String broadcastExchangeDeclareExchangeType = "fanout"; private boolean broadcastExchangeDeclareDurable = false; @@ -181,4 +189,20 @@ public class JbootRabbitmqConfig { public void setBroadcastQueueDeclareAutoDelete(boolean broadcastQueueDeclareAutoDelete) { this.broadcastQueueDeclareAutoDelete = broadcastQueueDeclareAutoDelete; } + + public boolean useQueue(){ + return useQueue; + } + + public void setUseQueue(boolean useQueue) { + this.useQueue = useQueue; + } + + public boolean useBroadcast() { + return useBroadcast; + } + + public void setUseBroadcast(boolean useBroadcast) { + this.useBroadcast=useBroadcast; + } } diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java index 15ffe4b056e7c4bf919db64ed487286174b10a0f..6d9e44d320781b1844d7fa7284d8370a3c51f55a 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java @@ -85,15 +85,16 @@ public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq { @Override protected void onStartListening() { for (String toChannel : channels) { - - //广播通道 - Channel broadcastChannel = getChannel(toChannel, false); - bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel); - - - //队列通道 - final Channel queueChannel = getChannel(toChannel, true); - bindChannel(queueChannel, toChannel, toChannel); + if(rabbitmqConfig.useBroadcast()) { + //广播通道 + Channel broadcastChannel = getChannel(toChannel, false); + bindChannel(broadcastChannel, buildBroadcastChannelName(toChannel), toChannel); + } + if(rabbitmqConfig.useQueue()) { + //队列通道 + final Channel queueChannel = getChannel(toChannel, true); + bindChannel(queueChannel, toChannel, toChannel); + } } } diff --git a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java index 066fe9ff5cada3f47fe679e8964bba118aa8897b..3dba0e389caed82807c33fb06e9ab52c218285b7 100644 --- a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java +++ b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver1.java @@ -20,7 +20,7 @@ public class RabbitMqReceiver1 { //设置 mq 的相关信息 JbootApplication.setBootArg("jboot.mq.type", "rabbitmq"); - JbootApplication.setBootArg("jboot.mq.channel", "channel1"); + JbootApplication.setBootArg("jboot.mq.channel", "channel1,channel2"); //以下可以不用配置,是默认信息 diff --git a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java index c6fb0a1d58bdb4563e18ca27d21e05ed5cd52741..9baac9beb306c9481d53065d530ecd8fa9a447e4 100644 --- a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java +++ b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqReceiver2.java @@ -19,8 +19,9 @@ public class RabbitMqReceiver2 { //设置 mq 的相关信息 JbootApplication.setBootArg("jboot.mq.type", "rabbitmq"); - JbootApplication.setBootArg("jboot.mq.channel", "channel2"); - + JbootApplication.setBootArg("jboot.mq.channel", "channel1,channel2,myChannel"); +// JbootApplication.setBootArg("jboot.mq.rabbitmq.useQueue", false); + JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastExchangeDeclareExchangeType", "direct"); //以下可以不用配置,是默认信息 // JbootApplication.setBootArg("jboot.mq.rabbitmq.username", "guest"); @@ -30,7 +31,7 @@ public class RabbitMqReceiver2 { // JbootApplication.setBootArg("jboot.mq.rabbitmq.virtualHost", ""); //非常重要,多个应用如果同时接受同一个 channel 的广播,必须配置此项,而且必须不能相同,否则广播的时候只有一个应用能够接受到 - JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastChannelPrefix", "app2"); +// JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastChannelPrefix", "app2"); //启动应用程序 JbootApplication.run(args); @@ -43,7 +44,7 @@ public class RabbitMqReceiver2 { public void onMessage(String channel, Object message, MessageContext context) { System.out.println("Receive msg: " + message + ", from channel: " + channel); } - },"channel2"); + }); Jboot.getMq().startListening(); diff --git a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java index 9b5d5634888dfb016c5747aab2b48f7e03c9f8ba..620b023a8fa9be0e419ae7fd02b9237d037eb98a 100644 --- a/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java +++ b/src/test/java/io/jboot/test/mq/rabbit/RabbitMqSender.java @@ -17,6 +17,7 @@ public class RabbitMqSender { //设置 mq 的相关信息 JbootApplication.setBootArg("jboot.mq.type", "rabbitmq"); + JbootApplication.setBootArg("jboot.mq.rabbitmq.broadcastExchangeDeclareExchangeType", "direct"); //以下可以不用配置,是默认信息 // JbootApplication.setBootArg("jboot.mq.rabbitmq.username", "guest");