# transflow
**Repository Path**: Sunleader/transflow
## Basic Information
- **Project Name**: transflow
- **Description**: java实现 类似于 logstash 的数据流转平台,插件化 input output
- **Primary Language**: Java
- **License**: AGPL-3.0
- **Default Branch**: master
- **Homepage**: http://sunleader1997.top:18987/#/
- **GVP Project**: No
## Statistics
- **Stars**: 15
- **Forks**: 2
- **Created**: 2025-03-06
- **Last Updated**: 2025-06-17
## Categories & Tags
**Categories**: big-data
**Tags**: None
## README
## Transflow
集成 reactor 类似于 logstash,插件化数据流转服务
```
注意: 因为安全问题,已经升级到 springboot3 ,所以只支持 jdk17+ 版本
```
## 为什么脑子一热
- 多个产品线研发交付后,客户厂商往往需要将一些业务数据对接到第三方的系统上,定制化版本的维护(需要基于某个版本源码进行修改打包以及部署升级)耗费时间与人力成本
- 传统ELK都是通过配置文件的方式修改数据处理与链路,学习成本较高,配置文件复杂,阅读困难,配置文件维护成本也高
例如 logstash 的配置文件, 往往无法快速理解数据链路是如何构建的
```nginx.conf
input{
file{
path => "/var/log/nginx/access.log"
start_position => "beginning"
type => "nginx_access_log"
}
kafka{
bootstrap_servers => ["localhost:9092"]
group_id => "logstash_group"
topic => "nginx_access_log"
codec => json{
support_multiple_values => true
}
type => "file_audit_log"
}
}
filter{
grok{
match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"(?:-|%{DATA:referrer})\" \"%{DATA:user_agent}\" (?:%{IP:proxy}|-) %{DATA:upstream_addr} %{NUMBER:upstream_request_time:float} %{NUMBER:upstream_response_time:float}"}
match => {"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) \"%{DATA:referrer}\" \"%{DATA:user_agent}\" \"%{DATA:proxy}\""}
}
if [request] {
urldecode {
field => "request"
}
ruby {
init => "@kname = ['url_path','url_arg']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('?'))])
event.append(new_event)"
}
if [url_arg] {
ruby {
init => "@kname = ['key', 'value']"
code => "event.set('url_args', event.get('url_arg').split('&').collect {|i| Hash[@kname.zip(i.split('='))]})"
}
}
}
geoip{
source => "clientip"
}
useragent{
source => "user_agent"
target => "ua"
remove_field => "user_agent"
}
date {
match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
locale => "en"
}
mutate{
remove_field => ["message","timestamp","request","url_arg"]
}
}
output{
elasticsearch {
hosts => "localhost:9200"
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
if [type] == "syslog1" and [logType] == "file_audit_log"{
syslog {
facility => "local7"
appname => "nginx-access-log"
}
}
}
```
- 参考调度系统的设计理念,公司各组件/服务之间的数据交互,应该尽可能低耦合,对于复杂的业务场景下,可以动态插拔组件
- ```
例如文件传输的场景下,对于文件同步服务,仅需告诉该服务哪些文件需要同步,而对于同步之前的审批等业务,可以根据具体业务场景进行串连
```
- 参考 NIFI 后发现性能不高,二次开发困难
NIFI

### 功能特点:
- 拖拽式编辑数据流
- 拖拽/编辑
- 脚本化数据处理
- 中间节点通过 if 判断数据该怎么流向下一个节点
- 也可以实现赋值,groovy 能支持的操作都可以
- 数据批量处理
- es 批量存储
- kafka 批量生产消息
- 数据流实时监控
- 就是可以实时看到数据流,方便看到到底哪个环节出了问腿
- 还可以打开控制台看到每个节点的输出(开监控的话会降低性能)
- 基于 reactor 实现高吞吐数据流
- 目前我那个1GHz4核的小机器能跑2w/s
### 当前已支持的插件列表
| input | 功能 | output | 功能 |
|------------------------------|-------------------|---------------------------------|-----------------|
| plugin-demo-input | 测试输入 | plugin-stdout | 打印到控制台 |
| gitlab-hook-input | gitlab mr hook | plugin-gitlab-ai-codeview-outer | gitlab 评论器 |
| plugin-kafka-input | kafka 消息消费端 | plugin-kafka-output | 输出到kafka指定TOPIC |
| plugin-netty-file-input | 文件传输接收端 | plugin-netty-file-output | 文件发送端,发送到input |
| plugin-netty-demo-input | netty实现的动态rest接口 | plugin-http-output | 支持调用HTTP接口API |
| plugin-transflow-agent-input | java agent插装器(停用) | | |
| ❌ | ❌ | plugin-es-output | 输出到es存储 |
| plugin-syslog-input | syslog接收器 | plugin-syslog-output | syslog 输出器 |
| filter | 功能 |
|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| plugin-json-filter | 使用 aviator 实现的脚本化过滤器,主要为了做简单的单行if判断,或者赋值,速度会快一些
因为是单行所以脚本不需要加 return
如果返回 ture(if行为)或者其他对象(默认为赋值行为)则下发到下一个节点,
脚本返回 false 则交给下一个处理器 |
| plugin-json-gateway | 使用 groovy 实现的脚本化过滤器,可以实现多行的脚本化操作
如果返回 true,则交给下一个节点
如果返回 false 或者其他对象(注意此处跟 filter 不一样),则交给下一个处理器 |
| plugin-openai-filter | 调用ai进行对话,可以预设系统prompt等参数,
将输入的某一个字段作为问题,
ai的回复赋值到指定字段
再将所有数据下发到下一个节点
作者在此处的应用即是 gitlab mr hook 监听代码合并,
然后调用ai审批代码,
再输出到gitlab的指定代码行作为评论给开发参考 |
| | |
## 演示

# 打包
* transflow-all 下 执行 mvn clean package, 最终成品在 transflow-app 的 target 下 .zip
* 注意,会打包前端资源以及plugin,plugin放在 /plugins下,也会被打进 zip
* 发布版本因为大小受限,只提供基础demo插件
# Linux 安装
```
unzip transflow-app-0.1.0-distribution.zip -d /
```
# Linux 启动
- 通过 systemctl 启动
```
systemctl start transflow
```
- 通过脚本启动
```
cd /opt/transflow
sh startup.sh start
```
- 手动启动
```
cd /opt/transflow
java -jar transflow-app-0.1.0.jar
```
# 访问页面
http://localhost:18987/#/mgmt/job