Dart의 비동기 프로그래밍에서 Stream은 데이터의 연속된 흐름을 처리하는 데 중요한 역할을 한다. 비동기적으로 발생하는 데이터 이벤트를 처리하기 위해서는 Stream을 적절히 다루어야 하며, Dart는 이를 효과적으로 관리할 수 있는 다양한 기능을 제공한다. 이 섹션에서는 Stream의 기본 개념부터 이벤트 처리 방법, 그리고 더 나아가서 Stream을 활용한 복잡한 비동기 데이터 흐름 처리까지 설명한다.

Stream의 기본 개념

Stream은 Dart에서 일련의 비동기 이벤트를 전달하는 객체이다. 여기서 이벤트는 데이터를 나타낼 수 있으며, 오류나 작업 완료와 같은 상황도 포함될 수 있다. Stream은 여러 개의 이벤트를 순차적으로 전달하며, 각 이벤트는 비동기적으로 처리된다.

Dart의 Stream은 두 가지 유형으로 나눌 수 있다.

  1. Single subscription Stream: 한 번에 하나의 리스너만 등록할 수 있는 Stream이다. 주로 UI 이벤트나 사용자 상호작용과 같은 상황에서 사용된다.

  2. Broadcast Stream: 여러 리스너가 동시에 동일한 Stream을 청취할 수 있는 Stream이다. 서버의 메시지를 여러 클라이언트에게 동시에 전달해야 하는 상황에서 주로 사용된다.

Stream의 생성과 사용

Dart에서 Stream을 생성하는 방법은 다양한다. 가장 기본적인 방법은 StreamController를 사용하는 것이다. StreamControllerStream과 이를 제어할 수 있는 기능을 제공한다.

StreamController<int> controller = StreamController<int>();
Stream<int> stream = controller.stream;

위 코드에서 controller.stream을 통해 Stream 객체가 생성되었다. 이후 controller를 통해 이벤트를 추가하거나 오류를 전달할 수 있다.

Stream의 주요 메소드

dart stream.listen((event) { print('New event: $event'); });

dart stream.listen((event) { print('New event: $event'); }).pause(); // 스트림을 일시정지

Stream과 이벤트 처리의 흐름

Stream은 이벤트 기반의 데이터를 다룰 때 매우 유용하다. 예를 들어, 사용자가 특정 동작을 수행할 때마다 이벤트가 발생하는 경우, 이러한 이벤트를 Stream을 통해 처리할 수 있다. 이벤트는 보통 다음과 같이 세 가지 유형으로 구분된다.

각 이벤트는 Stream 구독자의 콜백 함수에서 처리된다. Stream의 구독자는 보통 다음과 같은 구조를 갖는다:

stream.listen(
  (data) {
    // 데이터 이벤트 처리
  },
  onError: (error) {
    // 오류 이벤트 처리
  },
  onDone: () {
    // 완료 이벤트 처리
  },
);

이처럼 Stream 구독자는 data를 처리하는 메인 콜백과 오류 및 완료 이벤트를 처리하는 추가적인 콜백을 갖는다.

Stream에서의 비동기 이벤트 흐름

Stream동기적 또는 비동기적으로 이벤트를 전달할 수 있다. Dart에서 비동기 스트림을 사용하는 경우 await for 구문을 통해 간편하게 비동기 데이터를 처리할 수 있다. 이 구문은 Stream에서 데이터를 비동기적으로 가져오고 이를 순차적으로 처리한다.

await for (var event in stream) {
  print(event);
}

await for는 비동기 데이터를 순차적으로 처리할 때 매우 유용하며, 이벤트가 발생하는 순서대로 데이터를 처리할 수 있게 한다.

StreamController와 StreamTransformer

Stream을 생성할 때 일반적으로 사용하는 객체가 StreamController이다. StreamControllerStream을 제어할 수 있는 기능을 제공한다. 이를 통해 데이터 이벤트를 추가하거나, 오류를 발생시키거나, 스트림을 종료할 수 있다. StreamController는 단순한 데이터 이벤트 흐름뿐만 아니라 복잡한 비동기 작업을 처리하는 데 중요한 역할을 한다.

StreamController의 주요 메소드

dart controller.add(10); // 새로운 데이터 10을 추가

dart controller.addError('오류 발생'); // 오류 이벤트 발생

dart controller.close(); // 스트림 종료

StreamController는 Dart에서 스트림을 다룰 때 기본적으로 많이 사용되는 컨트롤러로, 데이터를 제어하는 역할을 담당한다. 또한, StreamController를 활용하면 사용자 정의 스트림을 쉽게 생성하고 제어할 수 있다.

StreamTransformer

StreamTransformer는 입력 스트림을 출력 스트림으로 변환하는 기능을 제공한다. 이를 통해 스트림의 데이터를 원하는 형식으로 변환하거나 중간에서 데이터를 필터링하는 등의 작업을 할 수 있다.

StreamTransformer<int, String> transformer = StreamTransformer.fromHandlers(
  handleData: (int data, EventSink<String> sink) {
    sink.add('변환된 데이터: $data');
  }
);

Stream<String> transformedStream = stream.transform(transformer);
transformedStream.listen((event) {
  print(event);  // "변환된 데이터: ..."
});

위 예제에서는 StreamTransformer를 통해 정수형 데이터를 문자열로 변환하고 있다. 이처럼 StreamTransformer는 입력 스트림에서 데이터를 가져와서 특정 규칙에 따라 변환한 후 새로운 스트림으로 전달할 수 있다.

Stream과 이벤트 루프

Dart에서 Stream은 이벤트 루프를 기반으로 작동한다. 이벤트 루프는 Dart의 비동기 작업을 처리하는 기본 메커니즘으로, Stream의 이벤트 또한 이 이벤트 루프에 의해 관리된다. Dart의 이벤트 루프는 각 이벤트를 큐(queue)에 저장하고, 비동기적으로 이를 처리한다.

마이크로태스크 큐와 이벤트 큐

이벤트 루프에는 두 가지 큐가 존재한다:

  1. 마이크로태스크 큐(Microtask Queue): 우선적으로 처리되어야 하는 짧은 작업이 저장된다. 보통 Future.microtask()로 생성된 작업이 여기에 저장된다.

  2. 이벤트 큐(Event Queue): 비동기 작업이 이 큐에 저장되며, 일반적인 FutureStream 이벤트가 이 큐에 저장된다.

Stream의 이벤트는 보통 이벤트 큐에서 처리되며, 각 이벤트는 순차적으로 처리된다. 이벤트가 처리되는 동안 새로운 이벤트가 큐에 추가되면, 이벤트 루프는 이를 다음 차례로 처리한다.

이러한 Dart의 이벤트 기반 아키텍처 덕분에 Stream은 비동기 데이터 흐름을 효율적으로 처리할 수 있다.

Broadcast Stream

기본적으로 Stream은 한 번에 하나의 구독자만 허용하는 single subscription 방식이다. 하지만 여러 구독자가 동일한 Stream을 청취해야 하는 경우에는 Broadcast Stream을 사용해야 한다.

Broadcast Stream은 여러 구독자를 동시에 허용하며, 주로 이벤트가 여러 대상에게 동시에 전달되어야 할 때 사용된다. 예를 들어, 서버에서 클라이언트로 메시지를 전송할 때 동일한 이벤트가 여러 클라이언트에게 동시에 전달되어야 한다면 Broadcast Stream이 유용하다.

Stream<int> broadcastStream = stream.asBroadcastStream();
broadcastStream.listen((event) {
  print('첫 번째 구독자: $event');
});
broadcastStream.listen((event) {
  print('두 번째 구독자: $event');
});

asBroadcastStream() 메소드를 사용하여 일반 스트림을 Broadcast Stream으로 변환할 수 있으며, 여러 구독자를 동시에 처리할 수 있다.

Stream의 지연 평가와 lazy loading

Stream은 Dart에서 지연 평가(lazy evaluation)를 사용하여 비동기 데이터를 처리할 수 있다. 지연 평가는 필요할 때만 데이터를 처리하여 자원을 절약하는 방식이다. Dart의 Stream은 데이터를 즉시 소비하지 않고, 구독자가 Stream을 구독할 때 이벤트를 처리한다.

이 방식은 특히 대용량 데이터를 다루거나 시간이 지남에 따라 점진적으로 데이터를 처리할 때 유용하다. Stream이 지연 로딩 방식으로 동작하기 때문에 구독이 이루어질 때까지는 아무런 데이터도 처리하지 않는다.

Stream<int> stream = Stream.fromIterable([1, 2, 3, 4, 5]);

await for (var value in stream) {
  print(value);  // 구독 시점에서만 데이터가 처리됨
}

위의 예제에서는 StreamfromIterable로부터 데이터를 제공하지만, 실제로 구독자가 데이터를 처리할 때까지 지연되며, await for를 통해 구독이 시작되면 데이터가 순차적으로 처리된다.

Stream에서 데이터 필터링

Stream에서 데이터를 필터링할 수 있는 다양한 메소드가 제공된다. 이를 통해 스트림에서 특정 조건을 만족하는 데이터만 처리할 수 있다.

dart stream.where((event) => event.isEven).listen((event) { print('짝수만 처리: $event'); });

dart stream.map((event) => event * 2).listen((event) { print('데이터 변환 후: $event'); });

dart stream.skip(2).listen((event) { print('처음 2개 데이터 건너뜀: $event'); });

이러한 필터링 기능을 통해 Stream에서 불필요한 데이터를 제외하고 필요한 데이터만 처리할 수 있다.

Stream을 통한 데이터 병합과 분리

여러 개의 Stream을 하나로 병합하거나, 하나의 Stream을 여러 개로 분리하는 작업도 가능한다. 이 작업은 복잡한 비동기 작업을 처리할 때 유용하다.

Stream 병합

Dart에서 제공하는 Stream 관련 유틸리티 함수는 여러 개의 Stream을 하나로 병합할 수 있는 기능을 제공한다. 예를 들어, Stream 두 개를 하나로 결합하여 동시에 두 Stream에서 발생하는 데이터를 모두 처리할 수 있다.

Stream<int> stream1 = Stream.fromIterable([1, 2, 3]);
Stream<int> stream2 = Stream.fromIterable([4, 5, 6]);

Stream<int> mergedStream = Stream.concat([stream1, stream2]);
mergedStream.listen((event) {
  print('병합된 스트림 데이터: $event');
});

위 코드에서 Stream.concat()을 사용하여 두 Stream의 데이터를 병합하여 하나의 Stream으로 처리하고 있다.

Stream 분리

하나의 Stream을 여러 개로 분리하는 것도 가능한다. Dart에서는 Stream을 특정 조건에 따라 여러 하위 Stream으로 나누어 처리할 수 있다.

Stream<int> evenStream = stream.where((event) => event.isEven);
Stream<int> oddStream = stream.where((event) => event.isOdd);

evenStream.listen((event) {
  print('짝수 스트림: $event');
});

oddStream.listen((event) {
  print('홀수 스트림: $event');
});

이처럼 Stream을 원하는 조건에 따라 분리하여 각각의 데이터 흐름을 별도로 처리할 수 있다.

Stream의 Buffering

Stream버퍼링(buffering)은 일정 시간 동안 발생한 이벤트를 모아서 한꺼번에 처리하는 방식이다. 버퍼링은 네트워크 요청이나 대량의 데이터를 처리할 때 유용하다.

Dart에서는 RxDart와 같은 라이브러리를 사용하여 Stream의 버퍼링 기능을 구현할 수 있다.

stream.bufferTime(Duration(seconds: 2)).listen((bufferedEvents) {
  print('2초 동안의 이벤트: $bufferedEvents');
});

위 코드는 2초 동안 발생한 모든 이벤트를 모아 한꺼번에 처리하는 예제이다. 이 방식은 네트워크나 I/O 작업에서 대량의 이벤트를 처리할 때 매우 유용하다.