티스토리 뷰
Broker : 카프카 에플리케이션 서버 단위
Topic : 데이터 분리 단위
Topic Group : 토픽 그룹별로 offset을 관리 할 수 있다.
Partition : 하나의 토픽은 여러개의 Partion으로 구성 되어 있다. 늘리면 다시 줄일 수 없다. Consumer개수를 늘려서 분산처리를 할 수 있게 한다.
Prerequisite
이 포스트에서 사용한 kafka의 dir은 c:\tools\kafka_2.13-2.7.0 입니다.
kafka서버의 ip는 192.168.111.131입니다.
윈도우 명령어 보기
dir c:\tools\kafka_2.13-2.7.0\bin\windows
Kafka서버 띄우기
kafka가 설치된 루트로 갑니다. bin과 config가 보입니다.
먼저 zookeeper를 띄웁니다.
>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
그 다음 kafka를 실행 해주어야 합니다.
>bin\windows\kafka-server-start.bat config\server.properties
test라는 이름으로 파티션이 3개인 topic만들기
C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-topics --create --bootstrap-server 192.168.111.131:9092 --replication-factor 1 --partitions 3 --topic test
위 명령어에서 --partitions 3은 파티션의 개수를 3개로 만들겠다는 뜻입니다.
Topic이 만들어 졌는지 확인하기
bin\windows\kafka-topics --list --bootstrap-server localhost:9092
test라는 이름의 topic에 메세지 보내기
C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-console-producer --bootstrap-server 192.168.111.131:9092 --topic test
한 후
> 가 나오면
>hello
>world
>bye
등을 보내면 됩니다.
test라는 이름의 topic에 메세지 받기
C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-console-consumer --bootstrap-server 192.168.111.131:9092 --topic test --from-beginning
testgroup에서 메세지 받아오기
C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-console-consumer --bootstrap-server 192.168.111.131:9092 --topic test -group testgroup --from-beginning
consumer group list보기
C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-consumer-groups --bootstrap-server 192.168.111.131:9092 --list
bin\windows\kafka-consumer-groups --bootstrap-server localhost:9092 --list
testgroup의 상태 보기
C:\Users\ocean>C:\tools\kafka_2.13-2.7.0\bin\windows\kafka-consumer-groups --bootstrap-server 192.168.111.131:9092 --group testgroup --describe
Python Producer code
producer.py
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost'})
p.produce('adc-100', 'hello', 'world')
p.flush()
위 코드는 localhost에 있는 Broker의 adc-100이라는 이름의 토픽에 'hello'라는 메세지를 보내는 코드 입니다.
import time
from confluent_kafka import Producer, Consumer
t_id = 'abc-def-efg'
_conf = {'bootstrap.servers': 'localhost',
'enable.auto.commit': True,
'auto.offset.reset': 'earliest',
'session.timeout.ms': 6000,
"message.max.bytes": 50000000,
'max.poll.interval.ms': '10800000'}
p = Producer(_conf)
with open('c:/git/python/myproj/img.csv') as f:
t = f.read()
print(len(t))
p.produce(f'{t_id}-16', t.encode('utf-8'))
p.poll(0)
p.flush()
위 코드는 img.csv를 읽어서 그 내용을 abc-def-efg라는 topic에 pub하는 코드 입니다.
Python Consumer code
consumer.py
from confluent_kafka import Consumer
import time
consumer_group_id = 'foo'
consumer = Consumer({
'bootstrap.servers': 'localhost',
'group.id': consumer_group_id,
"enable.auto.commit":False,
'auto.offset.reset': 'earliest'})
topic_name = 'adc-100'
consumer.subscribe([topic_name])
while True:
msg = consumer.poll(timeout=1.0)
if msg != None:
print(msg.value())
else:
print('Msg is none.')
time.sleep(0.1)
위 코드는 adc-100이라는 topic에 있는 데이터를 'foo'라는 그룹 id를 이용해 가지고 오는 코드 입니다.
CMAK에서 클러스터 생성
Cluster Zookeeper Hosts에 Kafka서버의 ip와 포트번호를 넣어줍니다. 기본은 2181입니다.
- Total
- Today
- Yesterday
- 2017 티스토리 결산
- vim
- 도커컨테이너
- Sh
- docker container tissue box
- 도커티슈박스
- shellscript
- docker container
- 도커각티슈박스
- docker container whale
- docker container case
- 도커각티슈케이스
- 이직
- 개발자
- Linux
- docker container tissue
- 도커티슈케이스
- 싱가폴
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |