[Spring-Kafka 경험기] Spring-Kafka JsonDeserializer 사용할 때 주의해야할 점
- spring-kafka 를 사용해서 consumer 를 구축할 때, 아래와 같이 원하는 객체(TestV1DTO)로 바로를 사용하기 위해
JsonDeserializer
를 선언할 경우 리팩토링할 때 주의해야할 점이 있습니다.
1. Config
1-1. ConsumerFactory Config
- 아래와 같이 설정하여 @Bean 을 만들어 사용.
Map<String, Object> consumerConfigs = new HashMap<>();
//.... 설정값
return new DefaultKafkaConsumerFactory<>(
consumerConfigs,
new StringDeserializer(),
new JsonDeserializer<>(TestV1DTO.class, new ObjectMapper()) //요부분!!!!
);
1-2. Consumer listener
@KafkaListener
- 아래와 같이 받아서 손쉽게 사용 가능!
@Payload TestV1DTO data
- 아래와 같이 받아서 손쉽게 사용 가능!
2. Class 리네임…
- 리팩토링을 위해
TestV1DTO
를TestV2DTO
로 변경 했습니다.
3. 변경 후 테스트!
3-1. 로컬에서 테스트
- 리팩토링한 부분에 대해서 테스트 하기 위해서 Producer 와 Consumer 를 띄운 다음, 데이터를 보내고 잘 받는걸 확인 했습니다!
3-2. 개발환경 배포…
- 단순 리팩토링이라 딱히 순서에 상관이 없을거라 생각하고 아무거나(Producer) 먼저 배포 했습니다.
- 갑자기 Consumer 에서 에러가 발생!!!!!!!
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.test.kafka.TestV2DTO]; nested exception is java.lang.ClassNotFoundException: com.test.kafka.TestV2DTO
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138)
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:342)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1030)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1250)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:741)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
- Consumer 는 아직 배포하지 않았는데,
TestV2DTO
를 못찾..? 뭔가 이상하다….- Consumer 를 먼저 띄우게 된다면…
TestV1DTO
를 못 찾는다고 나옴.
- Consumer 를 먼저 띄우게 된다면…
- 혹시 Header 에 관련 Class 값이 들어가 있는지 확인해 보았지만 없었다.
- 컨슈머에서 아래와 같이 messageHeaders 를 꺼내서 확인
@Headers MessageHeaders messageHeaders
for (Entry<String, Object> entry : messageHeaders.entrySet()) {
log.info("key : {}, value : {}", entry.getKey(), entry.getValue());
}
4. 분석
- 에러로그를 하나씩 따라가 보자…….
4-1. JsonDeserializer 의 deserialize() 부터 확인해보자
- header 에서 값을 꺼내서 Class 를 찾는걸 볼 수 있다.
- spring-kafka 2.2.5
if (this.typeMapper.getTypePrecedence().equals(TypePrecedence.TYPE_ID)) { JavaType javaType = this.typeMapper.toJavaType(headers); if (javaType != null) { deserReader = this.objectMapper.readerFor(javaType); } }
- spring-kafka 2.8.3
if (javaType == null && this.typeMapper.getTypePrecedence().equals(TypePrecedence.TYPE_ID)) { javaType = this.typeMapper.toJavaType(headers); }
4-2. org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper
String typeIdHeader = retrieveHeaderAsString(headers, getClassIdFieldName()); // getClassIdFieldName() 는 `__TypeId__`
- 해당 로직에 debug 로 보면…
- 분명 Message Header 를 확인했을 때는 없던 값이 있다.
- value 를 확인해보니 Class 가 들어있다. (value :
com.test.kafka.TestV2DTO
)
typeIdHeader
값이 없으면 null, 있으면 Class 를 찾는다.- 여기서 리팩토링으로 해당 Class 는 없어졌으니…
ClassNotFoundException
발생
- 여기서 리팩토링으로 해당 Class 는 없어졌으니…
4-3. Header 에 있는 ` __TypeId__
값은 어디서 온걸까?
- JsonSerializer 의
serialize()
를 확인해보자. - spring-kafka 2.2.5
if (this.addTypeInfo && headers != null) { this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), headers); }
- spring-kafka 2.8.3
if (this.addTypeInfo && headers != null) { this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), headers); }
this.typeMapper
로 선언된DefaultJackson2JavaTypeMapper
의fromJavaType()
를 찾아보면 여기서 해당 Header 값을 세팅하는걸 볼 수 있다… (왜 Message Header 에서 안보였는지는 좀 더 아래에 작성 했습니다.)String classIdFieldName = getClassIdFieldName(); addHeader(headers, classIdFieldName, javaType.getRawClass()); //classIdFieldName 는 `__TypeId__`
5. 해결방안
- a. 리팩토링으로 리네임하기 전 클래스를 남겨놓는다…
TestV2DTO
를 사용할 예정이므로…TestV1DTO
를 기존 패키지에 동일하게 남겨놓는다…- 배포하고 난 뒤 또 지워야한다. 귀찮다…
- b. JsonDeserializer 로 사용하던 부분을
StringDeserializer
로 변경- JsonDeserializer 를 잘 쓰고 있었으므로 변경 필요성을 못느껴서 좀 더 다른 방안을 찾아보기로함.
- c.
this.typeMapper.getTypePrecedence().equals(TypePrecedence.TYPE_ID)
를false
로 바꾸게 되면 발생 안할 것 같음!- 이 부분에 대해서 좀 더 확인해보자!
5-1. c 에 대한 세부확인
- 아래 영역에 대해서 어디서 값을 세팅하는지 확인해보자.
this.typeMapper.getTypePrecedence()
- 생성자에서 세팅하는걸 확인할 수 있다.
- spring-kafka 2.2.5, 2.8.3 같음.
this.typeMapper.setTypePrecedence(useHeadersIfPresent ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
- spring-kafka 2.2.5, 2.8.3 같음.
boolean useHeadersIfPresent
: true to use headers if present and fall back to target type if not.- 기본값은
true
… - 해당 값을
false
로 변경하게 되면TypePrecedence.INFERRED
로 동작하게 되고, JsonDeserializer 에 선언한 class(TestV2DTO.class) 를 사용하게 됩니다.
- 기본값은
JsonDeserializer<TestV2DTO> jsonDeserializer = new JsonDeserializer<>(TestV2DTO.class, KafkaStaticOptions.OBJECT_MAPPER, false);
5-2. 테스트
- 해당 값을 변경하고 우선순위에 상관없이 잘 동작하는걸 확인 할 수 있었음!!
6. 추가 확인….
- 분명
__TypeId__
라는 Header 값이 있는데, 왜 컨슘하는 listener 에서는 해당 데이터를 확인 할 수 없었을까? JsonDeserializer
의deserialize()
메소드를 확인해보면 아래와 같은 부분이 있다.removeTypeHeaders
의 기본값은true
.if (this.removeTypeHeaders) { this.typeMapper.removeHeaders(headers); }
- JsonDeserializer 에 해당 값을 false 로 세팅하게 되면 header 들을 제거하지 않음.
jsonDeserializer.setRemoveTypeHeaders(false);
- 아래와 같이 선언하여 확인해보면 Header 값들을 확인 할 수 있다.
@Header(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME) String typedId,
@Header(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME) Object contentFieldName,
@Header(AbstractJavaTypeMapper.DEFAULT_KEY_CLASSID_FIELD_NAME) Object keyFieldName,