Kafka는 Consumer가 자신이 읽은 위치의 offset을 기록하도록 commit을 하도록 한다.
이때 자동 커밋과 수동 커밋으로 나뉜다.
자동 커밋
자동 커밋은 특정 주기마다 커밋을 자동으로 하는 방식이다. 하지만 자동 커밋은 컨슈머 그룹의 리벨런싱이 발생할 때 중복 또는 유실이 발생한다.
1. 메시지 유실
자동 커밋의 주기가 1초라고 가정하였을 때, 메시지를 poll 하고 처리하던 컨슈머가 메시지를 처리하는데 오래 걸려 1초를 초과한 후에 장애가 발생할 경우, 이미 커밋을 하였기 때문에 해당 메시지는 더 이상 처리할 수 없다. 따라서 메시지 유실이 발생한다.
2. 메시지 중복
자동 커밋의 주기가 5초라고 가정하였을 때, 메시지를 poll 하고 컨슈머가 5초 이내에 메시지를 처리하였다. 하지만 어떠한 이유로 해당 컨슈머가 죽게 되면, 해당 메시지의 대한 커밋은 처리하지 못한 상태가 된다. 따라서 다른 컨슈머가 메시지를 중복하여 처리한다.
수동 커밋
수동 커밋은 Consumer 코드에서 명시적으로 커밋하는 방식이다. 수동 커밋은 커밋 위치에 따라 메시지 중복 또는 유실이 발생한다.
1. 메시지 유실
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
consumer.commitAsync();
... // 메시지 처리 코드
위와 같이 메시지를 poll을 하고, 바로 커밋을 하게 되면 offset은 기록되어 이 메시지는 더 이상 중복되어 처리되지 않는다. 하지만 메시지를 처리하는 코드에서 에러가 발생할 경우, 해당 메시지와 함께 가져온 메시지들은 처리하지 못하고 유실이 발생한다.
2. 메시지 중복
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
... // 메시지 처리 코드
consumer.commitAsync();
위 코드는 메시지 유실을 방지하기 위해 컨슈머가 메시지를 poll을 하고, 바로 커밋을 하는 것이 아니라 메시지를 처리하는 코드를 선행한 후에 커밋을 하였다. 하지만 메시지 처리하는 코드에서 에러가 발생할 경우 커밋을 하지 못한다. 이 경우 다시 메시지를 가져오기 때문에 이미 처리한 메시지들은 중복이 발생하게 된다. 이와 같이 메시지가 중복은 발생할 수 있어도 최소 한 번은 전달되는 방식을 at least once(최소 한 번)이라고 한다.
메시지 중복 방지
메시지 유실은 수동 커밋을 사용한다면 방지가 가능하다. 하지만 수동 커밋을 사용한다고 하더라도 메시지 중복이 발생한다. 메시지 중복을 막을 수 있는 방법은 무엇일까?
kafka에서는 exactly-once를 지원한다. 하지만 이 기능은 producer-kafka(broker)-consumer의 파이프라인 동안에 오직 한 번만 전달되는 의미이다. 즉 producer 쪽에 발생할 수 있는 메시지 유실에 더 집중한 기능이다. 이 기능을 사용한다고 해도 수동 커밋에 의한 메시지 중복을 피할 수 있는 것은 아니다. 따라서 컨슈머의 중복처리는 따로 로직을 작성해야한다.
1. 메시지를 DB에 insert만 하는 경우
컨슈머의 역할이 로깅과 같은 메시지를 DB에 저장하는 작업이라면 중복은 큰 문제가 되지 않는다. PK와 같은 식별자를 통해 DB 자체에서 검증할 수 있으므로 해당 예외만 처리하여 구현하면 된다.
try {
dao.insert(log);
} catch (DataIntegrityViolationException ex) {
logger.error("Log insert error. {}", log, ex);
}
2. 메시지를 DB에 insert 후에 다음 메시지는 update 하는 경우
상태와 같은 로그성 메시지일 경우, 처음 들어온 메시지는 insert를 해야 할 것이고 이후에 들어온 동일한 식별자는 update를 해야 하는 경우 아래와 같이 코드를 작성할 수 있다.
try {
dao.insert(log);
} catch (DataIntegrityViolationException ex) {
logger.error("Log insert error. {}", log, ex);
dao.update(log);
}
하지만 위 코드는 파티션이 2개 이상이고, 라운드 로빈 방식으로 파티션에 나눠 보내면 문제가 발생한다. 최종 상태가 먼저 insert 된 후, 초기 상태가 이후에 처리되어 update 될 수 있기 때문이다. 또한 메시지 유실이 발생할 경우 각각의 처리하는 컨슈머가 다르므로 뒤죽박죽 처리될 것이다.
따라서 상태를 반영해야 하는 메시지일 경우 카프카는 파티션에 대해서는 메시지 순서를 보장하므로 동일한 식별자인 메시지에 대해서는 동일한 파티션으로만 보내야 한다. 이렇게 하면 메시지 유실이 발생한다고 하여도 메시지의 순서는 보장되므로 최종 상태도 정상 반영된다.
다만 위와 같이 코드를 작성할 경우 DB에 항상 먼저 insert를 한 후에 문제가 발생하면 update를 처리하므로 DB에 부하가 심하다. 따라서 상태와 같은 항목을 조건으로 하여 아래와 같이 작성하는 게 성능상으로 좋다.
public void process() {
if(TransactionProcessStatusEnum.PROGRESS == status) {
insert(log);
} else {
int result = dao.update(log);
if (result <= 0) {
insert(log);
}
}
}
public void insert(Log log) {
try {
dao.insert(log);
} catch (DataIntegrityViolationException ex) {
logger.error("Log insert error. {}", log, ex);
dao.update(log);
}
}
3. 온라인(즉시적인) 메시지 처리일 경우