본문 바로가기
MSA/MSA관련기술

kafka Producer / Consumer 설명 및 예제코드

by 문자메일 2022. 6. 2.

1. Kafka를 사용하고자 하는 producer, consumer MSA 둘 다 pom.xml에 kafka 의존성을 추가해야 한다.

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

 

2. Producer 부분

1. @EnableKafka 어노테이션 설정

2. producerFactory() 생성 / 설정 값 저장하는 파일로 보임, key, value 에 StringSerializer 적용

3. KafkaTemplate 인스턴스 사용하여 메시지 보내는 것으로 보임

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

KafkaProducer 클래스 파일에서 kafkaTemplate 인스턴스를 생성자 주입으로 받은 다음,

 ObjectMapper 인스턴스로 producerFactory에서 직렬화 클래스로 지정한 type으로 보낼 값을 변환한 후,

 kafkaTemplate.send(TOPIC, VALUE); 값으로 전송한다.

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString ="";
        try{
            jsonInString = mapper.writeValueAsString(orderDto);
        }catch(JsonProcessingException ex){
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from the Order microservice : " + orderDto);

        return orderDto;
    }
}

 

아래처럼 Controller에서 kafkaProducer.send("example-catalog-topic", orderDto); 처럼 메시지 송신 한다.

//http://127.0.0.1:0/order-service/{user_id}/orders/
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                 @RequestBody RequestOrder orderDetails){
    ModelMapper mapper = new ModelMapper();
    mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

    /* jpa */
    OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
    orderDto.setUserId(userId);
    OrderDto createdOrder = orderService.createOrder(orderDto);

    ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

    /* send this order to the kafka */
    kafkaProducer.send("example-catalog-topic", orderDto);

    return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}

 

3. Consumer 부분

 

1. kafka

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}

 

@KafkaListener(topics = "####") 에 지정된 메시지로 카프카 메시지가 날라오면 해당 annotation이 붙은 메스드의 String 파라메터값에 값이 들어가도록 구현되어 있는듯 하다.

@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }

    @KafkaListener(topics = "example-catalog-topic")
    public void update(String kafkaMessage){
        log.info("Kafka Message: -> " + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try{
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        }catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
        if(entity!=null){
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            repository.save(entity);
        }
    }
}

 

'MSA > MSA관련기술' 카테고리의 다른 글

API Gateway Service  (0) 2022.07.04
유레카 클라이언트 설정  (0) 2022.07.03
Spring Cloud Netflix Eureka 서버 설정  (0) 2022.07.03
MSA 아키텍처 1  (0) 2022.07.03
웹서버 사용하는 이유, 프록시 서버  (0) 2022.07.03

댓글