보다 더 나은 내일의 나를 위해

Spring Batch 코드로 알아보기 본문

spring/batch

Spring Batch 코드로 알아보기

H-SC 2022. 4. 25. 16:19

이 포스트는 공식 문서​를 참고하여 작성되었습니다.

예제 코드는 깃허브에 작성되어 있습니다.

 


Person

package com.example.batch.model;

public class Person {
    private String lastName;
    private String firstName;

    // Constructor, getter, setter, toString...
}

읽어온 데이터로 구성될 객체입니다.

 


 

BatchConfiguration

package com.example.batch.config;

// ...
public class BatchConfiguration {
    // ...

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader") // (1)
                .resource(new ClassPathResource("sample-data.csv")) // (2)
                .delimited()
                .names(new String[]{"firstName", "lastName"}) // (3)
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                    setTargetType(Person.class);
                }}) // (4)
                .build();
    }

    // ...
}

ItemReader을 생성하는 부분입니다.

FlatFileItemReader은 DB가 아닌 Resource에서 데이터를 읽어오는 구현체이다.

(1) ExecutionContext의 내부 키를 계산하는 데 사용되는 이름을 입력합니다.

 

(2) Resource 파일을 찾습니다.

여기서는 sample-data.csv 파일을 선택합니다.

기본 path는 src/main/resources 입니다.

 

(3) 형식에 맞춰 데이터를 잘라냅니다.

 

(4) 잘라낸 데이터를 객체로 생성합니다.

 

 

package com.example.batch.config;

// ...
public class BatchConfiguration {
    // ...

    @Bean
    public PersonItemProcessor processor() {/* ... */} // (1)
    
    // ...
}

ItemProcessor을 생성합니다.

직접 구현한 PersonItemProcessor을 사용하였습니다.

PersonItemProcessor는 아래에 있습니다.

processor는 ItemProcessor<I, O> 인터페이스를 상속받아서 작성해야 합니다.

(1) 구현한 처리기를 스프링 빈에 등록시킵니다.

 

 

 

package com.example.batch.config;

// ...
public class BatchConfiguration {
    // ...

    @Bean
    public JdbcBatchItemWriter<Person> write(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>() // (1)
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(dataSource) 
                .build();
    }

    // ...
}

ItemWriter를 생성합니다.

데이터베이스에 데이터를 쓰기 위해 jdbc를 사용합니다.
이를 위해 사용한 JdbcBatchItemWriter은 쿼리를 ChunkSize만큼 쌓아 한번에 DB로 전송합니다.

(1) jdbc 사용을 위한 ItemWriter입니다.

 

 

 

package com.example.batch.config;

// ...
public class BatchConfiguration {
    // ...

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step) {
        return jobBuilderFactory.get("importUserJob") // (1)
                .incrementer(new RunIdIncrementer()) // (2)
                .listener(listener) // (3)
                .flow(step) // (4)
                .end()
                .build();
    }

    // ...
}

Job을 정의합니다.

 

(1) job builder를 생성하고 해당 job repository를 초기화 합니다.

Bean에 등록되어 있는 이름과 작업의 이름이 달라도 됩니다.

 

(2) 동일 Job Parameter로 계속 실행이 될 수 있도록 run.id라는 임의의 파라미터를 추가로  사용해 매번 run.id의 값을 변경해줍니다.

 

(3) JobExecutionListener를 상속받은 listener를 등록할 수 있다.

 

(4) step에 따라 작업의 흐름을 제어할 수 있게 한다.

 

 

 

package com.example.batch.config;

// ...
public class BatchConfiguration {
    // ...

    @Bean
    public Step step(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step") // (1)
                .<Person, Person> chunk(10) // (2)
                .reader(reader()) // (3)
                .faultTolerant()
                .skip(FlatFileParseException.class)
                .skipLimit(3) // (4)
                .processor(processor()) // (5)
                .writer(writer) // (6)
                .build();
    }
    
    // ...
}

Step을 정의합니다.

 

(1) step builder를 생성하고 job repositorytransaction manager를 초기화합니다. Bean에 등록되어있는 이름과 단계 이름이 달라도 됩니다.

 

(2) chunk개수를 지정합니다.

 

(3) ItemReader를 등록합니다.

 

(4) ItemReader로 데이터를 읽을 때 FlatFileParseException 발생 시 지정한 개수만큼 스킵하고 진행합니다.

 

(5) ItemProcessor을 등록합니다.

 

(6) ItemWriter을 등록합니다.

 


 

PersonItemProcessor

package com.example.batch.lib;

// ...
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    @Override
    public Person process(Person item) throws Exception {
        // ...
    }
}

직접 작성한 ItemProcessor입니다.

ItemProcessor로 사용하기 위해 ItemProcessor<I, O>인터페이스를 상속 받아야 합니다.

오버라이딩 한 process()  함수에 처리 로직을 넣습니다.

 


 

JobCompletionNotificationListener

package com.example.batch.lib;

// ...
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    // ...
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        // ...
    }
}

직접 작성한 listener입니다.

listenerJobExecutionListenter를 상속받아야 합니다.

JobExecutionListener 인터페이스에는 afterJob()beforeJob() 함수가 있습니다.

하지만 이 예제에서는 afterJob()함수만 필요하므로 JobExecutionListener 인터페이스의 구현체인 JobExecutionListenerSurpport를 상속받습니다.

'spring > batch' 카테고리의 다른 글

Spring Batch 개념잡기  (0) 2022.04.25
Comments