Spring batch chunk processing
Chunk-oriented Processing?
Spring Batch는 가장 일반적인 구현 내에서 ‘청크 지향’ 처리 스타일을 사용합니다. 청크 지향 처리는 한 번에 하나씩 데이터를 읽고 트랜잭션 경계 내에서 기록되는 ‘청크’를 생성하는 것을 말합니다. 읽은 항목 수가 커밋 간격과 같으면 ItemWriter가 전체 청크를 작성한 다음 트랜잭션이 커밋됩니다. 다음 이미지는 프로세스를 보여줍니다.
- Tasklet vs Chunk
Tasklet은 ‘한 작업 다음 다른 작업’ 시나리오에서 더 자연스럽게 느껴지지만, 청크는 페이지가 매겨진 읽기 또는 메모리에 상당한 양의 데이터를 유지하고 싶지 않은 상황을 처리하기 위한 간단한 솔루션을 제공합니다.
ItemStream?
ItemReader와 ItemWriter는 모두 개별 목적을 잘 수행하지만 다른 인터페이스를 필요로 하는 공통 관심사가 있습니다. 일반적으로 일괄 작업 범위의 일부로 판독기와 기록기는 열리고 닫히고 상태를 유지하기 위한 메커니즘이 필요합니다. ItemStream 인터페이스는 다음 예제와 같이 이러한 용도로 사용됩니다.
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
- ItemReader, ItemWriter는 ItemStream를 Implement
ItemReader?
간단한 개념이지만 ItemReader는 다양한 유형의 입력에서 데이터를 제공하는 수단입니다. 가장 일반적인 예는 다음과 같습니다.
- Flat File(.txt, .xlsx)
- XML
- Database
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
- Calling it returns one item or null if no more items are left
// MyBatisCursorItemReader
@Override
protected T doRead() throws Exception {
T next = null;
if (cursorIterator.hasNext()) {
next = cursorIterator.next();
}
return next;
}
- CursorReader
- Not thread safe
- Cursor-based
- High real-time
// MyBatisPagingItemReader
@Override
protected void doReadPage() {
Map<String, Object> parameters = new HashMap<String, Object>();
if (parameterValues != null) {
parameters.putAll(parameterValues);
}
parameters.put("_page", getPage());
parameters.put("_pagesize", getPageSize());
parameters.put("_skiprows", getPage() * getPageSize());
if (results == null) {
results = new CopyOnWriteArrayList<T>();
} else {
results.clear();
}
results.addAll(sqlSessionTemplate.<T> selectList(queryId, parameters));
}
- PagingReader
- Thread safe(Multi threading)
- Offset-based
- High performance
- 만약에 읽은 테이블의 데이터에 대해서 업데이트가 되고 하고, 페이징 방식으로 다음 청크가 진행되면 어떻게 될까?
ItemProceessor?
ItemProcessor는 간단합니다. 하나의 객체가 주어지면 이를 변환하고 다른 객체를 반환합니다. 제공된 객체는 같은 유형일 수도 있고 아닐 수도 있습니다. 요점은 비즈니스 논리가 프로세스 내에서 적용될 수 있으며 해당 논리를 만드는 것은 전적으로 개발자에게 달려 있다는 것입니다. ItemProcessor는 단계에 직접 연결할 수 있습니다. 예를 들어 ItemReader가 Foo 유형의 클래스를 제공하고 작성되기 전에 Bar 유형으로 변환해야 한다고 가정합니다. 다음 예는 변환을 수행하는 ItemProcessor를 보여줍니다.
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
- Chaining(여러 개 프로세스를 엮을 수 있음)
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
-
Validate
-
Filtering
- To filter a record, you can return null from the ItemProcessor
ItemWriter?
ItemWriter는 기능면에서 ItemReader와 유사하지만 역 연산이 있습니다. 리소스는 여전히 찾고, 열고, 닫을 필요가 있지만, ItemWriter가 읽는 것이 아니라 기록한다는 점에서 다릅니다. 데이터베이스나 큐의 경우 이러한 작업은 삽입, 업데이트 또는 보내기가 될 수 있습니다. 출력 직렬화 형식은 각 배치 작업에 따라 다릅니다.
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
// MyBatisBathItemWriter
@Override
public void write(final List<? extends T> items) {
if (!items.isEmpty()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Executing batch with " + items.size() + " items.");
}
for (T item : items) {
sqlSessionTemplate.update(statementId, item);
}
List<BatchResult> results = sqlSessionTemplate.flushStatements();
if (assertUpdates) {
if (results.size() != 1) {
throw new InvalidDataAccessResourceUsageException("Batch execution returned invalid results. " +
"Expected 1 but number of BatchResult objects returned was " + results.size());
}
int[] updateCounts = results.get(0).getUpdateCounts();
for (int i = 0; i < updateCounts.length; i++) {
int value = updateCounts[i];
if (value == 0) {
throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
+ " did not update any rows: [" + items.get(i) + "]", 1);
}
}
}
}
}