자바/모던 자바 인 액션

[모던 자바 인 액션] 7장. 병렬 데이터 처리와 성능

Rudtjs 2022. 10. 24. 08:49

이번에는 데이터 병렬 처리, 병렬 스트림의 성능, 포크/조인 프레임워크를 알아보겠습니다.

 

 7.1 병렬 스트림

병렬 스트림이란? 각각의 스트림에서 처리할 수 있도록 스트림 요소를 여러 개로 분할한 스트림이다.

 

예시로 숫자 n을 입력 받아서 1부터 n까지의 숫자의 합계를 구현해주는 메서드를 만들어보자.

// JAVA 반복문
public long sum(long n) {
	long result = 0;
	for (long i = 1; i <= n; i++) {
		result += i;
	}
	return result;
}


// 스트림
public long sequentialSum(long n) {
  return Stream.iterate(1L, i -> i + 1)
    .limit(n)
    .reduce(0L, Long::sum);
  }

만약 여기서 n이 커진다면 병렬로 어떻게 처리해야 좋을까?

 

 

7.1.1 순차 스트림을 병렬 스트림으로 변환하기

순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리된다.

public long sequentialSum(long n) {
  return Stream.iterate(1L, i -> i + 1)
    .limit(n)
    .parallel() 
    .reduce(0L, Long::sum);
  }

이 코드가 전과 다른점은 여러 개로 분할되고 연산을 수행한 뒤 다시 리듀싱 연산으로 합쳐져 결과를 도출한다.

만약 반대로 병렬이 아닌 순차로 바꿔주고 싶다면 sequential를 이용해주면 됩니다.

 

7.1.2 스트림 성능 측정

성능 측정을 위하여 자바 마이크로벤치마크 하니스(JMH) 라이브러리를 통해 벤치마크를 구현해보자.

벤치마크: 오브젝트에 대해 일반적으로 수많은 표준 테스트와 시도를 수행함으로써 오브젝트의 상대적인 성능 측정을 목적으로 컴퓨터 프로그램을 실행하는 행위이다.
 -위키백과-
@BenchmarkMode(Mode.AverageTime) //벤치마크 대상 메서드를 실행하는데 걸린 평균 시간 측정
@OutputTimeUtil(TimeUnit.MILLISECONDS) //벤치마크 결과를 ms 단위로 출력
@Fork(2, jvmArgs = {"-Xms4G", "-Xmx4G"}) //4GB의 힙 공간을 제공한 환경에서 2번의 벤치마크를 수행
public class ParallelStreamBenchmark {
  private static final long N = 10_000_000L;
  
  @Benchmark
  public long sequentialSum() {
    return Stream.iterate(1L, i -> i + 1).limit(N)
      .reduce(0L, Long::sum);
    }
    
    @TearDown(Level.Invocation) //매 벤치마크 실행한 후에 가비지 컬렉터 동작 시도
    public void tearDown() {
      System.gc();
    }
  }

전통적인 for 루프를 사용해 반복하는 방법이 더 저수준으로 동작할 뿐 아니라 기본값을 박싱 하거나 언박싱할 필요가 없으므로 더 빠를 것이라 예상할 수 있다.

@Benchmark
public long iterativeSum() {
  long result = 0;
  for (long i = 1L; i <= N; i++) {
    result += i;
  }
  return result;
}

벤치마크 결과도 역시 순차적 스트림을 사용할 때보다 4배 이상 빠르게 측정되었다.

 

하지만 병렬스트림의 벤치마크 결과는 순차 스트림에 비해 5배나 느린 결과가 나왔다. 어떤 문제가 있을까?

  • 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.
  • 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.

이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 iterate 연산을 청크로 분할하기가 어렵다.

스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만, 결국 순차처리 방식과 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가한 것이다.

 

병렬 처리를 사용할때는 내부적으로 어떤 일이 일어나는지 확인하고 사용하자.

 

더 특화된 메서드 사용

멀티코어 프로세서를 활용해서 효과적으로 병렬 연산을 실행하려면 어떻게 해야 할까?

 

LongStream.rangeClosed의 장점

  • 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.
  • 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다. 예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20 범위의 숫자로 분할한다.
  @Benchmark
  public long parallelRangedSum() {
    return LongStream.rangeClosed(1, N)
      .parallel()
      .reduce(0L, Long::sum);
    }

벤치마크 결과 순차로 수행했을 때 보다 병렬로 수행했을때 더 높은 성능을 보였고

올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘한다는 사실을 알 수 있었다.

 

7.1.3 병렬 스트림의 올바른 사용법

병렬 스트림을 사용하면서 발생하는 가장 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문이다.

public long sideEffectSum(long n) {
  Accumlator accumulator = new Accumulator();
  LongStream.rangeClosed(1, n).forEach(accumulator::add);
  return acculator.total;
}

public class Accumulator {
  public long total = 0;
  public void add(long value) { total += value; }
}

위 코드를 병렬로 실행하게 되면 total 값에 접근할 때마다  문제가 일어난다.

public long sideEffectParallelSum(long n) {
  Accumulator accumulaor = new Accumulator();
  LongStream.rangeClosed(1, n).parallel().forEach(accumultor::add);
  return accumulator.total;
}

메서드의 성능뿐만 아니라 결괏값도 올바르게 나오지 않는다.

병렬 스트림이 올바르게 동작하기 위해서는 공유된 가변 상태를 피해야 한다.

 

7.1.4 병렬 스트림 효과적으로 사용하기

  • 확신이 서지 않을 때는 순차 스트림과 병렬 스트림 구현 시의 성능을 직접 측정한다. 병렬 처리 한다고 무조권 좋은 것이 아니다.
  • 자동 박싱언박싱성능을 크게 저하시킬 수 있는 요소이므로 주의해서 사용해야 하며, 기본형 특화 스트림(IntStream, LongStream, DoubleStream)을  사용하는 것이 좋다.
  • limit이나 findFirst처럼 요소의 순서에 의존하는 연산은 병렬 스트림에서 성능이 더 떨어진다. 요소의 순서가 상관없다면 unordered를 호출해서 비 정렬된 스트림을 얻은 후 limit을 호출하는 것이 더 효율적이다.
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하자. 요소 수가 많고 요소 당 연산 비용이 높다면 병렬 스트림으로 성능을 개선할 여지가 있다.
  • 병렬화 과정의 부가 비용을 상쇄하지 못할 정도의 소량의 데이터에서는 병렬 스트림이 도움되지 않는다.
  • 스트림을 구성하는 자료구조가 적절한지 확인한다. ArrayList는 요소를 탐색하지 않고도 분할할 수 있지만 LinkedList는 모든 요소를 탐색해야 분할할 수 있다.
  • 스트림의 특성과 파이프라인 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다. SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 병렬 처리가 가능하다. 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 병렬 처리가 어려워진다.
  • 최종 연산의 병합 과정 비용이 비싸다면 병렬 스트림으로 얻은 이익이 상쇄될 수 있다.

 

7.2 포크/조인 프레임워크

포크/조인 프레임워크는 병렬화할 수 있는 작어블 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되어있다.

 

7.2.1 Recursive Task 활용

스레드 풀을 이용하기 위해서는 RecursiveTask<R>의 서브클래스를 만들어야 한다. 

RecursiveTask를 정의하기 위해서는 추상 메서드 compute를 먼저 구현해야 한다.

protected abstract R compute();

compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생상하는 알고리즘이다.

if(태스크가 충분히 작거나 더이상 분할할 수 없으면) {
  순차적으로 태스크 계산
} else {
  태스크를두 서브태스크로 분할
  태스크가 다시 서브태스크로 분할되도록 메시지를 재귀적으로 호출
  모든 서브태스크의 연산이 왑료될때까지 대기
  각 서브태스크의 결과를 합침
}

 

7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법

  • join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출하지 않으면, 각각의 서브태스크가 다른 서브태스크를 기다리는 일이 발생할 수 있다.
  • RecursiveTask 내에서는 compute나 fork 메서드를 사용하며, 순차코드에서 병렬 계산을 시작할때만 ForkJoinPool의 invoke 메서드를 사용해야 한다.
  • 포크/조인 프레임워크를 이용한 병렬 계산은 디버깅하기 어렵다.
  • 포크/조인 프레임워크를 사용한다고 속도가 무조권 빠르다는 생각은 버려야 한다. 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다.