# zbus **Repository Path**: wangscript/zbus ## Basic Information - **Project Name**: zbus - **Description**: 小巧而极速的MQ, RPC实现, 支持HTTP/TCP代理,开放易扩展,多语言支撑微服务,系统总线架构 - **Primary Language**: 消息服务器/消息队列 - **License**: MIT - **Default Branch**: master - **Homepage**: http://zbus.io - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 913 - **Created**: 2018-01-06 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README /\\\ \/\\\ \/\\\ /\\\\\\\\\\\ \/\\\ /\\\ /\\\ /\\\\\\\\\\ \///////\\\/ \/\\\\\\\\\ \/\\\ \/\\\ \/\\\////// /\\\/ \/\\\////\\\ \/\\\ \/\\\ \/\\\\\\\\\\ /\\\/ \/\\\ \/\\\ \/\\\ \/\\\ \////////\\\ /\\\\\\\\\\\ \/\\\\\\\\\ \//\\\\\\\\\ /\\\\\\\\\\ \/////////// \///////// \///////// \////////// QQ Group: 467741880 # ZBUS zbus核心是一个独立实现的小巧极速的消息队列(MQ),支持持久化与内存队列, 支持单播、广播、组播等多种消息通信模式;在MQ之上 zbus完备地支持了RPC服务,RPC支持独立伺服,基于总线两种模式;同时zbus支持代理服务,基于MQ的HttpProxy实现了类Nginx的HTTP代理服务(支持DMZ网络结构),TcpProxy则支持透明的TCP协议代理,可以代理任何基于TCP的协议,比如代理MySQL数据库。 zbus内建分布式高可用(HA),解决单点问题;Java/.NET/JS/C++/PHP等主流语言接入能力为zbus充当SOA服务总线提供跨平台支持; 在设计上,zbus拥抱KISS准则,所有特性浓缩在一个小小的400K左右的jar包中(非常少的依赖);轻量,MQ核心,方便二次开发,zbus为微服务架构、系统整合、弹性计算、消息推送等场景提供开箱即用的功能支持。 ## 特性 - **高速磁盘/内存MQ**,支持单播,组播,广播多种消息模式 - **RPC开箱即用**,支持同步异步,动态类代理 - **多语言客户端**,Java(服务器)/.NET/JavaScript/PHP/Python/C++/Go(服务器) - **轻量级**,非常少依赖,整体大小 ~3M - **无应用故障单点**,分布式高可用的内置支持 - **简洁的协议设计**,类HTTP头部扩展协议,长短连接,WebSocket支持 - **内置监控**,不断丰富的监控指标 ## 客户端 [Java](https://gitee.com/rushmore/zbus) [C#](https://gitee.com/rushmore/zbus-dotnet) [JavaScript](https://gitee.com/rushmore/zbus-javascript) [Python](https://gitee.com/rushmore/zbus-python) [PHP](https://gitee.com/rushmore/zbus-php) [C++](https://gitee.com/rushmore/zbus-cpp) [Go](https://gitee.com/rushmore/zbus-go) ## 性能概览 快速AB测试(Apache Benchmark),基础硬件:Mac i7-4870HQ 2.5GHz SSD 创建队列: http://localhost:15555/MyTopic?cmd=declare 生产消息 ab -k -c 32 -n 1000000 http://localhost:15555/MyTopic?cmd=produce 消费消息 ab -k -c 32 -n 1000000 http://localhost:15555/MyTopic?cmd=consume 生产: ~80,000 w/s 消费: ~90,000 r/s RPC: ~60,000 /s Netty/GoLang网络栈同机比对值: ~150,000/s 测试数据均未使用批量操作,数据为空,核心耗时在网络栈与处理逻辑栈 ## 快速起步 zbus服务器基于Java实现(另有Go版本,推荐使用Java版本),要求JDK6+。 Maven官方下载 io.zbus zbus 0.11.3 Clone项目到本地后,默认客户端模块不下载,如果对客户端感兴趣,请执行 git submodule update --init --recursive 各个语言都包括大量使用例子代码,最好的起步了解方法是直接跑示例。 **监控** 启动起来(下载后找到zbus-dist目录,直接启动zbus.bat/zbus.sh),监控可以直接访问[http://localhost:15555](http://localhost:15555) ![Monitor](/doc/monitor.png?raw=true "Monitor") ## 消息队列(MQ) 消息队列MQ是zbus的核心功能,消息队列的模型,与同类产品诸如RabbitMQ概念类似,但zbus消息队列没有采用AMQP相对复杂的协议,而是更加简洁的协议。 消息队列组件包括 - Topic, 全局唯一标识消息队列,消息需指定 - ConsumeGroup,仅消费者使用到的分组通道,用于实现多种通信模型 - Producer, 生产消息方 - Consumer, 消费消息方,需指定消费分组通道,带默认值。 - Broker,消息存储服务器(注意:API层面的Broker指的是服务器接入抽象) Topic: |||||||||||||||||||||||||||| <-- Producer写入 ^ ^ | | +---+------ConsumeGroup_1 --> Consumer1/Consumer2 | +--------ConsumeGroup_n --> Consumer_3/...Consumer_n **规则设计**: 1. 队列消息数据单份,分组通道任意多个 2. 分组通道之内的消费者共享分组读指针,读指针改变影响本分组通道上的所有消费者 3. 分组通道之间的消费者互不影响 4. 分组通道的支持消息过滤,基于消息头部Tag 简而言之: *分组之内共享,分组之间独享,分组读指针支持消息标签过滤* 分组通道的设计类似同类产品Kafka,但是在消息过滤以及细节上有些差异,相对更加简单。 **消息模式**: 1. 单播 仅一个分组通道,所有的消费者共享一个分组通道,每条消息只送达其中一个消费者 2. 广播 每个消费一个分组通道,每条消息抵达所有的消费者 3. 组播 单播与广播的混合,多个分组通道,每个分组通道上多个消费者负载均衡 4. 订阅模式 广播的分组上设置消息过滤主题(注意区别队列主题Topic) 基于以上设计,zbus的消息队列能够有效的支持各种业务场景,同时保持底层数据模型的简洁设计。为了便于扩展,消息队列增加MASK标记,可以方便应用在队列上打标签来支持队列个性化,比如支持内存队列与磁盘队列的选择,支持标记队列为RPC作用,支持标记队列使用于代理作用等等。 **API示例** 生产者 //Broker是对zbus服务器的本地抽象,多地址支持HA Broker broker = new Broker("localhost:15555"); Producer p = new Producer(broker); p.declareTopic("MyTopic"); //当确定队列不存在需创建 Message msg = new Message(); msg.setTopic("MyTopic"); //设置消息主题 //msg.setTag("oo.account.pp"); //可以设置消息标签 msg.setBody("hello " + System.currentTimeMillis()); Message res = p.publish(msg); System.out.println(res); broker.close(); 消费者 Broker broker = new Broker("localhost:15555"); ConsumerConfig config = new ConsumerConfig(broker); config.setTopic("MyTopic"); //指定消息队列主题,同时可以指定分组通道 config.setMessageHandler(new MessageHandler() { @Override public void handle(Message msg, MqClient client) throws IOException { System.out.println(msg); //消费处理 } }); Consumer consumer = new Consumer(config); consumer.start(); 详细的控制请参考更多项目示例,其他语言客户端保持几乎相同的命名,减少了理解复杂度。 ## 远程方法调用(RPC) 远程方法调用RPC使用广泛,虽然跟MQ本质无必然联系,但是在分布式应用中经常会互相融合。MQ重点解决系统耦合问题,提供丰富的消息模式支持,也因此MQ的消息更偏向于单向与责任链设计。RPC则依然耦合了客户端与服务端时间序列,消息是双向的。 RPC的实现可以非常多选择,也无标准可言,底层通道可以是TCP,也可以基于HTTP,甚至也可以是更底层协议。目标可以简化为: 封装远程方法[名字+参数] -->--+ +--->解析出方法+参数调用本地方法 |<==> 网络传输<==> | 调用结果 <------------------+ +<---封装调用结果 网络传输协议有各种选择(TCP/HTTP...),如何封装方法+参数也可以各种选择,比如JSON,XML甚至二进制底层的协议(ProtoBuffer,Thrift等等)。 ZBUS的选择, 1) 底层通信协议: 基于MQ消息传输(TCP),之上支持HTTP直接访问 2)RPC请求应答协议: 默认JSON封装, 可替换 ZBUS的RPC的优势在于,继承了MQ消息特性,直接支持了HA高可用特性,减少NameServer等重复开发,天生融合MQ与RPC。 选择JSON是一个折中: 动态能力、通信效率和协议标准化三项指标的平衡。当然也支持个性化选择,需要扩展zbus的RpcCodec来完成。 RPC可讨论的主题众多,诸如高可用,超时,重试,熔断,限流,版本控制等等,zbus并没有全部解决,比如熔断。因为这些主题有不少仍然存在争议,在不同应用场景下会有完全不同的结论,比如自动重试在交易系统中是一个非常危险的选择。 ZBUS的RPC在应用设计上追求了极少依赖的目标,给应用程序提供更换的更大可能,也给zbus本身升级带来平滑。 仍然以Java语言示例,其他语言请参考各自示例。 **RPC服务端** 手动加载服务 public static void main(String[] args) throws Exception { ServiceBootstrap b = new ServiceBootstrap(); //手动增加模块,使用默认构造,也可以指定模块细节 b.addModule(InterfaceExampleImpl.class); b.addModule(GenericMethodImpl.class); b.addModule(SubService1.class); b.addModule(SubService2.class); b.serviceName("MyRpc") .port(15555) //内部启动了zbus服务器,zbus与rpc服务之间不通过网络协议栈 //.serviceAddress("localhost:15555") //也可以通过网络链接到远程服务器上 //.ssl("ssl/zbus.crt", "ssl/zbus.key") //启用SSL //.serviceToken("myrpc_service") //启用Token权限验证 .start(); } 自动加载服务 public static void main(String[] args) throws Exception { ServiceBootstrap b = new ServiceBootstrap(); b.serviceName("MyRpc") .port(15555) .autoDiscover(true) .start(); } 是的,没有任何业务相关的,你已经启动了zbus的RPC服务,而业务相关的代码自动加载进来了。 侵入点:服务需要注释@io.zbus.rpc.Remote 对Spring的支持,zbus可以让应用简单几行配置享用zbus的RPC特性 除此之外你需要关心的可能就是监控与问题排查的时候了解zbus。 **RPC客户端** ClientBootstrap b = new ClientBootstrap(); b.serviceAddress("localhost:15555") .serviceName("MyRpc") .serviceToken("myrpc_service"); RpcInvoker rpc = b.invoker(); //可以通过该RpcInvoker调用底层同步、异步各种API能力 InterfaceExample api = rpc.createProxy(InterfaceExample.class); //最常用是动态代理出实现类,隔离掉zbus细节 通过Spring进一步完全隔离 另外,高可用HA的配置并没有发生本质变化,serviceAddress地址填写多个完毕,细节参考详细示例。 ## HTTP、TCP代理(Proxy) 代理也是zbus提供的经常使用到的功能之一,在这个层面zbus并不是与Nginx,Apache等反向代理竞争产品,而是提供了一个更加简洁的实现选择,同时弥补了在特殊网络要求下的代理模式,比如在金融行业中经常会使用到的安全隔离DMZ中,Apache和Nginx等反向代理均失效,因为目标服务器根本无法连接,而zbus的HttpProxy正是为这种代理而生,当然zbus的弹性能力,使得用zbus部署传统的反向代理一样简单。 **HttpProxy代理配置** 启动类 io.zbus.proxy.http.HttpProxy http://localhost:15555/?cmd=server http://localhost:15556/?cmd=server zbus的代理能力还体现在其扩展性,sendFilter与recvFilter提供了消息代理的中间逻辑拦截介入,来完成个性化需求,诸如打印日志,截断请求,URL重写等等。 除了HTTP层面的代理,zbus也提供了TCP层的透明代理 **TcpProxy TCP透明代理** 可以方便代理任何TCP协议,示例简单自我解释。 启动类 io.zbus.proxy.tcp.TcpProxy 0.0.0.0:80 localhost:15555 3000 60000 本地启动80端口服务,所有访问80的端口的数据转发到15555端口上。