From c566ac051211f5d6a2c216a73a6b5d5811ef159a Mon Sep 17 00:00:00 2001 From: liyunfeng Date: Sat, 19 Mar 2022 12:34:41 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=8E=BB=E9=99=A4base64?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jlog/dashboard/utils/CharTest.java | 293 ++++++++++++++++++ client/pom.xml | 8 +- .../jlog/client/TracerClientStarter.java | 7 +- .../platform/jlog/client/udp/UdpClient.java | 2 - .../platform/jlog/client/udp/UdpSender.java | 2 +- common/pom.xml | 6 - .../jlog/common/handler/CompressHandler.java | 14 +- .../jlog/common/handler/ExtractHandler.java | 11 +- .../platform/jlog/common/utils/IdWorker.java | 12 +- config/pom.xml | 15 - example/pom.xml | 18 -- .../jlog/clientdemo/web/TestController.java | 8 +- .../src/main/resources/application.properties | 20 +- .../com/jd/platform/jlog/test/Common.java | 67 ---- .../jlog/test/EtcdConfiguratorTest.java | 85 ----- .../jlog/test/FileConfiguratorTest.java | 69 ----- .../jlog/test/NacosConfiguratorTest.java | 81 ----- .../platform/jlog/test/TracerPacketTest.java | 79 ----- .../jlog/test/ZKConfiguratorTest.java | 79 ----- worker/pom.xml | 2 +- .../jlog/worker/WorkerApplication.java | 6 +- .../jlog/worker/config/CenterStarter.java | 19 +- .../com/jd/platform/jlog/worker/db/Db.java | 10 +- .../jlog/worker/disruptor/Producer.java | 4 +- .../jlog/worker/disruptor/TracerConsumer.java | 27 +- .../worker/store/TracerModelToDbStore.java | 4 +- .../platform/jlog/worker/udp/UdpServer.java | 3 +- worker/src/main/resources/application.yml | 19 +- 28 files changed, 390 insertions(+), 580 deletions(-) create mode 100644 Dashboard/src/main/java/com/jd/platform/jlog/dashboard/utils/CharTest.java delete mode 100644 example/src/test/java/com/jd/platform/jlog/test/Common.java delete mode 100644 example/src/test/java/com/jd/platform/jlog/test/EtcdConfiguratorTest.java delete mode 100644 example/src/test/java/com/jd/platform/jlog/test/FileConfiguratorTest.java delete mode 100644 example/src/test/java/com/jd/platform/jlog/test/NacosConfiguratorTest.java delete mode 100644 example/src/test/java/com/jd/platform/jlog/test/TracerPacketTest.java delete mode 100644 example/src/test/java/com/jd/platform/jlog/test/ZKConfiguratorTest.java diff --git a/Dashboard/src/main/java/com/jd/platform/jlog/dashboard/utils/CharTest.java b/Dashboard/src/main/java/com/jd/platform/jlog/dashboard/utils/CharTest.java new file mode 100644 index 0000000..8c56d39 --- /dev/null +++ b/Dashboard/src/main/java/com/jd/platform/jlog/dashboard/utils/CharTest.java @@ -0,0 +1,293 @@ +package com.jd.platform.jlog.dashboard.utils; + +import com.alibaba.fastjson.JSON; +import org.springframework.util.StringUtils; +import ru.yandex.clickhouse.ClickHouseConnection; +import ru.yandex.clickhouse.ClickHouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.sql.*; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 测试类 别用 + */ +@Deprecated +public class CharTest { + public static void main2(String[] args) throws SQLException, UnsupportedEncodingException, InterruptedException { + + List> list = new ArrayList<>(); + String sql = "select * from tracer_model"; + + Connection connection = getConn(); + Statement statement = connection.createStatement(); + + try { + int id = new Random().nextInt(10000); + String str = "SOSOSOSOSA1232哈哈哈"; + byte[] bt = ZstdUtils.compress(str.getBytes()); + + PreparedStatement pstmt = connection.prepareStatement("insert into test values(?, ?)"); + for (int i = 0; i < 1; i++) { + pstmt.setInt(1, id); + pstmt.setString(2, new String(bt,"ISO8859-1")); + pstmt.addBatch(); + } + // pstmt.executeBatch(); + } catch (SQLException e) { + e.printStackTrace(); + } + + ResultSet results = statement.executeQuery(sql); + ResultSetMetaData rsmd = results.getMetaData(); + + while(results.next()){ + Map row = new HashMap<>(); + for(int i = 1; i <= rsmd.getColumnCount(); i++){ + row.put(rsmd.getColumnName(i), results.getObject(rsmd.getColumnName(i))); + } + list.add(row); + } + + + String str4 = "SOSOSOSOSA1232哈哈哈"; + byte[] bt4 = ZstdUtils.compress(str4.getBytes()); + System.out.println("压缩后的 bt4 ==> "+Arrays.toString(bt4)); + System.out.println("list => "+list); + for (Map objectMap : list) { + if("7648".equals((objectMap.get("id").toString()))){ + System.out.println("bt4 ==> "+Arrays.toString(objectMap.get("name").toString() + .getBytes("ISO8859-1"))); + String result = ZstdUtils.decompress(objectMap.get("name").toString() + .getBytes("ISO8859-1")); + System.out.println("解密之后的结果: "+result); + } + + } + + + System.out.println("=================================="); + cutString(str4,8); + + } + + public static Connection getConn() { + + String username = "default"; + // String password = "123456"; + String address = "jdbc:clickhouse://101.42.242.201:8123"; + String db = "default"; + int socketTimeout = 600000; + + ClickHouseProperties properties = new ClickHouseProperties(); + properties.setUser(username); + // properties.setPassword(password); + properties.setDatabase(db); + properties.setSocketTimeout(socketTimeout); + ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(address, properties); + + ClickHouseConnection conn = null; + try { + conn = clickHouseDataSource.getConnection(); + return conn; + } catch (SQLException e) { + e.printStackTrace(); + } + + return null; + } + + private static byte[] format(String dbStr){ + String[] arr0 = dbStr.split(","); + + byte[] bt = new byte[arr0.length]; + for (int i = 0; i < arr0.length; i++) { + if(i == 0){ + String first = arr0[0].replace("[", ""); + System.out.println("first --> "+first); + bt[i] = Byte.valueOf(first); + continue; + } + if(i == arr0.length-1){ + String last = arr0[arr0.length-1].replace("]", ""); + bt[i] = Byte.valueOf(last.trim()); + continue; + } + System.out.println(arr0[i]+" len => "+arr0[i].length()); + bt[i] = Byte.parseByte(arr0[i].trim()); + } + return bt; + } + + + + + public static void cutString(String str, int n) throws UnsupportedEncodingException, InterruptedException { + + + AtomicInteger fail = new AtomicInteger(0); + AtomicInteger ok = new AtomicInteger(0); + + for (int j = 0; j < 2; j++) { + new Thread(() -> { + for (int i = 0; i < 5; i++) { + int id = new Random().nextInt(99000000); + String ml = String.valueOf(getRandomChar()) +getRandomChar()+getRandomChar()+getRandomChar()+id; + System.out.println("old--> "+ml); + byte[] zipBt= ZstdUtils.compress(ml.getBytes()); + String newStr = null; + + try { + newStr = new String(zipBt,StandardCharsets.ISO_8859_1); + } catch (Exception e) {} + + // byte[] resul = newStr.getBytes(); + // System.out.println("新字符串的bt => "+Arrays.toString(resul)); + + try { + byte[] resul = sub(newStr, 4); + if(resul[0]==40 && resul[1]==-62 && resul[2]==-75 && resul[3]==47 && resul[4]==-61 && resul[5]==-67 && resul[6]==32){ + ok.incrementAndGet(); + }else{ + fail.incrementAndGet(); + } + + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + try { + String val = ZstdUtils.decompress(newStr.getBytes(StandardCharsets.ISO_8859_1)); + System.out.println("result--> "+val); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + } + + + Thread.sleep(5000); + + System.out.println("ok == > "+ok.get()); + System.out.println("fail == > "+fail.get()); + + } + + private static byte[] sub(String str, int n) throws UnsupportedEncodingException { + // System.out.println("str => "+str); + StringBuilder sb = new StringBuilder(); + int count=0; + for (int i = 0; i < str.length(); i++) { + String c = String.valueOf(str.charAt(i)); + sb.append(c); + count+=c.getBytes("ISO8859-1").length; + if(count>n){ + // System.out.println("count=> "+count); + break; + } + } + return sb.toString().getBytes(); + // System.out.println(" bs==> "+ Arrays.toString(sb.toString().getBytes())); + } + + public static char getRandomChar() { + return (char) (0x4e00 + (int) (Math.random() * (0x9fa5 - 0x4e00 + 1))); + } + + + public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException, SQLException { + testChar(); + + + String str = "SOSOSOSOSA1232哈哈哈"; + byte[] bt = ZstdUtils.compress(str.getBytes()); + + String newnewStr = new String(bt, "ISO8859-1"); + System.out.println("bt4 ==> "+Arrays.toString(newnewStr + .getBytes("ISO8859-1"))); + String result = ZstdUtils.decompress(newnewStr + .getBytes("ISO8859-1")); + System.out.println("解密之后的结果: "+result); + + if(1==1){ + return; + } + // insert(); + List> list = new ArrayList<>(); + String sql = "select * from tracer_model"; + + Connection connection = getConn(); + Statement statement = connection.createStatement(); + ResultSet results = statement.executeQuery(sql); + ResultSetMetaData rsmd = results.getMetaData(); + + while(results.next()){ + Map row = new HashMap<>(); + for(int i = 1; i <= rsmd.getColumnCount(); i++){ + Object obj = results.getObject(rsmd.getColumnName(i)); + if(isZip(obj.toString())){ + System.out.println("%%%%%%%%### is zip"+obj.toString()); + obj = obj.toString().getBytes(StandardCharsets.ISO_8859_1); + } + // System.out.println("obj=> "+obj); + row.put(rsmd.getColumnName(i), obj); + } + list.add(row); + } + System.out.println(JSON.toJSONString(list)); + } + + public static boolean isZip(String str) throws UnsupportedEncodingException { + + if(StringUtils.isEmpty(str)){ + return false; + } + StringBuilder sb = new StringBuilder(); + int count=0; + for (int i = 0; i < str.length(); i++) { + String c = String.valueOf(str.charAt(i)); + sb.append(c); + count += c.getBytes("ISO8859-1").length; + if(count > 4){ + break; + } + } + byte[] bs = str.getBytes("ISO8859-1"); + // byte[] bs = sb.toString().getBytes("ISO8859-1"); + System.out.println(str+ " <=== ### ==> "+ Arrays.toString(bs)); + + return bs[0] == 40 && bs[1] == -62 && bs[2] == -75 && bs[3] == 47 && bs[4] == -61 && bs[5] == -67 && bs[6] == 32; + } + + private static void insert() throws SQLException, UnsupportedEncodingException { + int id = new Random().nextInt(10000); + String str = "滴滴员工tangbohu的终身代号是什么???是9527"; + byte[] bt = ZstdUtils.compress(str.getBytes()); + + PreparedStatement pstmt = getConn().prepareStatement("insert into tracer_model (responseContent) values(?)"); + for (int i = 0; i < 1; i++) { + pstmt.setString(1, new String(bt,"ISO8859-1")); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + + //[40, -75, 47, -3, 32, + private static void testChar() throws UnsupportedEncodingException { + for (int i = 0; i < 10; i++) { + String str =String.valueOf(getRandomChar())+ String.valueOf(getRandomChar()) +i; + byte[] initBs = ZstdUtils.compress(str.getBytes()); + String zipStr = new String(initBs, "ISO8859-1"); + System.out.println("@@@ => "+ Arrays.toString(zipStr.getBytes("ISO8859-1"))); + } + + /* String str = "滴滴员工tangbohu的终身代号是什么???是9527"; + byte[] initBs = ZstdUtils.compress(str.getBytes()); + String zipStr = new String(initBs, "ISO8859-1"); + System.out.println("@@@ => "+ Arrays.toString(zipStr.getBytes("ISO8859-1"))); + */ + } +} diff --git a/client/pom.xml b/client/pom.xml index 47d1a0e..f65e2bf 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -34,16 +34,16 @@ 1.4-SNAPSHOT --> - + + com.jd.platfrom.jlog config-core 1.4-SNAPSHOT - --> + javax.servlet servlet-api diff --git a/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java b/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java index 32f41c3..2608782 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java +++ b/client/src/main/java/com/jd/platform/jlog/client/TracerClientStarter.java @@ -4,6 +4,9 @@ package com.jd.platform.jlog.client; import com.alibaba.fastjson.JSON; import com.jd.platform.jlog.client.mdc.Mdc; import com.jd.platform.jlog.client.task.Monitor; +import com.jd.platform.jlog.client.udp.HttpSender; +import com.jd.platform.jlog.client.udp.UdpClient; +import com.jd.platform.jlog.client.udp.UdpSender; import com.jd.platform.jlog.common.handler.TagConfig; import com.jd.platform.jlog.core.ClientHandlerBuilder; import com.jd.platform.jlog.core.Configurator; @@ -88,14 +91,14 @@ public class TracerClientStarter { Monitor starter = new Monitor(); starter.start(); - /* UdpClient udpClient = new UdpClient(); + UdpClient udpClient = new UdpClient(); udpClient.start(); //开启发送 UdpSender.uploadToWorker(); //开启大对象http发送 - HttpSender.uploadToWorker();*/ + HttpSender.uploadToWorker(); } diff --git a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java index af4645b..e7c93a1 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java +++ b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java @@ -68,8 +68,6 @@ public class UdpClient { //7.关闭group group.shutdownGracefully(); } catch (InterruptedException e) { - System.out.println("NioEventLoopGroup ==> "+e.toString()); - e.printStackTrace(); } } diff --git a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java index c563181..d8e174b 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java +++ b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpSender.java @@ -119,7 +119,7 @@ public class UdpSender { List tempTracers = new ArrayList<>(); TracerBean tracerBean = new TracerBean(); - tracerBean.setTracerId("-99999"); + tracerBean.setTracerId("-1"); List> tracerObject = new ArrayList<>(); Map map = new HashMap<>(); diff --git a/common/pom.xml b/common/pom.xml index 1999299..e569fde 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -8,7 +8,6 @@ 1.4-SNAPSHOT 4.0.0 - common @@ -18,11 +17,6 @@ 1.5.0-4 1.2.70 1.7.2 - - diff --git a/common/src/main/java/com/jd/platform/jlog/common/handler/CompressHandler.java b/common/src/main/java/com/jd/platform/jlog/common/handler/CompressHandler.java index 5d13f83..e7eaa17 100644 --- a/common/src/main/java/com/jd/platform/jlog/common/handler/CompressHandler.java +++ b/common/src/main/java/com/jd/platform/jlog/common/handler/CompressHandler.java @@ -1,10 +1,6 @@ package com.jd.platform.jlog.common.handler; -import com.alibaba.fastjson.JSON; import com.jd.platform.jlog.common.utils.ZstdUtils; -import com.sun.istack.internal.NotNull; - -import java.util.Base64; import java.util.Map; import static com.jd.platform.jlog.common.constant.Constant.MIN; @@ -56,7 +52,7 @@ public class CompressHandler { } public static Map compressReq(Map map){ - if(instance == null || !isMatched(instance.compress, E_REQ)){ + if(instance == null || !isMatched(instance.compress, C_REQ)){ return map; } @@ -67,12 +63,12 @@ public class CompressHandler { } public static byte[] compressLog(byte[] contentBytes){ - if(instance == null || !isMatched(instance.compress, E_LOG)){ return contentBytes; } + if(instance == null || !isMatched(instance.compress, C_LOG)){ return contentBytes; } return doCompress(contentBytes); } public static byte[] compressResp(byte[] contentBytes){ - if(instance == null || !isMatched(instance.compress, E_RESP)){ return contentBytes; } + if(instance == null || !isMatched(instance.compress, C_RESP)){ return contentBytes; } return doCompress(contentBytes); } @@ -80,9 +76,7 @@ public class CompressHandler { if(contentBytes.length < instance.threshold){ return contentBytes; } - //最终的要发往worker的response,经历了base64压缩 - byte[] bytes = ZstdUtils.compress(contentBytes); - return Base64.getEncoder().encode(bytes); + return ZstdUtils.compress(contentBytes); } diff --git a/common/src/main/java/com/jd/platform/jlog/common/handler/ExtractHandler.java b/common/src/main/java/com/jd/platform/jlog/common/handler/ExtractHandler.java index b722076..fae5205 100644 --- a/common/src/main/java/com/jd/platform/jlog/common/handler/ExtractHandler.java +++ b/common/src/main/java/com/jd/platform/jlog/common/handler/ExtractHandler.java @@ -1,10 +1,8 @@ package com.jd.platform.jlog.common.handler; import com.alibaba.fastjson.JSON; -import com.jd.platform.jlog.common.utils.CollectionUtil; import com.jd.platform.jlog.common.utils.ConfigUtil; import com.jd.platform.jlog.common.utils.StringUtil; -import com.sun.istack.internal.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,18 +85,13 @@ public class ExtractHandler { public static Map extractReqTag(Map reqMap) { if(instance == null || !isMatched(instance.extract, E_REQ)){ return null; } - - System.out.println("### REQ INSTANCE :"+instance.toString()); - Map tagMap = new HashMap<>(instance.reqTags.size()); for (String tag : instance.reqTags) { Object val = reqMap.get(tag); if(val != null){ tagMap.put(tag, val); } - } - System.out.println("提取到了请求入参日志标签:"+JSON.toJSONString(tagMap)); - return tagMap; + }return tagMap; } @@ -129,7 +122,6 @@ public class ExtractHandler { } } } - System.out.println("提取到了请求log日志标签:"+JSON.toJSONString(tagMap)); return tagMap; } @@ -150,7 +142,6 @@ public class ExtractHandler { requestMap.put(tag, val); } } - System.out.println("提取到了请求出参日志标签:"+JSON.toJSONString(requestMap)); return requestMap; } diff --git a/common/src/main/java/com/jd/platform/jlog/common/utils/IdWorker.java b/common/src/main/java/com/jd/platform/jlog/common/utils/IdWorker.java index 4b60305..38c250d 100644 --- a/common/src/main/java/com/jd/platform/jlog/common/utils/IdWorker.java +++ b/common/src/main/java/com/jd/platform/jlog/common/utils/IdWorker.java @@ -1,6 +1,5 @@ package com.jd.platform.jlog.common.utils; -import com.sun.tools.javac.util.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,14 +85,19 @@ public class IdWorker { * @param workerId 工作进程Id */ private static void setWorkerId(final Long workerId) { - Assert.check(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE); - IdWorker.workerId = workerId; + if(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE){ + IdWorker.workerId = workerId; + }else{ + throw new RuntimeException("workerId is illegal"); + } } //下一个ID生成算法 public static long nextId() { long time = System.currentTimeMillis(); - Assert.check(lastTime <= time, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds"+lastTime); + if(lastTime > time){ + throw new RuntimeException("Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds"+lastTime); + } if (lastTime == time) { if (0L == (sequence = ++sequence & SEQUENCE_MASK)) { time = waitUntilNextTime(time); diff --git a/config/pom.xml b/config/pom.xml index e9885c4..616aa0b 100644 --- a/config/pom.xml +++ b/config/pom.xml @@ -1,19 +1,4 @@ - diff --git a/example/pom.xml b/example/pom.xml index b177a8e..14a4cb1 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -40,24 +40,6 @@ spring-boot-configuration-processor true - - org.springframework.boot - spring-boot-test - 2.5.5 - test - - - junit - junit - 4.13.2 - test - - - org.springframework - spring-test - 5.3.10 - test - org.projectlombok lombok diff --git a/example/src/main/java/com/jd/platform/jlog/clientdemo/web/TestController.java b/example/src/main/java/com/jd/platform/jlog/clientdemo/web/TestController.java index 0a3b385..26fe6e0 100644 --- a/example/src/main/java/com/jd/platform/jlog/clientdemo/web/TestController.java +++ b/example/src/main/java/com/jd/platform/jlog/clientdemo/web/TestController.java @@ -66,11 +66,11 @@ public class TestController { @PostMapping(value = "/test", consumes = MediaType.APPLICATION_JSON_VALUE) public Object test(@RequestParam Integer uid, @RequestParam Integer newKey,@RequestBody TestReq req) { - String config = ConfiguratorFactory.getInstance().getString("reqTags"); - // System.out.println("tagConfig ===> " + tagConfig.toString()); RequestLog.info("|errno=val3||node=val4||这是随便的log|"); - - return 1; + if(newKey == 1){ + return 1; + } + return "滴滴员工tangbohu的终身代号是什么???是9527"; } diff --git a/example/src/main/resources/application.properties b/example/src/main/resources/application.properties index 19b36d2..3050b8f 100644 --- a/example/src/main/resources/application.properties +++ b/example/src/main/resources/application.properties @@ -3,4 +3,22 @@ serverAddr=101.42.242.201:2379 apollo.meta=http://127.0.0.1:8080 apollo.config-service=http://127.0.0.1:8080 -app.id=order \ No newline at end of file +app.id=order + + +tag-config.reqTags[0]=uid +tag-config.reqTags[1]=url + +tag-config.logTags[0]=node +tag-config.logTags[1]=bizType + +tag-config.respTags[0]=errno +tag-config.respTags[1]=msg +tag-config.delimiter=| +tag-config.join== +tag-config.extract=41 + +compress=68 +threshold=10 + +workers=[1,2,3] diff --git a/example/src/test/java/com/jd/platform/jlog/test/Common.java b/example/src/test/java/com/jd/platform/jlog/test/Common.java deleted file mode 100644 index b6034fb..0000000 --- a/example/src/test/java/com/jd/platform/jlog/test/Common.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.jd.platform.jlog.test; - -import com.alibaba.fastjson.JSON; -import com.jd.platform.jlog.common.handler.TagConfig; -import com.jd.platform.jlog.core.Configurator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.List; -import java.util.Random; - -/** - * @author tangbohu - * @version 1.0.0 - * @ClassName Common.java - * @Description TODO - * @createTime 2022年03月01日 07:36:00 - */ -public class Common { - - private final static Logger LOGGER = LoggerFactory.getLogger(Common.class); - - - public static void getTest(Configurator configurator){ - - LOGGER.info("配置器类型:{}", configurator.getType()); - String addr = configurator.getString("serverAddr"); - LOGGER.info("配置器get addr:{}", addr); - TagConfig tagConfig = configurator.getObject("tag-config", TagConfig.class); - LOGGER.info("配置器get tagConfig:{}", tagConfig.toString()); - List workers = configurator.getList("workers"); - LOGGER.info("配置器get workers:{}", JSON.toJSONString(workers)); - } - - - public static void modifyFile(String path)throws Exception{ - - String temp; - File file = new File(path); - FileInputStream fis = new FileInputStream(file); - InputStreamReader isr = new InputStreamReader(fis); - BufferedReader br = new BufferedReader(isr); - StringBuffer buf = new StringBuffer(); - - int id = new Random().nextInt(1000); - int num = 0; - // 保存该行前面的内容 - while ((temp = br.readLine()) != null) { - if(num == 0){ - buf = buf.append("testKey: ").append(id); - }else{ - buf = buf.append(temp); - } - num++; - buf = buf.append(System.getProperty("line.separator")); - } - - br.close(); - FileOutputStream fos = new FileOutputStream(file); - PrintWriter pw = new PrintWriter(fos); - pw.write(buf.toString().toCharArray()); - pw.flush(); - pw.close(); - } - -} diff --git a/example/src/test/java/com/jd/platform/jlog/test/EtcdConfiguratorTest.java b/example/src/test/java/com/jd/platform/jlog/test/EtcdConfiguratorTest.java deleted file mode 100644 index 4a5b575..0000000 --- a/example/src/test/java/com/jd/platform/jlog/test/EtcdConfiguratorTest.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.jd.platform.jlog.test; - -import com.alibaba.fastjson.JSON; -import com.jd.platform.jlog.clientdemo.ExampleApplication; -import com.jd.platform.jlog.core.Configurator; -import com.jd.platform.jlog.core.ConfiguratorFactory; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -import java.util.List; -import java.util.Random; - -import static com.jd.platform.jlog.test.Common.getTest; - -/** - * @author tangbohu - * @version 1.0.0 - * @ClassName EtcdConfiguratorTest.java - * @Description TODO - * @createTime 2022年03月03日 07:35:00 - */ -@SpringBootTest(classes = ExampleApplication.class) -@RunWith(SpringRunner.class) -public class EtcdConfiguratorTest { - - - - private final static Logger LOGGER = LoggerFactory.getLogger(EtcdConfiguratorTest.class); - - - private Configurator configurator = null; - - - - @Before - public void init() { - configurator = ConfiguratorFactory.getInstance(); - getTest(configurator); - } - - // @Test - public void testUpdateCFG() throws Exception { - List workers = configurator.getList("workers"); - LOGGER.info("初始化的workers:{}", JSON.toJSONString(workers)); - String myIp = "121.1.1.0"; - if(workers.contains(myIp)){ - // do nothing - LOGGER.info("自己的IP还在配置list里 什么也不做"); - return; - }else{ - LOGGER.info("自己的IP不在配置list里 添加进去并发布"); - workers.add(myIp); - } - configurator.putConfig("workers",JSON.toJSONString(workers)); - List workers2 = configurator.getList("workers"); - LOGGER.info("最新的workers:{}", JSON.toJSONString(workers2)); - } - - @Test - public void testAddConfigListener() throws Exception { - int i1 = new Random().nextInt(2000); - int i2 = new Random().nextInt(2000); - - String val1 = configurator.getString("testKey"); - LOGGER.info("初始化的testKey的val:{}", val1); - LOGGER.info("添加监听器后, 修改配置testKey = {}", i1); - configurator.putConfig("testKey",i1 + ""); - LOGGER.info("修改完毕 准备触发监听器"); - Thread.sleep(5000); - String val2 = configurator.getString("testKey"); - LOGGER.info("第一次修改后的的val:{}", val2); - Thread.sleep(5000); - // LOGGER.info("移除监听器后:修改配置testKey = {}",i2); - // configurator.putConfig("testKey",i2 + ""); - LOGGER.info("准备验证监听器是否停止 最新testKey={}", configurator.getString("testKey")); - LOGGER.info("第二次添加监听器"); - Thread.sleep(22000); - - } -} diff --git a/example/src/test/java/com/jd/platform/jlog/test/FileConfiguratorTest.java b/example/src/test/java/com/jd/platform/jlog/test/FileConfiguratorTest.java deleted file mode 100644 index e3eca92..0000000 --- a/example/src/test/java/com/jd/platform/jlog/test/FileConfiguratorTest.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.jd.platform.jlog.test; - -import com.alibaba.fastjson.JSON; -import com.jd.platform.jlog.clientdemo.ExampleApplication; -import com.jd.platform.jlog.core.Configurator; -import com.jd.platform.jlog.core.ConfiguratorFactory; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; -import org.yaml.snakeyaml.Yaml; - -import java.io.*; -import java.util.Map; -import java.util.Properties; - -import static com.jd.platform.jlog.test.Common.getTest; -import static com.jd.platform.jlog.test.Common.modifyFile; - - -/** - * @author tangbohu - * @version 1.0.0 - * @ClassName FileConfiguratorTest.java - * @Description TODO - * @createTime 2022年02月28日 19:45:00 - */ -@SpringBootTest(classes = ExampleApplication.class) -@RunWith(SpringRunner.class) -public class FileConfiguratorTest { - - - private final static Logger LOGGER = LoggerFactory.getLogger(FileConfiguratorTest.class); - - - private Configurator configurator = null; - - - - @Before - public void init() { - configurator = ConfiguratorFactory.getInstance(); - getTest(configurator); - } - - - - @Test - public void testAddConfigListener() throws Exception { - String path = "/Users/didi/Desktop/jlog/example/target/classes/bakapplication.yml"; - Properties props = new Properties(); - FileInputStream fis = new FileInputStream(new File(path)); - if (path.contains("yml")) { - props.putAll(new Yaml().loadAs(fis, Map.class)); - } else { - props.load(fis); - } - LOGGER.info("读取文件:{} 最新配置:{}", path, JSON.toJSONString(props)); - modifyFile(path); - LOGGER.info("修改文件完毕 准备触发监听器"); - Thread.sleep(10000); - } - - - -} diff --git a/example/src/test/java/com/jd/platform/jlog/test/NacosConfiguratorTest.java b/example/src/test/java/com/jd/platform/jlog/test/NacosConfiguratorTest.java deleted file mode 100644 index 38c77c9..0000000 --- a/example/src/test/java/com/jd/platform/jlog/test/NacosConfiguratorTest.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.jd.platform.jlog.test; - -import com.alibaba.fastjson.JSON; -import com.jd.platform.jlog.clientdemo.ExampleApplication; -import com.jd.platform.jlog.core.Configurator; -import com.jd.platform.jlog.core.ConfiguratorFactory; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -import java.util.List; -import java.util.Random; - -import static com.jd.platform.jlog.test.Common.getTest; - -/** - * @author tangbohu - * @version 1.0.0 - * @ClassName NacosConfiguratorTest.java - * @Description TODO - * @createTime 2022年03月01日 07:35:00 - */ -@SpringBootTest(classes = ExampleApplication.class) -@RunWith(SpringRunner.class) -public class NacosConfiguratorTest { - - - - private final static Logger LOGGER = LoggerFactory.getLogger(NacosConfiguratorTest.class); - - - private Configurator configurator = null; - - - - @Before - public void init() { - configurator = ConfiguratorFactory.getInstance(); - getTest(configurator); - } - - @Test - public void testUpdateCFG() throws Exception { - List workers = configurator.getList("workers"); - LOGGER.info("初始化的workers:{}", JSON.toJSONString(workers)); - String myIp = "121.1.1.0"; - if(workers.contains(myIp)){ - // do nothing - LOGGER.info("自己的IP还在配置list里 什么也不做"); - return; - }else{ - LOGGER.info("自己的IP不在配置list里 添加进去并发布"); - workers.add(myIp); - } - configurator.putConfig("workers",JSON.toJSONString(workers)); - List workers2 = configurator.getList("workers"); - LOGGER.info("最新的workers:{}", JSON.toJSONString(workers2)); - } - - @Test - public void testAddConfigListener() throws Exception { - int i1 = new Random().nextInt(2000); - int i2 = new Random().nextInt(2000); - - String val1 = configurator.getString("testKey"); - LOGGER.info("初始化的testKey的val:{}", val1); - // configurator.putConfig("testKey",i1 + ""); - LOGGER.info("修改完毕 准备触发监听器"); - Thread.sleep(1000); - String val2 = configurator.getString("testKey"); - LOGGER.info("第一次修改后的的val:{}", val2); - configurator.putConfig("testKey",i2 + ""); - LOGGER.info("准备验证监听器是否停止 最新testKey={}", configurator.getString("testKey")); - - Thread.sleep(35000); - } -} diff --git a/example/src/test/java/com/jd/platform/jlog/test/TracerPacketTest.java b/example/src/test/java/com/jd/platform/jlog/test/TracerPacketTest.java deleted file mode 100644 index 0ef5a23..0000000 --- a/example/src/test/java/com/jd/platform/jlog/test/TracerPacketTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.jd.platform.jlog.test; - -import com.alibaba.fastjson.JSON; -import com.jd.platform.jlog.client.udp.UdpSender; -import com.jd.platform.jlog.clientdemo.ExampleApplication; -import com.jd.platform.jlog.common.model.TracerBean; -import com.jd.platform.jlog.common.utils.IpUtils; -import com.jd.platform.jlog.common.utils.ZstdUtils; -import org.apache.tomcat.util.codec.binary.Base64; - -import java.nio.charset.StandardCharsets; -import java.util.*; - -/** - * 跳过过滤器,手动发送供worker消费的日志 - * - * @author shenkaiwen5 - * @version 1.0 - * @date 2021-12-27 - */ -//@SpringBootTest(classes = ExampleApplication.class) -//@RunWith(SpringRunner.class) -public class TracerPacketTest { - - //@Test - public void testSendUdp() { - TracerBean tracerBean = new TracerBean(); - List> tracerObject = new ArrayList<>(); - tracerBean.setTracerObject(tracerObject); - //将request信息保存 - Map requestMap = new HashMap<>(16); - requestMap.put("tracerId", "1"); - requestMap.put("pin", UUID.randomUUID()); - requestMap.put("appName", "myTest"); - requestMap.put("uuid", "uuid" + UUID.randomUUID()); - requestMap.put("client", "android"); - requestMap.put("clientVersion", "10.3.2"); - requestMap.put("ip", "127.0.0.1"); - requestMap.put("serverIp", "127.0.0.1"); - requestMap.put("uri", "test"); - tracerObject.add(requestMap); - - //设置耗时 - tracerBean.setCostTime((int) (System.currentTimeMillis() - tracerBean.getCreateTime())); - - try { - byte[] contentBytes = testString().getBytes(StandardCharsets.UTF_8); - - //最终的要发往worker的response - byte[] bytes = ZstdUtils.compress(contentBytes); - byte[] base64Bytes = Base64.encodeBase64(bytes); - Map responseMap = new HashMap<>(8); - responseMap.put("response", base64Bytes); - - tracerObject.add(responseMap); - UdpSender.offerBean(tracerBean); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * 生成测试的返回值 - */ - private String testString() { - return JSON.toJSONString(new MyRes(IpUtils.getIp() + " send a test udp message.")); - } - - /** - * 测试返回值类 - */ - class MyRes { - String context; - - MyRes(String s) { - this.context = s; - } - } -} diff --git a/example/src/test/java/com/jd/platform/jlog/test/ZKConfiguratorTest.java b/example/src/test/java/com/jd/platform/jlog/test/ZKConfiguratorTest.java deleted file mode 100644 index 26582a4..0000000 --- a/example/src/test/java/com/jd/platform/jlog/test/ZKConfiguratorTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.jd.platform.jlog.test; - -import com.alibaba.fastjson.JSON; -import com.jd.platform.jlog.clientdemo.ExampleApplication; -import com.jd.platform.jlog.core.Configurator; -import com.jd.platform.jlog.core.ConfiguratorFactory; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -import java.util.List; - -import java.util.Random; - -import static com.jd.platform.jlog.test.Common.getTest; - -/** - * @author tangbohu - * @version 1.0.0 - * @ClassName ZKConfiguratorTest.java - * @Description TODO - * @createTime 2022年03月01日 07:35:00 - */ -@SpringBootTest(classes = ExampleApplication.class) -@RunWith(SpringRunner.class) -public class ZKConfiguratorTest { - - - - private final static Logger LOGGER = LoggerFactory.getLogger(ZKConfiguratorTest.class); - - - private Configurator configurator = null; - - - - @Before - public void init() { - configurator = ConfiguratorFactory.getInstance(); - getTest(configurator); - } - - @Test - public void testUpdateCFG() throws Exception { - List workers = configurator.getList("workers"); - LOGGER.info("初始化的workers:{}", JSON.toJSONString(workers)); - String myIp = "121.0"; - if(workers.contains(myIp)){ - LOGGER.info("自己的IP还在配置list里 什么也不做"); - return; - }else{ - LOGGER.info("自己的IP不在配置list里 添加进去并发布"); - workers.add(myIp); - } - configurator.putConfig("workers",JSON.toJSONString(workers)); - List workers2 = configurator.getList("workers"); - LOGGER.info("最新的workers:{}", JSON.toJSONString(workers2)); - } - - @Test - public void testAddConfigListener() throws Exception { - int i1 = new Random().nextInt(2000); - int i2 = new Random().nextInt(2000); - - String val1 = configurator.getString("testKey"); - LOGGER.info("初始化的testKey的val:{}", val1); - LOGGER.info("添加监听器后, 修改配置testKey = {}", i1); - // configurator.putConfig("testKey",i1 + ""); - LOGGER.info("修改完毕 准备触发监听器"); - Thread.sleep(1000); - String val2 = configurator.getString("testKey"); - LOGGER.info("第一次修改后的的val:{}", val2); - Thread.sleep(1000); - } -} diff --git a/worker/pom.xml b/worker/pom.xml index 8e02fe6..d924778 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -24,7 +24,7 @@ com.jd.platfrom.jlog - config-zk + config-core 1.4-SNAPSHOT diff --git a/worker/src/main/java/com/jd/platform/jlog/worker/WorkerApplication.java b/worker/src/main/java/com/jd/platform/jlog/worker/WorkerApplication.java index cc237b2..9fe285a 100644 --- a/worker/src/main/java/com/jd/platform/jlog/worker/WorkerApplication.java +++ b/worker/src/main/java/com/jd/platform/jlog/worker/WorkerApplication.java @@ -10,7 +10,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class WorkerApplication { public static void main(String[] args) { - SpringApplication.run(WorkerApplication.class, args); + try { + SpringApplication.run(WorkerApplication.class, args); + }catch (Exception e){ + e.printStackTrace(); + } } } diff --git a/worker/src/main/java/com/jd/platform/jlog/worker/config/CenterStarter.java b/worker/src/main/java/com/jd/platform/jlog/worker/config/CenterStarter.java index 9c234f4..68c725a 100644 --- a/worker/src/main/java/com/jd/platform/jlog/worker/config/CenterStarter.java +++ b/worker/src/main/java/com/jd/platform/jlog/worker/config/CenterStarter.java @@ -24,22 +24,6 @@ import java.util.concurrent.TimeUnit; */ @Component public class CenterStarter { - /** - * 该worker为哪个app服务 - */ - @Value("${config.workerPath}") - private String workerPath; - /** - * 配置中心地址 - */ - @Value("${config.server}") - private String configServer; - /** - * 机房 - */ - @Value("${config.mdc}") - private String mdc; - /** * 上报自己的ip到配置中心 @@ -68,8 +52,7 @@ public class CenterStarter { * 在配置中心存放的key */ private String buildKey() { - String hostName = IpUtils.getHostName(); - return Constant.WORKER_PATH + workerPath + "/" + hostName; + return IpUtils.getHostName(); } /** diff --git a/worker/src/main/java/com/jd/platform/jlog/worker/db/Db.java b/worker/src/main/java/com/jd/platform/jlog/worker/db/Db.java index be59585..9a22a18 100644 --- a/worker/src/main/java/com/jd/platform/jlog/worker/db/Db.java +++ b/worker/src/main/java/com/jd/platform/jlog/worker/db/Db.java @@ -1,10 +1,12 @@ package com.jd.platform.jlog.worker.db; +import com.alibaba.fastjson.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -13,6 +15,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static java.nio.charset.StandardCharsets.*; + /** * https://blog.csdn.net/linglongxin24/article/details/53769957 * @@ -81,7 +85,11 @@ public class Db { // logger.info(sql.toString()); for (Map data : datas) { for (int k = 0; k < keys.length; k++) { - preparedStatement.setObject(k + 1, data.get(keys[k])); + Object val = data.get(keys[k]); + if(val instanceof byte[]){ + val = new String((byte[]) val, ISO_8859_1); + } + preparedStatement.setObject(k + 1, val); } preparedStatement.addBatch(); } diff --git a/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/Producer.java b/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/Producer.java index bea30f1..474726f 100644 --- a/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/Producer.java +++ b/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/Producer.java @@ -46,8 +46,8 @@ public class Producer { logger.info("生产消费队列,已接收:" + totalReceive); } try { - OneTracer OneTracer = ringBuffer.get(sequence); - OneTracer.setBytes(bytes); + OneTracer oneTracer = ringBuffer.get(sequence); + oneTracer.setBytes(bytes); } finally { ringBuffer.publish(sequence); } diff --git a/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/TracerConsumer.java b/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/TracerConsumer.java index 167bcc0..dfb9f4e 100644 --- a/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/TracerConsumer.java +++ b/worker/src/main/java/com/jd/platform/jlog/worker/disruptor/TracerConsumer.java @@ -1,5 +1,6 @@ package com.jd.platform.jlog.worker.disruptor; +import com.alibaba.fastjson.JSON; import com.jd.platform.jlog.common.model.RunLogMessage; import com.jd.platform.jlog.common.model.TracerBean; import com.jd.platform.jlog.common.model.TracerData; @@ -13,10 +14,11 @@ import io.netty.util.internal.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; import java.util.concurrent.atomic.LongAdder; /** @@ -44,6 +46,9 @@ public class TracerConsumer implements WorkHandler { */ private TracerLogToDbStore tracerLogToDbStore; + private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + public TracerConsumer(TracerModelToDbStore tracerModelToDbStore, TracerLogToDbStore tracerLogToDbStore) { this.tracerModelToDbStore = tracerModelToDbStore; this.tracerLogToDbStore = tracerLogToDbStore; @@ -59,6 +64,9 @@ public class TracerConsumer implements WorkHandler { byte[] decompressBytes = ZstdUtils.decompressBytes(oneTracer.getBytes()); TracerData tracerData = ProtostuffUtils.deserialize(decompressBytes, TracerData.class); + + System.out.println("从事件中获取并解压的数据="+ JSON.toJSONString(tracerData)); + //包含了多个tracer对象 List tracerBeanList = tracerData.getTracerBeanList(); buildTracerModel(tracerBeanList); @@ -78,9 +86,8 @@ public class TracerConsumer implements WorkHandler { //遍历传过来的 for (TracerBean tracerBean : tracerBeanList) { //普通日志 - if ("-99999".equals(tracerBean.getTracerId())) { + if ("-1".equals(tracerBean.getTracerId())) { dealTracerLog(tracerBean); - } else { dealFilterModel(tracerBean); } @@ -121,7 +128,7 @@ public class TracerConsumer implements WorkHandler { Map map = new HashMap<>(requestMap); - long tracerId = requestMap.get("tracerId") == null ? 0 : Long.valueOf(requestMap.get("tracerId").toString()); + long tracerId = Long.parseLong(tracerBean.getTracerId()); //filter的出入参 Map responseMap = mapList.get(mapList.size() - 1); @@ -129,13 +136,17 @@ public class TracerConsumer implements WorkHandler { if (responseMap.get("response") != null) { responseBytes = (byte[]) responseMap.get("response"); } - map.put("responseContent", responseBytes); map.put("costTime", tracerBean.getCostTime()); map.put("tracerId", tracerId); + map.put("createTime", formatLongTime(tracerBean.getCreateTime())); responseMap.remove("response"); map.putAll(responseMap); tracerModelToDbStore.offer(map); } + private static String formatLongTime(long time) { + return DEFAULT_FORMATTER.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(time),ZoneId.systemDefault())); + } + } diff --git a/worker/src/main/java/com/jd/platform/jlog/worker/store/TracerModelToDbStore.java b/worker/src/main/java/com/jd/platform/jlog/worker/store/TracerModelToDbStore.java index 8a54550..8f3e4ac 100644 --- a/worker/src/main/java/com/jd/platform/jlog/worker/store/TracerModelToDbStore.java +++ b/worker/src/main/java/com/jd/platform/jlog/worker/store/TracerModelToDbStore.java @@ -1,7 +1,7 @@ package com.jd.platform.jlog.worker.store; -import com.google.common.collect.Queues; import com.jd.platform.jlog.common.utils.AsyncPool; +import com.jd.platform.jlog.common.utils.AsyncWorker; import com.jd.platform.jlog.common.utils.CollectionUtil; import com.jd.platform.jlog.worker.db.Db; import org.slf4j.Logger; @@ -93,7 +93,7 @@ public class TracerModelToDbStore { try { List> tempModels = new ArrayList<>(); //每1s入库一次 - Queues.drain(modelQueue, tempModels, Integer.valueOf(batchSize), interval, TimeUnit.SECONDS); + AsyncWorker.drain(modelQueue, tempModels, Integer.valueOf(batchSize), interval, TimeUnit.SECONDS); if (CollectionUtil.isEmpty(tempModels)) { continue; } diff --git a/worker/src/main/java/com/jd/platform/jlog/worker/udp/UdpServer.java b/worker/src/main/java/com/jd/platform/jlog/worker/udp/UdpServer.java index 5acf57f..6378bdd 100644 --- a/worker/src/main/java/com/jd/platform/jlog/worker/udp/UdpServer.java +++ b/worker/src/main/java/com/jd/platform/jlog/worker/udp/UdpServer.java @@ -12,6 +12,7 @@ import io.netty.handler.codec.MessageToMessageDecoder; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.Arrays; import java.util.List; /** @@ -76,8 +77,6 @@ public class UdpServer { byte[] body = new byte[buf.readableBytes()]; buf.readBytes(body); - -// tracerBeanStore.offer(body); producer.sendData(body); } } diff --git a/worker/src/main/resources/application.yml b/worker/src/main/resources/application.yml index 924c7ae..79b569c 100644 --- a/worker/src/main/resources/application.yml +++ b/worker/src/main/resources/application.yml @@ -5,17 +5,17 @@ thread: queue: maxSize: ${queueSize:16384} preDbSize: ${preDbSize:10000} -#etcd的地址,如有多个用逗号分隔 -serverAddr: 101.42.242.201:2181 + +#serverAddr: 101.42.242.201:2181 server: - port: 8080 + port: 8086 #ck信息,自行修改 clickhouse: - url: jdbc:clickhouse://${MYSQL_HOST:127.0.0.1}:${MYSQL_PORT:8123} - db: ${DB_NAME:test} - username: ${MYSQL_USER:test} - password: ${MYSQL_PASS:test} + url: jdbc:clickhouse://101.42.242.201:8123 + db: default + username: default + password: batchSize: ${BATCH_SIZE:5000} poolSize: ${POOL_SIZE:5} insertInterval: ${INSERT_INTERVAL:5} @@ -24,4 +24,7 @@ log: batchSize: ${BATCH_SIZE:5000} poolSize: ${POOL_SIZE:2} insertInterval: ${INSERT_INTERVAL:5} - preDbSize: ${preDbSize:10000} \ No newline at end of file + preDbSize: ${preDbSize:10000} + + +workers: "['192.268.1.2:8888','192.268.1.3:8888']" \ No newline at end of file -- Gitee From f077adb9311745e3fcd612b9128fbfe27b198ca3 Mon Sep 17 00:00:00 2001 From: liyunfeng Date: Sat, 19 Mar 2022 12:43:58 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E7=A4=BA=E4=BE=8B=E8=A1=A8sql=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- worker/src/main/resources/jlog.sql | 36 ++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 worker/src/main/resources/jlog.sql diff --git a/worker/src/main/resources/jlog.sql b/worker/src/main/resources/jlog.sql new file mode 100644 index 0000000..171a1c2 --- /dev/null +++ b/worker/src/main/resources/jlog.sql @@ -0,0 +1,36 @@ + +CREATE TABLE tracer_log ( + tracerId Int64, + className String, + threadName String, + methodName String, + logLevel String, + content String, + node String, + normal String, + tag1 String, + createTime DateTime +) ENGINE = MergeTree() + PARTITION BY toYYYYMMDD ( createTime ) + ORDER BY createTime + PRIMARY key createTime + TTL createTime + toIntervalDay ( 15 ) + + + + CREATE TABLE tracer_model ( + tracerId Int64, + requestContent String, + responseContent String, + costTime String, + uid String, + errno String, + errmsg String, + app String, + uri String, + createTime DateTime + ) ENGINE = MergeTree() + PARTITION BY toYYYYMMDD ( createTime ) + ORDER BY ( uid, createTime ) + PRIMARY key uid + TTL createTime + toIntervalDay ( 15 ) -- Gitee From d7187ceadba53aa14130dc1ce382195ea888f065 Mon Sep 17 00:00:00 2001 From: liyunfeng Date: Sat, 19 Mar 2022 13:06:52 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E8=BF=90=E8=A1=8C=E8=AF=B4=E6=98=8E?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...7\220\350\241\214\350\257\264\346\230\216.md" | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git "a/\350\277\220\350\241\214\350\257\264\346\230\216.md" "b/\350\277\220\350\241\214\350\257\264\346\230\216.md" index 8aecfdc..bb906d3 100644 --- "a/\350\277\220\350\241\214\350\257\264\346\230\216.md" +++ "b/\350\277\220\350\241\214\350\257\264\346\230\216.md" @@ -1,8 +1,7 @@ ### 安装概要 -- 在etcd下载页面下载对应操作系统的etcd,[https://github.com/etcd-io/etcd/releases](https://gitee.com/link?target=https%3A%2F%2Fgithub.com%2Fetcd-io%2Fetcd%2Freleases) 使用3.4.x以上。相关搭建细节 +- 参照example项目,启动引入了client.jar的client。 (目前支持file,zk,apolo,etcd,nacos等配置器,file配置器需重写putconfig用以维护worker) - 启动worker -- 启动client - 启动DashBoard @@ -15,7 +14,7 @@ export MYSQL_HOST = test export MYSQL_USER user export MYSQL_PASS pass export INSERT_INTERVAL 1 -export etcdServer http://127.0.0.1:2379,http://127.0.0.1:xxxx +export serverAddr http://127.0.0.1:2379,http://127.0.0.1:xxxx export BATCH_SIZE 7000 export MYSQL_PORT 8123 export NODE_NUMBER 15 @@ -79,7 +78,7 @@ java -jar $JAVA_OPTS $JAVA_ENV xxx.jar --etcd.server=${etcdServer} `JLog`总结了常用的日志参数,在Client端用**规范**的格式记录参数 (具体记录逻辑可见`UserFIlter`类) 。在Worker端,针对定义的日志格式又有一些特殊处理(具体见`TracerConsumer`类)再入库。**那如何自定义日志记录格式和处理入库格式呢?** -- 仿照`UserFIlter`自定义日志**记录格式**, 修改放入传输对象`tracerBean`的元素逻辑 +- 仿照`HttpFIlter`自定义日志**记录格式**, 修改放入传输对象`tracerBean`的元素逻辑 - 仿照`TracerConsumer`自定义日志**入库格式**, 修改中途日志(`dealTracerLog`) / 出入参日志(`dealFilterModel`)的 入库对象构造方法 - 可能涉及更改`DashBoard`项目的**展示逻辑** @@ -93,7 +92,7 @@ java -jar $JAVA_OPTS $JAVA_ENV xxx.jar --etcd.server=${etcdServer} - 配置ETCD - 构建TracerClientStarter, 设置etcd,应用名,mdc(机房名, 可有可无) + 构建TracerClientStarter, 设置配置器,应用名 ```java @Configuration @@ -105,8 +104,7 @@ java -jar $JAVA_OPTS $JAVA_ENV xxx.jar --etcd.server=${etcdServer} @PostConstruct public void begin() { TracerClientStarter tracerClientStarter = new TracerClientStarter.Builder() - .setAppName("demo") - .setEtcdServer(etcdServer).build(); + tracerClientStarter.startPipeline(); } } @@ -118,11 +116,11 @@ java -jar $JAVA_OPTS $JAVA_ENV xxx.jar --etcd.server=${etcdServer} @Bean public FilterRegistrationBean urlFilter() { FilterRegistrationBean registration = new FilterRegistrationBean(); - UserFilter userFilter = new UserFilter(); + HttpFilter userFilter = new HttpFilter(); registration.setFilter(userFilter); registration.addUrlPatterns("/*"); - registration.setName("UserTraceFilter"); + registration.setName("HttpTraceFilter"); registration.setOrder(1); return registration; } -- Gitee