# transflow **Repository Path**: Sunleader/transflow ## Basic Information - **Project Name**: transflow - **Description**: java实现 类似于 logstash 的数据流转平台,插件化 input output - **Primary Language**: Java - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: http://sunleader1997.top:18987/#/ - **GVP Project**: No ## Statistics - **Stars**: 15 - **Forks**: 2 - **Created**: 2025-03-06 - **Last Updated**: 2025-06-17 ## Categories & Tags **Categories**: big-data **Tags**: None ## README ## Transflow 集成 reactor 类似于 logstash,插件化数据流转服务 ``` 注意: 因为安全问题,已经升级到 springboot3 ,所以只支持 jdk17+ 版本 ``` ## 为什么脑子一热 - 多个产品线研发交付后,客户厂商往往需要将一些业务数据对接到第三方的系统上,定制化版本的维护(需要基于某个版本源码进行修改打包以及部署升级)耗费时间与人力成本 - 传统ELK都是通过配置文件的方式修改数据处理与链路,学习成本较高,配置文件复杂,阅读困难,配置文件维护成本也高
例如 logstash 的配置文件, 往往无法快速理解数据链路是如何构建的 ```nginx.conf input{ file{ path => "/var/log/nginx/access.log" start_position => "beginning" type => "nginx_access_log" } kafka{ bootstrap_servers => ["localhost:9092"] group_id => "logstash_group" topic => "nginx_access_log" codec => json{ support_multiple_values => true } type => "file_audit_log" } } filter{ grok{ match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"} match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""} } if [request] { urldecode { field => "request" } ruby { init => "@kname = ['url_path','url_arg']" code => " new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))]) event.append(new_event)" } if [url_arg] { ruby { init => "@kname = ['key', 'value']" code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})" } } } geoip{ source => "clientip" } useragent{ source => "user_agent" target => "ua" remove_field => "user_agent" } date { match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"] locale => "en" } mutate{ remove_field => ["message","timestamp","request","url_arg"] } } output{ elasticsearch { hosts => "localhost:9200" index => "nginx-access-log-%{+YYYY.MM.dd}" } if [type] == "syslog1" and [logType] == "file_audit_log"{ syslog { facility => "local7" appname => "nginx-access-log" } } } ```
- 参考调度系统的设计理念,公司各组件/服务之间的数据交互,应该尽可能低耦合,对于复杂的业务场景下,可以动态插拔组件 - ``` 例如文件传输的场景下,对于文件同步服务,仅需告诉该服务哪些文件需要同步,而对于同步之前的审批等业务,可以根据具体业务场景进行串连 ``` - 参考 NIFI 后发现性能不高,二次开发困难
NIFI ![img.png](nifi.png)
### 功能特点: - 拖拽式编辑数据流 - 拖拽/编辑 - 脚本化数据处理 - 中间节点通过 if 判断数据该怎么流向下一个节点 - 也可以实现赋值,groovy 能支持的操作都可以 - 数据批量处理 - es 批量存储 - kafka 批量生产消息 - 数据流实时监控 - 就是可以实时看到数据流,方便看到到底哪个环节出了问腿 - 还可以打开控制台看到每个节点的输出(开监控的话会降低性能) - 基于 reactor 实现高吞吐数据流 - 目前我那个1GHz4核的小机器能跑2w/s ### 当前已支持的插件列表 | input | 功能 | output | 功能 | |------------------------------|-------------------|---------------------------------|-----------------| | plugin-demo-input | 测试输入 | plugin-stdout | 打印到控制台 | | gitlab-hook-input | gitlab mr hook | plugin-gitlab-ai-codeview-outer | gitlab 评论器 | | plugin-kafka-input | kafka 消息消费端 | plugin-kafka-output | 输出到kafka指定TOPIC | | plugin-netty-file-input | 文件传输接收端 | plugin-netty-file-output | 文件发送端,发送到input | | plugin-netty-demo-input | netty实现的动态rest接口 | plugin-http-output | 支持调用HTTP接口API | | plugin-transflow-agent-input | java agent插装器(停用) | | | | ❌ | ❌ | plugin-es-output | 输出到es存储 | | plugin-syslog-input | syslog接收器 | plugin-syslog-output | syslog 输出器 | | filter | 功能 | |----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| | plugin-json-filter | 使用 aviator 实现的脚本化过滤器,主要为了做简单的单行if判断,或者赋值,速度会快一些
因为是单行所以脚本不需要加 return

如果返回 ture(if行为)或者其他对象(默认为赋值行为)则下发到下一个节点,
脚本返回 false 则交给下一个处理器 | | plugin-json-gateway | 使用 groovy 实现的脚本化过滤器,可以实现多行的脚本化操作
如果返回 true,则交给下一个节点
如果返回 false 或者其他对象(注意此处跟 filter 不一样),则交给下一个处理器 | | plugin-openai-filter | 调用ai进行对话,可以预设系统prompt等参数,
将输入的某一个字段作为问题,
ai的回复赋值到指定字段
再将所有数据下发到下一个节点
作者在此处的应用即是 gitlab mr hook 监听代码合并,
然后调用ai审批代码,
再输出到gitlab的指定代码行作为评论给开发参考 | | | | ## 演示 ![gif](20250327183942.gif) # 打包 * transflow-all 下 执行 mvn clean package, 最终成品在 transflow-app 的 target 下 .zip * 注意,会打包前端资源以及plugin,plugin放在 /plugins下,也会被打进 zip * 发布版本因为大小受限,只提供基础demo插件 # Linux 安装 ``` unzip transflow-app-0.1.0-distribution.zip -d / ``` # Linux 启动 - 通过 systemctl 启动 ``` systemctl start transflow ``` - 通过脚本启动 ``` cd /opt/transflow sh startup.sh start ``` - 手动启动 ``` cd /opt/transflow java -jar transflow-app-0.1.0.jar ``` # 访问页面 http://localhost:18987/#/mgmt/job