diff --git a/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java b/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java index 26087821f56c755d45e26368b19c1c476a180a6d..75206e6472e22f2a9ca782cfe970b3b5e12e6121 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java +++ b/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java @@ -3,10 +3,12 @@ package com.jd.platform.jlog.client; import com.alibaba.fastjson.JSON; import com.jd.platform.jlog.client.mdc.Mdc; +import com.jd.platform.jlog.client.modeholder.ModeHolder; import com.jd.platform.jlog.client.task.Monitor; import com.jd.platform.jlog.client.udp.HttpSender; import com.jd.platform.jlog.client.udp.UdpClient; import com.jd.platform.jlog.client.udp.UdpSender; +import com.jd.platform.jlog.common.constant.SendMode; import com.jd.platform.jlog.common.handler.TagConfig; import com.jd.platform.jlog.core.ClientHandlerBuilder; import com.jd.platform.jlog.core.Configurator; @@ -37,7 +39,7 @@ public class TracerClientStarter { */ private TagConfig tagConfig; - + private SendMode sendMode; /** * TracerClientStarter */ @@ -52,6 +54,7 @@ public class TracerClientStarter { private String appName; private Mdc mdc; private TagConfig tagConfig; + private SendMode sendMode; public Builder() { } @@ -66,6 +69,11 @@ public class TracerClientStarter { return this; } + public Builder setSendMode(SendMode sendMode) { + this.sendMode = sendMode; + return this; + } + public Builder setTagConfig(TagConfig tagConfig) { this.tagConfig = tagConfig; return this; @@ -75,6 +83,7 @@ public class TracerClientStarter { TracerClientStarter tracerClientStarter = new TracerClientStarter(appName); tracerClientStarter.tagConfig = tagConfig; tracerClientStarter.mdc = mdc; + tracerClientStarter.sendMode=sendMode; return tracerClientStarter; } } @@ -88,6 +97,8 @@ public class TracerClientStarter { Context.MDC = mdc; + ModeHolder.setSendMode(this.sendMode); + Monitor starter = new Monitor(); starter.start(); diff --git a/client/src/main/java/com/jd/platform/jlog/client/modeholder/ModeHolder.java b/client/src/main/java/com/jd/platform/jlog/client/modeholder/ModeHolder.java new file mode 100644 index 0000000000000000000000000000000000000000..ad90de921c42d1ad2272fa01ea99721e4c277571 --- /dev/null +++ b/client/src/main/java/com/jd/platform/jlog/client/modeholder/ModeHolder.java @@ -0,0 +1,32 @@ +package com.jd.platform.jlog.client.modeholder; + +import com.alibaba.ttl.TransmittableThreadLocal; +import com.jd.platform.jlog.common.constant.SendMode; + +/** + * 线程间传递通讯模式(单播、多播) + */ +public class ModeHolder { + /** + * 用于在线程池间也能透传SendMode + */ + private static TransmittableThreadLocal context = new TransmittableThreadLocal<>(); + /** + * 设置SendMode到线程里 + */ + public static void setSendMode(SendMode mode) { + context.set(mode); + } + + /** + * 如果没有SendMode,说明没设置上,则返回一个默认值,默认是单播模式 + */ + public static SendMode getSendMode() { + try { + return context.get(); + } catch (Exception e) { + return new SendMode(); + } + } + +} diff --git a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java index e7c93a1488e381bd7947aa90c90bae29af856cb6..7bf45a4027120050839bc10c0a7b49256fefec1b 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java +++ b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java @@ -1,6 +1,7 @@ package com.jd.platform.jlog.client.udp; import com.jd.platform.jlog.client.Context; +import com.jd.platform.jlog.client.modeholder.ModeHolder; import com.jd.platform.jlog.client.worker.WorkerInfoHolder; import com.jd.platform.jlog.common.constant.Constant; import com.jd.platform.jlog.common.model.TracerData; @@ -91,15 +92,19 @@ public class UdpClient { ByteBuf buf = channelHandlerContext.alloc().buffer(compressBytes.length); buf.writeBytes(compressBytes); - - //挑一个worker - String workerIpPort = WorkerInfoHolder.chooseWorker(); - if (workerIpPort == null) { - return; + InetSocketAddress remoteAddress=null; + if(ModeHolder.getSendMode().getUnicast()){ + //挑选worker + String workerIpPort = WorkerInfoHolder.chooseWorker(); + if (workerIpPort == null) { + return; + } + String[] ipPort = workerIpPort.split(Constant.SPLITER); + //发往worker的ip + remoteAddress= new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1])); + }else{ + remoteAddress=tracerData.getAddress(); } - String[] ipPort = workerIpPort.split(Constant.SPLITER); - //发往worker的ip - InetSocketAddress remoteAddress = new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1])); DatagramPacket packet = new DatagramPacket(buf, remoteAddress); list.add(packet); } diff --git a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java index 71805aae16b7972399016da3ef8e4c84ddcb0b7d..2fa36d0611a98080884436c582c1643f81339ddd 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java +++ b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java @@ -1,14 +1,20 @@ package com.jd.platform.jlog.client.udp; import com.jd.platform.jlog.client.Context; +import com.jd.platform.jlog.client.modeholder.ModeHolder; +import com.jd.platform.jlog.client.worker.WorkerInfoHolder; +import com.jd.platform.jlog.common.constant.Constant; import com.jd.platform.jlog.common.model.RunLogMessage; import com.jd.platform.jlog.common.model.TracerBean; import com.jd.platform.jlog.common.model.TracerData; import com.jd.platform.jlog.common.utils.AsyncPool; import com.jd.platform.jlog.common.utils.AsyncWorker; +import io.netty.channel.ChannelFuture; +import io.netty.channel.socket.DatagramPacket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -140,9 +146,23 @@ public class UdpSender { /** * 往worker发traceBean */ - private static void send(List tracerBeans) { + private static void send(List tracerBeans) throws InterruptedException { TracerData tracerData = new TracerData(); tracerData.setTracerBeanList(tracerBeans); - Context.CHANNEL.writeAndFlush(tracerData); + if(!ModeHolder.getSendMode().getUnicast()){ + Listips= WorkerInfoHolder.selectWorkers(); + for(String ip:ips){ + String[] ipPort = ip.split(Constant.SPLITER); + //发往worker的ip + InetSocketAddress remoteAddress = new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1])); + tracerData.setAddress(remoteAddress); + ChannelFuture future = Context.CHANNEL.writeAndFlush(tracerData); + //同步操作,否则会出现bug + future.sync(); + } + return; + }else { + Context.CHANNEL.writeAndFlush(tracerData); + } } } diff --git a/client/src/main/java/com/jd/platform/jlog/client/worker/WorkerInfoHolder.java b/client/src/main/java/com/jd/platform/jlog/client/worker/WorkerInfoHolder.java index e10f6992c2dded7bf37272cc9f2544c0b96a5495..164cc9daf2d444304343a504bb1fcb86791a4e2a 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/worker/WorkerInfoHolder.java +++ b/client/src/main/java/com/jd/platform/jlog/client/worker/WorkerInfoHolder.java @@ -1,6 +1,6 @@ package com.jd.platform.jlog.client.worker; - +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -33,12 +33,12 @@ public class WorkerInfoHolder { if (size == 0) { return workerIp; } + //按本机ip对worker数量进行hash // int index = Math.abs(IpUtils.getIp().hashCode() % size); if (index >= WORKER_HOLDER.size()) { index = 0; } - try { workerIp = WORKER_HOLDER.get(index); } catch (Exception e) { @@ -49,7 +49,15 @@ public class WorkerInfoHolder { return workerIp; } - + //多播模式 返回所有注册在注册中心为Work的地址 + public static List selectWorkers(){ + ListdefaultIps=new ArrayList<>(); + defaultIps.add("127.0.0.1:9999"); + if(WORKER_HOLDER.size()==0){ + return defaultIps; + } + return WORKER_HOLDER; + } /** * 监听到worker信息变化后 * 将新的worker信息和当前的进行合并,并且连接新的address diff --git a/common/src/main/java/com/jd/platform/jlog/common/constant/SendMode.java b/common/src/main/java/com/jd/platform/jlog/common/constant/SendMode.java new file mode 100644 index 0000000000000000000000000000000000000000..38cf7ccb79b9b7af39c49e23f071959643d3bb6e --- /dev/null +++ b/common/src/main/java/com/jd/platform/jlog/common/constant/SendMode.java @@ -0,0 +1,19 @@ +package com.jd.platform.jlog.common.constant; + +import java.io.Serializable; + +//通讯方式实体类 +public class SendMode implements Serializable { + //true为单播,false为多播 + private Boolean unicast=true; + public SendMode() { + } + public boolean getUnicast() { + return unicast; + } + + public void setUnicast(boolean unicast) { + this.unicast = unicast; + } + +} diff --git a/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java b/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java index 3866676c4593301ffb5e4675f56d5ca60325f06f..9354891e2dc9c809398b660bfdecbabf7743cf27 100644 --- a/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java +++ b/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java @@ -1,6 +1,7 @@ package com.jd.platform.jlog.common.model; import java.io.Serializable; +import java.net.InetSocketAddress; import java.util.List; /** @@ -15,12 +16,17 @@ public class TracerData implements Serializable { */ private List tracerBeanList; - @Override + private transient InetSocketAddress address; + /*@Override public String toString() { return "TracerData{" + "tracerBeanList=" + tracerBeanList + '}'; - } + }*/ + + public InetSocketAddress getAddress() { return address; } + + public void setAddress(InetSocketAddress address) { this.address = address; } public List getTracerBeanList() { return tracerBeanList; diff --git a/example/src/main/java/com/jd/platform/jlog/clientdemo/custom/Starter.java b/example/src/main/java/com/jd/platform/jlog/clientdemo/custom/Starter.java index 44c714a57aac42220da0f0c1061cf0849f28a03f..5d83af5068c5281da65f62af8018687fdb336ce0 100644 --- a/example/src/main/java/com/jd/platform/jlog/clientdemo/custom/Starter.java +++ b/example/src/main/java/com/jd/platform/jlog/clientdemo/custom/Starter.java @@ -2,6 +2,7 @@ package com.jd.platform.jlog.clientdemo.custom; import com.jd.platform.jlog.client.TracerClientStarter; import com.jd.platform.jlog.client.filter.HttpFilter; +import com.jd.platform.jlog.common.constant.SendMode; import com.jd.platform.jlog.common.handler.TagConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +25,16 @@ public class Starter { private Logger logger = LoggerFactory.getLogger(getClass()); private TagConfig tagConfig ; + //通讯方式(单播、多播),默认是单播 + private static SendMode sendMode=new SendMode(); + + public void setSendMode(SendMode sendMode) { + this.sendMode = sendMode; + } + + public SendMode getSendMode() { + return sendMode; + } public TagConfig getTagConfig() { return tagConfig; @@ -38,18 +49,16 @@ public class Starter { TracerClientStarter tracerClientStarter = new TracerClientStarter.Builder() .setAppName("demo") .setTagConfig(tagConfig) + .setSendMode(sendMode) .build(); logger.info("初始化tagConfig: {}",tagConfig); tracerClientStarter.startPipeline(); - - } @Bean public FilterRegistrationBean urlFilter() { FilterRegistrationBean registration = new FilterRegistrationBean(); HttpFilter userFilter = new HttpFilter(); - registration.setFilter(userFilter); registration.addUrlPatterns("/*"); registration.setName("UserTraceFilter"); diff --git a/example/src/main/resources/application.properties b/example/src/main/resources/application.properties index 2c4119351017ccc6aea461d8707bce50cc9aa8ec..1f447c71d467b8588fa26d461dcf47c4e599bb2c 100644 --- a/example/src/main/resources/application.properties +++ b/example/src/main/resources/application.properties @@ -9,6 +9,10 @@ apollo.meta=http://127.0.0.1:8080 apollo.config-service=http://127.0.0.1:8080 app.id=order +#设置通讯方式(单播或者多播),不设置则为单播(默认) +#单播为true,多播为false +#send-mode.unicast=true + tag-config.reqTags[0]=uid tag-config.reqTags[1]=url @@ -25,3 +29,6 @@ tag-config.extract=41 compress=68 threshold=10 +#worker的地址 +#workers=['127.0.0.1:9999','127.0.0.1:10000'] +