commit da1d09a2d4cfb13ad6ddbaaff688169019aaa33c parent 5b01c31c7175b094789378216cf794a5f980785a Author: d.levin256@gmail.com <d.levin256@gmail.com> Date: Tue, 4 Apr 2017 02:48:10 +0300 Lock-free ring buffer Diffstat:
M | include/kfr/base/univector.hpp | | | 65 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
1 file changed, 65 insertions(+), 0 deletions(-)
diff --git a/include/kfr/base/univector.hpp b/include/kfr/base/univector.hpp @@ -383,6 +383,71 @@ CMT_INLINE univector<T> render(Expr&& expr, size_t size) result = expr; return result; } + +/// @brief Single producer single consumer lock-free ring buffer +template <typename T> +struct lockfree_ring_buffer +{ + lockfree_ring_buffer() : front(0), tail(0) {} + + size_t size() const + { + return tail.load(std::memory_order_relaxed) - front.load(std::memory_order_relaxed); + } + + template <size_t Tag> + size_t try_enqueue(const T* source, size_t size, univector<T, Tag>& buffer, bool partial = false) + { + const size_t cur_tail = tail.load(std::memory_order_relaxed); + const size_t avail_size = buffer.size() - (cur_tail - front.load(std::memory_order_relaxed)); + if (size > avail_size) + { + if (!partial) + return 0; + size = std::min(size, avail_size); + } + std::atomic_thread_fence(std::memory_order_acquire); + + const size_t real_tail = cur_tail % buffer.size(); + const size_t first_size = std::min(buffer.size() - real_tail, size); + internal::builtin_memcpy(buffer.data() + real_tail, source, first_size * sizeof(T)); + internal::builtin_memcpy(buffer.data(), source + first_size, (size - first_size) * sizeof(T)); + + std::atomic_thread_fence(std::memory_order_release); + + tail.store(cur_tail + size, std::memory_order_relaxed); + return size; + } + + template <size_t Tag> + size_t try_dequeue(T* dest, size_t size, const univector<T, Tag>& buffer, bool partial = false) + { + const size_t cur_front = front.load(std::memory_order_relaxed); + const size_t avail_size = tail.load(std::memory_order_relaxed) - cur_front; + if (size > avail_size) + { + if (!partial) + return 0; + size = std::min(size, avail_size); + } + std::atomic_thread_fence(std::memory_order_acquire); + + const size_t real_front = cur_front % buffer.size(); + const size_t first_size = std::min(buffer.size() - real_front, size); + internal::builtin_memcpy(dest, buffer.data() + real_front, first_size * sizeof(T)); + internal::builtin_memcpy(dest + first_size, buffer.data(), (size - first_size) * sizeof(T)); + + std::atomic_thread_fence(std::memory_order_release); + + front.store(cur_front + size, std::memory_order_relaxed); + return size; + } + +private: + std::atomic<size_t> front; + char cacheline_filler[64 - sizeof(std::atomic<size_t>)]; + std::atomic<size_t> tail; +}; } CMT_PRAGMA_MSVC(warning(pop))