본문 바로가기
Lecture

스프링 배치 병렬처리, mock과 static mock, AssertFile을 이용한 배치 로직 테스트

by Renechoi 2023. 7. 12.

 

스프링 배치에는 4가지 방법의 병렬처리 방식이 있다. 

 

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#scalability

 

 

275개의 데이터가 있는 input.txt 파일을 읽어 output으로 변환하는 작업을 병렬로 처리해보자. 

 

데이터는 다음과 같다. 

1   데이터-1  10
2  데이터-2  20
3  데이터-3  30
4  데이터-4  40
5  데이터-5  50
6  데이터-6  60
7  데이터-7  70
8  데이터-8  80
9  데이터-9  90
10 데이터-10 100
11 데이터-11 110
12 데이터-12 120
13 데이터-13 130
14 데이터-14 140
15 데이터-15 150

// ...

 

 

먼저 전처리를 위해 dto와 mapper를 작성한다. 

 

@Data
public class AmountDto {
    private int index;
    private String name;
    private int amount;
}
public class AmountFieldSetMapper implements FieldSetMapper<AmountDto> {

    @Override
    public AmountDto mapFieldSet(FieldSet fieldSet) {
        AmountDto amount = new AmountDto();
        amount.setIndex(fieldSet.readInt(0));
        amount.setName(fieldSet.readString(1));
        amount.setAmount(fieldSet.readInt(2));
        return amount;
    }
}

 

 

단일 스레드로 진행하는 배치 로직은 다음과 같이 작성할 수 있다. 해당 폼을 기본으로 하고 멀티 스레드를 TaskExecutor를 작성해 설정할 수 있다. 

 

@Configuration
@AllArgsConstructor
public class JobConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job JobConfig(Step multiThreadStep) {
        return jobBuilderFactory.get("multiThreadStepJob")
                .incrementer(new RunIdIncrementer())
                .start(multiThreadStep)
                .build();
    }

    @JobScope
    @Bean
    public Step multiThreadStep(FlatFileItemReader<AmountDto> amountFileItemReader,
                                ItemProcessor<AmountDto, AmountDto> amountFileItemProcessor,
                                FlatFileItemWriter<AmountDto> amountFileItemWriter,
                                TaskExecutor taskExecutor
                                ) {
        return stepBuilderFactory.get("multiThreadStep")
                .<AmountDto, AmountDto>chunk(10)
                .reader(amountFileItemReader)
                .processor(amountFileItemProcessor)
                .writer(amountFileItemWriter)
                // todo -> 멀티 스레드를 받아오기 .taskExecutor(taskExecutor)
                .build();
    }


    @StepScope
    @Bean
    public FlatFileItemReader<AmountDto> amountFileItemReader() {
        return new FlatFileItemReaderBuilder<AmountDto>()
                .name("amountFileItemReader")
                .fieldSetMapper(new AmountFieldSetMapper())
                .lineTokenizer(new DelimitedLineTokenizer(DelimitedLineTokenizer.DELIMITER_TAB))
                .resource(new FileSystemResource("spring-batch-practice/data/input.txt"))
                .build();
    }

    @StepScope
    @Bean
    public ItemProcessor<AmountDto, AmountDto> amountFileItemProcessor() {
        return item -> {
            System.out.println(item + "\tThread = " + Thread.currentThread().getName());
            item.setAmount(item.getAmount() * 100);
            return item;
        };
    }

    @StepScope
    @Bean
    public FlatFileItemWriter<AmountDto> amountFileItemWriter() throws IOException {
        BeanWrapperFieldExtractor<AmountDto> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"index", "name", "amount"});
        fieldExtractor.afterPropertiesSet();

        DelimitedLineAggregator<AmountDto> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setFieldExtractor(fieldExtractor);

        String filePath = "spring-batch-practice/data/output.txt";
        new File(filePath).createNewFile();

        return new FlatFileItemWriterBuilder<AmountDto>()
                .name("amountFileItemWriter")
                .resource(new FileSystemResource(filePath))
                .lineAggregator(lineAggregator)
                .build();
    }

}

 

 

 

Multi-threaded Step 

 

 

병렬 처리를 구현하는 것은 간단하다. 4가지 방식 중 첫 번째인 Multi-threaded Step은 다음과 같이 구현할 수 있다. 

 

@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("spring-batch-task-executor");
    taskExecutor.setConcurrencyLimit(4);
    return taskExecutor;
}

 

 

step에서 이 taskExecutor를 추가해준다. 

 

@JobScope
@Bean
public Step multiThreadStep(FlatFileItemReader<AmountDto> amountFileItemReader,
                            ItemProcessor<AmountDto, AmountDto> amountFileItemProcessor,
                            FlatFileItemWriter<AmountDto> amountFileItemWriter,
                            TaskExecutor taskExecutor
                            ) {
    return stepBuilderFactory.get("multiThreadStep")
            .<AmountDto, AmountDto>chunk(10)
            .reader(amountFileItemReader)
            .processor(amountFileItemProcessor)
            .writer(amountFileItemWriter)
            .taskExecutor(taskExecutor)
            .build();
}

 

작업 현황에 스레드 정보를 print해보면 다음과 같다. 

 

 

 

결과물은 어떨까? 

 

순차처리를 했을 때와 대비해 멀티 스레드를 통한 작업 결과는 뒤죽박죽인 것을 확인할 수 있다. 

 

 

 

parallel Step 

 

패러럴 스텝은 스텝 여러 개를 동시에 실행할 수 있도록 해준다. 앞서 multi-threaded step은 chunk 단위로 한다면 이것은 step 단위로 한다. 

 

단일 프로세스에서 flow를 사용해 step 자체를 병렬로 처리하는 방식이다. 

 

코드 구현은 다음과 같다. 

 

/**
 * 단일 프로세스 멀티 쓰레드에서 Flow를 사용해 스텝을 동시에 실행한다.
 */
@Configuration
@AllArgsConstructor
public class ParallelStepJobConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job parallelJob(Flow splitFlow) {
        return jobBuilderFactory.get("parallelJob")
                .incrementer(new RunIdIncrementer())
                .start(splitFlow)
                .build()
                .build();
    }

    @Bean
    public Flow splitFlow(TaskExecutor taskExecutor,
                          Flow flowAmountFileStep,
                          Flow flowAnotherStep) {
        return new FlowBuilder<SimpleFlow>("parallelFlow")
                .split(taskExecutor)
                .add(flowAmountFileStep, flowAnotherStep)
                .build();
    }

    @Bean
    public Flow flowAmountFileStep(Step amountFileStep) {
        return new FlowBuilder<SimpleFlow>("flowAmountFileStep")
                .start(amountFileStep)
                .build();
    }

    @Bean
    public Step amountFileStep(FlatFileItemReader<AmountDto> amountFileItemReader,
                                ItemProcessor<AmountDto, AmountDto> amountFileItemProcessor,
                                FlatFileItemWriter<AmountDto> amountFileItemWriter
    ) {
        return stepBuilderFactory.get("multiThreadStep")
                .<AmountDto, AmountDto>chunk(10)
                .reader(amountFileItemReader)
                .processor(amountFileItemProcessor)
                .writer(amountFileItemWriter)
                .build();
    }

    @Bean
    public Flow flowAnotherStep(Step anotherStep) {
        return new FlowBuilder<SimpleFlow>("anotherStep")
                .start(anotherStep)
                .build();
    }

    @Bean
    public Step anotherStep() {
        return stepBuilderFactory.get("anotherStep")
                .tasklet((contribution, chunkContext) -> {
                    Thread.sleep(500);
                    System.out.println("Another Step Completed. Thread = " + Thread.currentThread().getName());
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

 

 

splitFlow 메서드는 Flow Builder를 통해 Flow를 생성한다. 이때 TaskExecutor를 가져온다. 

 

스텝은 두 가지로 분기하여 구현한다.  flowAmountFileStep 메서드에서는 amountFileStep을 포함하는 Flow를 생성하고, flowAnotherStep 메서드에서는 anotherStep을 포함하는 Flow를 생성한다.

amountFileStep은 stepBuilderFactory를 통해 생성되며, 다음과 같은 기능을 수행한다

 

- FlatFileItemReader<AmountDto>를 사용하여 AmountDto를 읽어온다.
- ItemProcessor<AmountDto, AmountDto>를 사용하여 AmountDto를 처리한다.
- FlatFileItemWriter<AmountDto>를 사용하여 처리된 AmountDto를 출력한다.
- chunk(10)을 통해 10개의 항목을 한 번에 처리한다.

 


anotherStep은 stepBuilderFactory를 통해 생성되며, 다음과 같은 기능을 수행한다


- tasklet을 사용하여 비즈니스 로직을 정의한다.
- 500ms 동안 대기한 후 "Another Step Completed" 메시지와 현재 스레드의 이름을 출력한.
- RepeatStatus.FINISHED를 반환하여 스텝의 완료를 나타낸다.

 

 

이렇게 작성한 배치를 돌려보면 어떻게 될까? 

 

 

순차적으로 진행되는 것을 볼 수 있다. 

 

이때 다른 스레드에서 다른 스텝으로 설정한 another 스텝은 어떻게 되었을까? 

 

다른 스레드에서 실행되는 것을 확인할 수 있다. 

 

첫 번째는 스레드 1에서, 두 번째는 2에서 실행되되, 먼저 완료된 것은 완료되었다는 로그가 찍히고 나머지는 계속 진행되다가 최종적으로 마무리 된다. 

 

 

 

 

 

Remote Chunking 

 

원격 chunking에 대해 알아보자. 원격 chunk는 스텝을 처리할 때 여러 프로세스로 분할하여 처리한다. master 쪽에서 단일 프로세스로 처리하고 나머지 작업을 원격에게 채널을 통해 전달한다. 

 

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking

 

 

성능이 잘 나오려면 매니저 쪽에서 병목이 없이 원할하게 처리를 해야한다. 

 

여러 프로세스를 활용하기 때문에 많은 데이터를 처리할 때 고려해볼 수는 있다. 

 

현재 실습에서는 pass한다. 

 

여러 개의 프로세스에서 배치 잡을 처리한다는 정도로 이해하고 넘어간다. 

 

 

 

 

Partitioning 

 

파티셔닝은 단일 프로세스에서 마스터(매니저) 스텝과 워커 스텝을 두고 마스터가 만들어준 파티션 단위로 스텝을 병렬로 처리하는 방식이다. 

 

패러럴 스텝에서는 지정한 스텝들에 대해서만 실행할 수 있었는데, 파티셔닝은 더 많은 스텝을 양성해서 실행한다. 

 

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning

 

 

코드 구현은 다음과 같다. 

 

/**
 * 단일 프로세스에서 마스터 스텝과 워크 스텝을 두고, 마스터 스텝에서 생성해준 파티션 단위로 스텝을 병렬처리한다.
 */
@Configuration
@AllArgsConstructor
public class PartitioningJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private static final int PARTITION_SIZE = 100;

    @Bean
    public Job partitioningJob(Step masterStep) {
        return jobBuilderFactory.get("partitioningJob")
                .incrementer(new RunIdIncrementer())
                .start(masterStep)
                .build();
    }

    @JobScope
    @Bean
    public Step masterStep(Partitioner partitioner,
                           PartitionHandler partitionHandler) {
        return stepBuilderFactory.get("masterStep")
                .partitioner("anotherStep", partitioner)
                .partitionHandler(partitionHandler)
                .build();
    }

    @StepScope
    @Bean
    public Partitioner partitioner() {
        SimplePartitioner partitioner = new SimplePartitioner();
        partitioner.partition(PARTITION_SIZE);
        return partitioner;
    }

    @StepScope
    @Bean
    public TaskExecutorPartitionHandler partitionHandler(Step anotherStep,
                                                         TaskExecutor taskExecutor) {
        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
        partitionHandler.setStep(anotherStep);
        partitionHandler.setGridSize(PARTITION_SIZE);
        partitionHandler.setTaskExecutor(taskExecutor);
        return partitionHandler;
    }
}

 

 

파티션 단위를 100으로 설정하였다. 

 

partitionHandler에서는 동시 실행할 스텝을 지정해준다. anotherStep을 활용한다. 

 

taskExecutor에서 limit을 4개로 주었기 때문에 동시에 실행되는 스레드를 4개씩 실행되도록 한다. 

 

@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("spring-batch-task-executor");
    taskExecutor.setConcurrencyLimit(4);
    return taskExecutor;
}

 

 

 

 

지정된 스텝 말고 동적으로 스텝으로 처리해야하는 경우 파티셔닝 기법으로 처리해주도록 한다. 

 

 

 

 

 

Mock, Static Mock 테스트 

 

Player와 PlayerService를 바탕으로 테스트 코드를 작성해보자. 

@Service
public class PlayerSalaryService {

   public PlayerSalaryDto calcSalary(PlayerDto player) {
      int salary = (Year.now().getValue() - player.getBirthYear()) * 1000000;
      return PlayerSalaryDto.of(player, salary);
   }
}

 

하고자 하는 테스트는 calcSalary() 메서드이다. 

 

먼저 inline으로 mockito를 사용하기 위해 다음과 같은 dependency를 추가하자. 

 

testImplementation 'org.mockito:mockito-inline:3.8.0'

 

테스트 코드는 다음과 같다.

 

public class PlayerSalaryServiceTest {

   private PlayerSalaryService playerSalaryService;

   @BeforeEach
   public void setup() {
      playerSalaryService = new PlayerSalaryService();
   }

   @Test
   public void calcSalary() {
      // given
      PlayerDto mockPlayer = mock(PlayerDto.class);
      when(mockPlayer.getBirthYear()).thenReturn(1980);

      // when
      PlayerSalaryDto result = playerSalaryService.calcSalary(mockPlayer);

      // then
      Assertions.assertEquals(result.getSalary(), 40000000);
   }
}

 

Mocking할 클래스를 지정하여 PlayerDto를 가져온다. 

 

생년 월일을 우리가 원하는 값으로 설정해준다. 테스트 코드에서는 1980으로 설정하였다. 

 

result 값으로 40000000을 리턴하는 것을 검증한다. 

 

이렇게 할 때 테스트 코드가 통과하는 것을 볼 수 있다. 그런데 이 테스트는 완전하지 못한 테스트이다. 왜 그럴까? 

 

테스트는 항상 언제 돌렸을 때도 돌려야 하는데 이 테스트는 올해 돌리면 성공이지만 내년이면 실패일 것이다. 왜냐하면 service 클래스에서 Year.now() 메서드를 통해 올해의 연도를 기반으로 로직을 수행하기 때문이다. 

 

그렇다면 어떻게 해결해야 할까? 

 

이를 Static Method를 모킹 하는 방식으로 해결할 수 있다. 다음과 같이 StaticMock을 통해 Year.now 까지 모킹하도록 한다. 

 

@Test
public void calcSalary() {
   // given
   Year mockYear = mock(Year.class);
   when(mockYear.getValue()).thenReturn(2020);
   Mockito.mockStatic(Year.class).when(Year::now).thenReturn(mockYear);

   PlayerDto mockPlayer = mock(PlayerDto.class);
   when(mockPlayer.getBirthYear()).thenReturn(1980);

   // when
   PlayerSalaryDto result = playerSalaryService.calcSalary(mockPlayer);

   // then
   Assertions.assertEquals(result.getSalary(), 40000000);
}

 

 

통합 테스트 - AssertFile

 

 

통합 테스트는 어떻게 할까? 

 

job이 완료되면 특정 파일이 생성되었다는 것을 검증해볼 수 있다. 

 


@SpringBatchTest
@SpringBootTest
@ExtendWith(SpringExtension.class)
@ActiveProfiles("test")
@ContextConfiguration(classes = {FlatFileJobConfig.class, BatchTestConfig.class, PlayerSalaryService.class})
public class FlatFileJobConfigTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void success() throws Exception {
        // when
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // then
        Assertions.assertEquals(execution.getExitStatus(), ExitStatus.COMPLETED);
        AssertFile.assertFileEquals(new FileSystemResource("spring-batch-practice/player-salary-list.txt"),
                new FileSystemResource("spring-batch-practice/succeed-player-salary-list.txt"));
    }
}

 

이때 스프링 배치가 제공하는 AssertFile을 사용하면 손쉽게 테스트 코드를 작성할 수 있다. 

 

 

 

 


참고 자료 

- Spring Batch Reference Doc: https://docs.spring.io/spring-batch/docs/current/reference/html/

- 패스트캠퍼스: 한 번에 끝내는 Spring 완.전.판 초격차 패키지 Online.

 

 

 

 

 

반응형