diff --git a/src/main/java/io/jboot/components/mq/Jbootmq.java b/src/main/java/io/jboot/components/mq/Jbootmq.java index e64a98f9e9f646dc4eee9ceb23138ec453a7fe97..552d4587a2d8eca3c57f8643255c861897dfecad 100644 --- a/src/main/java/io/jboot/components/mq/Jbootmq.java +++ b/src/main/java/io/jboot/components/mq/Jbootmq.java @@ -16,6 +16,7 @@ package io.jboot.components.mq; import java.util.Collection; +import java.util.Map; public interface Jbootmq { @@ -37,5 +38,9 @@ public interface Jbootmq { public Collection getListenersByChannel(String channel); public boolean startListening(); + + public void enqueue(Object message, String toChannel, Map arguments); + + public void publish(Object message, String toChannel, Map arguments); } diff --git a/src/main/java/io/jboot/components/mq/JbootmqBase.java b/src/main/java/io/jboot/components/mq/JbootmqBase.java index d725eb915c3c5c876137c7df4d8ba72643e5fbb0..422aea2894d91c39d6ba67bc92fd75ebd7e422cd 100644 --- a/src/main/java/io/jboot/components/mq/JbootmqBase.java +++ b/src/main/java/io/jboot/components/mq/JbootmqBase.java @@ -28,6 +28,7 @@ import io.jboot.utils.StrUtil; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.*; @@ -176,5 +177,15 @@ public abstract class JbootmqBase implements Jbootmq { protected abstract void onStartListening(); + + @Override + public void enqueue(Object message, String toChannel, Map arguments) { + + } + + @Override + public void publish(Object message, String toChannel, Map arguments) { + + } } diff --git a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java index 0c6ce9e50f561e5b1ebda6a211a53d2ae094d55c..47e351ef039b263b62dfdea539eee4b4eab7ce3a 100644 --- a/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java +++ b/src/main/java/io/jboot/components/mq/rabbitmq/JbootRabbitmqImpl.java @@ -174,5 +174,56 @@ public class JbootRabbitmqImpl extends JbootmqBase implements Jbootmq { } } - + @Override + public void enqueue(Object message, String toChannel, Map arguments) { + Channel channel = getChannel(toChannel, true, arguments); + try { + byte[] bytes = getSerializer().serialize(message); + channel.basicPublish("", toChannel, MessageProperties.BASIC, bytes); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void publish(Object message, String toChannel, Map arguments) { + Channel channel = getChannel(toChannel, false, arguments); + try { + byte[] bytes = getSerializer().serialize(message); + channel.basicPublish(toChannel, "", MessageProperties.BASIC, bytes); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public synchronized Channel getChannel(String toChannel, boolean queueMode, Map arguments) { + + Channel channel = channelMap.get(toChannel + queueMode); + if (channel == null) { + try { + channel = connection.createChannel(); + + // 队列模式,只需要创建 队列就可以了,不需要定义交换机 + if (queueMode) { + channel.queueDeclare(toChannel, true, false, false, arguments); + } + + // 广播模式,需要定义交换机,发送者直接把消息发送到交换机里 + else { + channel.queueDeclare(buildBroadcastChannelName(toChannel), false, false, true, arguments); + channel.exchangeDeclare(toChannel, BuiltinExchangeType.FANOUT, true); + channel.queueBind(buildBroadcastChannelName(toChannel), toChannel, rabbitmqConfig.getBroadcastChannelRoutingKey()); + } + + } catch (Exception ex) { + throw new JbootException("Can not create rabbit mq channel.", ex); + } + + if (channel != null) { + channelMap.put(toChannel + queueMode, channel); + } + } + + return channel; + } }