From dde4f941474dd0931eaf01db2e819fab5cf174f0 Mon Sep 17 00:00:00 2001 From: whp <124653847@qq.com> Date: Fri, 24 Dec 2021 13:42:08 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=89=A9=E5=B1=95=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=A4=E6=8D=A2=E6=9C=BA/=E9=98=9F?= =?UTF-8?q?=E5=88=97=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/io/jboot/components/mq/Jbootmq.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/io/jboot/components/mq/Jbootmq.java b/src/main/java/io/jboot/components/mq/Jbootmq.java index e64a98f9..552d4587 100644 --- a/src/main/java/io/jboot/components/mq/Jbootmq.java +++ b/src/main/java/io/jboot/components/mq/Jbootmq.java @@ -16,6 +16,7 @@ package io.jboot.components.mq; import java.util.Collection; +import java.util.Map; public interface Jbootmq { @@ -37,5 +38,9 @@ public interface Jbootmq { public Collection getListenersByChannel(String channel); public boolean startListening(); + + public void enqueue(Object message, String toChannel, Map arguments); + + public void publish(Object message, String toChannel, Map arguments); } -- Gitee From 69b727a17fe33751341d3a3aa56844246b3a97ed Mon Sep 17 00:00:00 2001 From: whp <124653847@qq.com> Date: Fri, 24 Dec 2021 13:46:41 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=89=A9=E5=B1=95=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=A4=E6=8D=A2=E6=9C=BA/=E9=98=9F?= =?UTF-8?q?=E5=88=97=E5=8F=82=E6=95=B0-1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/jboot/components/mq/JbootmqBase.java | 11 ++++ .../mq/rabbitmq/JbootRabbitmqImpl.java | 53 ++++++++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/jboot/components/mq/JbootmqBase.java b/src/main/java/io/jboot/components/mq/JbootmqBase.java index d725eb91..422aea28 100644 --- a/src/main/java/io/jboot/components/mq/JbootmqBase.java +++ b/src/main/java/io/jboot/components/mq/JbootmqBase.java @@ -28,6 +28,7 @@ import io.jboot.utils.StrUtil; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.*; @@ -176,5 +177,15 @@ public abstract class JbootmqBase implements Jbootmq { protected abstract void onStartListening(); + + @Override + public void enqueue(Object message, String toChannel, Map arguments) { + + } + + @Override + public void publish(Object message, String toChannel, Map arguments) { + + } } diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java index 0c6ce9e5..47e351ef 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java @@ -174,5 +174,56 @@ public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq { } } - + @Override + public void enqueue(Object message, String toChannel, Map arguments) { + Channel channel = getChannel(toChannel, true, arguments); + try { + byte[] bytes = getSerializer().serialize(message); + channel.basicPublish("", toChannel, MessageProperties.BASIC, bytes); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void publish(Object message, String toChannel, Map arguments) { + Channel channel = getChannel(toChannel, false, arguments); + try { + byte[] bytes = getSerializer().serialize(message); + channel.basicPublish(toChannel, "", MessageProperties.BASIC, bytes); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public synchronized Channel getChannel(String toChannel, boolean queueMode, Map arguments) { + + Channel channel = channelMap.get(toChannel + queueMode); + if (channel == null) { + try { + channel = connection.createChannel(); + + // 队列模式,只需要创建 队列就可以了,不需要定义交换机 + if (queueMode) { + channel.queueDeclare(toChannel, true, false, false, arguments); + } + + // 广播模式,需要定义交换机,发送者直接把消息发送到交换机里 + else { + channel.queueDeclare(buildBroadcastChannelName(toChannel), false, false, true, arguments); + channel.exchangeDeclare(toChannel, BuiltinExchangeType.FANOUT, true); + channel.queueBind(buildBroadcastChannelName(toChannel), toChannel, rabbitmqConfig.getBroadcastChannelRoutingKey()); + } + + } catch (Exception ex) { + throw new JbootException("Can not create rabbit mq channel.", ex); + } + + if (channel != null) { + channelMap.put(toChannel + queueMode, channel); + } + } + + return channel; + } } -- Gitee