Back-end/Java

Java Stream Custom Collection 구현하기

HOONY_612 2024. 1. 29. 14:18
반응형

 

 

이번 주제는 자바 8부터 지원하는 Custom Stream 구현하기 관한 주제입니다.

 

소개

자바 8이 등장하며 많은 자바 개발자들이 Stream을 사용한다.

Stream의 강점은 간결하고 가독성이 좋은 코드를 만들 수 있고 Immutable한 동작을 보장한다.

복잡한 코드 파이프라인을 구성하는 경우 어렵게 코드를 구성해야한다.

이러한 문제를 해결할 수 있는 방법이 Custom stream이다.

 

Collector Interface

Interface Collector<T,A,R> 

 

- T: 변환 및 집계가 필요한 입력 타입

- A: 중간 결과(변형이 가능한) 타입

- R: 결과에 대한 리턴 타입

 

인터페이스 제공 메서드

- supplier(): 새로운 결과를 생성함. 

- accumulator(): 변형된 데이터를 모으는 역할.(T -> A에 저장)

- combiner(): 2개의 작업이 병렬 실행 시 결과를 1개로 모으는 역할.

- finisher(): 마지막 변형을 담당하는 역할.

- characteristics(): 3개의 타입을 가지고있고 최적화에 사용함.

 

1. Data 및 DataAggregator 만들기

 

InputData를 넣고 갯수, 평균, 합계를 도출하고 싶다.

 

<입력>

@Data
@AllArgsConstructor
public class InputData {
    private String name;
    private long numOfCoin;
}

 

<원하는 출력>

@Data
public class InputDataAggregation {

    private long numOfData;
    private long average;
    private long sum;
}

 

2. Collector 구현하기

public class InputDataAggregationCollector implements Collector<InputData, InputDataAggregation, InputDataAggregation> {
	...
}

 

2-1. supplier 구현하기

어떤 타입의 객체를 중간 결과 출력으로 사용할지 정의한다.

@Override
public Supplier<InputDataAggregation> supplier() {
    return InputDataAggregation::new;
}

 

2-2. accumulator 구현하기

실제 연산 작업을 수행한다.

aggregation(InputDataAggregation) 객체가 누적돼서 들어오고 변환할 데이터가 들어온다.

예를 들어서 4개의 리스트에 대해서 Collector를 사용한다면 4번의 호출이 발생한다.

 

    @Override
    public BiConsumer<InputDataAggregation, InputData> accumulator() {
        return (aggregation, data) -> {
            long prevNumOfData = aggregation.getNumOfData();
            aggregation.setNumOfData(++prevNumOfData);

            long prevSum = aggregation.getSum();
            aggregation.setSum(prevSum + data.getNumOfCoin());

            aggregation.setAverage(aggregation.getSum() / aggregation.getNumOfData());
        };
    }

 

2-3. combiner 구현하기

aggregation이 병렬적으로 수행되는 경우가 있다.

이럴 땐 스레드 2개에서 수행되어진 Aggregation 객체를 모아서 하나로 처리해줘야한다.

예를 들어 4개의 리스트에 대해서 Collector사용하는 경우 호출이 안될 수도 있고 1번 또는 2번도 될 수 있다.

agg1과 agg2 데이터들을 agg1으로 집계하여 agg1을 리턴하는 방식이다.

@Override
public BinaryOperator<InputDataAggregation> combiner() {
    return (agg1, agg2) -> {
        long agg1NumOfData = agg1.getNumOfData();
        agg1.setNumOfData(++agg1NumOfData);

        long agg1Sum = agg1.getSum();
        long agg2Sum = agg2.getSum();
        agg1.setSum(agg1Sum + agg2Sum);

        long aggSum = agg1.getSum();
        long aggNumOfData = agg1.getNumOfData();
        agg1.setAverage(aggSum / aggNumOfData);

        return agg1;
    };
}

 

2-4. finisher 구현하기

반환해야하는 타입에 대해서 설정해준다.

@Override
public Function<InputDataAggregation, InputDataAggregation> finisher() {
    return (inputDataAggregation) -> inputDataAggregation;
}

 

2-5. characteristics 구현하기

3가지 타입으로 Collector 타입을 지정해 줄 수 있다.

CONCURRENT: accumulator를 동시(멀티스레드)에 호출할 수 있음

UNORDER: 결과가 순서에 영향을 받지 않음

IDENTITY_FINISH: finisher함수가 어떤 값이 와도 자기 자신이 되는 함수라는 것

@Override
public Set<Characteristics> characteristics() {
    return Set.of(Characteristics.IDENTITY_FINISH);
}

 

결과

출력: InputDataAggregation(numOfData=4, average=2, sum=10)

public class Monitors {

    public static void main(String[] args) {
        List<InputData> inputDataList = List.of(
                new InputData("bob", 1),
                new InputData("cock", 2),
                new InputData("disel", 3),
                new InputData("martini", 4)
        );

        InputDataAggregation aggregation = inputDataList
                .stream()
                .collect(new InputDataAggregationCollector());

        System.out.println(aggregation.toString());
    }
}
반응형