주녁, DevNote
article thumbnail

개요

  • Spring Batch를 이용해 반복적인 작업을 자동화한다.

목표

  • 특정 시간에 log 데이터를 읽어 DB에 저장하는 작업을 구현한다.

여정

Spring Batch 란?

대용량 일괄처리의 편의를 위해 설계된 가볍고 포괄적인 배치 프레임워크.

DI, AOP, 서비스 추상화 등 Spring 프레임워크의 3대 요소를 모두 사용할 수 있다.

언제 사용하나요?

  1. 대용량의 비즈니스 데이터를 복잡한 작업으로 처리해야하는 경우
  2. 특정한 시점에 스케쥴러를 통해 자동화된 작업이 필요한 경우
  3. (ex. 푸시알림, 월 별 리포트)
  4. 대용량 데이터의 포맷을 변경, 유효성 검사 등의 작업을 트랜잭션 안에서 처리 후 기록해야하는 경우

제약은 없나요?

  • 대용량 데이터 : 대량의 데이터를 가져오거나, 전달하거나, 계산하는 등의 처리를 할 수 있어야 한다.
  • 자동화 : 심각한 문제 해결을 제외하고는 사용자 개입 없이 실행되어야 한다.
  • 견고성 : 잘못된 데이터를 충돌/중단 없이 처리할 수 있어야 한다.
  • 신뢰성 : 무엇이 잘못되었는지를 추적할 수 있어야 한다. (로깅, 알림)
  • 성능 : 지정한 시간 안에 처리를 완료하거나 동시에 실행되는 다른 애플리케이션을 방해하지 않도록 수행되어야 함.

Spring Quartz랑 다른게 뭔가요?

Quartz는 스케줄러의 역할, 대용량 데이터 배치 처리에 대한 기능이 없다.

또한 Batch도 Quartz의 다양한 스케줄 기능을 지원하지 않기 때문에 보통은 Quartz + Batch를 조합해서 사용한다.


Spring Batch 동작 방식

Spring Batch 아키텍쳐

JobRepository

다양한 배치 수행과 관련된 수치 데이터와 잡의 상태를 유지 및 관리한다.

일반적으로 관계형 데이터베이스를 사용하며 스프링 배치 내의 대부분의 주요 컴포넌트가 공유한다.

실행된 Step, 현재 상태, 읽은 아이템 및 처리된 아이템 수 등이 모두 JobRepository에 저장된다.

Job

Job은 배치 처리 과정을 하나의 단위로 만들어 표현한 객체이고 여러 Step 인스턴스를 포함하는 컨테이너이다.

Job이 실행될 때 스프링 배치의 많은 컴포넌트는 탄력성(resiliency)을 제공하기 위해 서로 상호작용을 한다.

JobLauncher

Job을 실행하는 역할을 담당한다. Job.execute을 호출하는 역할이다.

Job의 재실행 가능 여부 검증, 잡의 실행 방법, 파라미터 유효성 검증 등을 수행한다.

스프링 부트의 환경에서는 부트가 Job을 시작하는 기능을 제공하므로, 일반적으로 직접 다룰 필요가 없는 컴포넌트다.

Job을 실행하면 해당 잡은 각 Step을 실행한다. 각 스텝이 실행되면 JobRepository는 현재 상태로 갱신된다.

Step

스프링 배치에서 가장 일반적으로 상태를 보여주는 단위이다. 각 Step은 잡을 구성하는 독립된 작업의 단위이다.

Step에는 Tasklet, Chunk 기반으로 2가지가 있다.

Tasklet

Step이 중지될 때까지 execute 메서드가 계속 반복해서 수행하고 수행할 때마다 독립적인 트랜잭션이 얻어진다. 초기화, 저장 프로시저 실행, 알림 전송과 같은 잡에서 일반적으로 사용된다.

Chunk

한 번에 하나씩 데이터(row)를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것

Chunk 단위로 트랜잭션을 수행하기 때문에 실패할 경우엔 해당 Chunk 만큼만 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다.

Chunk 기반 Step은 ItemReader, ItemProcessor, ItemWriter라는 3개의 주요 부분으로 구성될 수 있다.

 

Spring Batch 동작 순서도

ItemReader와 ItemProcessor에서 데이터는 1건씩 다뤄지고, Writer에선 Chunk 단위로 처리된다.

일반적으로 스프링 배치는 대용량 데이터를 다루는 경우가 많기 때문에 Tasklet보다 상대적으로 트랜잭션의 단위를 짧게 하여 처리할 수 있는 ItemReader, ItemProcessor, ItemWriter를 이용한 Chunk 지향 프로세싱을 이용한다.


환경 설정

// 의존성 추가
// gradle
dependencies {
    ...
    implementation 'org.springframework.boot:spring-boot-starter-batch'	
    ...
}

// pom.xml
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
@EnableBatchProcessing // 추가
@SpringBootApplication
public class SpringApplication {
    public static void main(String[] args) {
        ...
    }
}

위에서 추가한 Spring Batch 라이브러리에 org.springframework.batch.core 폴더에서

현재 사용하는 DB에 맞게 schema.sql을 찾아 테이블을 생성하도록 하자.

 

Spring Batch 예제 (Log 파일을 읽어 DB에 적재하기)

코드의 가독성을 위해 일부 코드를 생략하였기 때문에

코드 그대로 동작하지 않는 점 유의하자

로그 파일 설정을 위한 Logback 설정

<!-- logback.xml -->

<!-- 로그 적재를 위한 로그 파일 설정 추가 -->
    <appender name="QUERY" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>${LOG_PATH}/query/%d{yyyyMMdd}.log</FileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>INFO</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <charset>UTF-8</charset>
            <layout class="ch.qos.logback.classic.PatternLayout">
                <Pattern>[ %d{yyyy-MM-dd HH:mm:ss.SSS} ] [ %msg ] %n</Pattern>
            </layout>
        </encoder>
    </appender>

...

<logger name="QUERY-LOGGER" level="INFO">
    <appender-ref ref="QUERY"/>
</logger>

로그 적재 코드 추가

// QueryService.java

private static final org.slf4j.Logger queryLogger = LoggerFactory.getLogger("QUERY-LOGGER");


public ResponseResult insertQueryToFile(QueryForm form) {
	// Validation
	
	try {
	    clickinfoLogger.info(mapper.writeValueAsString(form)); // 로그 파일 적재
	} catch (JsonProcessingException e) {
	    throw new RuntimeException(e);
	}
}

Log File을 읽어 Service에 전달하는 Batch 클래스 작성

@RequiredArgsConstructor
@Configuration
public class QueryBatch {
		// Spring Batch Core에서 사용하는 Entity 클래스
		// EnableBatchProcessing으로 사용할 수 있게됨.
    private final JobBuilderFactory jobBuilderFactory; 
    private final StepBuilderFactory stepBuilderFactory;
    private final QueryService queryService; // 사용할 서비스
		
		// 파라미터를 꺼내올 Util 메서드
    protected static String getDateParam(ChunkContext chunkContext) {
        JobParameters jobParameters = chunkContext.getStepContext().getStepExecution().getJobParameters();
        return jobParameters.getString("dateParam");
    }

		@Bean
    public Job queryJob() {
        return jobBuilderFactory.get("QueryJob") // Job Repository에서 가져올 이름
								// 이 부분을 설정하지 않으면 Job 이름이 중복되어 완료된 작업은 Skip된다.
                .incrementer(new RunIdIncrementer()) 
                .start(cleanStep()) // Step 실행 시작, 결과에 따라 분기할 수 있다.
                    .on("COMPLETED")
                    .to(insertStep())
                    .on("*")
                    .end()
                .end()
                .build();
    }
		@Bean
    public Step cleanStep() {
        return stepBuilderFactory.get("CleanStep")
                .startLimit(RETRY_LIMIT) // 재시도 횟수
								// 실행할 작업을 정의
								// (contribution, chunkContext)은 Job을 실행할 외부에서 주입받는다.
                .tasklet((contribution, chunkContext) -> {
                    String dateParam = getDateParam(chunkContext);
                    queryService.cleanQueryWithDate(dateParam);
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
		@Bean
    public Step insertStep() {
        return stepBuilderFactory.get("InsertStep")
                .startLimit(RETRY_LIMIT)
                .tasklet((contribution, chunkContext) -> {
                    String dateParam = getDateParam(chunkContext);
                    List<QueryForm> formList = LogFileReader.read(LogPath(dateParam), QueryForm.class);
                    if(formList.isEmpty()) {
                        LOG.info("No data to insert");
                        return RepeatStatus.FINISHED;
                    }
                    queryService.insertQueryFromFile(formList, dateParam);
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

JPA를 이용해 독립적으로 동작하는 Batch 클래스 작성

Paging Size 와 Chunk Size를 동일하게 설정하는 것이 권장된다.

Paging Size와 Chunk Size를 동일하게 설정하면, 한 번에 처리하는 데이터 양이 일치하여 메모리 사용을 최적화할 수 있기 때문이다.

@RequiredArgsConstructor
@Configuration
public class DailyReportBatch {
    private static final int CHUNK_SIZE = 1000;
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;
    private final QueryService queryService;
    @Bean
    public Job dailyReportJob() {
        return jobBuilderFactory.get("DailyReportJob")
                .incrementer(new RunIdIncrementer())
                .start(cleanStep())
                    .on("*")
                    .to(insertStep())
                    .on("*")
                    .end()
                .end()
                .build();
    }
    @Bean
    public Step cleanStep() {
        return stepBuilderFactory.get("CleanStep")
                .startLimit(RETRY_LIMIT)
                .tasklet((contribution, chunkContext) -> {
                    String dateParam = getDateParam(chunkContext);
                    queryService.cleanDailyReportWithDate(dateParam);
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
    @Bean
    public Step insertStep() {
        return stepBuilderFactory.get("InsertStep")
                .startLimit(RETRY_LIMIT)
                .<User, User>chunk(CHUNK_SIZE) // 시작, 종료 Type 명시
                .reader(reader(null))
                .processor(processor())
                .writer(writer())
                .build();
    }
    @Bean
		// jobParameter에서 전달한 매개변수 가져오기
    public JpaPagingItemReader<User> reader(@Value("#{jobParameters[date]}") String date) {
 				// paramMap으로 필요한 쿼리에 필요한 매개변수 전달
				Map<String, Object> paramMap = new HashMap<>();
        paramMap.put("date", date);
				
				// EntityManager를 통한 Entity 읽어오기
        return new JpaPagingItemReaderBuilder<User>()
                .name("userReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(CHUNK_SIZE) // Paging Size == Chunk Size
                .queryString("SELECT u FROM User u WHERE u.date = :date") // 파라미터 바인딩
                .parameterValues(paramMap)
                .build();
    }
    @Bean
    public ItemProcessor<User, User> processor() {
        return user -> {
						// 각 Entity마다 처리할 함수 내용 정의
						// Processor 없이 Reader -> Writer도 가능함
            if(user.getLoginFailCount() > 5){ return null; } // 5회 이상 로그인 실패한 사용자는 제외
            LOG.info("User : {}", user);
            user.setName(user.getName() + "_test");
            return user;
        };
    }
    @Bean
    public JpaItemWriter<User> writer() {
				// Reader, 혹은 Processor로 부터 전달받은 내용을 저장
        return new JpaItemWriterBuilder<User>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }
}

여러 Job을 순서대로 실행하기 위한 JobLauncher 작성

JobLauncher를 사용하면 Job을 순서대로 복수개 실행할 수 있다.

이 경우, Batch 클래스에 있는 @Configuration과 @Bean 어노테이션을 제거해야한다.

2번씩 실행되기 때문이다. (Batch 클래스, JobConfig 클래스 Bean 생성시 각각 실행)

 

추가로 appilcation.properties에서 실행 주기를 설정할 수 있도록 해보았다.

# 날짜 형식 : yyyyMMdd / 빈 칸인 경우 어제 날짜 기준으로 조회
# Cron 스케줄 : 새벽 0시 30분 동작
cron.dateParam=
cron.expression=0 30 0 1/1 * ? *
@RequiredArgsConstructor
@Component
public class JobConfig {
    private final JobLauncher jobLauncher;
    private final QueryBatch queryBatch;
    private final DailyReportBatch clickDailyReportBatch;

    protected static final int RETRY_LIMIT = 3;

    @Value("${cron.dateParam}")
    private String dateParam;

    protected static String yesterday() {
        return LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
    }

    protected static String LogPath(String date) {
        return LOG_PATH + "/" + date + ".log";
    }

    @Scheduled(cron="${cron.expression}")
    public void runMultipleJobs() throws Exception {
        dateParam = StringUtils.isBlank(dateParam) ? yesterday() : dateParam;
				
				// Job에 전달할 파라미터 설정
        JobParameters jobParameters = new JobParametersBuilder()
                .addDate("date", new Date())
                .addString("dateParam", dateParam)
                .toJobParameters();

				// 설정한 순서대로 Job이 실행됨
        jobLauncher.run(queryBatch.queryJob(), jobParameters);
        jobLauncher.run(dailyReportBatch.dailyReportJob(), jobParameters);
    }
}

 

 

Q & A

  • transactionManager', 관련 에러 - A bean with that name has already been defined in class path resource [SimpleBatchConfiguration.class]
더보기

# application.properties에 아래 내용 추가
# Spring 2.1 이상 부터 Bea

# Enable Bean Overriding
spring.main.allow-bean-definition-overriding=true


마무리

지금까지 Spring Batch의 개념과 순서도, 예제를 통한 간단한 실습을 진행했다.

Spring Batch로 할 수 있는 일은 훨씬 더 많이 있다.

Flow를 통한 Fail Over 전략, 복잡한 쿼리를 해결하는 방법, 테스트 코드 등등

필요한 부분은 다음 시간에 추가로 학습해보도록 하자

 


참고자료

스프링 배치(Spring Batch)란? 소개 및 예제 — Bonglog - 기록과 정리의 공간 (tistory.com)

스프링 배치(Spring Batch) 활용하기 — Bonglog - 기록과 정리의 공간 (tistory.com)

Spring batch의 transaction manager bean 중복 문제 해결하기 (feat Spring boot 2.1) (tistory.com)

Spring Batch란? 이해하고 사용하기(예제소스 포함) :: 히진쓰의 서버사이드 기술 블로그 (tistory.com)

'Backend' 카테고리의 다른 글

Spring - 검증(Validation)  (0) 2023.01.23
Spring - 메시지, 국제화  (0) 2023.01.21
profile

주녁, DevNote

@junwork

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!