spring

멀티 모듈 kafka 추가하기

HJHStudy 2024. 6. 19. 23:14
728x90

이번 포스팅은 저번 멀티 모듈을 구성한 프로젝트에서 카프카를 추가해 볼 것이다.

 

카프카 프로듀서와 컨슈머를 설정을 통해 각 생성해서 사용할 것이다.

이를 어느 모듈에 둘지 고민을 하다 인프라 모듈에 위치하기로 했다. 기능에 대한 설정 정보를 작성하기 적절하다 생각하기 때문이다.

 

2~3개 정도의 토픽을 생성해서 카프카를 활용해 볼 생각이다.

이전 카프카 포스팅에서는 카프카 커넥트에 대해서 다루지 않았었는데 이번 포스팅에서 카프카 커넥트를 사용하지 않는 카프카로 사용자가 구매한 구매 목록과 배송 정보를 메시지로 발행하고 구독하는 컨슈머에서 소비하도록 하고 카프카 커넥트를 사용해서 DB와 연결해서 DB의 데이터로 소비할 수 있도록 할 것이다.

소비 시에 이메일로 구매 정보와 배송에 대한 정보를 보내도록 하겠다.

실제 서비스라면 회원 가입 시에 이메일을 적는 곳이 있는데 이 이메일로 전송하도록 하고 현재 서비스는 하고 있지 않으니 필자의 이메일로 전송하도록 만들겠다.

기본적인 카프카에 대한 것은 필자의 카프카 포스팅에서 확인할 수 있다.

이미 포스팅한 내용을 제외하고는 설명을 기입하도록 하겠다.

이번에는 주키퍼를 사용하도록 하겠다. 현재 크래프트를 사용하고 있는 곳이 많지 않은 것 같고 아직은 주키퍼를 지원하니 주키퍼로 진행하도록 하겠다.

커넥트에 대한 부분은 필자도 공부가 좀 필요하기에 뒤로 미루고 먼저 로그와 배송정보를 보내는 과정을 확인해보자.

우선 토픽을 먼저 생성해 두었다.

 

이번 포스팅에서는 카프카를 사용해서 생성된 배송 정보를 이메일로 보낼 수 있도록 구성하는 것이 목표이다.

 

토픽도 만들었으니 코드로 들어가 보도록 하겠다.

외부 설정과 설정 파일은 infra에 위치하도록 하겠다.

 

카프카와 메일을 사용하기 위해서 yml 파일부터 작성하도록 하겠다.

kafka:
  producer:
    bootstrap-servers: localhost:9092 # Kafka 클러스터에 대한 초기 연결에 사용할 호스트 : 포트 목록
  #      key-serializer: org.apache.kafka.common.serialization.StringSerializer
  #      value-serializer: org.apache.kafka.common.serialization.StringSerializer

  consumer:
    bootstrap-servers: localhost:9092
    group-id: consumer_group_myProject
    auto-offset-reset: earliest
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

카프카는 이전에 카프카 예제에서 사용한 것을 그대로 사용할 것이다.

spring:
  mail:
    host: smtp.naver.com
    username: gkswlgns7653@naver.com
    password: 
    properties:
      mail:
        smtp:
          auth: true
          starttls:
            enable: true
          ssl:
            enable: true

 

위 username 은 메일을 보낼 발신자이다. 본 포스팅에서는 수신과 발신을 같은 메일주소를 사용하도록 하겠다.

password 부분이 있는데 이 부분은 필자는 네이버를 사용하고 있다.

password는 네이버 아이디의 비밀번호가 아닌 로그인 후 본인의 보안설정으로 이동 후 2단계 인증 관리에 들어가 준다.

그 후 애플리케이션 비밀번호 관리에서 생성해서 사용하면 된다.

 

위에서 종류를 선택하고 비밀번호를 생성하면 생성된 비밀번호를 password 부분에 기입하면 된다.

 

네이버 메일을 사용하기 위해서 네이버 페이지에 로그인을 한 후 메일로 이동해서 환경설정에 들어가 준다.

 

위와 같이 설정을 한 후 저장을 해야 메일을 보낼 수 있다.

 

외부 설정 파일을 다 설정했다면 gradle에 의존성을 추가하자.

implementation 'org.springframework.boot:spring-boot-starter-mail'
implementation 'org.springframework.kafka:spring-kafka'

 

의존성을 추가했다면 이제 설정 파일을 구성해 보도록 하자.

package com.project.config;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfig {
    private final Environment env;


    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                env.getProperty("kafka.producer.bootstrap-servers"));

        //키와 값을 직렬화하는 클래스로 StringSerializer를 사용한다.
        //Kafka 프로듀서가 메시지를 브로커에 전송할 때 문자열로 직렬화하도록 설정한다.
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
                , StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
                , StringSerializer.class);
        return props;
    }


    //ProducerFactory 인터페이스의 구현체를 반환한다.
    //이 팩토리는 Kafka 프로듀서를 생성하는 데 사용된다.
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(this.producerConfig());
    }

    //KafkaTemplate은 메시지를 Kafka 토픽으로 전송하는 데 사용된다.
    //이 템플릿은 Kafka 프로듀서의 상위 수준 API로, 프로듀서의 여러 설정과 기능을 간단하게 사용할 수 있게 해준다.
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());
    }
}

 

package com.project.config;

import org.apache.kafka.common.serialization.StringDeserializer;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;


import java.util.HashMap;
import java.util.Map;


@EnableKafka //리스너를 사용하기 위한 필수적인 애노테이션
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {

    private final Environment env;

    @Bean
    public Map<String, Object> consumerConfig() {
        // Kafka Consumer 설정을 담고 있는 맵을 생성한다.
        Map<String, Object> props = new HashMap<>();

        // Kafka 클러스터에 연결할 서버 정보다. 여러 서버를 ,로 구분하여 지정할 수 있다.
        // 예: "localhost:9092,localhost:9093"
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("kafka.consumer.bootstrap-servers"));

        // Consumer 그룹 ID이다. 같은 그룹에 속한 Consumer들은 동일한 토픽의 메시지를 분산해서 처리한다.
        props.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("kafka.consumer.group-id"));

        // Consumer가 읽기 시작할 오프셋을 지정한다. 옵션은 "earliest", "latest", "none" 중 하나다. yml에 earliest로 설정했다.
        // "earliest"는 가장 오래된 메시지부터 읽고, "latest"는 가장 최근 메시지부터 읽는다.
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("kafka.consumer.auto-offset-reset"));

        // 메시지 키의 역직렬화 클래스다. Kafka에서는 메시지를 전송할 때 바이트 배열로 전송하므로
        // 이를 다시 Java 객체로 변환하는 과정이 필요하다. 이 예제에서는 문자열로 역직렬한다.
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 메시지 값의 역직렬화 클래스다. 키와 마찬가지로, 값을 바이트 배열에서 문자열로 변환한다.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;
    }
    //DefaultKafkaConsumerFactory는 Consumer 인스턴스를 생성하는 팩토리다.
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory는 Kafka 메시지 리스너 컨테이너를 관리하는 팩토리다.
        // 여러 개의 Kafka 리스너를 병렬로 실행할 수 있도록 설정할 수 있다.
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        // 리스너 컨테이너가 메시지를 소비할 수 있도록 Consumer 팩토리를 설정한다.
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

package com.project.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.JavaMailSenderImpl;

import java.util.Properties;

@Configuration
public class MailConfig {

    @Value("${spring.mail.host}")
    private String host;
    @Value("${spring.mail.username}")
    private String username;
    @Value("${spring.mail.password}")
    private String password;

    public JavaMailSender sendSimpleMessage(String to, String subject, String text) {
        JavaMailSenderImpl javaMailSender = new JavaMailSenderImpl();

        javaMailSender.setHost(host);
        // 네이버 아이디
        javaMailSender.setUsername(username);
        // 네이버 비밀번호
        javaMailSender.setPassword(password);
        javaMailSender.setPort(465);
        // 메일 인증서버 가져오기
        javaMailSender.setJavaMailProperties(getMailProperties());
        javaMailSender.setDefaultEncoding("UTF-8");

        return javaMailSender;
    }
    // 메일 인증서버 정보 가져오기
    private Properties getMailProperties() {
        Properties properties = new Properties();
        properties.setProperty("mail.transport.protocol", "smtp"); // 프로토콜 설정
        properties.setProperty("mail.smtp.auth", "true"); // smtp 인증
        properties.setProperty("mail.smtp.starttls.enable", "true"); // smtp strattles 사용
        properties.setProperty("mail.debug", "true"); // 디버그 사용
        properties.setProperty("mail.smtp.ssl.trust", "smtp.naver.com"); // ssl 인증 서버 (smtp 서버명)
        properties.setProperty("mail.smtp.ssl.enable", "true"); // ssl 사용

        return properties;
    }
}

 

카프카 프로듀서와 컨슈머 설정은 이전과 동일하다. 바뀐 점은 yml에서 컨슈머 그룹의 아이디가 바뀐 것뿐이다.

메일에 대한 config도 설정했다. 각 코드 옆에 주석이 있으니 설명을 주석을 확인하면 될 것 같다.

 

설정을 마쳤다면 프로듀서로 토픽에 주문정보인 메시지를 발행해서 파티션에 저장하게 만들고 카프카 리스너를 통해서 컨슈머가 소비하고 소비하게 된 부분을 메일로 보낼 수 있도록 구성하자.

 

필자는 결제 시스템으로 카카오를 사용한다.

 

기존 서비스에 위와 같이 결제 시 정보를 웹에서 확인하도록 만들었는데 이 부분을 포함해서 결제 정보와 배송정보를 함께 이메일로 전송하게끔 만들 것이다.

이를 위해서 dto가 하나 필요했다. 결제 시 반환 값을 가지는 객체와 주문한 유저의 배송 정보를 가지는 객체를 직렬화해서 카프카 메시지로 저장해야 하기 때문에 각 객체를 따로 저장하면 메시지가 2개가 되니 필요한 정보를 가지는 dto를 만들었다.

package com.project.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BuyInfoDto {
    private String zipcode;
    private String city;
    private String street;
    private LocalDateTime orderTime;
    private String quantity;
    private String totalAmount;
    private String item_name;
}

 

위와 같은 dto를 만들어서 이에 대한 정보를 메일로 보내도록 하겠다.

 

우선 중개인 안 토픽에 메시지를 저장해하니 프로듀서를 만들어주도록 하겠다.

package com.project.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.project.Delivery;
import com.project.dto.BuyInfoDto;
import com.project.pay.KakaoPayApprovalV0;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    public void serialization_SendMessage(Delivery delivery, KakaoPayApprovalV0 kakaoPayApprovalV0, int total_price) throws JsonProcessingException {

        String totalPrice = Integer.toString(total_price);
        BuyInfoDto buyInfoDto = new BuyInfoDto(
                delivery.getAddress().getZipcode(),
                delivery.getAddress().getCity(),
                delivery.getAddress().getStreet(),
                delivery.getOrderTime(),
                kakaoPayApprovalV0.getQuantity().toString(),
                totalPrice,
                kakaoPayApprovalV0.getItem_name()
        );
        objectMapper.registerModule(new JavaTimeModule());
        String message = objectMapper.writeValueAsString(buyInfoDto);

        kafkaTemplate.send("application-delivery",message);
    }
}

 

필요한 데이터를 파라미터로 받아서 직렬화하고 kafkaTemplate을 통해서 메시지를 발행하게 만들었다.

objectMapper.registerModule(new JavaTimeModule());

 

위 코드는 필자가 코딩하면서 발생했던 오류를 해결한 부분이다. 짧게 다룰 것이다.

아래의 에러 메시지를 확인하게 되었고 이는 Java 8 LocalDateTime을 지원하지 않는다고 한다.

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time type java.time.LocalDateTime not supported by default: add Module "com.fasterxml.jackson.datatype
" to enable handling (through reference chain:

 

찾아본 결과 LocalDateTime 필드를 ObjectMapper로 역직렬화와 직렬화 시에 생기는 에러이다.

기본 설정으로 Java 8의 날짜/시간 타입을 정확히 처리하지 못하는 이유는 해당 타입들은 표준 Java 라이브러리 일부가 아니라 Java 8에 처음 도입된 JSR 310 날짜/시간 API의 일부이기 때문이다.

implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'

위 의존성을 추가하고 위 registerModule을 추가하면 정상 동작이 된다.

 

다시 본론으로 돌아와서 발행한 메시지이제 소비하는 부분을 구성해야 한다.

public interface MailService {
    //메일 발송
    void sendSimpleMessage(String to,String messageText)throws Exception;
}

메일 서비스를 제공할 인터페이스를 만들었다. 추후 메일을 통한 인증이나 이런 것에 대해서도 이번에 메일을 공부하며 배웠기 때문에 여러 기능을 추가하게 되면 좋을 것 같아서 인터페이스로 만들었다.

 

package com.project.service;

import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import jakarta.mail.internet.MimeMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;



@Service
@RequiredArgsConstructor
@Slf4j
@Transactional
public class KafkaConsumerService implements MailService{
    // MailConfig에서 등록해둔 Bean을 autowired하여 사용하기
    private final JavaMailSender emailSender;
    @Value("${spring.mail.username}")
    private String username;

    @KafkaListener(topics = "application-delivery", groupId = "consumer_group_myProject")
    public void listen(@Payload String message) {
        log.info("소비하는 메시지:{}", message);
        try {
            // 실제 사용자 이메일을 여기에 전달해야 한다. 지금 예제는 본인 이메일로 고정
            sendSimpleMessage(username, message);
        } catch (Exception e) {
            log.error("메일 발송에 실패 했습니다.", e);
        }
    }
    // 메일 발송
    // sendSimpleMessage 의 매개변수 to는 이메일 주소가 되고,
    // MimeMessage 객체 안에 내가 전송할 메일의 내용을 담는다
    // bean으로 등록해둔 javaMail 객체를 사용하여 이메일을 발송한다
    @Override
    public void sendSimpleMessage(String to, String messageText) throws Exception {
        MimeMessage message = createMessage(to, messageText);
        try {
            //메일로 보내주는 메소드
            emailSender.send(message);
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalArgumentException();
        }
    }


    private MimeMessage createMessage(String to, String messageText) throws MessagingException {
        log.info("메일받을 사용자 : " + to);
        MimeMessage message = emailSender.createMimeMessage();
        message.addRecipients(Message.RecipientType.TO, to);
        // 이메일 제목
        message.setSubject("배송 정보입니다.");
        message.setText(messageText,"utf-8","html");
        message.setFrom(username);

        log.info("생성된 message:{}", messageText);

        return message;
    }

}

 

위는 카프카 컨슈머로 구현한 구현체이다. 어떤 기능을 하는지에 대해서는 주석도 달아 놓았다.

web 모듈에서도 의존성을 가져야 하기 때문에 아래와 같이 gradle에 추가했다.

implementation 'org.springframework.boot:spring-boot-starter-mail'
implementation 'org.springframework.kafka:spring-kafka'

 

이제 실행을 해보도록 하겠다.

실제 구매하고 이런 흐름은 생략하고 결제 시 로그랑 실제 메일이 오는지, 토픽에 메시지가 저장이 잘 되는지 확인을 해보고 포스팅을 마치도록 하겠다.

 

아래는 실행돼서 남긴 로그이다. 

 

 

위는 실제 도착한 메일이다. 메일 폼이 json 형태여서 마음에 안 들기도 하지만 이는 아래와 같이 html 코드를 넣어서도 만들 수 있다.

정확한 정보는 아니지만 필자의 생각으로는 옛날 스프링에서 서블릿을 사용할 때 아래와 같지는 않지만 동적인 html을 구성할 때 코드 안에서 html 코드를 포함해서 코드를 짜기도 했는데 스프링 mvc 패턴과 함께 viewResolver를 통해서 뷰를 편하게 구성하듯 이 부분도 그런 방법이 있을 것 같다고 생각했다. 메일을 보내는 부분은 처음 다뤄보는 주제이기 때문에 아직 부족하지만 차차 더 공부해보도록 하겠다.

 String msgg = "";
        msgg += "<h1>안녕하세요</h1>";
        msgg += "<h1>...</h1>";
        msgg += "<br>";
        msgg += "<br>";
        msgg += "<div align='center' style='border:1px solid black'>";
        msgg += "<h3 style='color:blue'>쇼핑몰 입니다</h3>";
        msgg += "<div style='font-size:130%'>";
        msgg += "원하는 필드 추가"
        msgg += "</div>";
        message.setText(msgg, "utf-8", "html");

 

 

 

포스팅은 여기까지이다.

이렇게 구성하면서 메일을 보내는 부분도 다뤄보게 되고 흥미로웠다. 사실 카프카를 사용하면서까지 메일을 보낼 필요는 없는 것 같다고 생각했지만 카프카로 실행하니 편한 부분도 많았다. 메일의 정보를 가지고 있지 않아도 되고 카프카를 통해 리스너로 결제에 반응형으로 제공할 수 있게 되었고 지금은 json 형태로 메일을 보내지만 만약 다른 서비스와 연계된 파이프라인이 생긴다면 카프카를 통해 데이터를 주고받을 수 있을 것 같다 예를 들어 택배사에 데이터를 보내고 받는다던지 여러 연계된 서비스를 할 수 있을 것 같고 처음 카프카를 다룰 땐 이게 그렇게 유용하게 사용이 되나? 싶었던 부분이 이번 공부로 인해서 확 와닿았다. 이를 잘 활용하면 간단한 채팅 같은 부분도 구현할 수 있을 것이다. 중개란 표현이 적절하구나 라는 생각이 들었다.

이번 포스팅에서 여러 개의 카프카를 다뤄보겠다고 말했지만 하나를 다뤘다. 다음 포스팅에서는 kafka에서 사용하는 connect라는 것이 있다. 이는 소스를 통해서 DB와 카프카를 연동해서 사용하는 것이다. 이 부분에 대해서는 아직 다뤄본 적은 없지만 한 번은 둘러보았기 때문에 DB와 카프카가 어떤 동작을 할 수 있을지 필자의 프로젝트에서 간단하게 구현해 볼 것이다. 다음 포스팅은 카프카 커넥트에 대해서 알아보고 그다음 포스팅에 구현을 해보도록 하겠다.

 

728x90