# lock_free_ring_queue **Repository Path**: jerrygit/lock_free_ring_queue ## Basic Information - **Project Name**: lock_free_ring_queue - **Description**: 一种无锁环形队列 (LockFreeRingQueue) 的实现 - **Primary Language**: C++ - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2024-08-21 - **Last Updated**: 2024-08-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README @[TOC] ## 0. 概述 在现代多线程编程中,高效的并发数据结构对于提升系统性能至关重要,尤其是在处理高并发场景时。本文将详细介绍一种无锁环形队列 (`LockFreeRingQueue`) 的实现,并探讨其在实际应用中的优势与局限。 ## 1. 无锁环形队列概述 无锁环形队列是一种高效的并发数据结构,适用于多线程环境下的生产者-消费者模型。它通过使用原子操作(如CAS操作)来避免锁的使用,从而消除了锁竞争带来的性能开销和潜在的死锁风险。 ### 1.1 无锁环形队列的特点 1. **线程安全**:通过原子操作保证数据的正确性。 2. **高效性**:避免了锁竞争,减少了线程上下文切换的开销。 3. **避免ABA问题**:设计时特别考虑了ABA问题的影响,通过合理的索引管理避免了这一问题。 ### 1.2 无锁环形队列的优势与局限 **优势**: 1. **高并发性**:无锁结构通过避免锁的使用,使多个线程可以并发执行,提高了吞吐量。 2. **低延迟**:在高并发场景下,无锁结构减少了线程竞争带来的上下文切换,降低了系统延迟。 3. **避免死锁**:由于没有锁的存在,自然避免了死锁问题。 **局限**: 1. **复杂性**:无锁结构通常比锁机制实现更复杂,容易引入难以调试的并发错误。 2. **硬件依赖性**:原子操作(如CAS)通常依赖于底层硬件支持,在不同平台上表现可能有所不同。 3. **有限应用场景**:无锁队列并不适合所有场景,在某些情况下(如低并发或非实时系统),传统的锁机制可能更为适合。 ## 2. `LockFreeRingQueue` 实现 下面是一个基于C++实现的无锁环形队列的实现,该队列支持多生产者和多消费者线程的并发访问。 ```cpp #ifndef RING_QUEUE_HPP #define RING_QUEUE_HPP #include #include #include #include template class LockFreeRingQueue { public: // Constructor that initializes the ring queue with the specified size explicit LockFreeRingQueue(uint32_t size); // Default destructor ~LockFreeRingQueue() = default; // Disable copy constructor and assignment operator LockFreeRingQueue(const LockFreeRingQueue&) = delete; LockFreeRingQueue& operator=(const LockFreeRingQueue&) = delete; // Enqueue operation to add an element to the queue bool Enqueue(const T& data); // Dequeue operation to remove an element from the queue bool Dequeue(T* data); // Check if the queue is empty bool IsEmpty() const noexcept; // Check if the queue is full bool IsFull() const noexcept; // Get the current size of the queue uint32_t Size() const noexcept; private: // Check if the given number is a power of two static bool IsPowerOfTwo(uint32_t num) noexcept; // Calculate the ceiling power of two greater than or equal to the given number static uint32_t CeilPowerOfTwo(uint32_t num) noexcept; // Round up the given number to the nearest power of two static uint32_t RoundupPowerOfTwo(uint32_t num) noexcept; // Get the index within the queue uint32_t IndexOfQueue(uint32_t index) const noexcept; private: const uint32_t size_; // Size of the queue, must be a power of two std::atomic length_; // Current length of the queue std::atomic read_index_; // Index for the consumer to read std::atomic write_index_; // Index for the producer to write std::atomic last_write_index_; // Last confirmed write index std::unique_ptr queue_; // Array to store the queue elements }; template LockFreeRingQueue::LockFreeRingQueue(uint32_t size) : size_(size <= 1U ? 2U : IsPowerOfTwo(size) ? size : RoundupPowerOfTwo(size)), length_(0U), read_index_(0U), write_index_(0U), last_write_index_(0U), queue_(std::make_unique(size_)) { if (size == 0U) { throw std::out_of_range("Queue size must be greater than 0"); } } template bool LockFreeRingQueue::Enqueue(const T& data) { uint32_t current_read_index; uint32_t current_write_index; do { current_read_index = read_index_.load(std::memory_order_relaxed); current_write_index = write_index_.load(std::memory_order_relaxed); // Check if the queue is full if (IndexOfQueue(current_write_index + 1U) == IndexOfQueue(current_read_index)) { return false; // Queue is full } } while (!write_index_.compare_exchange_weak(current_write_index, current_write_index + 1U, std::memory_order_release, std::memory_order_relaxed)); queue_[IndexOfQueue(current_write_index)] = data; // Confirm the write operation while (!last_write_index_.compare_exchange_weak(current_write_index, current_write_index + 1U, std::memory_order_release, std::memory_order_relaxed)) { std::this_thread::yield(); // Yield CPU to avoid busy-waiting } length_.fetch_add(1U, std::memory_order_relaxed); return true; } template bool LockFreeRingQueue::Dequeue(T* data) { if (data == nullptr) { throw std::invalid_argument("Null pointer passed to Dequeue"); } uint32_t current_read_index; uint32_t current_last_write_index; do { current_read_index = read_index_.load(std::memory_order_relaxed); current_last_write_index = last_write_index_.load(std::memory_order_relaxed); // Check if the queue is empty if (IndexOfQueue(current_last_write_index) == IndexOfQueue(current_read_index)) { return false; // Queue is empty } *data = queue_[IndexOfQueue(current_read_index)]; if (read_index_.compare_exchange_weak(current_read_index, current_read_index + 1U, std::memory_order_release, std::memory_order_relaxed)) { length_.fetch_sub(1U, std::memory_order_relaxed); return true; } } while (true); } template bool LockFreeRingQueue::IsEmpty() const noexcept { return length_.load(std::memory_order_relaxed) == 0U; } template bool LockFreeRingQueue::IsFull() const noexcept { uint32_t next_write_index = IndexOfQueue(write_index_.load(std::memory_order_relaxed) + 1U); return next_write_index == read_index_.load(std::memory_order_acquire); } template uint32_t LockFreeRingQueue::Size() const noexcept { return length_.load(std::memory_order_relaxed); } template bool LockFreeRingQueue::IsPowerOfTwo(uint32_t num) noexcept { return (num != 0U) && ((num & (num - 1U)) == 0U); } template uint32_t LockFreeRingQueue::CeilPowerOfTwo(uint32_t num) noexcept { num |= (num >> 1U); num |= (num >> 2U); num |= (num >> 4U); num |= (num >> 8U); num |= (num >> 16U); return num - (num >> 1U); } template uint32_t LockFreeRingQueue::RoundupPowerOfTwo(uint32_t num) noexcept { return CeilPowerOfTwo((num - 1U) << 1U); } template uint32_t LockFreeRingQueue::IndexOfQueue(uint32_t index) const noexcept { return index & (size_ - 1U); } #endif // RING_QUEUE_HPP ``` ### 2.1 `Enqueue` 操作流程图 ```mermaid graph TD A[Start enQueue] --> B{Check if Full} B --> |Full| C[Return false] B --> |Not Full| D[CAS Update Write Index] D --> |Success| E[Write Data] D --> |Fail| B E --> F[CAS Update Last Write Index] F --> |Success| G[Increment Length] G --> H[End enQueue] F --> |Fail| I[Yield Thread] I --> F ``` ### 2.2 `Dequeue` 操作流程图 ```mermaid graph TD A[Start deQueue] --> B{Check if Empty} B --> |Empty| C[Return false] B --> |Not Empty| D[Read Data from Queue] D --> E[CAS Update Read Index] E --> |Success| F[Decrement Length] F --> G[End deQueue] E --> |Fail| B ``` ## 3. 核心实现细节 ### 3.1 环形队列的大小调整 环形队列的大小通常为2的幂次,以便可以通过位运算快速计算索引。`RoundUpPowerOfTwo` 函数将任意正整数向上调整为最接近的2的幂次。 ### 3.2 索引计算 通过`IndexOfQueue` 函数,索引计算可以使用位与操作(`&`)而不是取模运算(`%`),从而提升计算速度。 ### 3.3 原子操作与CAS 通过 `std::atomic_compare_exchange_weak` 原子操作实现无锁队列的核心逻辑。CAS(Compare-And-Swap)是无锁数据结构的基础,用于确保多个线程在修改共享数据时不会引发数据竞争。 ### 3.4 线程让步 在 `Enqueue` 操作中,当多个线程尝试修改相同的共享变量时,失败的线程可以选择让出CPU时间片,以减少竞争和等待时间。 ## 4. 测试示例程序 下面是一个简单的C++示例程序,演示了如何使用 `LockFreeRingQueue` 类。该程序将进行一些基本的入队和出队操作,然后在多线程环境下测试队列的使用。 ### 4.1 实现代码 ```cpp #include #include #include "lock_free_ring_queue.h" // 假设你的类定义在这个文件中 void SingleProducerSingleConsumerExample() { LockFreeRingQueue queue(4); // 单生产者线程入队 std::thread producer([&queue]() { for (int i = 0; i < 4; ++i) { if (queue.Enqueue(i)) { std::cout << "Producer enqueued: " << i << std::endl; } else { std::cout << "Queue is full, cannot enqueue: " << i << std::endl; } } }); // 单消费者线程出队 std::thread consumer([&queue]() { int value = 0; for (int i = 0; i < 4; ++i) { while (!queue.Dequeue(&value)) { std::this_thread::yield(); // 等待队列中有数据 } std::cout << "Consumer dequeued: " << value << std::endl; } }); producer.join(); consumer.join(); } void MultiProducerMultiConsumerExample() { LockFreeRingQueue queue(8); auto producer = [&queue](int id) { for (int i = 0; i < 4; ++i) { int value = id * 10 + i; if (queue.Enqueue(value)) { std::cout << "Producer " << id << " enqueued: " << value << std::endl; } else { std::cout << "Queue is full, Producer " << id << " cannot enqueue: " << value << std::endl; } } }; auto consumer = [&queue](int id) { int value = 0; for (int i = 0; i < 4; ++i) { while (!queue.Dequeue(&value)) { std::this_thread::yield(); // 等待队列中有数据 } std::cout << "Consumer " << id << " dequeued: " << value << std::endl; } }; std::thread producers[2]; std::thread consumers[2]; // 启动两个生产者线程 for (int i = 0; i < 2; ++i) { producers[i] = std::thread(producer, i); } // 启动两个消费者线程 for (int i = 0; i < 2; ++i) { consumers[i] = std::thread(consumer, i); } for (auto &producer : producers) { producer.join(); } for (auto &consumer : consumers) { consumer.join(); } } int main() { std::cout << "Single Producer, Single Consumer Example:" << std::endl; SingleProducerSingleConsumerExample(); std::cout << "\nMulti Producer, Multi Consumer Example:" << std::endl; MultiProducerMultiConsumerExample(); return 0; } ``` ### 4.2 代码解释: + **`SingleProducerSingleConsumerExample`**: - 创建了一个大小为 4 的队列。 - 使用一个生产者线程将数字 `0-3` 入队。 - 使用一个消费者线程从队列中出队并打印结果。 + **`MultiProducerMultiConsumerExample`**: - 创建了一个大小为 8 的队列。 - 启动两个生产者线程,每个线程将其线程 ID 和循环计数拼接成值并入队。 - 启动两个消费者线程,从队列中出队并打印结果。 ### 4.3 运行程序: 将上面的代码保存为 `main.cpp`,并确保你已经编译和链接了 `LockFreeRingQueue` 类的实现。 编译示例: ```bash g++ -std=c++14 main.cpp -pthread -o lock_free_ring_queue_example ./lock_free_ring_queue_example ``` ### 4.4 执行结果: ```plaintext $ ./lock_free_ring_queue_example Single Producer, Single Consumer Example: Producer enqueued: Consumer dequeued: 0 0 Producer enqueued: 1 Producer enqueued: 2 Producer enqueued: 3 Consumer dequeued: 1 Consumer dequeued: 2 Consumer dequeued: 3 Multi Producer, Multi Consumer Example: Producer 0 enqueued: 0 Producer 0 enqueued: 1 Producer 0 enqueued: 2 Producer 0 enqueued: 3 Consumer 1 dequeued: 0 Consumer 1 dequeued: 1 Consumer 1 dequeued: 2 Consumer 1 dequeued: 3 Producer 1 enqueued: 10 Producer 1 enqueued: 11 Producer 1 enqueued: 12 Producer 1 enqueued: 13 Consumer 0 dequeued: 10 Consumer 0 dequeued: 11 Consumer 0 dequeued: 12 Consumer 0 dequeued: 13 ``` ## 5. 单元测试 下面是使用`gtest`编写的`LockFreeRingQueue`类的完整单元测试。测试涵盖了基本功能,包括队列的初始化、入队、出队、边界条件,以及在多线程环境下的行为。 ```cpp #include "lock_free_ring_queue.h" // 假设你的类定义在这个文件中 #include #include #include #include #include class LockFreeRingQueueTest : public ::testing::Test { protected: void SetUp() override { // 初始化队列大小 queue_size_ = 64; queue_ = std::make_unique>(queue_size_); } std::unique_ptr> queue_; uint32_t queue_size_; }; // 测试队列初始化 TEST_F(LockFreeRingQueueTest, Initialization) { EXPECT_EQ(queue_->Size(), 0U); EXPECT_TRUE(queue_->IsEmpty()); } // 测试入队和出队单个元素 TEST_F(LockFreeRingQueueTest, SingleEnqueueDequeue) { int value_in = 42; int value_out = 0; EXPECT_TRUE(queue_->Enqueue(value_in)); EXPECT_EQ(queue_->Size(), 1U); EXPECT_FALSE(queue_->IsEmpty()); EXPECT_TRUE(queue_->Dequeue(&value_out)); EXPECT_EQ(value_out, value_in); EXPECT_EQ(queue_->Size(), 0U); EXPECT_TRUE(queue_->IsEmpty()); } // 测试队列满时入队 TEST_F(LockFreeRingQueueTest, EnqueueFullQueue) { for (uint32_t i = 0; i < queue_size_ - 1; ++i) { // 注意减1 EXPECT_TRUE(queue_->Enqueue(static_cast(i))); } EXPECT_EQ(queue_->Size(), queue_size_ - 1); EXPECT_FALSE(queue_->Enqueue(100)); // 队列已满,入队失败 } // 测试空队列出队 TEST_F(LockFreeRingQueueTest, DequeueEmptyQueue) { int value_out = 0; EXPECT_FALSE(queue_->Dequeue(&value_out)); // 队列为空,出队失败 } // 多线程测试 TEST_F(LockFreeRingQueueTest, MultiThreadedEnqueueDequeue) { const int num_threads = 4; const int num_elements_per_thread = 10; auto enqueue_function = [&](int thread_id) { for (int i = 0; i < num_elements_per_thread; ++i) { queue_->Enqueue(thread_id * num_elements_per_thread + i); } }; auto dequeue_function = [&](int thread_id, int* result_array) { for (int i = 0; i < num_elements_per_thread; ++i) { int value = 0; while (!queue_->Dequeue(&value)) { std::this_thread::yield(); } result_array[thread_id * num_elements_per_thread + i] = value; } }; std::vector threads; int results[num_threads * num_elements_per_thread] = {0}; for (int i = 0; i < num_threads; ++i) { threads.emplace_back(enqueue_function, i); } for (auto& thread : threads) { thread.join(); } threads.clear(); for (int i = 0; i < num_threads; ++i) { threads.emplace_back(dequeue_function, i, results); } for (auto& thread : threads) { thread.join(); } EXPECT_EQ(queue_->Size(), 0U); EXPECT_TRUE(queue_->IsEmpty()); // 检查所有元素是否都已被成功出队 std::sort(std::begin(results), std::end(results)); for (int i = 0; i < num_threads * num_elements_per_thread; ++i) { EXPECT_EQ(results[i], i); } } // 边界条件测试:初始化大小为1的队列 TEST(LockFreeRingQueueBoundaryTest, InitializationWithSizeOne) { LockFreeRingQueue small_queue(1); EXPECT_EQ(small_queue.Size(), 0U); EXPECT_TRUE(small_queue.IsEmpty()); int value_in = 99; EXPECT_TRUE(small_queue.Enqueue(value_in)); EXPECT_FALSE(small_queue.Enqueue(value_in)); // 队列应该已经满了 } // 边界条件测试:入队和出队仅一个元素 TEST(LockFreeRingQueueBoundaryTest, SingleElementQueue) { LockFreeRingQueue small_queue(1); int value_in = 123; int value_out = 0; EXPECT_TRUE(small_queue.Enqueue(value_in)); EXPECT_FALSE(small_queue.Enqueue(value_in)); // 队列已满 EXPECT_TRUE(small_queue.Dequeue(&value_out)); EXPECT_EQ(value_out, value_in); EXPECT_FALSE(small_queue.Dequeue(&value_out)); // 队列为空 } int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } ``` ### 5.1 测试代码解释: + **基本功能测试**: - `Initialization`: 检查队列是否正确初始化。 - `SingleEnqueueDequeue`: 测试单个元素的入队和出队操作。 - `EnqueueFullQueue`: 测试在队列已满时的入队操作。 - `DequeueEmptyQueue`: 测试在队列为空时的出队操作。 + **多线程测试**: - `MultiThreadedEnqueueDequeue`: 使用多个线程测试队列的入队和出队操作。每个线程分别执行入队和出队操作,最后检查所有元素是否正确出队。 + **边界条件测试**: - `InitializationWithSizeOne`: 测试初始化大小为1的队列。 - `SingleElementQueue`: 测试大小为1的队列的入队和出队操作。 ### 5.2 测试运行: 将上面的代码保存为一个测试文件(例如`lock_free_ring_queue_test.cpp`),并确保你已经安装了`gtest`库。然后编译并运行测试。 编译示例: ```bash g++ -std=c++14 lock_free_ring_queue_test.cpp -lgtest -lgtest_main -pthread -o lock_free_ring_queue_test ./lock_free_ring_queue_test ``` **测试结果**: ```plaintext $ ./lock_free_ring_queue_test [==========] Running 7 tests from 2 test suites. [----------] Global test environment set-up. [----------] 5 tests from LockFreeRingQueueTest [ RUN ] LockFreeRingQueueTest.Initialization [ OK ] LockFreeRingQueueTest.Initialization (0 ms) [ RUN ] LockFreeRingQueueTest.SingleEnqueueDequeue [ OK ] LockFreeRingQueueTest.SingleEnqueueDequeue (0 ms) [ RUN ] LockFreeRingQueueTest.EnqueueFullQueue [ OK ] LockFreeRingQueueTest.EnqueueFullQueue (0 ms) [ RUN ] LockFreeRingQueueTest.DequeueEmptyQueue [ OK ] LockFreeRingQueueTest.DequeueEmptyQueue (0 ms) [ RUN ] LockFreeRingQueueTest.MultiThreadedEnqueueDequeue [ OK ] LockFreeRingQueueTest.MultiThreadedEnqueueDequeue (10 ms) [----------] 5 tests from LockFreeRingQueueTest (10 ms total) [----------] Global test environment tear-down [==========] 7 tests from 2 test suites ran. (10 ms total) [ PASSED ] 7 tests. ``` ## 6. 总结 无锁环形队列在高并发场景下具有显著的性能优势,其设计充分利用了现代硬件提供的原子操作和内存模型。然而,在实际应用中,开发者需要权衡无锁结构带来的复杂性和潜在的硬件依赖问题。