# PasteCluster
**Repository Path**: pastecode/paste-cluster
## Basic Information
- **Project Name**: PasteCluster
- **Description**: .net6.0之后使用的集群中间件
自动Master选举
自动修复集群
支持半入模式
支持查找模式
引入组件后,你只要专注自己的业务代码即可!
- **Primary Language**: C#
- **License**: MIT
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 1
- **Created**: 2024-06-10
- **Last Updated**: 2025-04-30
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 组件适用
1.需要集群部署的服务,需要有主从的集群服务
2.如果是定时的任务,建议使用PasteTask任务调度器实现
3.组件适用于.net6.0以上的框架
4.集群对外的消息使用Channel队列发出
## 引入组件
1.先引入IHttpClientFactory,示例:context.Services.AddHttpClient();
2.添加对组件的应用 示例:
3.使用单例注入,示例:context.Services.AddSingleton();
4.配置配置文件,示例:context.Services.Configure(Configuration.GetSection("ClusterConfig"));
```
"ClusterConfig": {
"SloveToken": "zxcvfr43dr56hgt5",//集群密钥,可自定义,防止其他集群乱入!
"ClusterHost": "",//已有的节点地址链路,示例http://192.110.0.3:80;http://192.110.0.7:80
"CurrentHost": ""//当前节点的访问地址是多少 示例http://192.110.0.6:80
}
```
以上是基础配置,需要自定义更多的请查阅PasteSloveConfig的属性
5.如果使用了Route路由,注意本组件需要/api/cluster/的路由转发配置
## 开始使用
1.初始化集群组件,写入必要的信息,比如集群中还有谁,自己的HOST是多少
2.定义一个HostedService用于接收集群产生的事件
示例代码:
```
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using PasteCluster;
using TestCluster.redismodel;
namespace TestCluster
{
///
/// 系统初始化
///
public class StartHostedService : IHostedService
{
private readonly IServiceProvider _serviceProvider;
private PasteClusterHandler _cluster_handler;
private IAppCache _appCache;
private ILogger _logger;
///
///
///
///
///
///
///
public StartHostedService(IServiceProvider serviceProvider, PasteClusterHandler pasteClusterHandler, IAppCache appCache, ILogger logger)
{
_serviceProvider = serviceProvider;
_cluster_handler = pasteClusterHandler;
_appCache = appCache;
_logger = logger;
}
private const string node_list_cache_key = "cache:last:nodes";
//private System.Timers.Timer _timer;
///
///
///
///
///
///
public Task StartAsync(CancellationToken cancellationToken)
{
//可以从他处读取集群节点列表,然后把列表写入到系统中
var reads = _appCache.Get>(node_list_cache_key);
if (reads != null && reads.Count > 0)
{
foreach (var _node in reads)
{
_cluster_handler.AddNodeToList(_node).Wait();
//await _cluster_handler.AddNodeToList(_node);
}
}
//如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中
// -e ClusterConfig:CurrentHost="http://192.168.1.100"
//如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"
//_cluster_handler.Register("http://192.168.1.100",0,0);
//启动集群中的当前节点(在这之前必须要确认当前节点的host是多少)
_cluster_handler.StartCluster();
//读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息
ReadClusterChannel();
//_timer = new System.Timers.Timer();
//hit_index = new Random().Next(1,100);
//_timer.Interval = 1000;
//_timer.AutoReset = true;
//_timer.Elapsed += _timer_Elapsed;
//_timer.Start();
return Task.CompletedTask;
}
//private int hit_index = 5;
//测试产生数据进行交互
//private void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
//{
// try
// {
// hit_index--;
// if (hit_index == 0)
// {
// _cluster_handler.PushMsgToNode($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
// hit_index = new Random().Next(1, 20);
// }
// }
// catch (Exception exl)
// {
// _logger.LogException(exl);
// }
//}
///
/// 处理集群产生的数据
///
private async void ReadClusterChannel()
{
try
{
var _info = await _cluster_handler.ChannelCluster.Reader.ReadAsync();
if (_info != null && _info != default)
{
_logger.LogInformation("ClusterChannel.Message:" + Newtonsoft.Json.JsonConvert.SerializeObject(_info));
if (_info.msg_type == 1)
{
if (_cluster_handler.CurrentIsMaster)
{
//当前节点确定选举为Master 可以执行保存等
//var _nodes = _cluster_handler.Nodes();
//if (_nodes != null && _nodes.Length > 0)
//{
// await _appCache.SetAsync>(node_list_cache_key, _nodes.ToList());
//}
}
}
else
{
//业务代码处理
}
}
}
catch (Exception exl)
{
_logger.LogException(exl);
await Task.Delay(1000);
}
finally
{
ReadClusterChannel();
}
}
///
///
///
///
///
///
public Task StopAsync(CancellationToken cancellationToken)
{
_cluster_handler.Dispose();
return Task.CompletedTask;
}
}
}
```
3.在入口函数中启动这个HostedService
``
context.Services.AddHostedService();
``
## 相关问答
1.如何知道当前运行情况
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/status
2.获取所有节点信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/nodes
3.手动注入当前节点IP信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/joinin?host=http://192.168.1.5
4.如何查找当前节点?
有一种模式,就是知道所有集群的IP,需要找到A是A,那么有前提
a.先要把节点的信息写入到组件,所有的节点
b.给当前节点设定一个code,可以使用SetNodeCode
c.后者使用环境信息 -e ClusterConfig:CurrentCode="xxxxxuuuu323423"
d.StartCluster()后,组件会遍历节点,查找哪个节点是自己!主要他们的链接要能通!
5.如何防止多Master的情况出现
a.在每次交互的时候,会附带当前节点的信息,比如master是谁,有多少各节点等,通过这个信息进行判断各自节点的信息是否一致,不一致的时候触发信息同步!
b.master冲突的时候,会检查哪一个节点的选举时间更早,更早的为主,时间采用时间戳ms模式,如果时间戳一致,则再对比对应的Id信息
c.节点数据交互的时候,会进行master和count的消息确定
6.消息完整性?
a.消息分成2部分,一部分是组件内部消息,另一部分是涉及业务的部分
b.内部消息,其实在配置的时间周期内会重新发送重新检查,以便维护集群节点的正确性
c.业务消息,一般是节点发送消息给master,在发送失败的时候,会压入队列,等待选举完成后再解压队列发送,这里有疑问的是,业务消息是否也有可以丢弃的消息,比如如下场景:
x.已知有N个node节点,作为在线客服的websocket的服务端,内部链接又使用site分隔用户
xx.如果要获得某一个站点site有多少人在线,要么遍历所有的node节点,然后统计数据,要么所有节点有信息的时候告知某一个点,很显然在这个需求中,各node节点有在线数变更的时候主动通知是最合适的
xxx.比如node1告知node1:site1:30,node2告知node2:site1:25 信息归总到master中,master中本地缓存这个数据,当有变动的时候,推送给所有客服节点即可!
xxxx.这里要处理的就是,某一个节点宕机的情况,比如node5不可用了,如何告知master他掉了!
## 逻辑介绍
1.Master的选举逻辑
a.当前组件集群支持从1到N,这个数量理论上不是无上限的,要基于配置文件中的最长交互时间决定
b.当当前没有Master的时候,会从节点队列中问询谁是master,被问询的如果也没有master信息,则被问询方直接成为master,并进入master的选举环节
c.节点交互会重置最后交互时间,主要在于master和cluster
d.组件会定期检查这个交互时间是否过期,如果过期了,会尝试健康检查,如果不通,则进入选举模式
2.选举原则
a.谁先启动谁为主
b.谁被告知消息不符,谁放弃,进行master查找
## 写在最后
1.当前组件的代码还没有经过严格测试,我估计还有问题的地方在于
a.时效性的问题,虽然可以修改检查的间隔时间,但是这个时间也是有间隙的
b.选举间隔,产生太多消息的话?消息重复的话!压缩的消息是否对业务有影响
c.如果同时多个节点掉线,那么这个选举的时间会更长
d.当前集群模式将在后续更新到PasteSpider的集群策略中,替换到目前的版本!