Programing

원형 잠금없는 버퍼

lottogame 2020. 11. 5. 07:41
반응형

원형 잠금없는 버퍼


하나 이상의 데이터 피드 스트림에 연결하고 결과를 기반으로 이벤트를 트리거하는 것보다 데이터에 대한 분석을 수행하는 시스템을 설계하는 중입니다. 일반적인 다중 스레드 생산자 / 소비자 설정에서는 여러 생산자 스레드가 데이터를 대기열에 넣고 여러 소비자 스레드가 데이터를 읽고 있으며 소비자는 최신 데이터 포인트와 n 개의 포인트에만 관심이 있습니다. 생산자 스레드는 느린 소비자가 따라 잡을 수없는 경우 차단해야하며, 처리되지 않은 업데이트가 없으면 당연히 소비자 스레드가 차단됩니다. 리더 / 라이터 잠금과 함께 일반적인 동시 대기열을 사용하면 잘 작동하지만 들어오는 데이터 속도는 엄청날 수 있으므로 잠금 오버 헤드, 특히 생산자를위한 작성자 잠금을 줄이고 싶었습니다. 순환 잠금없는 버퍼가 필요하다고 생각합니다.

이제 두 가지 질문이 있습니다.

  1. 순환 잠금없는 버퍼가 답입니까?

  2. 만약 그렇다면, 내가 직접 시작하기 전에 내 필요에 맞는 공개 구현을 알고 있습니까?

순환 잠금없는 버퍼를 구현하는 모든 포인터는 항상 환영합니다.

BTW, Linux의 C ++에서이 작업을 수행합니다.

추가 정보 :

응답 시간은 내 시스템에 매우 중요합니다. 이상적으로 소비자 스레드는 추가 1 밀리 초 지연이 시스템을 쓸모 없게 만들거나 훨씬 덜 가치있게 만들 수 있기 때문에 가능한 한 빨리 들어오는 업데이트를보고 싶어 할 것입니다.

제가 기대하고있는 디자인 아이디어는 생산자 스레드가 가능한 한 빨리 버퍼에 데이터를 넣는 반 잠금없는 순환 버퍼입니다. 버퍼가 가득 차 있지 않으면 차단하지 않고 버퍼 A의 헤드를 호출합시다. A가 버퍼 Z의 끝을 충족합니다. 소비자 스레드는 각각 순환 버퍼 P 및 P n에 대한 두 개의 포인터를 보유합니다 . 여기서 P는 스레드의 로컬 버퍼 헤드이고 P nP 다음의 n 번째 항목입니다. 각 소비자 스레드는 P를 전진 시킵니다. 그리고 P n 일단 현재 P 처리를 마치고 버퍼 포인터 Z의 끝이 가장 느린 P n으로 진행됩니다.. P가 A를 따라 잡으면 더 이상 처리 할 새로운 업데이트가 없음을 의미하며 소비자는 회전하고 A가 다시 진행될 때까지 바쁘게 기다립니다. 소비자 스레드가 너무 오래 회전하면 잠자기 상태가되어 조건 변수를 기다릴 수 있지만, 지연 시간을 늘리지 않기 때문에 소비자가 업데이트를 기다리는 CPU주기를 차지해도 괜찮습니다 (더 많은 CPU 코어가있을 것입니다). 스레드보다). 순환 트랙이 있고 생산자가 많은 소비자 앞에서 실행 중이라고 상상해보십시오. 핵심은 생산자가 일반적으로 소비자보다 몇 발 앞서 실행되도록 시스템을 조정하는 것입니다. 이러한 작업의 대부분은 잠금없는 기술을 사용하여 수행됩니다. 구현에 대한 세부 사항을 올바르게 이해하는 것이 쉽지 않다는 것을 이해합니다 ... 좋아요, 매우 어렵습니다. 그래서 제가 몇 가지 실수를하기 전에 다른 사람의 실수로부터 배우고 싶습니다.


지난 몇 년 동안 잠금없는 데이터 구조에 대한 특정 연구를했습니다. 나는 현장에서 대부분의 논문을 읽었습니다 (약 40 개 정도 밖에 없습니다 – 실제로 사용하는 것은 10 개 또는 15 개 정도입니다. :-)

잠금없는 순환 버퍼 인 AFAIK는 개발되지 않았습니다. 문제는 독자가 작가를 추월하거나 그 반대의 복잡한 상황을 다루는 것입니다.

잠금없는 데이터 구조를 연구하는 데 최소 6 개월을 소비하지 않았다면 직접 작성하지 마십시오. 당신은 그것을 잘못 이해하고 새로운 플랫폼에서 배포 후 코드가 실패 할 때까지 오류가 존재한다는 것이 분명하지 않을 수 있습니다.

그러나 귀하의 요구 사항에 대한 해결책이 있다고 생각합니다.

잠금없는 대기열을 잠금없는 사용 가능 목록과 페어링해야합니다.

무료 목록은 사전 할당을 제공하므로 잠금없는 할당 자에 대한 (재정적으로 비싼) 요구 사항을 제거합니다. free-list가 비어 있으면 큐에서 요소를 즉시 빼고 대신 사용하여 순환 버퍼의 동작을 복제합니다.

(물론 잠금 기반 순환 버퍼에서 일단 잠금이 확보되면 요소를 얻는 것이 매우 빠르다 (기본적으로 포인터 역 참조 일뿐).하지만 잠금없는 알고리즘에서는이를 얻을 수 없습니다. 작업을 수행하는 방법에서 잘 벗어났습니다. 자유 목록 팝에 실패하고 대기열에서 빼는 오버 헤드는 잠금없는 알고리즘이 수행해야하는 작업의 양과 동등합니다.

Michael과 Scott은 1996 년에 정말 좋은 잠금없는 대기열을 개발했습니다. 아래 링크는 논문의 PDF를 추적 할 수있는 충분한 세부 정보를 제공합니다. Michael 및 Scott, FIFO

잠금없는 자유 목록은 가장 간단한 잠금없는 알고리즘이며 실제로 이에 대한 실제 논문을 본 적이 없다고 생각합니다.


원하는 것에 대한 예술 용어는 잠금없는 대기열 입니다. Ross Bencina의 코드 및 논문 에 대한 링크 가 포함 된 훌륭한 메모 세트가 있습니다 . 내가 가장 신뢰하는 사람은 Maurice Herlihy입니다 (미국인의 경우 이름을 "Morris"처럼 발음).


버퍼가 비어 있거나 가득 찬 경우 생산자 또는 소비자가 차단하는 요구 사항은 데이터를 사용할 수있을 때까지 생산자와 소비자가 차단되도록 세마포어 또는 조건 변수와 함께 일반 잠금 데이터 구조를 사용해야 함을 제안합니다. 잠금없는 코드는 일반적으로 이러한 조건에서 차단되지 않습니다. OS를 사용하여 차단하는 대신 수행 할 수없는 작업을 회전하거나 포기합니다. (다른 스레드가 데이터를 생성하거나 소비 할 때까지 기다릴 여유가 있다면 다른 스레드가 데이터 구조 업데이트를 완료 할 때까지 잠금을 기다리는 이유는 무엇입니까?)

(x86 / x64) Linux에서 뮤텍스를 사용한 스레드 내 동기화는 경합이없는 경우 상당히 저렴합니다. 생산자와 소비자가 잠금을 유지하는 데 필요한 시간을 최소화하는 데 집중하십시오. 마지막으로 기록 된 N 개의 데이터 포인트에만 관심이 있다고 말씀 하셨으므로 원형 버퍼가이 작업을 합리적으로 잘 수행 할 것이라고 생각합니다. 그러나 이것이 차단 요구 사항과 소비자가 실제로 읽은 데이터를 소비 (제거)한다는 아이디어에 어떻게 부합하는지 실제로 이해하지 못합니다. (소비자가 마지막 N 개의 데이터 포인트 만보고 제거하지 않기를 원합니까? 생산자가 소비자가 따라갈 수 없는지 신경 쓰지 않고 오래된 데이터 만 덮어 쓰도록 하시겠습니까?)

또한 Zan Lynx가 언급했듯이 데이터가 많이 들어올 때 데이터를 더 큰 덩어리로 집계 / 버퍼링 할 수 있습니다. 고정 된 수의 포인트 또는 일정 시간 내에 수신 된 모든 데이터를 버퍼링 할 수 있습니다. . 이는 동기화 작업이 더 적다는 것을 의미합니다. 그러나 지연 시간이 발생하지만 실시간 Linux를 사용하지 않는 경우 어쨌든 어느 정도 처리해야합니다.


DDJ 에 대한 꽤 좋은 기사 시리즈가 있습니다. 이 물건이 얼마나 어려울 수 있는지에 대한 신호로, 이전 기사대한 수정 이 잘못되었습니다. 자신을 굴리기 전에 실수를 이해했는지 확인하십시오)-;


부스트 라이브러리의 구현은 고려할 가치가 있습니다. 사용하기 쉽고 상당히 높은 성능입니다. 테스트를 작성하고 쿼드 코어 i7 랩톱 (8 스레드)에서 실행 한 후 1 초에 최대 4M 대기열에 넣거나 대기열에서 빼기 작업을 수행했습니다. 지금까지 언급되지 않은 또 다른 구현은 http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue 의 MPMC 대기열 입니다. 32 명의 제작자와 32 명의 소비자가있는 동일한 랩톱에서이 구현으로 간단한 테스트를 수행했습니다. 광고 된 바와 같이 부스트 잠금없는 큐보다 빠릅니다.

다른 답변의 대부분은 상태 잠금 프로그래밍이 어렵습니다. 대부분의 구현은 수정하기 위해 많은 테스트 및 디버깅이 필요한 코너 케이스를 감지하기 어렵습니다. 이러한 문제는 일반적으로 코드에서 메모리 장벽을 신중하게 배치하여 수정됩니다. 또한 많은 학술 기사에 게시 된 정확성 증명을 찾을 수 있습니다. 나는 무차별 대입 도구로 이러한 구현을 테스트하는 것을 선호합니다. 프로덕션에서 사용하려는 잠금없는 알고리즘은 http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html 과 같은 도구를 사용하여 정확성을 확인해야합니다 .


경합을 줄이는 한 가지 유용한 기술은 항목을 여러 대기열로 해시하고 각 소비자가 "주제"에 전념하도록하는 것입니다.

소비자가 관심을 갖는 가장 최근의 항목 수의 경우 전체 대기열을 잠그고 재정의 할 항목을 찾기 위해 반복하고 싶지 않습니다. 항목을 N- 튜플, 즉 N 개의 최근 항목 모두에 게시하면됩니다. 생산자가 시간 초과로 전체 대기열에서 차단하고 (소비자가 따라 잡을 수없는 경우) 로컬 튜플 캐시를 업데이트하는 구현에 대한 보너스 포인트는 데이터 소스에 역압을 가하지 않습니다.


이 기사에 동의하고 잠금없는 데이터 구조를 사용하지 않는 것이 좋습니다. 잠금없는 fifo 대기열에 대한 비교적 최근의 논문은 이것입니다 . 동일한 저자의 추가 논문을 검색합니다. 잠금없는 데이터 구조에 관한 Chalmers 박사 논문도 있습니다 (링크를 잃었습니다). 그러나 요소의 크기를 말하지 않았습니다. 잠금없는 데이터 구조는 단어 크기의 항목에서만 효율적으로 작동하므로 기계어 (32 또는 64)보다 큰 경우 요소를 동적으로 할당해야합니다. 비트). 요소를 동적으로 할당하는 경우 (예 : 프로그램을 프로파일 링하지 않았고 기본적으로 조기 최적화를 수행하고 있기 때문에 가정) 병목 현상을 메모리 할당 자로 이동 시키므로 잠금없는 메모리 할당자가 필요합니다 (예 : Streamflow)., 애플리케이션과 통합합니다.


Sutter의 대기열은 차선책이며 그는 그것을 알고 있습니다. The Art of Multicore programming은 훌륭한 참고 자료이지만 메모리 모델에 대한 Java 사용자를 신뢰하지 마십시오. Ross의 링크는 그러한 문제에 라이브러리가 있었기 때문에 명확한 대답을 얻지 못할 것입니다.

문제를 해결하기 전에 분명히 과도하게 엔지니어링하고있는 것에 많은 시간을 소비하고 싶지 않다면 잠금없는 프로그래밍을하는 것은 문제를 요구합니다. '캐시 일관성). 수년이 걸리고 문제를 먼저 해결하지 않고 나중에는 일반적인 질병을 최적화하는 데 도움이됩니다.


저는 하드웨어 메모리 모델과 잠금없는 데이터 구조의 전문가가 아니며 프로젝트에서 사용하지 않는 경향이 있으며 기존의 잠금 데이터 구조를 사용합니다.

그러나 최근에 비디오 : 링 버퍼를 기반으로 한 Lockless SPSC 대기열

이것은 거래 시스템에서 사용하는 LMAX distruptor라는 오픈 소스 고성능 Java 라이브러리를 기반으로합니다. LMAX Distruptor

위의 프레젠테이션을 기반으로 머리와 꼬리 포인터를 원자 적으로 만들고 머리가 뒤에서 꼬리를 잡거나 그 반대의 상황을 원자 적으로 확인합니다.

아래에서 매우 기본적인 C ++ 11 구현을 볼 수 있습니다.

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}

Although this is an old question, no one mentioned DPDK's lockless ring buffer. It's a high throughput ring buffer that supports multiple producers and multiple consumers. It also provides single consumer and single producer modes, and the ring buffer is wait-free in SPSC mode. It's written in C and supports multiple architectures.

In addition, it supports Bulk and Burst modes where items can be enqueued/dequeued in bulk. The design let's multiple consumers or multiple producers write to the queue at the same time by simple reserving the space through moving an atomic pointer.


Just for completeness: there's well tested lock-free circular buffer in OtlContainers, but it is written in Delphi (TOmniBaseBoundedQueue is circular buffer and TOmniBaseBoundedStack is bounded stack). There's also an unbounded queue in the same unit (TOmniBaseQueue). The unbounded queue is described in Dynamic lock-free queue – doing it right. The initial implementation of the bounded queue (circular buffer) was described in A lock-free queue, finally! but the code was updated since then.


This is an old thread, but since it hasn't been mentioned, yet - there is a lock-free, circular, 1 producer -> 1 consumer, FIFO available in the JUCE C++ framework.

https://www.juce.com/doc/classAbstractFifo#details


Check out Disruptor (How to use it) which is a ring-buffer that multiple threads can subscribe to:


Here is how I would do it:

  • map the queue into an array
  • keep state with a next read and next next write indexes
  • keep an empty full bit vector around

Insertion consists of using a CAS with an increment and roll over on the next write. Once you have a a slot, add your value and then set the empty/full bit that matches it.

Removals require a check of the bit before to test on underflows but other than that, are the same as for the write but using read index and clearing the empty/full bit.

Be warned,

  1. I'm no expert in these things
  2. atomic ASM ops seem to be very slow when I've used them so if you end up with more than a few of them, you might be faster to use locks embedded inside the insert/remove functions. The theory is that a single atomic op to grab the lock followed by (very) few non atomic ASM ops might be faster than the same thing done by several atomic ops. But to make this work would require manual or automatic inlineing so it's all one short block of ASM.

You may try lfqueue

It is simple to use, it is circular design lock free

int *ret;

lfqueue_t results;

lfqueue_init(&results);

/** Wrap This scope in multithread testing **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&results, int_data) != 1) ;

/*Dequeue*/
while ( (ret = lfqueue_deq(&results)) == NULL);

// printf("%d\n", *(int*) ret );
free(ret);
/** End **/

lfqueue_clear(&results);

There are situations that you don't need locking to prevent race condition, especially when you have only one producer and consumer.

Consider this paragraph from LDD3:

When carefully implemented, a circular buffer requires no locking in the absence of multiple producers or consumers. The producer is the only thread that is allowed to modify the write index and the array location it points to. As long as the writer stores a new value into the buffer before updating the write index, the reader will always see a consistent view. The reader, in turn, is the only thread that can access the read index and the value it points to. With a bit of care to ensure that the two pointers do not overrun each other, the producer and the consumer can access the buffer concurrently with no race conditions.

참고URL : https://stackoverflow.com/questions/871234/circular-lock-free-buffer

반응형