
发布日期:2025-03-18 03:36 点击次数:94
在C++中收场无锁队伍是一个复杂的任务,因为无锁编程波及到复杂的内存操作和同步机制,需要真切衔接并发编程和硬件级别的原子操作。无锁队伍的上风在于高并发场景下不错提供更好的性能高跟美女,因为它幸免了传统锁机制带来的线程竞争和高下文切换支拨。
C++无锁队伍是一种多线程编程技艺,它不错在不使用锁的情况下收场线程安全的队伍。它不错提高多线程关节的性能,无锁队伍的主要念念想是让多个线程同期拜访队伍,而不需要使用锁来保护分享资源。这不错幸免锁竞争和死锁等问题,从而提高关节的拆伙。收场无锁队伍闲居需要使用C++11或更高版块的原子操作库(如<atomic>),以及一些高等的并发收尾技艺。
一、无锁队伍综合无锁队伍(Lock-Free Queue)是一种并发数据结构,用于在多线程环境下收场高效的数据交换。与传统的基于锁的队伍比拟,无锁队伍使用了一些特殊的算法和技艺,幸免了线程之间的互斥操作,从而提高了并发性能和反应性。
无锁队伍闲居基于原子操作(atomic operations)或其他底层同步原语来收场,而且它们遴选一些奥秘的措施来确保操作的正确性。主要念念想是通过使用原子读写操作或雷同的机制,在莫得显式锁定通盘队伍的情况下收场线程安全。
典型的无锁队伍算法有轮回缓冲区(Circular Buffer)和链表(Linked List)等。轮回缓冲区闲居使用两个指针(head 和 tail)来闪现队伍的起首和收尾位置,应用自旋、CAS (Compare-and-Swap) 等原子操作来进行入队和出队操作。链表则通过应用 CAS 操作插入或删除节点来收场并发拜访。
优点:
提供了更好的并发性能,幸免了互斥操作带来的性能瓶颈。
对于高度竞争情况下不错提供更好的可伸缩性。
情欲九歌快播短处:
收场相对复杂,需要探究并发安全和正确性问题。
在高度竞争的情况下可能出现自旋恭候导致的性能赔本。
二、无锁队伍旨趣无锁队伍的旨趣是通过使用原子操作(atomic operations)或其他底层同步原语来收场并发安全。它们幸免了传统锁机制中的互斥操作,以提高并发性能和反应性。
典型的无锁队伍算法有轮回缓冲区(Circular Buffer)和链表(Linked List)等。
在轮回缓冲区的收场中,闲居使用两个指针来闪现队伍的起首位置和收尾位置,即头指针(head)和尾指针(tail)。入队时,通过自旋、CAS (Compare-and-Swap) 等原子操作更新尾指针,并将元素放入相应位置。出队时,一样应用原子操作更新头指针,并复返对应位置上的元素。
链表收场无锁队伍时,在插入或删除节点时使用 CAS 操作来确保唯有一个线程生效修改节点的指针值。这样不错幸免对通盘链表进行加锁操作。
不管是轮回缓冲区如故链表收场,要害点在于怎么应用原子操作确保不同线程之间的互助与一致性。需要仔细处理并发情况下可能出现的竞争条目,并打算合适的算法来保证正确性和性能。
2.1队伍操作模子队伍是一种尽头蹙迫的数据结构,其秉性是先进先出(FIFO),合适活水线业务历程。在程度间通讯、采集通讯间闲居遴选队伍作念缓存,缓解数据处理压力。笔据操作队伍的场景分为:单坐蓐者——单铺张者、多坐蓐者——单铺张者、单坐蓐者——多铺张者、多坐蓐者——多铺张者四大模子。笔据队伍中数据分为:队伍中的数据是定长的、队伍中的数据是变长的。
(1)单坐蓐者——单铺张者
图片
(2)多坐蓐者——单铺张者
图片
(3)单坐蓐者——多铺张者
图片
(4)多坐蓐者——多铺张者
图片
2.2CAS操作CAS即Compare and Swap,是通盘CPU辅导都解救CAS的原子操作(X86中CMPXCHG汇编辅导),用于收场收场多样无锁(lock free)数据结构。
CAS操作的C言语收场如下:
bool compare_and_swap ( int *memory_location, int expected_value, int new_value){ if (*memory_location == expected_value) { *memory_location = new_value; return true; } return false;}
CAS用于查验一个内存位置是否包含预期值,要是包含,则把新值复赋值到内存位置。生效复返true,失败复返false。
(1)GGC对CAS解救,GCC4.1+版块中解救CAS原子操作。
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
(2)Windows对CAS解救,Windows中使用Windows API解救CAS。
LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand);
(3)C11对CAS解救,C11 STL中atomic函数解救CAS并不错跨平台。
template< class T >bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );template< class T >bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
其它原子操作如下:
Fetch-And-Add:一般用来对变量作念+1的原子操作
Test-and-set:写值到某个内存位置并传回其旧值
2.3队伍数据定长与变长(1)队伍数据定长
图片
(2)队伍数据变长
图片
三、无锁队伍决策3.1boost决策boost提供了三种无锁决策,别离适用不同使用场景。
boost::lockfree::queue是解救多个坐蓐者和多个铺张者线程的无锁队伍。
boost::lockfree::stack是解救多个坐蓐者和多个铺张者线程的无锁栈。
boost::lockfree::spsc_queue是仅解救单个坐蓐者和单个铺张者线程的无锁队伍,比boost::lockfree::queue性能更好。
Boost无锁数据结构的API通过轻量级原子锁收场lock-free,不是的确赞佩的无锁。
Boost提供的queue不错竖立运转容量,添加新元素时要是容量不够,则总容量自动增长;但对于无锁数据结构,添加新元素时要是容量不够,总容量不会自动增长。
3.2并发队伍ConcurrentQueue遴选了无锁算法来收场并发操作。它基于CAS(Compare-and-Swap)原子操作和其他底层同步原语来保证线程安全性。具体来说,它使用自旋锁和原子辅导来确保对队伍的修改是原子的,而且在多个线程之间分享数据时提供正确性保证。
ConcurrentQueue是基于C++收场的工业级无锁队伍决策。GitHub:https://github.com/cameron314/concurrentqueueReaderWriterQueue是基于C++收场的单坐蓐者单铺张者场景的无锁队伍决策。GitHub:https://github.com/cameron314/readerwriterqueue
ConcurrentQueue具有以下特色:
线程安全:多个线程不错同期对队伍进行操作而无需极端加锁。
无阻难:入队和出队操作闲居詈骂阻难的,而且具有较低的支拨。
先进先出(FIFO)规矩:元素按照插入规矩陈设,在出队时会复返最早入队的元素。
使用ConcurrentQueue不错轻便地处理多个线程之间分享数据,并减少由于加锁引起的性能支拨。但需要小心,固然ConcurrentQueue提供了高效、线程安全的并发操作,但在某些特定情况下可能不得当通盘应用场景,因此在选拔数据结构时需要笔据具体需求进行评估。
3.3DisruptorDisruptor是一种高性能的并发编程框架,用于收场无锁(lock-free)的并发数据结构。它当先由LMAX Exchange拓荒,并成为了其中枢交游引擎的要害组件。
Disruptor旨在搞定在高度多线程环境下的数据分享和通讯问题。它基于环形缓冲区(Ring Buffer)和事件驱动模子,通过优化内存拜访和线程退换,提供了尽头高效的讯息传递机制。
Disruptor是英外洋汇交游公司LMAX基于JAVA拓荒的一个高性能队伍。GitHub:https://github.com/LMAX-Exchange/disruptor
主要特色如下:
无锁打算:Disruptor使用CAS(Compare-and-Swap)等无锁算法来幸免使用传统锁带来的竞争和阻难。
高婉曲量:Disruptor应用环形缓冲区和预分派内存等技艺,在保证正确性前提下追求尽可能高的处理速率。
低蔓延:由于无锁打算和紧凑的内存布局,Disruptor大概收场尽头低的讯息处理蔓延。
线程间互助:Disruptor提供了生动而巨大的事件发布、铺张者恭候及触发机制,可用于收场复杂的线程间通讯阵势。
使用Disruptor不错有用地搞定坐蓐者-铺张者模子中数据传递过程中的性能瓶颈,稀薄适用于高并发、低蔓延的应用场景,举例金融交游系统、讯息队伍等。干系词,由于Disruptor对编程模子和衔接要求较高,使用时需要仔细探究,并笔据具体需求评估是否得当。
四、无锁队伍收场4.1环形缓冲区RingBuffer是坐蓐者和铺张者模子中常用的数据结构,坐蓐者将数据追加到数组尾端,当达到数组的尾部时,坐蓐者绕回到数组的头部;铺张者从数组头端取走数据,当到达数组的尾部时,铺张者绕回到数组头部。
要是唯有一个坐蓐者和一个铺张者,环形缓冲区不错无锁拜访,环形缓冲区的写入index只允许坐蓐者拜访并修改,只须坐蓐者在更新index前将新的值保存到缓冲区中,则铺张者将恒久看到一致的数据结构;读取index也只允许铺张者拜访并修改,铺张者只须在取走数据后更新读index,则坐蓐者将恒久看到一致的数据结构。
空队伍时,front与rear相等;当有元素进队,则rear后移;有元素出队,则front后移。
空队伍时,rear等于front;满队伍时,队伍尾部空一个位置,因此判断轮回队伍满时使用(rear-front+maxn)%maxn。
入队操作:
data[rear] = x;rear = (rear+1)%maxn;
出队操作:
x = data[front];front = (front+1)%maxn;
单坐蓐者单铺张者
对于单坐蓐者和单铺张者场景,由于read_index和write_index都只会有一个线程写,因此不需要加锁也不需要原子操作,凯旋修改即可,但读写数据时需要探究遭受数组尾部的情况。
线程对write_index和read_index的读写操作如下:
(1)写操作。先判断队伍时否为满,要是队伍未满,则先写数据,写完数据后再修改write_index。
(2)读操作。先判断队伍是否为空,要是队伍不为空,则先读数据,读完再修改read_index。
多坐蓐者单铺张者
多坐蓐者和单铺张者场景中,由于多个坐蓐者都会修改write_index,是以在不加锁的情况下必须使用原子操作。
4.2RingBuffer收场RingBuffer.hpp文献:
#pragma once template <class T>class RingBuffer{public: RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0) { m_data = new T[size]; } ~RingBuffer() { delete [] m_data; m_data = NULL; } inline bool isEmpty() const { return m_front == m_rear; } inline bool isFull() const { return m_front == (m_rear + 1) % m_size; } bool push(const T& value) { if(isFull()) { return false; } m_data[m_rear] = value; m_rear = (m_rear + 1) % m_size; return true; } bool push(const T* value) { if(isFull()) { return false; } m_data[m_rear] = *value; m_rear = (m_rear + 1) % m_size; return true; } inline bool pop(T& value) { if(isEmpty()) { return false; } value = m_data[m_front]; m_front = (m_front + 1) % m_size; return true; } inline unsigned int front()const { return m_front; } inline unsigned int rear()const { return m_rear; } inline unsigned int size()const { return m_size; }private: unsigned int m_size;// 队伍长度 int m_front;// 队伍头部索引 int m_rear;// 队伍尾部索引 T* m_data;// 数据缓冲区};
RingBufferTest.cpp测试代码:
#include <stdio.h>#include <thread>#include <unistd.h>#include <sys/time.h>#include "RingBuffer.hpp" class Test{public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); }private: int id; int value; char data[128];}; double getdetlatimeofday(struct timeval *begin, struct timeval *end){ return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);} RingBuffer<Test> queue(1 << 12);2u000 #define N (10 * (1 << 20)) void produce(){ struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i % 1024, i))) { i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} void consume(){ sleep(1); Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { // test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} int main(int argc, char const *argv[]){ std::thread producer1(produce); std::thread consumer(consume); producer1.join(); consumer.join(); return 0;}
编译:
g++ --std=c++11 RingBufferTest.cpp -o test -pthread
单坐蓐者单铺张者场景下,讯息婉曲量为350万条/秒傍边。
4.3LockFreeQueue收场LockFreeQueue.hpp:
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <fcntl.h>#include <stdbool.h>#include <sys/stat.h>#include <sys/types.h>#include <sys/time.h>#include <sys/mman.h> #define SHM_NAME_LEN 128#define MIN(a, b) ((a) > (b) ? (b) : (a))#define IS_POT(x) ((x) && !((x) & ((x)-1)))#define MEMORY_BARRIER __sync_synchronize() template <class T>class LockFreeQueue{protected: typedef struct { int m_lock; inline void spinlock_init() { m_lock = 0; } inline void spinlock_lock() { while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {} } inline void spinlock_unlock() { __sync_lock_release(&m_lock); } } spinlock_t; public: // size:队伍大小 // name:分享内存key的旅途称呼,默许为NULL,使用数组动作底层缓冲区。 LockFreeQueue(unsigned int size, const char* name = NULL) { memset(shm_name, 0, sizeof(shm_name)); createQueue(name, size); } ~LockFreeQueue() { if(shm_name[0] == 0) { delete [] m_buffer; m_buffer = NULL; } else { if (munmap(m_buffer, m_size * sizeof(T)) == -1) { perror("munmap"); } if (shm_unlink(shm_name) == -1) { perror("shm_unlink"); } } } bool isFull()const {#ifdef USE_POT return m_head == (m_tail + 1) & (m_size - 1);#else return m_head == (m_tail + 1) % m_size;#endif } bool isEmpty()const { return m_head == m_tail; } unsigned int front()const { return m_head; } unsigned int tail()const { return m_tail; } bool push(const T& value) {#ifdef USE_LOCK m_spinLock.spinlock_lock();#endif if(isFull()) {#ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return false; } memcpy(m_buffer + m_tail, &value, sizeof(T));#ifdef USE_MB MEMORY_BARRIER;#endif #ifdef USE_POT m_tail = (m_tail + 1) & (m_size - 1);#else m_tail = (m_tail + 1) % m_size;#endif #ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return true; } bool pop(T& value) {#ifdef USE_LOCK m_spinLock.spinlock_lock();#endif if (isEmpty()) {#ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return false; } memcpy(&value, m_buffer + m_head, sizeof(T));#ifdef USE_MB MEMORY_BARRIER;#endif #ifdef USE_POT m_head = (m_head + 1) & (m_size - 1);#else m_head = (m_head + 1) % m_size;#endif #ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return true; } protected: virtual void createQueue(const char* name, unsigned int size) {#ifdef USE_POT if (!IS_POT(size)) { size = roundup_pow_of_two(size); }#endif m_size = size; m_head = m_tail = 0; if(name == NULL) { m_buffer = new T[m_size]; } else { int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666); if (shm_fd < 0) { perror("shm_open"); } if (ftruncate(shm_fd, m_size * sizeof(T)) < 0) { perror("ftruncate"); close(shm_fd); } void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (addr == MAP_FAILED) { perror("mmap"); close(shm_fd); } if (close(shm_fd) == -1) { perror("close"); exit(1); } m_buffer = static_cast<T*>(addr); memcpy(shm_name, name, SHM_NAME_LEN - 1); }#ifdef USE_LOCK spinlock_init(m_lock);#endif } inline unsigned int roundup_pow_of_two(size_t size) { size |= size >> 1; size |= size >> 2; size |= size >> 4; size |= size >> 8; size |= size >> 16; size |= size >> 32; return size + 1; }protected: char shm_name[SHM_NAME_LEN]; volatile unsigned int m_head; volatile unsigned int m_tail; unsigned int m_size;#ifdef USE_LOCK spinlock_t m_spinLock;#endif T* m_buffer;};
#define USE_LOCK开启spinlock锁,多坐蓐者多铺张者场景#define USE_MB开启Memory Barrier#define USE_POT开启队伍大小的2的幂对王人
LockFreeQueueTest.cpp测试文献:
#include "LockFreeQueue.hpp"#include <thread> //#define USE_LOCK class Test{public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); }private: int id; int value; char data[128];}; double getdetlatimeofday(struct timeval *begin, struct timeval *end){ return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);} LockFreeQueue<Test> queue(1 << 10, "/shm"); #define N ((1 << 20)) void produce(){ struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i >> 10, i))) i++; } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} void consume(){ Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { //test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} int main(int argc, char const *argv[]){ std::thread producer1(produce); //std::thread producer2(produce); std::thread consumer(consume); producer1.join(); //producer2.join(); consumer.join(); return 0;}
多线程场景下,需要界说USE_LOCK宏,开启锁保护。
编译:
g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread五、kfifo内核队伍
计较机科学家照旧阐发注解,当唯有一个读线程和一个写线程并发操作时,不需要任何极端的锁,就不错确保是线程安全的,也即kfifo使用了无锁编程技艺,以提高kernel的并发。
Linux kernel内部从来就不清寒简略,优雅和高效的代码,仅仅咱们清寒发现和回味的眼神。在Linux kernel内部,简略并不闪现代码使用神出鬼没的超然技能,相悖,它使用的不外是人人尽头熟识的基础数据结构,然则kernel拓荒者能从基础的数据结构中,索求出优好意思的秉性。
kfifo即是这样的一类优好意思代码,它十分简略,绝无饱和的一滑代码,却尽头高效。
对于kfifo信息如下:
本文分析的原代码版块: 2.6.24.4kfifo的界说文献: kernel/kfifo.ckfifo的头文献: include/linux/kfifo.h
kfifo是Linux内核的一个FIFO数据结构,遴选环形轮回队伍的数据结构来收场,提供一个无垠界的字节流工作,而且使用并行无锁编程技艺,即单坐蓐者单铺张者场景下两个线程不错并发操作,不需要任何加锁行径就不错保证kfifo线程安全。
kfifo代码既然肩负着这样多秉性,那咱们先一敝它的代码:
struct kfifo { unsigned char *buffer; /* the buffer holding the data */ unsigned int size; /* the size of the allocated buffer */ unsigned int in; /* data is added at offset (in % size) */ unsigned int out; /* data is extracted from off. (out % size) */ spinlock_t *lock; /* protects concurrent modifications */};
这是kfifo的数据结构,kfifo主要提供了两个操作,__kfifo_put(入队操作)和__kfifo_get(出队操作)。 它的各个数据成员如下:
buffer: 用于存放数据的缓存
size: buffer空间的大小,在初化时,将它朝上膨大成2的幂(如5,朝上膨大 与它最接近的值且是2的n次方的值是2^3,即8)
lock: 要是使用不成保证任何时代最多唯有一个读线程和写线程,需要使用该lock实施同步。
in, out: 和buffer一齐组成一个轮回队伍。 in指向buffer中队头,而且out指向buffer中的队尾
它的结构如示图如下:
+--------------------------------------------------------------+| |<----------data---------->| |+--------------------------------------------------------------+ ^ ^ ^ | | | out in size
天然,内核拓荒者使用了一种更好的技艺处理了in, out和buffer的关系,咱们将鄙人面进行刺目分析。
5.1kfifo功能姿首kfifo提供如下对外功能规格
只解救一个读者和一个读者并发操作
无阻难的读写操作,要是空间不够,则复返内容拜访空间
(1)kfifo_alloc 分派kfifo内存和运升沉责任
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ unsigned char *buffer; struct kfifo *ret; /* * round up to the next power of 2, since our 'let the indices * wrap' tachnique works only in this case. */ if (size & (size - 1)) { BUG_ON(size > 0x80000000); size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret;}
这里值得一提的是,kfifo->size的值老是在调用者传进来的size参数的基础上向2的幂膨大(roundup_pow_of_two,我我方的收场在著述末尾),这是内核一贯的作念法。这样的平允了然于目——对kfifo->size取模运算不错升沉为与运算,如下:
kfifo->in % kfifo->size 不错升沉为 kfifo->in & (kfifo->size – 1)
在kfifo_alloc函数中,使用size & (size – 1)来判断size 是否为2幂,要是条目为真,则闪现size不是2的幂,然后调用roundup_pow_of_two将之朝上膨大为2的幂。
这都是常用的技能,只不外人人莫得将它们辘集起来使用费力,底下要分析的__kfifo_put和__kfifo_get则是将kfifo->size的特色推崇到了极致。
(2)__kfifo_put和__kfifo_get奥秘的入队和出队
__kfifo_put是入队操作,它先将数据放入buffer内部,临了才修改in参数;__kfifo_get是出队操作,它先将数据从buffer中移走,临了才修改out。(确保即使in和out修改失败,也不错再来一遍)
你会发现in和out两者各司其职。底下是__kfifo_put和__kfifo_get的代码
unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; len = min(len, fifo->size - fifo->in + fifo->out); /* * Ensure that we sample the fifo->out index -before- we * start putting bytes into the kfifo. */ smp_mb(); /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. */ smp_wmb(); fifo->in += len; return len;}
奇怪吗?代码实足是线性结构,莫得任何if-else分支来判断是否有足够的空间存放数据。内核在这里的代码尽头简略,莫得一滑饱和的代码。
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
这个抒发式计较现时写入的空间,换成东谈主可衔接的言语即是:
l = kfifo可写空间和预期写入空间的最小值
(3)使用min宏来代if-else分支
__kfifo_get也应用了一样技能,代码如下:
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; len = min(len, fifo->in - fifo->out); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. */ smp_rmb(); /* first get the data from fifo->out until the end of the buffer */ l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, fifo->buffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. */ smp_mb(); fifo->out += len; return len;}
肃肃读两遍吧,我也读了屡次,每次老是有新发现,因为in, out和size的关系太奥秘了,果然能应用上unsigned int回绕的秉性。
原来,kfifo每次入队或出队,kfifo->in或kfifo->out仅仅不祥地kfifo->in/kfifo->out += len,并莫得对kfifo->size 进行取模运算。因此kfifo->in和kfifo->out老是一直增大,直到unsigned in最大值时,又会绕回到0这一肇始端。但恒久称心:
kfifo->in - kfifo->out <= kfifo->size即使kfifo->in回绕到了0的那一端,这个性质仍然是保执的。
对于给定的kfifo:
数据空间长度为:kfifo->in - kfifo->out而剩余空间(可写入空间)长度为:kfifo->size - (kfifo->in - kfifo->out)
尽管kfifo->in和kfofo->out一直突出kfifo->size进行增长,但它对应在kfifo->buffer空间的下标却是如下:
kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))
往kfifo内部写一块数据时,数据空间、写入空间和kfifo->size的关系要是称心:
kfifo->in % size + len > size
那就要作念写拆分了,见下图:
kfifo_put(写)空间起首地址 | \_/ |XXXXXXXXXXXXXXXXXX| +--------------------------------------------------------------+| |<----------data---------->| |+--------------------------------------------------------------+ ^ ^ ^ | | | out%size in%size size ^ | 写空间收尾地址
第一块天然是: [kfifo->in % kfifo->size, kfifo->size]第二块天然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]
底下是代码,细细体味吧:
/* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l);
对于kfifo_get过程,亦然雷同的,请诸君自行分析。
(4)kfifo_get和kfifo_put无锁并发操作
计较机科学家照旧阐发注解,当唯有一个读经程和一个写线程并发操作时,不需要任何极端的锁,就不错确保是线程安全的,也即kfifo使用了无锁编程技艺,以提高kernel的并发。
kfifo使用in和out两个指针来姿首写入和读取游标,对于写入操作,只更新in指针,而读取操作,只更新out指针,可谓詈骂分明,闪现图如下:
|<--写入-->|+--------------------------------------------------------------+| |<----------data----->| |+--------------------------------------------------------------+ |<--读取-->| ^ ^ ^ | | | out in size
为了幸免读者看到写者预测写入,但内容莫得写入数据的空间,写者必须保证以下的写入规矩:
往[kfifo->in, kfifo->in + len]空间写入数据更新kfifo->in指针为 kfifo->in + len
在操作1完成时,读者是还莫得看到写入的信息的,因为kfifo->in莫得变化,觉得读者还莫得起首写操作,唯有更新kfifo->in之后,读者才能看到。
那么怎么保证1必须在2之前完成,心事即是使用内存障蔽:smp_mb(),smp_rmb(), smp_wmb(),来保证对方不雅察到的内存操作规矩。
5.2kfifo内核队伍收场kfifo数据结构界说如下:
struct kfifo{ unsigned char *buffer; unsigned int size; unsigned int in; unsigned int out; spinlock_t *lock;}; // 创建队伍struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ struct kfifo *fifo; // 判断是否为2的幂 BUG_ON(!is_power_of_2(size)); fifo = kmalloc(sizeof(struct kfifo), gfp_mask); if (!fifo) return ERR_PTR(-ENOMEM); fifo->buffer = buffer; fifo->size = size; fifo->in = fifo->out = 0; fifo->lock = lock; return fifo;} // 分派空间struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ unsigned char *buffer; struct kfifo *ret; // 判断是否为2的幂 if (!is_power_of_2(size)) { BUG_ON(size > 0x80000000); // 朝上膨大成2的幂 size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret;} void kfifo_free(struct kfifo *fifo){ kfree(fifo->buffer); kfree(fifo);} // 入队操作static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_put(fifo, buffer, len); spin_unlock_irqrestore(fifo->lock, flags); return ret;} // 出队操作static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_get(fifo, buffer, len); //当fifo->in == fifo->out时,buufer为空 if (fifo->in == fifo->out) fifo->in = fifo->out = 0; spin_unlock_irqrestore(fifo->lock, flags); return ret;} // 入队操作unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len){ unsigned int l; //buffer中空的长度 len = min(len, fifo->size - fifo->in + fifo->out); // 内存障蔽:smp_mb(),smp_rmb(), smp_wmb()来保证对方不雅察到的内存操作规矩 smp_mb(); // 将数据追加到队伍尾部 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); memcpy(fifo->buffer, buffer + l, len - l); smp_wmb(); //每次累加,到达最大值后溢出,自动转为0 fifo->in += len; return len;} // 出队操作unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; //稀有据的缓冲区的长度 len = min(len, fifo->in - fifo->out); smp_rmb(); l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); memcpy(buffer + l, fifo->buffer, len - l); smp_mb(); fifo->out += len; //每次累加,到达最大值后溢出,自动转为0 return len;} static inline void __kfifo_reset(struct kfifo *fifo){ fifo->in = fifo->out = 0;} static inline void kfifo_reset(struct kfifo *fifo){ unsigned long flags; spin_lock_irqsave(fifo->lock, flags); __kfifo_reset(fifo); spin_unlock_irqrestore(fifo->lock, flags);} static inline unsigned int __kfifo_len(struct kfifo *fifo){ return fifo->in - fifo->out;} static inline unsigned int kfifo_len(struct kfifo *fifo){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_len(fifo); spin_unlock_irqrestore(fifo->lock, flags); return ret;}5.3kfifo打算重心
(1)保证buffer size为2的幂
kfifo->size值在调用者传递参数size的基础上向2的幂膨大,观点是使kfifo->size取模运算不错升沉为位与运算(提高运行拆伙)。kfifo->in % kfifo->size升沉为 kfifo->in & (kfifo->size – 1)
保证size是2的幂不错通过位运算的神情求余,在频繁操作队伍的情况下不错大大提高拆伙。
(2)使用spin_lock_irqsave与spin_unlock_irqrestore 收场同步。
Linux内核中有spin_lock、spin_lock_irq和spin_lock_irqsave保证同步。
static inline void __raw_spin_lock(raw_spinlock_t *lock){ preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);} static inline void __raw_spin_lock_irq(raw_spinlock_t *lock){ local_irq_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);}
spin_lock比spin_lock_irq速率快,但并不是线程安全的。spin_lock_irq加多调用local_irq_disable函数,即扼制土产货中断,是线程安全的,既扼制土产货中断,又扼制内核霸占。
spin_lock_irqsave是基于spin_lock_irq收场的一个接济接口,在插足和离开临界区后,不会篡改中断的开启、关闭景色。
要是自旋锁在中断处理函数中被用到,在得回自旋锁前需要关闭土产货中断,spin_lock_irqsave收场如下:
A、保存土产货中断景色;
B、关闭土产货中断;
C、得回自旋锁。
解锁时通过 spin_unlock_irqrestore完成开释锁、复本土产货中断到原来景色等责任。
(3)线性代码结构
代码中莫得任何if-else分支来判断是否有足够的空间存放数据,kfifo每次入队或出队仅仅不祥的 +len 判断剩余空间,并莫得对kfifo->size 进行取模运算,是以kfifo->in和kfifo->out老是一直增大,直到unsigned in突出最大值时绕回到0这一肇始端,但恒久称心:kfifo->in - kfifo->out <= kfifo->size。
(4)使用Memory Barrier
mb():适用于多处理器和单处理器的内存障蔽。
rmb():适用于多处理器和单处理器的读内存障蔽。
wmb():适用于多处理器和单处理器的写内存障蔽。
smp_mb():适用于多处理器的内存障蔽。
smp_rmb():适用于多处理器的读内存障蔽。
smp_wmb():适用于多处理器的写内存障蔽。
Memory Barrier使用场景如下:
A、收场同步原语(synchronization primitives)
B、收场无锁数据结构(lock-free data structures)
C、驱动关节
关节在运行时内存内容拜访规矩和关节代码编写的拜访规矩不一定一致,即内存乱序拜访。内存乱序拜访行径出现是为了普及关节运行时的性能。内存乱序拜访主要发生在两个阶段:
A、编译时,编译器优化导致内存乱序拜访(辅导重排)。
B、运行时,多CPU间交互引起内存乱序拜访。
Memory Barrier大概让CPU或编译器在内存拜访上有序。Memory barrier前的内存拜访操作必定先于自后的完成。Memory Barrier包括两类:
A、编译器Memory Barrier。
B、CPU Memory Barrier。
闲居,编译器和CPU引起内存乱序拜访不会带来问题,但要是关节逻辑的正确性依赖于内存拜访规矩,内存乱序拜访会带来逻辑上的失实。
在编译时,编译器对代码作念出优化时可能篡改内容奉行辅导的规矩(如GCC的O2或O3都会篡改内容奉行辅导的规矩)。
在运行时,CPU固然会乱序奉行辅导,但在单个CPU上高跟美女,硬件大概保证关节奉行时通盘的内存拜访操作都是按关节代码编写的规矩奉行的,Memory Barrier莫得必要使用(不探究编译器优化)。为了更快奉行辅导,CPU遴选活水线的奉行神情,编译器在编译代码时为了使辅导更得当CPU的活水线奉行神情以及多CPU奉行,正本辅导就会出现乱序的情况。在乱序奉行时,CPU的确奉行辅导的规矩由可用的输入数据决定,而非关节员编写的规矩。
本站仅提供存储工作,通盘内容均由用户发布,如发现存害或侵权内容,请点击举报。