티스토리 뷰

Broker : 카프카 에플리케이션 서버 단위

Topic : 데이터 분리 단위

Topic Group : 토픽 그룹별로 offset을 관리 할 수 있다.

Partition : 하나의 토픽은 여러개의 Partion으로 구성 되어 있다. 늘리면 다시 줄일 수 없다. Consumer개수를 늘려서 분산처리를 할 수 있게 한다.

Prerequisite

이 포스트에서 사용한 kafka의 dir은 c:\tools\kafka_2.13-2.7.0 입니다.

kafka서버의 ip는 192.168.111.131입니다.

윈도우 명령어 보기

dir c:\tools\kafka_2.13-2.7.0\bin\windows

Kafka서버 띄우기

kafka가 설치된 루트로 갑니다. bin과 config가 보입니다.

먼저 zookeeper를 띄웁니다.

>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

그 다음 kafka를 실행 해주어야 합니다.

>bin\windows\kafka-server-start.bat config\server.properties

test라는 이름으로 파티션이 3개인 topic만들기

C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-topics --create --bootstrap-server 192.168.111.131:9092 --replication-factor 1 --partitions 3 --topic test

위 명령어에서 --partitions 3은 파티션의 개수를 3개로 만들겠다는 뜻입니다.

Topic이 만들어 졌는지 확인하기

bin\windows\kafka-topics --list --bootstrap-server localhost:9092

test라는 이름의 topic에 메세지 보내기

C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-console-producer --bootstrap-server 192.168.111.131:9092 --topic test

한 후

> 가 나오면

>hello

>world

>bye

등을 보내면 됩니다.

test라는 이름의 topic에 메세지 받기

C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-console-consumer --bootstrap-server 192.168.111.131:9092 --topic test --from-beginning

testgroup에서 메세지 받아오기

C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-console-consumer --bootstrap-server 192.168.111.131:9092 --topic test -group testgroup --from-beginning

consumer group list보기

C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-consumer-groups --bootstrap-server 192.168.111.131:9092 --list

bin\windows\kafka-consumer-groups --bootstrap-server localhost:9092 --list

testgroup의 상태 보기

C:\Users\ocean>C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-consumer-groups --bootstrap-server 192.168.111.131:9092 --group testgroup --describe

Python Producer code

producer.py

from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost'})

p.produce('adc-100', 'hello', 'world')
p.flush()

위 코드는 localhost에 있는 Broker의 adc-100이라는 이름의 토픽에 'hello'라는 메세지를 보내는 코드 입니다.

 

import time
from confluent_kafka import Producer, Consumer

t_id = 'abc-def-efg'
_conf = {'bootstrap.servers': 'localhost',
        'enable.auto.commit': True,
        'auto.offset.reset': 'earliest',
        'session.timeout.ms': 6000,
        "message.max.bytes": 50000000,
        'max.poll.interval.ms': '10800000'}
p = Producer(_conf)

with open('c:/git/python/myproj/img.csv') as f:
    t = f.read()
    print(len(t))
    p.produce(f'{t_id}-16', t.encode('utf-8'))
    p.poll(0)
    p.flush()

위 코드는 img.csv를 읽어서 그 내용을 abc-def-efg라는 topic에 pub하는 코드 입니다.

 

Python Consumer code

consumer.py

from confluent_kafka import Consumer
import time

consumer_group_id = 'foo'
consumer = Consumer({
    'bootstrap.servers': 'localhost',
    'group.id': consumer_group_id,
    "enable.auto.commit":False,
    'auto.offset.reset': 'earliest'})

topic_name = 'adc-100'
consumer.subscribe([topic_name])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg != None:
        print(msg.value())
    else:
        print('Msg is none.')

    time.sleep(0.1)

위 코드는 adc-100이라는 topic에 있는 데이터를 'foo'라는 그룹 id를 이용해 가지고 오는 코드 입니다.

 

 

CMAK에서 클러스터 생성

Cluster Zookeeper Hosts에 Kafka서버의 ip와 포트번호를 넣어줍니다. 기본은 2181입니다.

728x90
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함