# BMsgQueue **Repository Path**: bytes33/BMsgQueue ## Basic Information - **Project Name**: BMsgQueue - **Description**: 1. 主要是为了实现处理大量的SQL连接。(连接不同的数据服务器。处理一些DB事物) 2. 学习使用BOOST的一些库 - **Primary Language**: C++ - **License**: LGPL-2.1 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 7 - **Forks**: 7 - **Created**: 2015-02-08 - **Last Updated**: 2023-09-04 ## Categories & Tags **Categories**: message-server **Tags**: None ## README # BMsgQueue = lockMq + boost::bind ##第一阶段: 1. 实现高并发处理大量的SQL操作。(连接不同的数据服务器。处理一些DB事物) 2. 学习使用BOOST的一些库。 3. 试图在公司的某个应用中使用。(。。。。) ##完成情况: 1. 实现加锁消息队列 2. 用一个交换队列+N多个处理队列,实现消费处理 注:最近看到一个实现,使用二级消息队列能在高并发的协同处理,不会使某些消息饿死?有空试试 2. 把DB操作等方法,用boost::bind 绑定上一个callback,形成一个执行节点,实现一种异步DB操作 ##day day up ##第二阶段: 1. 在生产者、消费者之间加入Broker,实现Producer ==> Broker ==> Consumer { 使得逻辑上解耦分离: 生产者只需要知道Broker的存在,负责生产消息到Broker,不需要关心消费者的行为; 消费者也只需要知道Broker的存在,负责消费处理Broker上某个MQ队列的消息,不需要关心生产者的行为。 } 2. 独立DB模块,使得以后可以方便扩展使用其他数据库。(目前这个只适用Win下链接MySQL) 3. 优化生产者、消费者。(感觉条理不清晰。有可能重构。。。。。) 4. 优化boost::asio 异步Socket。感觉在处理分帧、粘包等问题上处理的不好 5. 每个独立模块加入单元测试代码。 6. 迁移开发环境到linux ##完成情况: 1. 增加一个用协程处理的异步socket 2. 改动目录结构 ------------------- --bin --obj --src ----net ----log ----test --win ------------------- ##更新记录: * DB执行节点 * 简易DBPool * 加锁MSG队列 * 增加读取配置文件 * 拆分BMsgQueue 为两个类 线程类BMsgThread和工作队列BMsgQueue * 使用boost:bind 实现多态 * 加入测试 * 加入vs2010编译环境 * 加入毫秒级事件定时器BEventTimer * 改DBPool为单例模式 * 定时器timer bug * 用boost::circular_buffer 替代list * 一些比较有意思的开源项目地址,作兴趣备注 * boost::asio实现异步socket //commit b7b3c89acad940efc307bd4de20741f82834a1f1 //适用于这个版本 ##测试代码: // int main() { zone.Init(); boost::thread_group group; //开一个交换处理线程 group.create_thread(BMsgThread(SWAP_TYPE)); //执行处理线程 group.create_thread(BMsgThread(RUN_TYPE)); group.create_thread(BMsgThread(RUN_TYPE)); group.create_thread(BMsgThread(RUN_TYPE)); group.create_thread(&test); // boost::asio::io_service ios; //定时器 // 增加一个work对象 boost::asio::io_service::work work(ios); boost::function func = boost::bind(&BDbWorkZone::TestEvent, zone); BEventTimer::GetTimerInstance()->SetEventListener(ios, func, -1, 1000); // ios.run(); group.join_all(); return 0; }