Java Stream Custom Collection 구현하기
이번 주제는 자바 8부터 지원하는 Custom Stream 구현하기 관한 주제입니다.
소개
자바 8이 등장하며 많은 자바 개발자들이 Stream을 사용한다.
Stream의 강점은 간결하고 가독성이 좋은 코드를 만들 수 있고 Immutable한 동작을 보장한다.
복잡한 코드 파이프라인을 구성하는 경우 어렵게 코드를 구성해야한다.
이러한 문제를 해결할 수 있는 방법이 Custom stream이다.
Collector Interface
Interface Collector<T,A,R>
- T: 변환 및 집계가 필요한 입력 타입
- A: 중간 결과(변형이 가능한) 타입
- R: 결과에 대한 리턴 타입
인터페이스 제공 메서드
- supplier(): 새로운 결과를 생성함.
- accumulator(): 변형된 데이터를 모으는 역할.(T -> A에 저장)
- combiner(): 2개의 작업이 병렬 실행 시 결과를 1개로 모으는 역할.
- finisher(): 마지막 변형을 담당하는 역할.
- characteristics(): 3개의 타입을 가지고있고 최적화에 사용함.
1. Data 및 DataAggregator 만들기
InputData를 넣고 갯수, 평균, 합계를 도출하고 싶다.
<입력>
@Data
@AllArgsConstructor
public class InputData {
private String name;
private long numOfCoin;
}
<원하는 출력>
@Data
public class InputDataAggregation {
private long numOfData;
private long average;
private long sum;
}
2. Collector 구현하기
public class InputDataAggregationCollector implements Collector<InputData, InputDataAggregation, InputDataAggregation> {
...
}
2-1. supplier 구현하기
어떤 타입의 객체를 중간 결과 출력으로 사용할지 정의한다.
@Override
public Supplier<InputDataAggregation> supplier() {
return InputDataAggregation::new;
}
2-2. accumulator 구현하기
실제 연산 작업을 수행한다.
aggregation(InputDataAggregation) 객체가 누적돼서 들어오고 변환할 데이터가 들어온다.
예를 들어서 4개의 리스트에 대해서 Collector를 사용한다면 4번의 호출이 발생한다.
@Override
public BiConsumer<InputDataAggregation, InputData> accumulator() {
return (aggregation, data) -> {
long prevNumOfData = aggregation.getNumOfData();
aggregation.setNumOfData(++prevNumOfData);
long prevSum = aggregation.getSum();
aggregation.setSum(prevSum + data.getNumOfCoin());
aggregation.setAverage(aggregation.getSum() / aggregation.getNumOfData());
};
}
2-3. combiner 구현하기
aggregation이 병렬적으로 수행되는 경우가 있다.
이럴 땐 스레드 2개에서 수행되어진 Aggregation 객체를 모아서 하나로 처리해줘야한다.
예를 들어 4개의 리스트에 대해서 Collector사용하는 경우 호출이 안될 수도 있고 1번 또는 2번도 될 수 있다.
agg1과 agg2 데이터들을 agg1으로 집계하여 agg1을 리턴하는 방식이다.
@Override
public BinaryOperator<InputDataAggregation> combiner() {
return (agg1, agg2) -> {
long agg1NumOfData = agg1.getNumOfData();
agg1.setNumOfData(++agg1NumOfData);
long agg1Sum = agg1.getSum();
long agg2Sum = agg2.getSum();
agg1.setSum(agg1Sum + agg2Sum);
long aggSum = agg1.getSum();
long aggNumOfData = agg1.getNumOfData();
agg1.setAverage(aggSum / aggNumOfData);
return agg1;
};
}
2-4. finisher 구현하기
반환해야하는 타입에 대해서 설정해준다.
@Override
public Function<InputDataAggregation, InputDataAggregation> finisher() {
return (inputDataAggregation) -> inputDataAggregation;
}
2-5. characteristics 구현하기
3가지 타입으로 Collector 타입을 지정해 줄 수 있다.
CONCURRENT: accumulator를 동시(멀티스레드)에 호출할 수 있음
UNORDER: 결과가 순서에 영향을 받지 않음
IDENTITY_FINISH: finisher함수가 어떤 값이 와도 자기 자신이 되는 함수라는 것
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.IDENTITY_FINISH);
}
결과
출력: InputDataAggregation(numOfData=4, average=2, sum=10)
public class Monitors {
public static void main(String[] args) {
List<InputData> inputDataList = List.of(
new InputData("bob", 1),
new InputData("cock", 2),
new InputData("disel", 3),
new InputData("martini", 4)
);
InputDataAggregation aggregation = inputDataList
.stream()
.collect(new InputDataAggregationCollector());
System.out.println(aggregation.toString());
}
}