Dart의 비동기 프로그래밍에서 Stream
은 데이터의 연속된 흐름을 처리하는 데 중요한 역할을 한다. 비동기적으로 발생하는 데이터 이벤트를 처리하기 위해서는 Stream
을 적절히 다루어야 하며, Dart는 이를 효과적으로 관리할 수 있는 다양한 기능을 제공한다. 이 섹션에서는 Stream
의 기본 개념부터 이벤트 처리 방법, 그리고 더 나아가서 Stream을 활용한 복잡한 비동기 데이터 흐름 처리까지 설명한다.
Stream의 기본 개념
Stream
은 Dart에서 일련의 비동기 이벤트를 전달하는 객체이다. 여기서 이벤트는 데이터를 나타낼 수 있으며, 오류나 작업 완료와 같은 상황도 포함될 수 있다. Stream
은 여러 개의 이벤트를 순차적으로 전달하며, 각 이벤트는 비동기적으로 처리된다.
Dart의 Stream
은 두 가지 유형으로 나눌 수 있다.
-
Single subscription Stream: 한 번에 하나의 리스너만 등록할 수 있는
Stream
이다. 주로 UI 이벤트나 사용자 상호작용과 같은 상황에서 사용된다. -
Broadcast Stream: 여러 리스너가 동시에 동일한
Stream
을 청취할 수 있는Stream
이다. 서버의 메시지를 여러 클라이언트에게 동시에 전달해야 하는 상황에서 주로 사용된다.
Stream의 생성과 사용
Dart에서 Stream
을 생성하는 방법은 다양한다. 가장 기본적인 방법은 StreamController
를 사용하는 것이다. StreamController
는 Stream
과 이를 제어할 수 있는 기능을 제공한다.
StreamController<int> controller = StreamController<int>();
Stream<int> stream = controller.stream;
위 코드에서 controller.stream
을 통해 Stream
객체가 생성되었다. 이후 controller
를 통해 이벤트를 추가하거나 오류를 전달할 수 있다.
Stream의 주요 메소드
- listen():
Stream
을 구독하고 이벤트를 처리한다.
dart
stream.listen((event) {
print('New event: $event');
});
- pause()와 resume():
Stream
을 일시정지하거나 다시 재개한다.
dart
stream.listen((event) {
print('New event: $event');
}).pause(); // 스트림을 일시정지
- cancel():
Stream
의 구독을 취소한다.
Stream과 이벤트 처리의 흐름
Stream
은 이벤트 기반의 데이터를 다룰 때 매우 유용하다. 예를 들어, 사용자가 특정 동작을 수행할 때마다 이벤트가 발생하는 경우, 이러한 이벤트를 Stream
을 통해 처리할 수 있다. 이벤트는 보통 다음과 같이 세 가지 유형으로 구분된다.
- 데이터 이벤트: 새로운 데이터가 들어왔을 때 발생한다.
- 오류 이벤트: 비동기 작업 중 오류가 발생했을 때 발생한다.
- 완료 이벤트: 스트림이 더 이상 이벤트를 전송하지 않을 때 발생한다.
각 이벤트는 Stream
구독자의 콜백 함수에서 처리된다. Stream
의 구독자는 보통 다음과 같은 구조를 갖는다:
stream.listen(
(data) {
// 데이터 이벤트 처리
},
onError: (error) {
// 오류 이벤트 처리
},
onDone: () {
// 완료 이벤트 처리
},
);
이처럼 Stream
구독자는 data
를 처리하는 메인 콜백과 오류 및 완료 이벤트를 처리하는 추가적인 콜백을 갖는다.
Stream에서의 비동기 이벤트 흐름
Stream
은 동기적 또는 비동기적으로 이벤트를 전달할 수 있다. Dart에서 비동기 스트림을 사용하는 경우 await for
구문을 통해 간편하게 비동기 데이터를 처리할 수 있다. 이 구문은 Stream
에서 데이터를 비동기적으로 가져오고 이를 순차적으로 처리한다.
await for (var event in stream) {
print(event);
}
await for
는 비동기 데이터를 순차적으로 처리할 때 매우 유용하며, 이벤트가 발생하는 순서대로 데이터를 처리할 수 있게 한다.
StreamController와 StreamTransformer
Stream
을 생성할 때 일반적으로 사용하는 객체가 StreamController
이다. StreamController
는 Stream
을 제어할 수 있는 기능을 제공한다. 이를 통해 데이터 이벤트를 추가하거나, 오류를 발생시키거나, 스트림을 종료할 수 있다. StreamController
는 단순한 데이터 이벤트 흐름뿐만 아니라 복잡한 비동기 작업을 처리하는 데 중요한 역할을 한다.
StreamController의 주요 메소드
- add(): 새로운 데이터를 스트림에 추가한다.
dart
controller.add(10); // 새로운 데이터 10을 추가
- addError(): 오류 이벤트를 스트림에 추가한다.
dart
controller.addError('오류 발생'); // 오류 이벤트 발생
- close(): 스트림을 더 이상 사용할 수 없도록 종료한다.
dart
controller.close(); // 스트림 종료
StreamController
는 Dart에서 스트림을 다룰 때 기본적으로 많이 사용되는 컨트롤러로, 데이터를 제어하는 역할을 담당한다. 또한, StreamController
를 활용하면 사용자 정의 스트림을 쉽게 생성하고 제어할 수 있다.
StreamTransformer
StreamTransformer
는 입력 스트림을 출력 스트림으로 변환하는 기능을 제공한다. 이를 통해 스트림의 데이터를 원하는 형식으로 변환하거나 중간에서 데이터를 필터링하는 등의 작업을 할 수 있다.
StreamTransformer<int, String> transformer = StreamTransformer.fromHandlers(
handleData: (int data, EventSink<String> sink) {
sink.add('변환된 데이터: $data');
}
);
Stream<String> transformedStream = stream.transform(transformer);
transformedStream.listen((event) {
print(event); // "변환된 데이터: ..."
});
위 예제에서는 StreamTransformer
를 통해 정수형 데이터를 문자열로 변환하고 있다. 이처럼 StreamTransformer
는 입력 스트림에서 데이터를 가져와서 특정 규칙에 따라 변환한 후 새로운 스트림으로 전달할 수 있다.
Stream과 이벤트 루프
Dart에서 Stream
은 이벤트 루프를 기반으로 작동한다. 이벤트 루프는 Dart의 비동기 작업을 처리하는 기본 메커니즘으로, Stream
의 이벤트 또한 이 이벤트 루프에 의해 관리된다. Dart의 이벤트 루프는 각 이벤트를 큐(queue)에 저장하고, 비동기적으로 이를 처리한다.
마이크로태스크 큐와 이벤트 큐
이벤트 루프에는 두 가지 큐가 존재한다:
-
마이크로태스크 큐(Microtask Queue): 우선적으로 처리되어야 하는 짧은 작업이 저장된다. 보통
Future.microtask()
로 생성된 작업이 여기에 저장된다. -
이벤트 큐(Event Queue): 비동기 작업이 이 큐에 저장되며, 일반적인
Future
와Stream
이벤트가 이 큐에 저장된다.
Stream
의 이벤트는 보통 이벤트 큐에서 처리되며, 각 이벤트는 순차적으로 처리된다. 이벤트가 처리되는 동안 새로운 이벤트가 큐에 추가되면, 이벤트 루프는 이를 다음 차례로 처리한다.
이러한 Dart의 이벤트 기반 아키텍처 덕분에 Stream
은 비동기 데이터 흐름을 효율적으로 처리할 수 있다.
Broadcast Stream
기본적으로 Stream
은 한 번에 하나의 구독자만 허용하는 single subscription 방식이다. 하지만 여러 구독자가 동일한 Stream
을 청취해야 하는 경우에는 Broadcast Stream을 사용해야 한다.
Broadcast Stream
은 여러 구독자를 동시에 허용하며, 주로 이벤트가 여러 대상에게 동시에 전달되어야 할 때 사용된다. 예를 들어, 서버에서 클라이언트로 메시지를 전송할 때 동일한 이벤트가 여러 클라이언트에게 동시에 전달되어야 한다면 Broadcast Stream
이 유용하다.
Stream<int> broadcastStream = stream.asBroadcastStream();
broadcastStream.listen((event) {
print('첫 번째 구독자: $event');
});
broadcastStream.listen((event) {
print('두 번째 구독자: $event');
});
asBroadcastStream()
메소드를 사용하여 일반 스트림을 Broadcast Stream
으로 변환할 수 있으며, 여러 구독자를 동시에 처리할 수 있다.
Stream의 지연 평가와 lazy loading
Stream
은 Dart에서 지연 평가(lazy evaluation)를 사용하여 비동기 데이터를 처리할 수 있다. 지연 평가는 필요할 때만 데이터를 처리하여 자원을 절약하는 방식이다. Dart의 Stream
은 데이터를 즉시 소비하지 않고, 구독자가 Stream
을 구독할 때 이벤트를 처리한다.
이 방식은 특히 대용량 데이터를 다루거나 시간이 지남에 따라 점진적으로 데이터를 처리할 때 유용하다. Stream
이 지연 로딩 방식으로 동작하기 때문에 구독이 이루어질 때까지는 아무런 데이터도 처리하지 않는다.
Stream<int> stream = Stream.fromIterable([1, 2, 3, 4, 5]);
await for (var value in stream) {
print(value); // 구독 시점에서만 데이터가 처리됨
}
위의 예제에서는 Stream
이 fromIterable
로부터 데이터를 제공하지만, 실제로 구독자가 데이터를 처리할 때까지 지연되며, await for
를 통해 구독이 시작되면 데이터가 순차적으로 처리된다.
Stream에서 데이터 필터링
Stream
에서 데이터를 필터링할 수 있는 다양한 메소드가 제공된다. 이를 통해 스트림에서 특정 조건을 만족하는 데이터만 처리할 수 있다.
- where(): 특정 조건에 맞는 데이터만 필터링한다.
dart
stream.where((event) => event.isEven).listen((event) {
print('짝수만 처리: $event');
});
- map(): 데이터를 변환한다.
dart
stream.map((event) => event * 2).listen((event) {
print('데이터 변환 후: $event');
});
- skip(): 처음의 n개의 데이터를 건너뛰고 그 이후의 데이터를 처리한다.
dart
stream.skip(2).listen((event) {
print('처음 2개 데이터 건너뜀: $event');
});
이러한 필터링 기능을 통해 Stream
에서 불필요한 데이터를 제외하고 필요한 데이터만 처리할 수 있다.
Stream을 통한 데이터 병합과 분리
여러 개의 Stream
을 하나로 병합하거나, 하나의 Stream
을 여러 개로 분리하는 작업도 가능한다. 이 작업은 복잡한 비동기 작업을 처리할 때 유용하다.
Stream 병합
Dart에서 제공하는 Stream
관련 유틸리티 함수는 여러 개의 Stream
을 하나로 병합할 수 있는 기능을 제공한다. 예를 들어, Stream
두 개를 하나로 결합하여 동시에 두 Stream
에서 발생하는 데이터를 모두 처리할 수 있다.
Stream<int> stream1 = Stream.fromIterable([1, 2, 3]);
Stream<int> stream2 = Stream.fromIterable([4, 5, 6]);
Stream<int> mergedStream = Stream.concat([stream1, stream2]);
mergedStream.listen((event) {
print('병합된 스트림 데이터: $event');
});
위 코드에서 Stream.concat()
을 사용하여 두 Stream
의 데이터를 병합하여 하나의 Stream
으로 처리하고 있다.
Stream 분리
하나의 Stream
을 여러 개로 분리하는 것도 가능한다. Dart에서는 Stream
을 특정 조건에 따라 여러 하위 Stream
으로 나누어 처리할 수 있다.
Stream<int> evenStream = stream.where((event) => event.isEven);
Stream<int> oddStream = stream.where((event) => event.isOdd);
evenStream.listen((event) {
print('짝수 스트림: $event');
});
oddStream.listen((event) {
print('홀수 스트림: $event');
});
이처럼 Stream
을 원하는 조건에 따라 분리하여 각각의 데이터 흐름을 별도로 처리할 수 있다.
Stream의 Buffering
Stream
의 버퍼링(buffering)은 일정 시간 동안 발생한 이벤트를 모아서 한꺼번에 처리하는 방식이다. 버퍼링은 네트워크 요청이나 대량의 데이터를 처리할 때 유용하다.
Dart에서는 RxDart와 같은 라이브러리를 사용하여 Stream
의 버퍼링 기능을 구현할 수 있다.
stream.bufferTime(Duration(seconds: 2)).listen((bufferedEvents) {
print('2초 동안의 이벤트: $bufferedEvents');
});
위 코드는 2초 동안 발생한 모든 이벤트를 모아 한꺼번에 처리하는 예제이다. 이 방식은 네트워크나 I/O 작업에서 대량의 이벤트를 처리할 때 매우 유용하다.