본문 바로가기
MSA/MSA관련기술

kafka 데이터 동기화 방법, connect

by 문자메일 2022. 12. 4.

RestTemplate나 FeignClient 통하여 MicroService간 통신함.

 

 

 

 

Kafka 통하여 통신함 (Producer/Consumer)

Kafka Topic & Listener 이용하는 방법. (그런데 이건 서비스 여러개 켰을 때, 토픽 1개 저장했을 때 하나의 서비스에서만 db에 업데이트 되는 것 확인하였음.. 하나의 서비스의 토픽 listener가 consume하고 사라져서 그런가..)

아래와 같은 listener를 만들어놓으면, 본인이 구독하는 topic 정보가 업데이트 되면 listener가 알아서 실행되는 방법

 

아래와 같은 DTO 객체를 직렬화해서 topic 이름 붙여서 kafka 클러스터로 보내면, consumer에서는 해당 topic이름으로 보내진 data를 읽어서 사용 가능하다.

{
  "productId": "CATALOG-002",
  "qty": 30,
  "unitPrice": 1500,
  "totalPrice": 45000,
  "orderId": "c49d3db8-f907-40a4-9142-8fa78c0265f6",
  "userId": "fdccff6d-9be2-446c-85c8-e5791276b183"
}

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try{
            jsonInString = mapper.writeValueAsString(orderDto);
        }catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from the  Order microservice : " + orderDto);

        return orderDto;
    }
}
@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }

    // 이 토픽에 데이터가 전달이 되면 그 데이터 값을 가져와서 이 메서드가 실행이 된다.
    @KafkaListener(topics = "example-catalog-topic")
    public void update(String kafkaMessage){
        log.info("Kafka Message: -> " + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try{
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        }catch (JsonProcessingException ex){
            ex.printStackTrace();
        }

        CatalogEntity entity = repository.findByProductId((String)map.get("productId"));
        if(entity!=null){
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            repository.save(entity);
        }
    }
}

 

Kafka Connect

현재까지 파악된 바로는 kafka connect를 사용해서 동기화 하는 방법으로는 아래 2가지가 있음.

1. 특정 Table에 Source Connect 걸었을 때, 그 Table에 변경사항 생기면 event가 발생하는 것 처럼 아래와 같은 json파일이 자동으로 생성되어 해당 Topic으로 저장됨.

그러면 Sink Connect 걸린 DB에서 해당되는 테이블에 정보를 자동으로 업데이트 진행함.

2. 특정 table에 source Connect 거는 것이 아니라, 특정 api 호출 시 아래와같은 json 스키마를 생성하여 바로 kafka connect 서버로 전송하여 topic 저장함. 그러면 sink connect 걸린 db가 자신의 topic이라면 data를 찾아가서 업데이트 하는 방식

 

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": true,
        "field": "order_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "product_id"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "qty"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "unit_price"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "total_price"
      }
    ],
    "optional": false,
    "name": "orders"
  },
  "payload": {
    "order_id": "1736abdd-1f95-4457-9341-1943100fcc60",
    "user_id": "fdccff6d-9be2-446c-85c8-e5791276b183",
    "product_id": "CATALOG-002",
    "qty": 30,
    "unit_price": 1500,
    "total_price": 45000
  }
}

댓글