본문 바로가기

programing

kafka connect에 대해

이번 포스팅에서는 카프카 커넥트에 대해서 알아보도록 하자.

 

Kafka Connect는 데이터베이스, 키-값 저장소, 검색 인덱스 및 파일 시스템 간의 간단한 데이터 통합을 위한 중앙 집중식 데이터 허브 역할을 하는 Apache Kafka의 무료 오픈소스 구성 요소이다.

kafka와 다른 데이터 시스템 간 데이터를 스트리밍 하고 대규모 데이터 셋을 이동시켜 주는 커넥터를 빠르게 생성할 수 있다.

 

카프카는 프로듀서와 컨슈머를 통해 데이터 파이프라인을 만들 수 있다. 만약 A서버의 DB에 저장한 데이터를 카프카 프로듀서와 컨슈머를 통해 B서버의 DB로 보낼 수 있는데 이러한 파이프 라이닝 여러 개라면 반복적으로 파이프라인을 구성해줘야 한다.

이런 반복적인 파이프 라인 구성을 쉽고 간편하게 만들 수 있게 만들어진 것이 Kafka Connect이다.

 

장점

- 데이터 중심 파이프라인

커넥트를 이용해 카프카로부터 데이터를 보내거나 카프카로부터 데이터를 가져올 수 있다.

 

- 유연성과 확장성

커넥트는 테스트 및 일회성 작업을 위한 단독 모드로 실행할 수 있고 대규모 운영 환경을 위한 분산모드(클러스터형)로 실행할 수 있다.

 

- 재사용성과 기능 확장

커넥트는 이미 만들어진 기존 커넥트들을 활용할 수 있고 운영 환경에서 요구 사항에 맞춰 빠르게 확장이 가능하다.

 

- 장애 및 복구

카프카 커넥트를 분산 모드로 실행하면 워커노드의 장애 상황에도 유연하게 대응 가능하므로 고가용성이 보장된다.

 

 

Kafka Connect 아키텍터 구성

Kafka Connect를 통해 왼쪽의 DB의 데이터를 Connect와 Source Connect를 사용해 kafka로 보내고 Connect와 Sink Connector를 사용해 카프카에 담긴 데이터를 DB에 저장하는 것을 위 이미지로 알 수 있다.

 

이때 중요한 것은 Connect와 Connector의 차이와 Source Connector와 Sink Connector다.


Connect: Connector를 동작하게 하는 프로세서(서버)
Connector:  Data Source(DB)의 데이터를 처리하는 소스가 들어있는 jar파일

Source Connector: data source에 담긴 데이터를 topic에 담는 역할(Producer)을 하는 connector
Sink Connector: topic에 담긴 데이터를 특정 data source로 보내는 역할(Consumer 역할)을 하는 connector

 

Connect는 단일 모드(Standalone)와 분산 모드(Distributed)로 이루어져 있다.

 

단일 모드(Standalone): 하나의 Connect만 사용하는 모드

분산 모드(Distributed): 여러 개의 Connect를 한 개의 클러스트로 묶어서 사용하는 모드.
 

특정 Connect가 장애가 발생해도 나머지 Connect가 대신 처리하도록 함


 

커넥트 내부를 상세히 보도록 하자.

 

 

3개의 워커(인스턴스)를 실행한 분산 모드 소스 커넥트를 나타낸 것이다.

 

만약 단독 모드로 실행한다면 위 이미지의 3개의 워커 중 하나의 워커만 동작하게 된다.

워커는 카프카 커넥트 프로세스가 실행되는 서버 또는 인스턴스 등을 의미하고 커넥터나 태스크들이 워커에서 실행하게 된다.

분산 모드는 특정 워커에 장애가 발생하더라도 해당 워커에서 동작 중인 커넥터나 태스크들이 다른 위치로 이동해 연속해서 동작할 수 있다는 장점이 있지만 단독 모드는 그렇지 않다.

 

Connector는 직접 데이터를 복사하지 않고 데이터를 어디에서 어디로 복사해야 하는지의 작업을 정의하고 관리하는 역할을 한다.

커넥터도 커넥트와 동일하게 소스에서 카프카로 전송하는 역할을 하는 소스 커넥터와 카프카에서 저장소로 싱크 하는 싱크 커넥터가 있다.

 

kafka Connect 내부 동작

분산 배치된 각 태스크들은 메시지들은 소스에서 카프카로, 카프카에서 싱크로 이동시킨다.

이때 커넥트는 파티셔닝 개념을 적용해 데이터들을 하위 집합으로 나뉜다.

카프카에서도 병렬 처리를 위해 토픽을 파티션으로 나누는데 커넥트도 동일하게 동작한다.

커넥트에서 나눈 파티션과 토픽의 파티션은 용어만 같을 뿐 서로 관계는 없다.

나뉜 파티션들에는 오프셋과 같이 순차적으로 레코드들이 정렬된다.

 

커넥터에서 복사 돼야 하는 데이터들은 레코드들의 순서에 맞춰 파티셔닝 돼야 하며 위 이미지에서 스트림 영역이 데이터가 파티셔닝 된 것을 나타낸다.

커넥터에 정의된 값을 보면 최대 태스크 수는 2로 정의되어 있다.

스트림에서 나눈 각 파티션은 각 태스크에 나뉘어 할당되고 태스크 들은 실제 데이터를 이동하는 동작을 처리한다.

태스크 1은 토픽 A, B에 데이터를 전송하고 태스크 2는 토픽 B와 C에 데이터를 전송한다.

각 파티션들에는 오프셋도 포함되어 있고 커넥트의 장애나 실패가 발생하는 경우 지정된 위치부터 데이터를 이동할 수 있다.

커넥터에 따라 오프셋의 기준이 달라질 수 있다 일반적인 파일을 전송하는 커넥터의 경우 오프셋이 파일의 위치를 나타내며 데이터베이스의 경우에는 타임스탬프나 시퀀스 ID를 나타낸다.

 

예제를 하나 해보고 넘어가도록 하자.

mysql table에 데이터를 insert 하면 다른 table에 데이터가 그대로 저장되는 예제를 해보자.

 

먼저 테스트할 테이블을 하나 만들어주었다.

 

이후 주키퍼와 카프카를 실행해 주자.

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties

 

필자는 이미 실행되어 있다.

kafka connect는 kafka에 기본적으로 포함되어 있다.

이동해서 설정 파일을 수정해 주자.

# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/Users/hanjihoon/study/plugins/confluentinc-kafka-connect-jdbc-10.7.6/lib

plugin이 저장될 폴더를 지정하면 된다. 이 부분은 아래에 connect를 설치하고 나서 해야 한다.

 

connect library를 다운로드하여서 정의한 플러그인 폴더에 넣는다.

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

 

JDBC Connector (Source and Sink)

Confluent, founded by the original creators of Apache Kafka®, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real-time.

www.confluent.io

 

 

압축을 해제한 후 실행해 주자.

그렇다면 카프카 실행한 서버에서 커넥트가 실행되었다는 것을 확인할 수 있다.

[2024-06-20 04:39:52,299] INFO [GroupCoordinator 0]: Stabilized group connect-cluster generation 1 (__consumer_offsets-13) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-06-20 04:39:52,304] INFO [GroupCoordinator 0]: Assignment received from leader connect-218.38.137.28:8083-79c83be2-29b7-486a-8591-18ded16b673c for group connect-cluster for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

 

이후 토픽이 생성되었는지 확인해야 하는데 아래처럼 확인할 수 있다.

 

확인이 되었다면 connect에 요청을 통해서 Source Connector를 생성하자.

 

{
    "name": "test-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://localhost:3306/test",
        "connection.user": "",
        "connection.password": "",
        "mode":"incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist" : "users",
        "topic.prefix" : "example_topic_",
        "tasks.max" : "1"
    }
}
cUrl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

 

포스트맨 기준으로 위와 같이 생성하면 된다. 201 Created 응답과 함께 생성 정보가 나오게 된다.

 

 

다음은 사용한 각 속성을 확인해 보자.

name : source connector 이름(JdbcSourceConnector를 사용)
config.connector.class : 커넥터 종류(JdbcSourceConnector 사용)
config.connection.url : jdbc이므로 DB의 정보 입력
config.connection.user : DB 유저 정보
config.connection.password : DB 패스워드
config.mode : "테이블에 데이터가 추가됐을 때 데이터를 polling 하는 방식"(bulk, incrementing, timestamp, timestamp+incrementing)
config.incrementing.column.name : incrementing mode일 때 자동 증가 column 이름
config.table.whitelist : 데이터를 변경을 감지할 table 이름
config.topic.prefix : kafka 토픽에 저장될 이름 형식 지정 위 같은 경우 whitelist를 뒤에 붙여 example_topic_users에 데이터가 들어감
tasks.max : 커넥터에 대한 작업자 수

 

이후 GET 요청으로 생성된 Connectors List를 확인할 수 있다.

cUrl -X GET -d @- http://localhost:8083/connectors

 

 

여기까지 연결되는 것을 알 수 있지만 status를 확인하면 

 "name": "test-connect",
            "connector": {
                "state": "FAILED",

 

실패한다고 뜬다 이 부분을 어떻게 해결해야 할지 다방면으로 많이 찾아보았지만 결국 잘 되지 않았다.

어디서 문제가 생긴 지는 어느 정도 감은 잡았지만 해결하려 드니 밑천이 드러났다. 기본적인 동작은 알지만 내부적으로 커넥터로 DataSource와 SInk가 어떻게 동작하는지에 대해서 더욱 공부하고 나서 해야 할 것 같다.

 

필자의 부족함으로 인해서 구현할 수 없었다. 추후에 꼭 다시 도전해 보기로 하고 카프카는 이만 줄이고 다음은 도커를 사용해 볼 것이다. 멀티 모듈을 구성한 것을 도커로 구성하고 카프카도 도커로 띄울 수 있도록 만들어 보도록 하겠다.

'programing' 카테고리의 다른 글

알고리즘 (배열 커서 연결 리스트)  (0) 2024.06.26
알고리즘 LIST  (0) 2024.06.24
멀티 모듈-3  (1) 2024.06.10
멀티 모듈 - 2  (2) 2024.06.09
멀티 모듈에 대해  (1) 2024.06.09