포스트

Kafka - Message Queuing Service

kafka 개요

  • 링크드인에서 개발한 서비스로, 오픈 소스화하여 Apache Kafka가 되었다.
  • kafka를 개발한 일부 개발자들이 설립한 confluent에서 kafka 관련 상업 서비스를 개발
  • 분산형 스트리밍 플랫폼
  • 대용량 데이터를 처리 가능한 메세징 시스템
  • 초당 100K건의 이벤트를 안정적으로 처리 가능
    • rabbitMQ는 초당 20개 이상
  • publisher / consumer 형태로 topic에 메세지를 전달하고 소비
  • acks1를 기다리지 않고 메세지 생산이 가능
    • 생산자 중심의 메세지 큐잉 서비스
  • kafka 서버를 broker라고 부른다
  • 일반적으로 3개 이상의 브로커 클러스터링을 구성하는 것을 권장한다.
    • 하나에 문제가 생기면 다른 브로커가 대신할 수 있기 때문
  • zookeeper 서버를 먼저 실행하고 브로커를 실행한다.
    • zookeeper는 브로커를 컨트롤하는 역할
    • 리더격인 컨트롤러 브로커에 문제가 생기면 컨트롤러 브로커를 중지 시키고
      정상 동작하는 워커 브로커에 역할을 위임하는 과정을 zookeeper가 담당한다.


동작 과정 실습 1. topic에 메세지 발행/구독

1. kafka container 실행

apache/kafka 이미지를 이용, 싱글 브로커 클러스터링 구축

apache/kafka 이미지는 Kraft라는 기능을 제공하며 zookeeper 없이 kafka 스스로 메타데이터를 관리할 수 있다.
단순히 topic에 메세지를 발행하고, 구독하는 동작을 확인하기 위해 사용

apache/kafka 보다는 confluent에서 제공하는 이미지를 사용하는 것이 권장된다!

1
2
3
4
5
6
7
docker run -d --rm `
 --name study-kafka `
 -p 9092:9092 `
 apache/kafka:3.7.0

# container 실행 확인
docker ps


2. kafka container 진입

1
docker exec -it study-kafka bash


3. Topic 생성

1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-issue --partitions 1

응답 값

1
Created topic first-issue.


4. Topic 목록 확인

1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

응답 값

1
first-issue


5. topic 정보 확인

1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first-issue

응답 값

1
2
Topic: first-issue      TopicId: w0x4UutkTYa6ygIYBKRNJA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
       Topic: first-issue      Partition: 0    Leader: 1       Replicas: 1     Isr: 1


6. topic 생산, 소비

소비와 생산을 실시간으로 확인하기 위해 kafka container에 새로운 창으로 진입한다.

1
docker exec -it study-kafka bash

consumer 준비

띄운 창 두 개 중 하나로 producer를 관찰하고 나머지 창으로 consumer를 관찰

1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-issue --from-beginning
  • --from-beginning 옵션을 추가하면 토픽에 처음 생성된 메세지부터 받게된다.

producer 메세지 발행

1
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first-issue
  • 위 명령어 입력 후 > 문자가 나타나면 텍트스를 입력하고 엔터키를 입력하면 된다.


실행 예시 image


7. Topic 삭제

1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first-issue



동작 과정 실습 2. connector로 db에 데이터 넣기

mariadb에 데이터를 입력하면 kafka source connect에 의해 해당 내용의 topic이 발행되고
kafka sink connect로 topic을 구독하면 mariadb에 구독 정보로 테이블을 생성하고 발행된 데이터를 추가해보는 실습

confluent에서 제공하는 docker compose 설정 파일을 이용


github repo https://github.com/confluentinc/cp-all-in-one

위 사이트에서 docker-compose.yml 파일을 다운 받아 수정해서 사용한다.


1. docker compose.yml 수정

connect 서비스의 /usr/share/java 하위에 디렉터리를 하나 생성해주고 이 경로를 호스트와 마운팅해서 jdbc 관련 jar 파일을 추가해야 한다.
(공식문서에서 connect 관련 jar 추가 파일은 /usr/share/java/kafka-connect-* 경로로 추가하는 것을 권장)

그리고 네트워크도 설정해줘야 한다.

나열되어 있던 서비스들 중 당장 사용 할 zookeeper와 broker, schema-registry, connect 서비스만 남기고 다른 서비스는 사용하지 않을 것이라 제거해주었다.

compose.yml 실행은 jdbc 관련 jar 파일을 다운 받은 후 진행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - my-network

  broker:
    image: confluentinc/cp-server:7.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    networks:
      - my-network

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    networks:
      - my-network

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    volumes:
      - ./connect/jdbc:/usr/share/java/kafka-connect-jdbc
    networks:
      - my-network


networks:
  my-network:
    external: true


2. kafka-connect-jdbc 다운로드

compose.yml 파일의 connect 서비스에서 마운팅 한 경로인 ./connect/jdbc 위치에 kafka-connect-jdbc를 다운 받고 안에 들어있는 lib 내용만 살린 뒤 나머지는 삭제한다.

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

1
2
3
4
5
mkdir -p ./connect/jdbc \
 && wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip \
 && unzip confluentinc-kafka-connect-jdbc-10.7.6.zip \
 && mv confluentinc-kafka-connect-jdbc-10.7.6/lib/* connect/jdbc \
 && rm -rf confluentinc-kafka-connect-jdbc-10.7.6*


3. mariadb-java-client.jar 다운로드

compose.yml 파일의 connect 서비스에서 마운팅 한 경로인 ./connect/jdbc 위치에 mariadb-java-client.jar 파일을 다운 받는다.

https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector

1
2
wget https://dlm.mariadb.com/3824147/Connectors/java/connector-java-3.4.0/mariadb-java-client-3.4.0.jar \
 && mv mariadb-java-client-3.4.0.jar connect/jdbc


3. compose.yml 실행

1
docker compose up -d --build


4. mariadb 세팅

database : test_db_2

1
create database test_db_2;

table : user_info

1
2
3
4
5
6
create table user_info(
   id int auto_increment primary key,
   user_id varchar(20),
   pwd varchar(20),
   name varchar(20)
);

그리고 mariaDB도 docker container로 실행되고 있기 때문에 kafka와 같은 네트워크로 연결

1
docker network connect my-network study-maria


5. source connect 추가

postman, talend API 등을 이용해서 connector의 rest api로 source connect 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 echo '
{
   "name" : "first-source-connect",
   "config" : {
     "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
     "connection.url":"jdbc:mariadb://study-maria:3306/test_db_2",
     "connection.user":"root",
     "connection.password":"1234",
     "mode": "incrementing",
     "incrementing.column.name" : "id",
     "table.whitelist":"test_db_2.user_info",
     "topic.prefix" : "my_topic_",
     "tasks.max" : "1",
     "key.converter": "org.apache.kafka.connect.json.JsonConverter",
     "key.converter.schemas.enable": "true",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable": "true"
   }
 }
 ' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
  • name : source connector 이름
  • connection.url : db url
  • connection.user : db username
  • connection.password : db password
  • table.whitelist : 감시할 테이블 이름 (db.table)
  • topic.prefix : 발행 할 토픽 이름의 prefix


실행 결과 image


생성된 sorce connect 확인

1
curl http://localhost:8083/connectors/first-source-connect/status

image


6. mariadb에서 데이터 추가하여 topic 생성 확인

test_db_2.user_info 테이블에 데이터를 입력하면 my_topic_user_info topic이 생성되는 것을 확인

mariaDB insert

1
insert into test_db_2.user_info(user_id, pwd, name) values('user1', '1111', 'tester');

broker - topic 조회

1
docker exec broker /bin/kafka-topics --bootstrap-server localhost:9092 --list
  • my_topic_user_info 이름의 토픽이 생성된 것을 확인


7. sink connect 추가

postman, talend API 등을 이용해서 connector의 rest api로 sink connect 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
echo '
{
 "name":"first-sink-connect",
 "config":{
   "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
   "connection.url":"jdbc:mariadb://study-maria:3306/test_db_2",
   "connection.user":"root",
   "connection.password":"1234",
   "auto.create":"true",
   "auto.evolve":"true",
   "insert.mode": "insert",
   "delete.enabled":"false",
   "tasks.max":"1",
   "topics":"my_topic_user_info",
   "key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "key.converter.schemas.enable": "true",
   "value.converter.schemas.enable": "true"
 }
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
  • name : sink connector 이름
  • connection.url : db url
  • connection.user : db username
  • connection.password : db password
  • topics : source connect가 발행한 topic 이름


실행 결과 image

sink connect 생성 확인

1
curl -X GET http://localhost:8083/connectors

mariadb table 생성 및 데이터 확인

1
show tables;

image

topic 내용 확인

broker 컨테이너에서 my_topic_user_info 토픽에 발행된 내용을 consumer console을 통해 확인

1
docker exec broker /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic_user_info --from-beginning

실행 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      }
    ],
    "optional": false,
    "name": "user_info"
  },
  "payload": {
    "id": 1,
    "user_id": "user1",
    "pwd": "1111",
    "name": "tester"
  }
}




참고한 사이트




  1. acks: acknowledgments 의 줄임말로 승인이라는 뜻을 가지고 있다. publisher가 보낸 메세지를 kafka가 수신했는지 여부를 의미한다. 

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.