1. Kafka를 사용하고자 하는 producer, consumer MSA 둘 다 pom.xml에 kafka 의존성을 추가해야 한다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. Producer 부분
1. @EnableKafka 어노테이션 설정
2. producerFactory() 생성 / 설정 값 저장하는 파일로 보임, key, value 에 StringSerializer 적용
3. KafkaTemplate 인스턴스 사용하여 메시지 보내는 것으로 보임
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducer 클래스 파일에서 kafkaTemplate 인스턴스를 생성자 주입으로 받은 다음,
ObjectMapper 인스턴스로 producerFactory에서 직렬화 클래스로 지정한 type으로 보낼 값을 변환한 후,
kafkaTemplate.send(TOPIC, VALUE); 값으로 전송한다.
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString ="";
try{
jsonInString = mapper.writeValueAsString(orderDto);
}catch(JsonProcessingException ex){
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice : " + orderDto);
return orderDto;
}
}
아래처럼 Controller에서 kafkaProducer.send("example-catalog-topic", orderDto); 처럼 메시지 송신 한다.
//http://127.0.0.1:0/order-service/{user_id}/orders/
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
/* jpa */
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createdOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
3. Consumer 부분
1. kafka
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
@KafkaListener(topics = "####") 에 지정된 메시지로 카프카 메시지가 날라오면 해당 annotation이 붙은 메스드의 String 파라메터값에 값이 들어가도록 구현되어 있는듯 하다.
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository repository) {
this.repository = repository;
}
@KafkaListener(topics = "example-catalog-topic")
public void update(String kafkaMessage){
log.info("Kafka Message: -> " + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try{
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
}catch (JsonProcessingException ex){
ex.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
if(entity!=null){
entity.setStock(entity.getStock() - (Integer)map.get("qty"));
repository.save(entity);
}
}
}
'MSA > MSA관련기술' 카테고리의 다른 글
API Gateway Service (0) | 2022.07.04 |
---|---|
유레카 클라이언트 설정 (0) | 2022.07.03 |
Spring Cloud Netflix Eureka 서버 설정 (0) | 2022.07.03 |
MSA 아키텍처 1 (0) | 2022.07.03 |
웹서버 사용하는 이유, 프록시 서버 (0) | 2022.07.03 |
댓글