# PasteCluster **Repository Path**: pastecode/paste-cluster ## Basic Information - **Project Name**: PasteCluster - **Description**: .net6.0之后使用的集群中间件 自动Master选举 自动修复集群 支持半入模式 支持查找模式 引入组件后,你只要专注自己的业务代码即可! - **Primary Language**: C# - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2024-06-10 - **Last Updated**: 2025-04-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 组件适用 1.需要集群部署的服务,需要有主从的集群服务 2.如果是定时的任务,建议使用PasteTask任务调度器实现 3.组件适用于.net6.0以上的框架 4.集群对外的消息使用Channel队列发出 ## 引入组件 1.先引入IHttpClientFactory,示例:context.Services.AddHttpClient(); 2.添加对组件的应用 示例: 3.使用单例注入,示例:context.Services.AddSingleton(); 4.配置配置文件,示例:context.Services.Configure(Configuration.GetSection("ClusterConfig")); ``` "ClusterConfig": { "SloveToken": "zxcvfr43dr56hgt5",//集群密钥,可自定义,防止其他集群乱入! "ClusterHost": "",//已有的节点地址链路,示例http://192.110.0.3:80;http://192.110.0.7:80 "CurrentHost": ""//当前节点的访问地址是多少 示例http://192.110.0.6:80 } ``` 以上是基础配置,需要自定义更多的请查阅PasteSloveConfig的属性 5.如果使用了Route路由,注意本组件需要/api/cluster/的路由转发配置 ## 开始使用 1.初始化集群组件,写入必要的信息,比如集群中还有谁,自己的HOST是多少 2.定义一个HostedService用于接收集群产生的事件 示例代码: ``` using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using PasteCluster; using TestCluster.redismodel; namespace TestCluster { /// /// 系统初始化 /// public class StartHostedService : IHostedService { private readonly IServiceProvider _serviceProvider; private PasteClusterHandler _cluster_handler; private IAppCache _appCache; private ILogger _logger; /// /// /// /// /// /// /// public StartHostedService(IServiceProvider serviceProvider, PasteClusterHandler pasteClusterHandler, IAppCache appCache, ILogger logger) { _serviceProvider = serviceProvider; _cluster_handler = pasteClusterHandler; _appCache = appCache; _logger = logger; } private const string node_list_cache_key = "cache:last:nodes"; //private System.Timers.Timer _timer; /// /// /// /// /// /// public Task StartAsync(CancellationToken cancellationToken) { //可以从他处读取集群节点列表,然后把列表写入到系统中 var reads = _appCache.Get>(node_list_cache_key); if (reads != null && reads.Count > 0) { foreach (var _node in reads) { _cluster_handler.AddNodeToList(_node).Wait(); //await _cluster_handler.AddNodeToList(_node); } } //如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中 // -e ClusterConfig:CurrentHost="http://192.168.1.100" //如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}" //_cluster_handler.Register("http://192.168.1.100",0,0); //启动集群中的当前节点(在这之前必须要确认当前节点的host是多少) _cluster_handler.StartCluster(); //读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息 ReadClusterChannel(); //_timer = new System.Timers.Timer(); //hit_index = new Random().Next(1,100); //_timer.Interval = 1000; //_timer.AutoReset = true; //_timer.Elapsed += _timer_Elapsed; //_timer.Start(); return Task.CompletedTask; } //private int hit_index = 5; //测试产生数据进行交互 //private void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) //{ // try // { // hit_index--; // if (hit_index == 0) // { // _cluster_handler.PushMsgToNode($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"); // hit_index = new Random().Next(1, 20); // } // } // catch (Exception exl) // { // _logger.LogException(exl); // } //} /// /// 处理集群产生的数据 /// private async void ReadClusterChannel() { try { var _info = await _cluster_handler.ChannelCluster.Reader.ReadAsync(); if (_info != null && _info != default) { _logger.LogInformation("ClusterChannel.Message:" + Newtonsoft.Json.JsonConvert.SerializeObject(_info)); if (_info.msg_type == 1) { if (_cluster_handler.CurrentIsMaster) { //当前节点确定选举为Master 可以执行保存等 //var _nodes = _cluster_handler.Nodes(); //if (_nodes != null && _nodes.Length > 0) //{ // await _appCache.SetAsync>(node_list_cache_key, _nodes.ToList()); //} } } else { //业务代码处理 } } } catch (Exception exl) { _logger.LogException(exl); await Task.Delay(1000); } finally { ReadClusterChannel(); } } /// /// /// /// /// /// public Task StopAsync(CancellationToken cancellationToken) { _cluster_handler.Dispose(); return Task.CompletedTask; } } } ``` 3.在入口函数中启动这个HostedService `` context.Services.AddHostedService(); `` ## 相关问答 1.如何知道当前运行情况 curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/status 2.获取所有节点信息 curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/nodes 3.手动注入当前节点IP信息 curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/joinin?host=http://192.168.1.5 4.如何查找当前节点? 有一种模式,就是知道所有集群的IP,需要找到A是A,那么有前提 a.先要把节点的信息写入到组件,所有的节点 b.给当前节点设定一个code,可以使用SetNodeCode c.后者使用环境信息 -e ClusterConfig:CurrentCode="xxxxxuuuu323423" d.StartCluster()后,组件会遍历节点,查找哪个节点是自己!主要他们的链接要能通! 5.如何防止多Master的情况出现 a.在每次交互的时候,会附带当前节点的信息,比如master是谁,有多少各节点等,通过这个信息进行判断各自节点的信息是否一致,不一致的时候触发信息同步! b.master冲突的时候,会检查哪一个节点的选举时间更早,更早的为主,时间采用时间戳ms模式,如果时间戳一致,则再对比对应的Id信息 c.节点数据交互的时候,会进行master和count的消息确定 6.消息完整性? a.消息分成2部分,一部分是组件内部消息,另一部分是涉及业务的部分 b.内部消息,其实在配置的时间周期内会重新发送重新检查,以便维护集群节点的正确性 c.业务消息,一般是节点发送消息给master,在发送失败的时候,会压入队列,等待选举完成后再解压队列发送,这里有疑问的是,业务消息是否也有可以丢弃的消息,比如如下场景: x.已知有N个node节点,作为在线客服的websocket的服务端,内部链接又使用site分隔用户 xx.如果要获得某一个站点site有多少人在线,要么遍历所有的node节点,然后统计数据,要么所有节点有信息的时候告知某一个点,很显然在这个需求中,各node节点有在线数变更的时候主动通知是最合适的 xxx.比如node1告知node1:site1:30,node2告知node2:site1:25 信息归总到master中,master中本地缓存这个数据,当有变动的时候,推送给所有客服节点即可! xxxx.这里要处理的就是,某一个节点宕机的情况,比如node5不可用了,如何告知master他掉了! ## 逻辑介绍 1.Master的选举逻辑 a.当前组件集群支持从1到N,这个数量理论上不是无上限的,要基于配置文件中的最长交互时间决定 b.当当前没有Master的时候,会从节点队列中问询谁是master,被问询的如果也没有master信息,则被问询方直接成为master,并进入master的选举环节 c.节点交互会重置最后交互时间,主要在于master和cluster d.组件会定期检查这个交互时间是否过期,如果过期了,会尝试健康检查,如果不通,则进入选举模式 2.选举原则 a.谁先启动谁为主 b.谁被告知消息不符,谁放弃,进行master查找 ## 写在最后 1.当前组件的代码还没有经过严格测试,我估计还有问题的地方在于 a.时效性的问题,虽然可以修改检查的间隔时间,但是这个时间也是有间隙的 b.选举间隔,产生太多消息的话?消息重复的话!压缩的消息是否对业务有影响 c.如果同时多个节点掉线,那么这个选举的时间会更长 d.当前集群模式将在后续更新到PasteSpider的集群策略中,替换到目前的版本!