티스토리 뷰

Language/JAVA

LMAX Disruptor 튜터리얼

KyeongRok Kim 2018. 5. 10. 17:59

LMAX Disruptor 튜터리얼


https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started 를 번역 하였다.



기본 이벤트 프로듀스(생산) 컨슘(소비)

Disruptor 대해 알아보기 위해 Disruptor 이용해 가장 간단하고 테스트를 하기 쉬운 예제로 알아볼 예정입니다.

예제 하나는 1개의 long타입의 값을 프로듀서(producer)에서 컨슈머(consumer) 보낼 것입니다.


컨슈머는 간단하게 값을 출력하기만 것입니다.

먼저 데이터를 들고갈 오브젝트를 만저 만들어 것입니다.


LongEvent.java

1
2
3
4
5
6
7
import lombok.Data;
 
@Data
public class LongEvent {
    private long value;
}
 
cs


Disruptor 이벤트를 우리에게 할당 하려면 EventFactory 만들어야 한다 EventFactory LongEvent를 만들어 줍니다.


LongEventFactory.java

1
2
3
4
5
6
7
8
9
import com.lmax.disruptor.EventFactory;
 
public class LongEventFactory implements EventFactory<LongEvent> {
 
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}
cs


Event 만들어지면 만들어진 이벤트들을 처리할 Event 컨슈머(consumer) 만들어야 한다. 이번 예제에서 필요한건 콘솔에 값을 출력하는 컨슈머입니다.


LongEventHandler.java

1
2
3
4
5
6
7
8
9
import com.lmax.disruptor.EventHandler;
 
public class LongEventHandler implements EventHandler<LongEvent> {
 
    @Override
    public void onEvent(LongEvent longEvent, long sequence, boolean endofBatch) throws Exception {
        System.out.println("event:" + longEvent);
    }
}
cs


데이터는 주로 http request Queue에서 오지만 예제를 해보아야 하기 때문에 데이터를 주는 프로듀서(Producer) 만들어야 합니다.


LongEventProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;
 
 
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
 
    public void onData(ByteBuffer bb) {
        long sequence = ringBuffer.next();
 
        LongEvent event = ringBuffer.get(sequence);
        event.setValue(bb.getLong(0));
 
        ringBuffer.publish(sequence);
    }
}
cs



위와 같은 코드를 써야 하는데 이렇게 사용하면 이벤트 발행이 단순 (queue) 사용 하는 보다 복잡하고 의존성이 높아집니다.


이건 왜그러냐면 이벤트를 미리 할당 하기 때문입니다. 이벤트를 미리 할당 하려면 메세지 발행을 하는데 2개의 단계를 거쳐야 합니다. 예를 들어 버퍼에 슬롯을 만들고 데이터를 발행 해야 하는 과정이 필요 합니다.


그러면 데이터를 발행 하는 명령은 try/catch 걸어주어야 합니다.


링버퍼에 슬롯을 만들면(RingBuffer.next() 호출하면) 순서를 반드시 함께 넘겨 주어야 합니다.


과정대로 처리를 하지 않으면 디스럽터의 상태가 오염됩니다. 멀티 프로듀서 환경에서는 재시작 하지 않으면 복구가 되지 않는 문제가 발생할 있습니다.



버전3.0 트렌스레이터스(Translators) 사용하기

링버퍼의 위와 같은 문제를 해결하기 위해 버젼3.0 디스럽터에는 람다 스타일의 API 추가 되었습니다. API 이용하면 개발자가 위와 같이 복잡한 로직을 캡슐링 해서 처리 있습니다.


3.0버전은 Event Publisher/ Event Translator API 사용하는게 좋습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
 
import java.nio.ByteBuffer;
 
public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;
 
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
 
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                @Override
                public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
                    longEvent.setValue(byteBuffer.getLong(0));
                }
            };
 
    public void onData(ByteBuffer byteBuffer) {
        ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
    }
}
 
cs


람다 스타일 API 사용하는 다른 장점은 트렌스레이터 코드를 분리된 클래스로 뽑아내서 비교적 쉽게 유닛 테스트를 만들 있다는 것입니다.

디스럽터는 EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg 트렌스레이터를 만들 있는 여러가지 인터페이스를 제공합니다.


이게 가능한 이유는 트렌스레이터가 스테텍 클래스 또는 캡춰링 람다(외부 변수에 액세스 불가) 되어있기 때문에 트렌스레이터를 통해 링버퍼로 들어갈 있습니다.


마지막 단계는 위에 만들었던 클래스들을 한군데 모으는겁니다. 모든 컴포넌트를 수동으로 불러와도 되지만 이건 복잡하기 때문에 DSL 간단한 생성자를 제공하는걸 사용합니다.


DSL 이용하면 몇몇 복잡한 케이스는 불가능 하지만 대부분의 상황에서 사용할 있습니다.



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
 
import java.lang.reflect.Executable;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
 
public class LongEventMain {
    public static void main(String[] args) throws InterruptedException {
        Executor executor = Executors.newCachedThreadPool();
 
        LongEventFactory factory = new LongEventFactory();
 
        int bufferSize = 1024;
 
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
 
        disruptor.handleEventsWith(new LongEventHandler());
 
        disruptor.start();
 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
 
        LongEventProducer producer = new LongEventProducer(ringBuffer);
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0true; l++ ) {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }
}
cs


end.



728x90
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함