본문 바로가기

API

스프링 스케줄러와 배치 #2 - Spring Batch

개인 학습 용 포스팅 

 

# 스케줄러만으로도 약속된 로직을 실행시킬 수 있는데 굳이 배치를 사용하는 이유 ?

  1.  대용량 데이터 처리에 최적화
  2. 데이터를 청크 단위로 처리하거나 병렬 실행, 재시작, 실패 처리 같은 시나리오를 통한 로직 구현 가능
  3. 세밀한 트랜잭션
  4. 로직 실행 시, 실패한 지점부터 다시 시작할 수 있도록 설계 가능
  5. 리더-프로세서-라이터 구조로  데이터 소스에서 읽고, 가공하고, 저장하는 역할과 구조가 분리되어 있어서 유지보수 및 확장성 용이

 

 Scheduler + Transaction 의 한계

@Scheduled 메서드 내에서 트랜잭션을 걸면 전체 로직이 하나의 트랜잭션으로 처리
   → 간단한 작업에는 지장이 없지만, 복잡한 작업이나 다량의 데이터 처리시 문제가 될 수 있음.

  • 전체 롤백의 위험 → 데이터 처리 중 90% 성공했더라도 마지막에 에러 발생 ▷ 전체 롤백
  • 중간 상태 저장 어려움 → 실패 시 처음부터 재실행
  • 대용량 처리 시 성능 저하 → 수천 건, 수만 건 데이터 한 번에 처리시 메모리/성능 이슈 발생

✅ Spring Batch - Transaction 을 쓰면 좋은 이유

Spring Batch는 작업을 Step → Chunk 단위로 나누고, 각 단계에 별도 트랜잭션을 부여할 수 있어 세밀한 실패 복구가 가능

If ) 1,000건 데이터를 100건씩 나누어 처리한다고 가정

  • 1~100건 처리 성공 후 커밋
  • 101~200 처리 중 오류 발생 시, 1~100은 유지되고 101부터 재시작 가능
  • 실패한 청크만 재시도하는 fail-safe 구조
  • JobRepository를 통한 실행 상태 추적 가능
    (이해가 용이하게 여기서 chuk 는 단위 데이터 건수를 생갹하면 편하다 !)

🔍 정리

  Scheduler + Transaction Batch + Transaction
트랜잭션 단위 전체 작업 1개 Chunk 단위
실패 시 동작 전체 롤백 실패 청크만 재시도
중간 저장 불가능 가능 (JobRepository)
대용량 처리 성능 이슈 발생 가능 효율적 처리 가능

 

# Spring Batch 관련 용어 정리

 

  • Job: 하나의 배치 실행 단위로 여러 Step을 포함함
  • Step: Job 내에서 실행되는 개별 처리 단계
  • Chunk: 데이터를 일정 개수씩 묶어 읽고, 처리하고, 저장하는 단위
  • ItemReader: 외부에서 데이터를 읽어오는 컴포넌트
  • ItemProcessor: 읽은 데이터를 가공하거나 필터링하는 로직
  • ItemWriter: 처리된 데이터를 최종 목적지(DB, 파일 등)에 저장
  • JobRepository: Job/Step의 실행 상태 및 이력을 저장하는 저장소
  • JobLauncher: Job을 실제로 실행시키는 트리거 역할 수행
  • JobParameters: Job 실행 시 전달하는 파라미터 (재실행 구분에도 사용됨)
  • ExecutionContext: 실행 중 상태 정보나 중간 결과를 저장하는 공간
  • JobInstance: 동일 Job 정의에 대해 고유 파라미터로 실행된 "논리적 실행 단위"
  • JobExecution: 하나의 JobInstance 실행에 대한 실제 실행 정보
  • StepExecution: 하나의 Step 실행에 대한 실행 정보
  • SkipPolicy: 오류가 발생했을 때 해당 항목을 건너뛸지 여부를 결정하는 정책
  • RetryPolicy: 오류 발생 시 재시도 횟수, 조건 등을 설정하는 정책
  • CompositeItemProcessor: 여러 Processor를 체이닝해서 순차 처리할 수 있게 해주는 구조
  • Tasklet: Chunk 방식이 아닌 단일 작업 로직을 직접 작성할 때 사용하는 Step 방식
  • Decider (JobExecutionDecider): 조건 분기에 따라 다음 Step을 동적으로 결정하는 로직
  • Partitioning: 데이터를 분할해 여러 Thread나 서버에서 병렬 처리하기 위한 구조
  • MultithreadedStep: Step 내부를 다중 쓰레드로 병렬 처리하도록 구성
  • ThrottleLimit: 동시에 실행되는 Thread의 최대 수를 제한하는 설정
  • Flow / FlowJob: Step들을 논리적 흐름(분기, 병합 등)으로 구성할 수 있는 구조
  • Listener: Job 또는 Step 전/후처리에 사용되는 이벤트 후킹 인터페이스
  • ExitStatus: Step 또는 Job이 종료된 결과 상태를 나타내는 값 (예: COMPLETED, FAILED 등)

# 다양한 케이스 별 Bean 등록 

// ✅ 1. Tasklet 기반 Step
@Bean
public Step taskletStep() {
    return stepBuilderFactory.get("taskletStep")
        .tasklet((contribution, chunkContext) -> {
            // 단일 처리용 기본 Step입니다.
            // 로그 출력, 파일 삭제 등 단순한 작업에 적합합니다.
            System.out.println("Tasklet 실행");
            return RepeatStatus.FINISHED; // 작업 완료 표시
        })
        .build();
}

 

// ✅ 2. Chunk 기반 Step (Reader → Processor → Writer)
@Bean
public Step chunkStep() {
    return stepBuilderFactory.get("chunkStep")
        .<String, String>chunk(3) // 3개씩 트랜잭션으로 묶어 처리
        .reader(() -> "데이터") // 간단 Reader
        .processor(item -> item + " 처리됨") // 처리 과정
        .writer(items -> items.forEach(System.out::println)) // 출력
        .build();
}

 

// ✅ 3. Retry + Skip 처리 포함 Chunk Step
@Bean
public Step retrySkipStep() {
    return stepBuilderFactory.get("retrySkipStep")
        .<String, String>chunk(5) // 트랜잭션 단위
        .reader(() -> {
            throw new IllegalArgumentException("읽기 중 예외 발생");
        })
        .processor(item -> item)
        .writer(System.out::println)
        .faultTolerant() // 예외를 허용하고 처리
        .retry(IllegalArgumentException.class) // 재시도 대상
        .retryLimit(2) // 최대 2회 재시도
        .skip(IllegalArgumentException.class) // 건너뛰기 대상
        .skipLimit(3) // 최대 3건까지 skip 가능
        .build();
}

 

// ✅ 4. Step 간 순차 실행 Job
@Bean
public Job sequentialJob() {
    return jobBuilderFactory.get("sequentialJob")
        .start(step1()) // 첫 번째 Step
        .next(step2()) // 다음 Step 순차 실행
        .build();
}

 

// ✅ 5. Step 분기 – JobExecutionDecider 사용
@Configuration // 이 클래스는 Spring 설정 클래스임을 명시
@RequiredArgsConstructor // 생성자 주입을 자동으로 생성 (final 필드 주입)
@EnableBatchProcessing // Spring Batch 기능을 활성화 (JobBuilderFactory, StepBuilderFactory 등 자동 주입)

public class ConditionalJobConfig {

    private final JobBuilderFactory jobBuilderFactory; // Job 빌더 생성용 팩토리
    private final StepBuilderFactory stepBuilderFactory; // Step 빌더 생성용 팩토리

    @Bean
    public Job conditionalJob() {
        return jobBuilderFactory.get("conditionalJob") // Job 이름 지정
            .start(initStep()) // 첫 번째 Step: 초기 작업
            .next(decider()) // Decider 실행: 분기 조건 판단
            .from(decider()).on("ODD").to(oddStep()) // Decider 결과가 "ODD"면 oddStep으로 이동
            .from(decider()).on("EVEN").to(evenStep()) // Decider 결과가 "EVEN"이면 evenStep으로 이동
            .end() // Job 흐름 종료
            .build(); // Job 빌드 완료
    }

    @Bean
    public Step initStep() {
        return stepBuilderFactory.get("initStep") // Step 이름: initStep
            .tasklet((contribution, chunkContext) -> { // 단일 태스크 작업 정의 (Tasklet)
                System.out.println("✅ [initStep] 초기 작업 실행"); // 로그 출력
                return RepeatStatus.FINISHED; // Step 정상 완료 상태 반환
            })
            .build(); // Step 빌드 완료
    }

    @Bean
    public JobExecutionDecider decider() {
        return (jobExecution, stepExecution) -> { // Job 및 Step 실행 정보 기반으로 판단 가능
            long current = System.currentTimeMillis(); // 현재 시각 밀리초
            String status = (current % 2 == 0) ? "EVEN" : "ODD"; // 짝/홀 여부 판단
            System.out.println("🧭 [decider] 현재 시각 기준 분기 결과: " + status); // 결과 출력
            return new FlowExecutionStatus(status); // Flow 분기 결과 반환
        };
    }

    @Bean
    public Step oddStep() {
        return stepBuilderFactory.get("oddStep") // Step 이름: oddStep
            .tasklet((contribution, chunkContext) -> {
                System.out.println("🔴 [oddStep] ODD 조건일 때 실행됨");
                return RepeatStatus.FINISHED; // Step 완료
            })
            .build();
    }

    @Bean
    public Step evenStep() {
        return stepBuilderFactory.get("evenStep") // Step 이름: evenStep
            .tasklet((contribution, chunkContext) -> {
                System.out.println("🔵 [evenStep] EVEN 조건일 때 실행됨");
                return RepeatStatus.FINISHED; // Step 완료
            })
            .build();
    }
}

 

// ✅ 6. 예외 발생 시 커스텀 ExitStatus 반환
@Bean
public Step customFailStep() {
    return stepBuilderFactory.get("customFailStep")
        .tasklet((contribution, chunkContext) -> {
            // 예외 발생 시 ExitStatus를 사용자 정의로 설정
            contribution.setExitStatus(new ExitStatus("CUSTOM_FAIL"));
            throw new RuntimeException("강제 실패");
        })
        .build();
}

 

// ✅ 7. JobParameter 사용 – 동적 파라미터 처리
@Bean
public Step parameterStep() {
    return stepBuilderFactory.get("parameterStep")
        .tasklet((contribution, chunkContext) -> {
            // Job 실행 시 전달된 파라미터 사용 예시
            String value = chunkContext.getStepContext()
                .getJobParameters().get("myParam").toString();
            System.out.println("입력된 파라미터: " + value);
            return RepeatStatus.FINISHED;
        })
        .build();
}
// ✅ 8. Multi-threaded Step – 병렬 처리
@Bean
public Step multithreadedStep() {
    return stepBuilderFactory.get("multithreadedStep")
        .<Integer, Integer>chunk(5)
        .reader(myReader()) // 사용자 정의 reader
        .writer(myWriter()) // 사용자 정의 writer
        .taskExecutor(new SimpleAsyncTaskExecutor()) // 멀티스레드 실행
        .throttleLimit(4) // 최대 4개 스레드 동시 처리
        .build();
}

 

// ✅ 9. StepScope Bean 사용 – 런타임 파라미터 적용
@Bean
@StepScope // 런타임 시 주입이 필요한 경우 사용
public FlatFileItemReader<User> userReader(
    @Value("#{jobParameters['filePath']}") String filePath) {
    // 실행 시 JobParameter로 전달된 파일 경로를 사용하여 CSV 파일을 읽음
    return new FlatFileItemReaderBuilder<User>()
        .name("userReader")
        .resource(new FileSystemResource(filePath))
        .targetType(User.class)
        .delimited()
        .names("id", "name")
        .build();
}

 

위 배치들을 @configuration 클래스에 작성   파일 예 ex ) BatchConfiguration

@Configuration
public class TaskletStepConfig {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public Job myJob() {
        return jobBuilderFactory.get("myJob")
            .start(myStep())
            .build();
    }

    @Bean
    public Step myStep() {
        return stepBuilderFactory.get("myStep")
            .tasklet((contribution, chunkContext) -> {
                System.out.println(">>> 배치 Step 실행됨");
                return RepeatStatus.FINISHED;
            })
            .build();
    }
}

 


@Bean 으로 등록된 job 을 실행시키는 방법 ! 

/**
 * ✅ Spring Batch Job 실행 예제
 *
 * 이 클래스는 우리가 @Bean으로 등록해둔 Job을 실제로 실행시키는 역할을 합니다.
 * 아래 방식으로 Job을 수동 실행할 수 있습니다.
 *
 * ✔ 구성 설명:
 * - @SpringBootApplication: 스프링 부트 애플리케이션 진입점입니다.
 * - @EnableBatchProcessing: 배치 관련 설정을 활성화합니다.
 * - CommandLineRunner: Spring Boot 실행 후 run() 메서드 자동 실행
 *
 * ✔ 실행 흐름:
 * 1. JobLauncher를 통해 Job을 실행합니다.
 * 2. JobParameters에 run.id 같은 유니크 값을 포함하면 중복 실행 가능
 * 3. 실행 결과는 JobExecution으로 확인 가능합니다.
 */

@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication implements CommandLineRunner {

    private final JobLauncher jobLauncher;
    private final Job myJob; // 실행할 Job. ex: simpleTaskletJob()

    public BatchApplication(JobLauncher jobLauncher, Job myJob) {
        this.jobLauncher = jobLauncher;
        this.myJob = myJob;
    }

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        JobParameters parameters = new JobParametersBuilder()
            .addLong("run.id", System.currentTimeMillis()) // 중복 실행 방지용 파라미터
            .toJobParameters();

        JobExecution execution = jobLauncher.run(myJob, parameters);

        System.out.println("Job 실행 상태: " + execution.getStatus());
    }
}

 

# application.yml 에 설정을 해주면 위 소스의 run 부분 아래 처럼 생략 가능

# application.yml 설정으로 Job 자동 실행도 가능
spring:
  batch:
    job:
      names: simpleTaskletJob
      
           
 # spring.batch.job.names=jobA,jobB로 설정하면, JobA → JobB 순서로 동기적으로 실행됩니다.
 # 이 순서를 제어하는 주체는 Spring Boot 내부의 JobLauncherCommandLineRunner입니다.
 # 따라서 JobA 실행 중 실패하거나 강제 실패 처리된 경우, JobB는 실행되지 않습니다.
@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

 


# configutaion 클래스에서 custom 으로 job, step, reader, writer 일련 과정 눈으로 보기

- 1 : 시나리오 없이 그냥 정리 (생략된 설명 많음)

/**
 * ✅ CustomReaderWriterConfig.java
 *
 * 설명:
 * - 사용자 정의 ItemReader와 ItemWriter를 사용하는 Chunk 기반 배치 구성입니다.
 * - Reader는 Queue에서 데이터를 읽고,
 * - Writer는 콘솔에 결과를 출력합니다.
 *
 * 실행 방법:
 * 1. 이 클래스를 @Configuration으로 등록
 * 2. Application 실행 시 Job을 등록하거나 jobLauncher.run()으로 실행
 * 3. run.id 등의 JobParameter 포함 필수 (중복 실행 방지)
 */

@Configuration
public class CustomReaderWriterConfig {

    @Autowired private JobBuilderFactory jobBuilderFactory;
    @Autowired private StepBuilderFactory stepBuilderFactory;

    // 내부 Queue - 데이터를 사용자 정의 Reader가 읽음
    private final Queue<String> queue = new LinkedList<>(List.of("사과", "바나나", "딸기", "망고"));

    /**
     * ✅ 전체 배치 Job 정의
     */
    @Bean
    public Job customRWJob() {
        return jobBuilderFactory.get("customRWJob")
                .start(customStep())
                .build();
    }

    /**
     * ✅ 사용자 정의 Reader + Writer 포함 Chunk Step 구성
     */
    @Bean
    public Step customStep() {
        return stepBuilderFactory.get("customStep")
            .<String, String>chunk(2) // 2개씩 트랜잭션 처리
            .reader(customItemReader()) // 사용자 정의 Reader
            .writer(customItemWriter()) // 사용자 정의 Writer
            .build();
    }

    /**
     * ✅ 사용자 정의 Reader
     *
     * 설명:
     * - Queue에서 데이터를 하나씩 꺼내는 로직을 직접 구현
     * - 더 이상 남은 데이터가 없으면 null을 리턴하여 종료됨
     */
    @Bean
    public ItemReader<String> customItemReader() {
        return new ItemReader<>() {
            @Override
            public String read() throws Exception {
                return queue.poll(); // Queue에서 하나 꺼냄 (없으면 null)
            }
        };
    }

    /**
     * ✅ 사용자 정의 Writer
     *
     * 설명:
     * - Item 목록을 받아 하나씩 콘솔에 출력
     * - DB 저장, 파일 출력, REST 전송 등으로 대체 가능
     */
    @Bean
    public ItemWriter<String> customItemWriter() {
        return new ItemWriter<>() {
            @Override
            public void write(List<? extends String> items) throws Exception {
                for (String item : items) {
                    System.out.println(">> 처리 완료: " + item);
                }
            }
        };
    }
}

 

- 2 : 더미 데이터 사용 시 (생략된 설명 많음)

/**
 * ✅ 유저 리스트 n건씩 조회해서 콘솔에 출력하는 배치 예제
 *
 * 설명:
 * - 이 예제는 chunk 기반의 Step으로, 유저 데이터를 5건씩 묶어 읽고 출력합니다.
 * - 사용자 정의 Reader는 리스트에서 유저를 순차적으로 꺼냅니다.
 * - Writer는 읽어온 유저 목록을 한 번에 콘솔에 출력합니다.
 * - 실제 환경에서는 DB → JPA → Writer 방식으로 확장 가능합니다.
 *
 * 실행 방식:
 * 1. @SpringBootApplication + @EnableBatchProcessing 구성 필요
 * 2. 이 Job을 포함한 Application을 실행하거나 jobLauncher로 수동 실행 가능
 * 3. chunk(5)로 설정되어 있으므로 5건마다 트랜잭션 커밋
 */

@Configuration
public class UserListPrintBatch {

    @Autowired private JobBuilderFactory jobBuilderFactory;
    @Autowired private StepBuilderFactory stepBuilderFactory;

    // 샘플 유저 데이터
    private final List<String> users = List.of("홍길동", "김철수", "이영희", "박민지", "최하늘", "정하나", "오태양");

    private final Queue<String> userQueue = new LinkedList<>(users);

    @Bean
    public Job userPrintJob() {
        return jobBuilderFactory.get("userPrintJob")
                .start(userPrintStep())
                .build();
    }

    @Bean
    public Step userPrintStep() {
        return stepBuilderFactory.get("userPrintStep")
                .<String, String>chunk(5) // 5명씩 묶어서 처리
                .reader(() -> userQueue.poll()) // 큐에서 한 명씩 꺼냄, 없으면 null → 종료
                .writer(items -> {
                    System.out.println("===== 유저 그룹 출력 =====");
                    items.forEach(user -> System.out.println("> " + user));
                    System.out.println("=========================");
                })
                .build();
    }
}

 

- 3 종합 : 실데이터 사용 시  ★

/**
 * ✅ 실제 DB에서 유저 목록을 조회 → DTO로 변환 → 콘솔에 출력하는 Spring Batch 구성
 *
 * 설명:
 * - JpaPagingItemReader를 사용해 DB에서 User 엔티티 데이터를 읽어옵니다.
 * - Processor에서 이메일을 마스킹하는 등의 DTO 변환 작업을 수행합니다.
 * - Writer에서는 변환된 UserDto 객체를 콘솔에 출력합니다.
 *
 * 요구 전제:
 * - 엔티티: User (id, name, email 등)
 * - DTO 클래스: UserDto (id, name, maskedEmail)
 * - EntityManagerFactory 등 JPA 설정 완료 필요
 */
@Configuration // 스프링 설정 클래스임을 의미함
public class UserPrintDbBatch {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Bean // Spring Batch Job 구성
    public Job userPrintFromDbJob() {
        return jobBuilderFactory.get("userPrintFromDbJob") // Job 이름 지정
                .start(userPrintFromDbStep())              // 시작 Step 지정
                .build();                                  // Job 빌드 완료
    }

    /**
     * ✅ chunk 기반 Step 구성 (User → UserDto)
     *
     * - Reader: User 엔티티 조회
     * - Processor: 이메일 마스킹 포함한 UserDto 변환
     * - Writer: UserDto 콘솔 출력
     * - 재시도, Skip 정책, 리스너 통한 예외 로깅까지 포함
     */
    @Bean // Step 구성
    public Step userPrintFromDbStep() {
        return stepBuilderFactory.get("userPrintFromDbStep")  // Step 이름 지정
                .<User, UserDto>chunk(5)                       // 5건 단위로 chunk 트랜잭션 커밋
                .reader(jpaUserReader())                       // JpaPagingItemReader를 사용하여 DB에서 User 읽기
                .processor(userProcessor())                   // Processor로 User → UserDto 변환
                .writer(userDtoWriter())                      // 변환된 UserDto를 출력하는 Writer

                // ✅ 예외 발생 시 재시도 및 Skip 정책 설정
                .faultTolerant()
                .retry(TransientDataAccessException.class)     // ✅ 재시도 대상: 일시적 DB 장애 등 복구 가능한 예외
                .retryLimit(3)                                 // 최대 3회까지 재시도
                .skip(ValidationException.class)               // ✅ 스킵 대상: 유효성 검증 실패 등 데이터 오류
                .skipLimit(10)                                 // 최대 10건까지 skip 허용

                // ✅ Skip 로그 출력 리스너
                .listener(new SkipListener<User, UserDto>() {
                    @Override
                    public void onSkipInRead(Throwable t) {
                        System.err.println("⛔ [읽기 Skip] 사유: " + t.getClass().getSimpleName()
                                           + " - " + t.getMessage());
                    }

                    @Override
                    public void onSkipInWrite(UserDto item, Throwable t) {
                        System.err.println("⛔ [쓰기 Skip] 대상: " + item
                                           + ", 사유: " + t.getClass().getSimpleName()
                                           + " - " + t.getMessage());
                    }

                    @Override
                    public void onSkipInProcess(User item, Throwable t) {
                        System.err.println("⛔ [변환 Skip] 유저 ID: " + item.getId()
                                           + ", 이름: " + item.getName()
                                           + ", 사유: " + t.getClass().getSimpleName()
                                           + " - " + t.getMessage());
                    }
                })

                // ✅ Processor 예외 발생 시 개별 로그 출력용 리스너 추가
                .listener(loggingProcessListener())

                .build(); // Step 빌드 완료
    }

    /**
     * ✅ DB에서 유저 목록을 조회하는 Reader
     */
    @Bean // Reader 구성
    public JpaPagingItemReader<User> jpaUserReader() {
        return new JpaPagingItemReaderBuilder<User>()
                .name("jpaUserReader")                         // Reader 이름 (메트릭, 디버깅 등에서 사용)
                .entityManagerFactory(entityManagerFactory)    // JPA EntityManagerFactory 연결
                .queryString("SELECT u FROM User u ORDER BY u.id ASC") // JPQL 쿼리 정의
                .pageSize(5)                                   // 5건 단위로 페이징 조회
                .build();                                      // Reader 빌드 완료
    }

    /**
     * ✅ 커스텀 Processor: User → UserDto 변환
     *
     * - 이메일 주소 도메인 부분을 마스킹 처리
     * - 엔티티의 민감한 정보를 숨겨서 Writer로 넘길 수 있음
     * - 이메일 없을 경우 예외 발생시켜 skip 처리
     */
    @Bean // Processor 구성
    public ItemProcessor<User, UserDto> userProcessor() {
        return user -> {
            if (user.getEmail() == null || !user.getEmail().contains("@")) {
                // ❗ 유효하지 않은 이메일은 유효성 검증 예외로 throw → skip 대상
                throw new ValidationException("유효하지 않은 이메일 형식입니다. ID: " + user.getId());
            }

            String maskedEmail = user.getEmail().replaceAll("@.*", "@****"); // 이메일 도메인 마스킹
            return new UserDto(
                user.getId(),
                user.getName(),
                maskedEmail
            );
        };
    }

    /**
     * ✅ Processor 예외 발생 시 로그 출력 리스너
     */
    @Bean
    public ItemProcessListener<User, UserDto> loggingProcessListener() {
        return new ItemProcessListener<User, UserDto>() {
            @Override
            public void beforeProcess(User item) { /* 처리 전 작업 생략 */ }

            @Override
            public void afterProcess(User item, UserDto result) { /* 처리 후 작업 생략 */ }

            @Override
            public void onProcessError(User item, Exception e) {
                System.err.println("❌ [Processor 오류] 유저 ID: " + item.getId()
                                 + ", 이름: " + item.getName()
                                 + ", 예외 종류: " + e.getClass().getSimpleName()
                                 + ", 메시지: " + e.getMessage());
            }
        };
    }

    /**
     * ✅ Writer: 변환된 UserDto 객체 출력
     */
    @Bean // Writer 구성
    public ItemWriter<UserDto> userDtoWriter() {
        return dtos -> {
            System.out.println("📄 변환된 유저 정보 출력 ====");
            for (UserDto dto : dtos) {
                System.out.println(
                    "ID: " + dto.getId() +
                    ", 이름: " + dto.getName() +
                    ", 이메일: " + dto.getMaskedEmail()
                );
            }
            System.out.println("===========================");
        };
    }
}

# 트랜잭션

트랜잭션은 어디서 걸릴까?

구분트랜잭션 범위커밋 시점
Chunk Step chunk(n) 단위 n건 처리 후 커밋
Tasklet Step Step 전체 Tasklet 로직 완료 시 커밋
Job 없음 (컨트롤러 역할) 각 Step의 결과만 기록됨
/**
 * ✅ Tasklet 기반 Step 예제
 *
 * 설명:
 * - Tasklet은 단일 작업 실행에 적합한 Step 구성입니다.
 * - 예를 들어 로그 출력, 파일 삭제, 초기화 작업 등에 자주 사용됩니다.
 * - 이 방식은 반복 구조가 없고, 내부 로직이 한 번만 실행됩니다.
 * - 트랜잭션은 Step 전체에 대해 단일하게 적용됩니다.
 *
 * 실행 흐름:
 * - Step 시작 → tasklet() 내부 코드 실행 → RepeatStatus 반환 → 종료
 */

@Bean
public Step taskletStep() {
    return stepBuilderFactory.get("taskletStep")
        .tasklet((contribution, chunkContext) -> {
            System.out.println(">> Tasklet 실행: 단일 로그 출력");
            return RepeatStatus.FINISHED;
        })
        .build();
}




/**
 * ✅ Chunk 기반 Step 예제
 *
 * 설명:
 * - Chunk 기반 구조는 대용량 데이터를 일정 크기(chunk)로 분할하여 반복 처리할 때 적합합니다.
 * - Reader → Processor → Writer 흐름으로 구성되며, 이 흐름이 chunk 단위로 반복됩니다.
 * - 트랜잭션은 chunk(n)마다 커밋되며, 예외 처리 시 해당 chunk만 rollback됩니다.
 * - retry(), skip(), faultTolerant() 같은 세부 설정이 가능해 예외 복원력도 우수합니다.
 *
 * 실행 흐름:
 * - Step 시작 → Reader에서 n개 아이템 읽기 → Processor로 변환 → Writer로 쓰기 → 커밋 → 반복
 *
 * 사용 예시:
 * - CSV → DB 저장, DB → 파일 출력, 마이그레이션, 정제 등 반복 처리 작업
 */

@Bean
public Step chunkStep() {
    return stepBuilderFactory.get("chunkStep")
        .<String, String>chunk(3) // 3개 단위로 트랜잭션 커밋
        .reader(() -> "데이터") // 간단한 Reader 구현
        .processor(item -> item + " 가공") // 가공 처리
        .writer(items -> items.forEach(System.out::println)) // 출력
        .build();
}

 


# read - processor - write 데이터 넘어가는 과정

- Spring Batch의 chunk 기반 처리 흐름에서는 Reader → Processor → Writer 순서로 데이터가 자동으로 넘어간다.

- 해당 동작은 Spring Batch 내부에서 잘 정의된 ItemStream 기반 처리 메커니즘에 따라 진행

1. Reader 가 아이템 1개를 읽는다
2. Processor 가 있으면 변환하고, 없으면 생략
3. Writer 는 지정된 chunk 크기만큼 모인 아이템을 한 번에 처리

If)  chunk(5)를 설정했다면:

  • Reader가 5개 아이템을 순차적으로 읽고
  • Processor를 거친 결과를 모아
  • Writer에게 한 번에 List<? extends T> 형태로 전달

💡 자동 전달이 가능한 이유

Spring Batch 내부의 SimpleChunkProcessor가 흐름 관리

  1. Reader가 반복 호출되어 하나씩 데이터를 읽고
  2. Processor를 통과 후
  3. List에 누적시켜
  4. Chunk 크기만큼 모이면 Writer에 전달

따라서 Reader/Processor/Writer만 정의하면, 중간의 데이터 전달 및 실행 순서는 Spring Batch가 자동으로 관리

 

# 소스 예시

// Reader: 유저 1명씩 반환
public User read() {
    return jdbcTemplate.queryForObject("SELECT ...");
}

// Processor (옵션)
public UserDto process(User user) {
    return new UserDto(user.getName());
}

// Writer: 5명씩 List<UserDto> 전달받음
public void write(List<? extends UserDto> items) {
    for (UserDto dto : items) {
        System.out.println(dto.getName());
    }
}

여기서 write() 메서드가 호출될 때마다 내부에 5건이 자동으로 담겨 있음

 

  • users는 우리가 메서드 호출로 넘기는 값이 아니라   Spring Batch가 Reader → (Processor →) Writer로 연결하면서 전달
  • chunk마다 리스트로 모은 데이터를 자동으로 Writer에 넣어서 호출해주는 구조.

 users라는 이름은 단지 람다 함수의 매개변수뿐이고 Spring Batch가 자동으로 전달해주는 List 형태의 데이터어떤 변수명으로 받을지는 개발자가 자유롭게 정할 수 있음