troubleShooting

kafka 트러블 슈팅

HJHStudy 2024. 6. 20. 00:37
728x90

본 포스팅은 프로젝트에 카프카 기능 구현하던 중 생긴 트러블로 짧은 오류로 잠깐 확인하고 가도록 하겠다.

Caused by: org.apache.kafka.common.KafkaException: class com.fasterxml.jackson.databind.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

카프카 진행하던 도중 위와 같은 에러가 생기게 됐다.

 

이는 직렬화와 역직렬화에서 문제가 생긴 것이다.

com.fasterxml.jackson.databind.deser.std.StringDeserializer 클래스가 org.apache.kafka.common.serialization.Deserializer 인터페이스의 구현체가 아니라고 나와 있다.

 

확인했을 때에 코드 상에 문제는 없었다.

@EnableKafka //리스너를 사용하기 위한 필수적인 애노테이션
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {

    private final Environment env;

    @Bean
    public Map<String, Object> consumerConfig() {
        // Kafka Consumer 설정을 담고 있는 맵을 생성한다.
        Map<String, Object> props = new HashMap<>();

        // Kafka 클러스터에 연결할 서버 정보다. 여러 서버를 ,로 구분하여 지정할 수 있다.
        // 예: "localhost:9092,localhost:9093"
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("kafka.consumer.bootstrap-servers"));

        // Consumer 그룹 ID이다. 같은 그룹에 속한 Consumer들은 동일한 토픽의 메시지를 분산해서 처리한다.
        props.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("kafka.consumer.group-id"));

        // Consumer가 읽기 시작할 오프셋을 지정한다. 옵션은 "earliest", "latest", "none" 중 하나다. yml에 earliest로 설정했다.
        // "earliest"는 가장 오래된 메시지부터 읽고, "latest"는 가장 최근 메시지부터 읽는다.
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("kafka.consumer.auto-offset-reset"));

        // 메시지 키의 역직렬화 클래스다. Kafka에서는 메시지를 전송할 때 바이트 배열로 전송하므로
        // 이를 다시 Java 객체로 변환하는 과정이 필요하다. 이 예제에서는 문자열로 역직렬한다.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 메시지 값의 역직렬화 클래스다. 키와 마찬가지로, 값을 바이트 배열에서 문자열로 변환한다.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;
    }
    //DefaultKafkaConsumerFactory는 Consumer 인스턴스를 생성하는 팩토리다.
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory는 Kafka 메시지 리스너 컨테이너를 관리하는 팩토리다.
        // 여러 개의 Kafka 리스너를 병렬로 실행할 수 있도록 설정할 수 있다.
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        // 리스너 컨테이너가 메시지를 소비할 수 있도록 Consumer 팩토리를 설정한다.
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

ConsumerConfig에서 생긴 문제로 yml에 직렬화와 역직렬화에 쓰일 값으로 아래와 같이 적시했었다.

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

여기서 문제는 이전에 사용하던 카프카 설정을 복사로 가져와서 사용했을 때 com.fasterxml.jackson.databind의 JsonDeserializer를 import 하고 있었기에 생기는 문제였다. 이후 패키지를 없애고 다시 org.apache.kafka.common.serialization.Deserializer를 사용하니 제대로 작동이 되었다.

알고 보면 되게 별거 아닌 문제였다. 오류 로그는 조금 길었고 사이에 nullpointException도 있었어서 당황했지만 침착하게 해석하면 됐었다. 다음부턴 복사하고 붙여 넣을 때 import에 대해서 주의해야겠다.

 

728x90

'troubleShooting' 카테고리의 다른 글

트러블 슈팅 git master, main  (0) 2024.06.11