본문 바로가기
Programming/Java, Spring

Java 병렬 처리 ForkJoinPool 기본 작동 원리

by Renechoi 2024. 4. 11.

배경

업무를 하면서 ForkJoinPool을 이용한 병렬 처리를 사용해볼 일이 생겼다. 엄밀히 말하면 Stream API의 parrallelStream()이었다. 본 글은 그에 앞서 간단히 ForkJoinPool의 작동 원리를 공부하면서 정리한 글이다. 

 

작동 원리를 알아보자

ForkJoinPool은 자바 7에서 도입된 프레임워크로, 작업을 작은 단위로 분할하고, 이를 여러 스레드에 할당하여 병렬로 처리한다. 핵심 로직은 '분할 정복 알고리즘'에 기반한다. 먼저 핵심 개념 및 컴포넌트를 살펴보면 다음과 같다.

 

핵심 컴포넌트

📌 ForkJoinPool

ForkJoinTask를 실행하는데 사용되는 스레드 풀로, 알고리즘 작업의 균형을 동적으로 조정하는 역할을 한다.

 

📌 ForkJoinTask

작업의 기본 단위이다. 작업에는 결과를 반환하지 않는 작업(RecursiveAction)과 결과를 반환하는 작업(RecursiveTask)의 두 가지 형태가 있다.

 

📌 Work-Stealing Algorithm

ForkJoinPool을 사용한 알고리즘의 마스코트가 되는 큰 특징 중 하나이다. 말 그대로 스레드가 다른 작업의 작업을 “훔치”는 알고리즘이다. 그럼으로써 모든 스레드가 유휴시간 없이 계속해서 작업을 처리할 수 있도록 한다.

 

http://www.h-online.com/developer/features/The-fork-join-framework-in-Java-7-1762357.html

 

 

 

프로세스를 생각해보자. 먼저 작업이 제출된다. ForkJoinPool은 작업을 받고 다음 시퀀스 다이어그램에서처럼 작업을 Task로 분할한다. 이 작업에서 fork()가 호출된다.

 

 

Fork & Join

fork() 호출시 발생하는 일

  • 작업 분할: 작업은 작은 서브 태스크로 분할되어야 한다. 분할된 작업은 ForkJoinPool에 속한 스레드의 데큐(double-ended queue, deque)에 추가된다.
  • 비동기 작업
  • 작업처리: 각 스레드는 자신의 deque에서 작업을 가져와 처리한다. 작업 실행 중 다른 서브 태스크가 fork()를 통해 생성되면, 이 서브 태스크는 같은 deque의 앞쪽에 추가되어, 가장 최근에 추가된 작업부터 처리된다(LIFO).
  • flow: 작업 제출 → | 작업 스레드 Deque | --[오버플로우/부하 균형] → 다른 Deque로 재분배

 

 

실제 구현을 보면 ForkJoinPoolexternalPush 메소드를 호출하여 실행되는 것을 볼 수 있는데,fork() 메소드는 ForkJoinTask를 현재 스레드의 작업 큐에 추가한다. 만약 현재 스레드가 ForkJoinWorkerThread의 인스턴스인 경우, 해당 스레드의 작업 큐(workQueue)에 직접 접근하여 작업을 추가하고, 그렇지 않은 경우에는 ForkJoinPool.common을 통해 공통 풀에 작업을 제출한다.

 

어째서 ForkJoinWorkerThread 인 경우와 그렇지 않은 경우를 다르게 처리할까? 여기서 workQueue와 ForkJoinPool 이 관리하는 main 데큐이다. ForkJoinWorkerThread 가 아닌 경우는 일반 애플리케이션의 스레드에서 fork()를 호출한 경우일 것이며, 이 경우는 common으로 제출되는 것이다. ForkJoinPool은 이렇게 common 작업과 명시적인 ForkJoinPool의 작업을 나누어서 관리함으로써 스레드 안정성과 효율성을 높일 수 있었을 것이다.

 

join() 호출시 발생하는 일

  • join() 연산은 작업 완료를 동기화하기 위해 중요하다 !
  • 이전에 fork한 다른 작업의 완료를 기다리는 순서가 시작
  • 작업 훔치기(Wor-Stealing) 알고리즘 활용: 현재 스레드가 join()을 호출하고 대기 상태에 들어가면, ForkJoinPool은 다른 스레드가 유휴 상태에 빠지지 않도록 작업 훔치기 알고리즘을 활성화한다. 즉, 대기 중인 스레드가 다른 스레드의 작업 데큐(deque)에서 실행할 수 있는 태스크를 찾아 실행한다.
  • 결과 합병: 모든 서브 태스크의 실행이 완료되고, 그 결과가 준비되면, join() 메소드는 이 결과들을 합병하여 최종 결과를 생성한다.

 

join()은 내부에 awaitDone() 메서드를 통해 작업이 완료될 때까지 현재 스레드를 대기 상태로 만든다. 해당 메서드는 내부 로직이 복잡한데, 중요한 부분만 한 번 살펴보자.

awaitDone(pool, ran, interruptible, timed, nanos) {
  만약 (현재스레드가 ForkJoinWorkerThread라면) {
    풀과 작업큐 할당;
  } 아니면 {
    공용 풀 사용;
  }

  만약 (중단됨) 반환 ABNORMAL;
  만약 (작업이 완료됨) 반환 상태;

  가능하다면 작업 완료를 도움;

  동안 (작업이 완료되지 않음) {
    만약 (시간제한 있고 시간초과) 중단;
    만약 (다른 작업 도울 수 있으면) 도움;
    아니면 스레드 대기 또는 중지;
  }

  작업 상태 반환;
}

 

슈도코드를 보면, fork() 에서처럼 먼저 현재 스레드가 ForkJoinWorkerThread인지 확인하여 적절한 작업 풀을 사용하는지를 확인하고, 이후 스레드가 인터럽트되었는지 체크한다. 태스크의 완료 상태를 확인하고, 필요한 경우 ForkJoinPool의 작업 훔치기 알고리즘을 활용하여 다른 태스크의 완료를 돕는다. 이를 반복적으로 수행하며, 태스크가 완료되면 최종 상태를 반환한다.

 

https://image.slidesharecdn.com/parallel-asynchronous-programming-java-240220174521-c612f09e/75/parallelasynchronousprogrammingjavapptx-40-2048.jpg?cb=1708451651

 

그렇다면 task stealing은 언제 발생하는 것일까?

 

작업 훔치기

작업 훔치기가 발생하는 시나리오:

 

stealing 로직은 ForkJoinPool 내에서 특정 조건 하에 발생하는데, 전반적인 과정에 관여하면서 발생한다고 보아야 한다. 메커니즘의 본질은 스레드에 작업이 없으면 다른 작업의 데큐에서 태스크를 “훔쳐”실행하는 것이다. 따라서 본질적으로 리소스를 최대한 활용하고자 하는 것이 목적이다.

 

이런 점을 고려할 때, stealing 은 작업 데크가 소진되었을 때나, 작업의 규현을 맞추기 위해 재분배 할 때 발생한다.

 

실제 작동 예시 코드는 ForkJoinPoolscan 메소드에서 찾아볼 수 있었다. 이 메소드는 스레드가 자신의 작업 큐가 비어 있을 때 다른 스레드의 큐를 검사(스캔)하고, 가능한 작업을 찾아 실행한다.

 

 

코드에서 WorkQueue[] qs = queues;는 모든 스레드의 작업 큐를 포함하는 배열이다. for 루프는 이 배열을 순회하며, 다른 스레드의 큐에서 실행 가능한 작업(ForkJoinTask<?> t = a[k];)을 찾는다. 작업을 성공적으로 "훔쳤다면"(즉, WorkQueue.casSlotToNull(a, k, t)true를 반환하면), 해당 작업의 인덱스(q.base)를 업데이트하고 작업을 실행한다. 여기서 cas 연산으로 안전하게 slot을 Null로 변환하는 것을 Stealing으로 처리하는 것으로 볼 수 있다.

 

정리

요약하자면 🔥🔥

 

큰 작업이 ForkJoinPool에 제출되면, 이는 더 작은 하위 작업으로 나누어지며(Fork), 각 하위 작업은 병렬로 실행된다.

 

스레드가 작업을 완료하고 더 이상 할 일이 없을 때, 다른 스레드의 Deque에서 작업을 훔쳐서 실행(Work Stealing)한다.

 

모든 하위 작업이 완료되면, 그 결과는 최종 결과를 형성하기 위해 결합된다(Join).

 

 

 

 

자바에서 쉽게 사용하기 

 

Stream의 ParrallelStream

 

자바에서는 Stream API에서 지원하는 병렬 연산을 사용할 때 ForkJoinPool 알고리즘 기반의 병렬 처리를 제공한다. 일반 stream()을 parrallelStream()으로 바꾸는 것만으로도 쉽게사용할 수 있다. 

 

Stream API의 병렬 처리는 내부적으로 ForkJoinPool.commonPool()을 사용한다. 이 공통 풀은 JVM 당 하나만 존재하며, JVM 전반의 병렬 스트림 연산에 사용된다. 병렬 스트림 연산을 시작할 때, 스트림의 원소들은 작은 태스크로 분할되어 ForkJoinTask로 감싸진다. 이 태스크들은 ForkJoinPool에 제출되어 처리된다.

 

 

예시 코드 

 

예를 들어 엔티티를 조회하고, 간접 참조 방식으로 엔티티의 하위 항목들을 별도로 조회하여 조합하는 경우 아래와 같이 일반 stream 방식을 parrallel 방식으로 변경할 수 있다. 

 

    public Page<TicketInfoWithMesh> searchWithMesh(TicketSearchRequest searchRequest, Pageable pageable) {
        String originalCarNumber = fetch...
        Page<TicketInfoWithMesh> ticketInfoWithMeshes = ticketCrudService.search( ...// 생략
        
        ticketInfoWithMeshes.forEach(ticket -> {
             ticket.setCategoryInfo(categoryCrudService.search(ticket.getCategoryId()));
             ticket.setCarExchangeDetailsInfo(carExchangeService.fetchById(ticket.getCarExchangeDetailsId()));
        });

        return ticketInfoWithMeshes;
    }

 

👇🏻

 

   public Page<TicketInfoWithMesh> searchWithMeshParallel(TicketSearchRequest searchRequest, Pageable pageable) {
        String originalCarNumber = fetch...
        Page<TicketInfoWithMesh> ticketInfoWithMeshes = ticketCrudService.search( ...// 생략

        ticketInfoWithMeshes.getContent().parallelStream().forEach(ticket -> {
             ticket.setCategoryInfo(categoryCrudService.search(ticket.getCategoryId()));
             ticket.setCarExchangeDetailsInfo(carExchangeService.fetchById(ticket.getCarExchangeDetailsId()));
        });

        return ticketInfoWithMeshes;
    }

 

 

Stream이 ForkJoinPool을 호출하는 것을 어떻게 알 수 있을까? 

그런데 Stream AP의 병렬처리에서 JVM 이 공유하는 공통 풀의 스레드를 어떻게 사용한다는 것일까? 정말로 Stream API가 스레드 안전하게 병렬 처리를 하도록 구현이 된 게 맞을까? 문득 궁금해졌다.

 

내부 구현 코드를 따라가 보았다. 

 

다음과 같은 내부적으로 AbstractTask를 사용하는 것을 볼 수 있다.

 

 

AbstractTask 는 내부적으로 match, foreach 등의 연산을 사용할 때 확장하는 abstract 클래스인데, CountedCompleter를 확장하는 클래스이기 때문에 java.concurrnet의 forkjoinpool에서 설정된 pool size를 사용한다. 그렇다면 CountedCompleter 는 무엇일까? CountedCompleter 를 보면 package java.util.concurrent; 에 존재하는 클래스로서 forkhjointask를 확장한 abstract 클래스임을 볼 수 있다.

 

 

 

 

 

Abstracttask 의 내부 구현을 보면 Forkjoinpool에서 getCommonPoolParallelism() 를 통해 비트 연산으로 얻는 LEAF_TARGET 를 기반으로 compute() 연산을 하는 것을 볼 수 있다.

 

그렇기 때문에 자바에서 제공하는 Runtime.getRuntime().availableProcessors()) 메서드로 병렬 처리에서 사용하는 thread 개수를 알 수 있는 것이다.

 

그런데 실제로 병렬처리에서 사용하는 thread 개수는 이 개수가 아니다. 해당 값은 시스템의 물리적 CPU 코어 수 또는 논리적 프로세서 수(하이퍼스레딩이 활성화된 경우)이다. 실제 사용되는 스레드 개수는 ForkJoinPool.commonPool()를 통해 볼 수 있는데, 해당 값은 사용 가능한 스레드 수가 논리 CPU 코어 수보다 하나 적게 설정된다. -1을 하는 이유는 주로 시스템의 다른 부분과의 자원 공유를 고려하기 때문이다.

 

ForkJoinPool의 공용 풀은 JVM 내부에서 여러 부분에 의해 공유된다. 만약 모든 가능한 스레드(프로세서 수만큼)가 스트림 API에 의해 사용되면, 시스템의 다른 부분(예: UI 이벤트 처리, 기타 백그라운드 작업)이 영향을 받을 수 있을 것이다. 따라서, ForkJoinPool의 공용 풀에서 하나 적은 스레드를 사용하도록 설정하는 것은 이러한 다른 작업을 위해 최소한 하나의 프로세서(또는 스레드)를 남겨두기 위한 일종의 예방 조치인 것이다.

 

Stream API의 ForkJoin 프로세스 

 

Stream API는 병렬 처리의 시작은 ParallelStreamSupport 클래스를 활용하는 시점부터다. 병렬 스트림 연산 시, 이 클래스는 ForkJoinPool의 commonPool()을 사용하여 작업을 병렬로 분할하고 처리한다. 병렬 스트림의 각 연산은 내부적으로 Spliterator를 사용하여 데이터 소스를 분할할 수 있는 작은 단위로 나눈다. 이렇게 분할된 작업 단위는 ForkJoinTask의 구현체로 변환되어, ForkJoinPool에 제출된다.

 

Stream 패키지의 ForEachOps 클래스에서 split 연산을 하는 부분을 찾아보면 

 

 

데이터 소스를 두 개의 부분으로 나누는 것을 볼 수 있는데, 분할된 작업은 재귀적으로 다시 분할되어 데이터가 더 이상 분할할 수 없을 때까지, 즉 trySplit이 null을 반환할 때까지 계속된다. 각 분할 과정에서 생성된 작업이 ForkJoinTask의 인스턴스로 ForkJoinPool에 제출되는 것이다.

 

이렇게 병렬 스트림 연산이 시작되면, AbstractTask의 서브 클래스인 ForEachOps.ForEachTask가 생성되어, 실제 데이터 처리를 담당한다. 각 ForEachTask는 Spliterator를 사용하여 작업을 더 작은 서브 태스크로 분할하며, 재귀적으로 분할하여 ForkJoinPool에 넘겨주는 것을 볼 수 있다. 

 

 

 

단, 조심하자 

 

중요한 포인트가 있다. ForkJoinPool의 스레드는 작업이 완료될 때까지 공용 풀로 반환되지 않는다는 사실이다. 

 

병렬 처리 로직이 등장한 배경과 그 히스토리를 살펴보면, 사실 어플리케이션 수준의 병렬 연산은 CPU 집약적인 수준을 위해서 존재할 때 그 가치의 활용도나 안정성이 높다고 볼 수 있을 것 같다. 병렬 작업의 성격이 외부 시스템과의 입출력(I/O)을 포함하는 경우, 그 복잡성과 잠재적인 문제 발생 가능성은 기하급수적으로 증가한다. 예를 들어 이런 생각을 해보자. 다소 극단적으로 생각해보자. 

 

ForkJoinPool이 관리하는 12개의 스레드가 병렬로 데이터베이스 I/O 작업을 수행한다고 가정하자. 모든 스레드가 데이터베이스 연결을 성공적으로 맺었으나, 요청한 작업에 대한 데이터베이스의 응답이 지연되어 모든 스레드가 대기 상태에 빠진 상황이 발생한다면 어떻게 될까? ForkJoinPool 내 스레드들이 데이터베이스 I/O 작업의 완료를 기다리는 동안 다른 작업을 수행할 수 없게 되어, 시스템 전체가 성능 저하를 겪거나 심하면 장애로 이어질 수 있을 것이다. 

 

 ForkJoinPool 부작용 테스트로 확인한 결과

 

ForkJoinPool의 설계 원칙으로 작업이 완료될 때까지 스레드를 공용 풀로 반환하지 않는다는 사실은 분명 효율적인 스레드 활용과 빠른 작업 처리를 목적으로 할 것이다. 다만, 스레드가 외부 시스템의 응답에 종속되는 작업을 처리할 경우 예상치 못한 병목 현상을 초래할 수 있다는 점을 고려해야 한다. 

 

 

 

 

 

 

 


레퍼런스
- https://medium.com/geekculture/pitfalls-of-java-parallel-streams-731fe0c1eb5f
- https://medium.com/@reetesh043/a-deep-dive-into-javas-forkjoinpool-mechanics-556f82d160fb
- https://www.slideshare.net/slideshow/parallelasynchronousprogrammingjavapptx/266404044
- http://www.h-online.com/developer/features/The-fork-join-framework-in-Java-7-1762357.html
반응형