Observable

커뮤니티 그룹 초안 보고서,

현재 버전:
https://wicg.github.io/observable/
에디터:
Dominic Farolino (Google)
참여하기:
GitHub WICG/observable (새 이슈, 오픈된 이슈)
커밋:
GitHub spec.bs 커밋
테스트 스위트:
https://wpt.fyi/results/dom/observable/

요약

Observable API는 비동기 이벤트 스트림을 다루는 조합 가능하고 인체공학적인 방법을 제공합니다.

문서 현황

이 명세는 Web Platform Incubator Community Group에서 발행되었습니다. W3C 표준이 아니며 W3C 표준 트랙에도 포함되지 않습니다. W3C 커뮤니티 기여자 라이선스 계약(CLA)에 따라 제한적 옵트아웃 및 기타 조건이 적용될 수 있으니 참고하시기 바랍니다. W3C 커뮤니티 및 비즈니스 그룹에 대해 더 알아보세요.

1. 소개

이 섹션은 비규범적입니다.

2. 핵심 인프라

2.1. Subscriber 인터페이스

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // Subscriber가 생성된 후, complete()/error()가 호출되거나 구독자가 구독을 해지할 때까지 true입니다.
  // complete()/error() 내부에서는 이 속성이 true입니다.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

Subscriber순서가 지정된 집합내부 옵저버를 가지며, 초기값은 비어있습니다.

Subscriberteardown 콜백을 가지며, 이는 리스트 형태의 VoidFunction들로 초기값은 비어있습니다.

Subscriber구독 컨트롤러를 가지며, 이는 AbortController입니다.

Subscriberactive 불리언 값을 가지며, 초기값은 true입니다.

참고: 이 값은 Subscriber가 종료된 이후에는 자신이 소유한 콜백을 절대 호출하지 않도록 보장하는 변수입니다.

active getter 단계는 thisactive 불리언 값을 반환합니다.

signal getter 단계는 this구독 컨트롤러signal을 반환합니다.

next(value) 메서드 단계:
  1. thisactive가 false이면 return합니다.

  2. this관련 글로벌 객체Window 객체이고, 그 연관된 Document완전히 활성화되어 있지 않으면 return합니다.

  3. observer에 대해 this내부 옵저버들을 순회합니다:

    1. 해당 observernext 단계value로 실행합니다.

      단언: 예외가 발생하지 않음.

      참고: 여기서는 예외가 발생할 수 없습니다. 내부 옵저버의 next 단계가 스크립트에서 제공된 콜백을 감싸는 래퍼일 경우, 옵저버 처리 단계에서 콜백 호출 시 예외를 캐치하고 글로벌에 보고하는 로직이 포함되어 있습니다.

      next 단계가 명세 알고리즘일 경우, 해당 단계가 외부로 예외를 던지지 않도록 주의하여 이 단언이 만족됩니다.

error(error) 메서드 단계:
  1. thisactive가 false이면, 예외를 보고하고 errorthis관련 글로벌 객체를 전달한 뒤 return합니다.

  2. this관련 글로벌 객체Window 객체이고, 그 연관된 Document완전히 활성화되어 있지 않으면 return합니다.

  3. 구독 종료this에 실행합니다.

  4. observer에 대해 this내부 옵저버들을 순회합니다:

    1. 해당 observererror 단계error로 실행합니다.

      단언: 예외가 발생하지 않음.

      참고: 왜 이런지에 대한 자세한 내용은 next() 설명을 참고하세요.

complete() 메서드 단계:
  1. thisactive가 false이면 return합니다.

  2. this관련 글로벌 객체Window 객체이고, 그 연관된 Document완전히 활성화되어 있지 않으면 return합니다.

  3. 구독 종료this에 실행합니다.

  4. observer에 대해 this내부 옵저버들을 순회합니다:

    1. 해당 observercomplete 단계를 실행합니다.

      단언: 예외가 발생하지 않음.

      참고: 왜 이런지에 대한 자세한 내용은 next() 설명을 참고하세요.

addTeardown(teardown) 메서드 단계:
  1. this관련 글로벌 객체Window 객체이고, 그 연관된 Document완전히 활성화되어 있지 않으면 return합니다.

  2. thisactive가 true이면, append teardownthisteardown 콜백 리스트에 추가합니다.

  3. 그 외의 경우, invoke teardown을 «»와 "report"를 인자로 실행합니다.

구독 종료Subscriber subscriber와 선택적 any reason에 대해 실행하는 단계:
  1. subscriberactive가 false이면 return합니다.

    이 단계는 재진입 호출을 방지합니다. "producer-initiated" 구독 해지의 경우 다음 예시처럼 동작할 수 있습니다:

    const outerController = new AbortController();
    const observable = new Observable(subscriber => {
      subscriber.addTeardown(() => {
        // 2.) 이 teardown은 "구독 종료" 알고리즘 내부에서 실행되며, 실행 중에 다운스트림 시그널의 abort 알고리즘을 실행합니다. 그 중 하나가 현재 실행 중인 "구독 종료" 알고리즘입니다.
        outerController.abort();
      });
    
      // 1.) 이 호출은 즉시 "구독 종료" 알고리즘을 실행하며, subscriber.active를 false로 설정합니다.
      subscriber.complete();
    });
    
    observable.subscribe({}, {signal: outerController.signal});
    
  2. subscriberactive 불리언 값을 false로 설정합니다.

  3. abort 시그널subscriber구독 컨트롤러reason이 있으면 전달합니다.

  4. teardown에 대해 subscriberteardown 콜백들을 역순으로 정렬하여 순회합니다:

    1. subscriber관련 글로벌 객체Document 객체이고, 그 연관된 Document완전히 활성화되어 있지 않으면 해당 단계들을 중단합니다.

      참고: 각 teardown에서 Document가 비활성 상태가 될 수 있기 때문에 해당 단계가 반복 실행될 수 있습니다.

    2. invoke teardown을 «»와 "report"를 인자로 실행합니다.

2.2. Observable 인터페이스

// SubscribeCallback은 Observable "생성자"의 코드가 들어있는 곳입니다.
// subscribe()가 호출될 때 호출되어 새로운 구독을 설정합니다.
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Mapper와 다른 점은 반환 타입뿐이며, 이 콜백은 시퀀스의 각 요소를 방문할 때만 사용됩니다.
callback Visitor = undefined (any value, unsigned long long index);

// 이 콜백은 `any`를 반환하며, 이는 `Observable`로 변환되어야 합니다. 변환 방식은 Observable 변환 규칙을 따릅니다.
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // value가 다음 중 하나일 경우, 네이티브 Observable을 생성합니다:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observable 반환 연산자. 연산자에 대한 자세한 내용은 "Operators" 섹션을 참조하세요.
  //
  // takeUntil()은 promise, iterable, async iterable, 다른 observable을 소비할 수 있습니다.
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promise 반환 연산자.
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

Observablesubscribe 콜백을 가지며, 이는 SubscribeCallback 또는 Subscriber를 받는 일련의 단계입니다.

Observableweak subscriber를 가지며, 이는 Subscriber의 약한 참조 또는 null이며, 초기값은 null입니다.

참고: 이 타입들의 "union"은 JavaScript로 생성된 Observable (항상 SubscribeCallback으로 생성됨)과 네이티브로 생성된 Observable (subscribe 콜백이 자바스크립트 콜백이 아니라 네이티브 단계일 수 있음)을 모두 지원하기 위한 것입니다. when()의 반환값이 후자의 예시입니다.

new Observable(callback) 생성자 단계:
  1. thissubscribe 콜백callback으로 설정합니다.

    참고: 이 콜백은 나중에 subscribe() 가 호출될 때 실행됩니다.

subscribe(observer, options) 메서드 단계:
  1. Observable 구독this에 대해 observeroptions로 실행합니다.

2.2.1. 지원 개념

기본 오류 알고리즘any error를 받아 다음 단계들을 실행하는 알고리즘입니다:
  1. 예외를 보고합니다. error현재 realm글로벌 객체를 사용합니다.

참고: 이 기본 알고리즘을 별도로 분리한 이유는, 명세에서 네이티브로 Observable에 구독할 때(즉, 명세에서 직접 구독하는 경우, subscribe() 메서드를 거치지 않는 경우) 중복으로 이 단계를 정의할 필요가 없기 때문입니다.

내부 옵저버는 다음 구조체항목을 가진 구조체입니다:

next 단계

하나의 any 타입 파라미터를 받는 알고리즘입니다. 초기에는 아무 동작도 하지 않습니다.

error 단계

하나의 any 타입 파라미터를 받는 알고리즘입니다. 초기값은 기본 오류 알고리즘입니다.

complete 단계

파라미터가 없는 알고리즘입니다. 초기에는 아무 동작도 하지 않습니다.

내부 옵저버 구조체next, error, complete 콜백 함수를 거울처럼 반영하는 데 사용됩니다. 자바스크립트를 통해 Observablesubscribe() 메서드로 구독할 때, 이러한 알고리즘 "단계"는 스크립트에서 제공된 콜백 함수 호출을 감싸는 래퍼입니다.

하지만 명세 내 프로즈(사용자 스크립트가 아님)에서 Observable에 구독할 때는, 이 "단계"들은 ObserverUnion 에 Web IDL 콜백 함수로 제공되는 것이 아니라 임의의 명세 알고리즘입니다. 예시로 § 2.3.3 Promise 반환 연산자에서 사용합니다.

Observable로 변환any value를 받아 다음 단계들을 실행합니다:

참고: 이 알고리즘은 Web IDL의 from() 메서드와 분리되어 명세 프로즈에서 Web IDL 바인딩을 거치지 않고 값을 변환할 수 있도록 합니다.

  1. Type(value)이 Object가 아니면, TypeError를 발생시킵니다.

    참고: 이는 원시 타입이 Iterable로 강제 변환되는 것을 방지합니다(예: String). 자세한 내용은 WICG/observable#125를 참고하세요.

  2. Observable에서 변환: value특정 타입Observable이면, value를 반환합니다.

  3. 비동기 iterable에서 변환: asyncIteratorMethod? GetMethod(value, %Symbol.asyncIterator%)로 설정합니다.

    참고: 여기서는 GetMethod를 사용합니다. GetIterator는 (a) 프로토콜 미구현, (b) 프로토콜은 있지만 호출 불가/게터 예외 등 두 경우 모두 예외를 발생시키지만, GetMethod는 두 번째 경우에만 예외를 발생시킵니다.

  4. asyncIteratorMethod가 undefined 또는 null이면 Iterable에서 변환 단계로 이동합니다.

  5. nextAlgorithm을 다음 단계로 설정합니다. (Subscriber subscriber, Iterator Record iteratorRecord를 받음):

    1. subscriber구독 컨트롤러signalaborted면 return합니다.

    2. nextPromisePromise 또는 undefined로 초기화합니다.

    3. nextCompletionIteratorNext(iteratorRecord)로 설정합니다.

      참고: 여기서는 IteratorNext를 사용합니다. 비동기 iterator의 next()가 Promise/thenable을 반환하는 경우를 처리하기 위함입니다.

    4. nextCompletionthrow completion이면:

      1. 단언: iteratorRecord의 [[Done]]이 true임을 확인합니다.

      2. nextPromise거부된 promise로 설정합니다.

    5. 그 외 nextRecordnormal completion이면 nextPromisenextRecord의 [[Value]]로 해결된 promise로 설정합니다.

      참고: nextRecord의 [[Value]]가 이미 Promise가 아니더라도 wrapping을 보장합니다.

    6. promise settle 후 반응:

      • nextPromiseiteratorResult로 fulfill되면:

        1. Type(iteratorResult) 이 Object가 아니면 subscribererror() 메서드를 TypeError로 실행하고 중단합니다.

        2. doneIteratorComplete(iteratorResult)로 설정합니다.

        3. donethrow completion이면 subscribererror() 메서드를 done의 [[Value]]로 실행하고 중단합니다.

        4. done의 [[Value]]가 true이면 subscribercomplete() 를 실행하고 중단합니다.

        5. valueIteratorValue(iteratorResult)로 설정합니다.

        6. valuethrow completion이면 subscribererror() 메서드를 value의 [[Value]]로 실행하고 중단합니다.

        7. subscribernext()value의 [[Value]]로 실행합니다.

        8. nextAlgorithmsubscriberiteratorRecord로 재실행합니다.

      • nextPromiser로 reject되면, subscribererror()r로 실행합니다.

  6. 새로운 Observable 을 반환합니다. 그 subscribe 콜백Subscriber subscriber를 받아 다음을 수행합니다:

    1. subscriber구독 컨트롤러signalaborted면 return합니다.

    2. iteratorRecordCompletionGetIterator(value, async)로 설정합니다.

      참고: 이는 %Symbol.asyncIterator% getter를 다시 호출할 수 있으며 corner case지만 테스트 기대치와 일치합니다. 자세한 논의는 issue#127 참고.

    3. iteratorRecordCompletionthrow completion이면 subscribererror() 메서드를 iteratorRecordCompletion의 [[Value]]로 실행하고 중단합니다.

      참고: 이 경우에만 비동기 iterable이 Observable로 변환될 때 구독 시작 시 동기적으로 오류가 보고됩니다. 그 외에는 오류가 마이크로태스크 타이밍으로 비동기적으로 옵저버에 전달됩니다. 이는 for-await-of loop와 동일한 동작입니다.

    4. iteratorRecord! iteratorRecordCompletion로 설정합니다.

    5. 단언: iteratorRecord가 Iterator Record임을 확인합니다.

    6. subscriber구독 컨트롤러signalaborted면 return합니다.

    7. 다음 abort 알고리즘을 subscriber구독 컨트롤러signal에 추가합니다:

      1. AsyncIteratorClose(iteratorRecord, NormalCompletion(subscriber구독 컨트롤러abort reason)).

    8. nextAlgorithmsubscriberiteratorRecord로 실행합니다.

  7. Iterable에서 변환: iteratorMethod? GetMethod(value, %Symbol.iterator%)로 설정합니다.

  8. iteratorMethod가 undefined면 Promise에서 변환 단계로 이동합니다.

    그 외에는 새로운 Observable 을 반환합니다. 그 subscribe 콜백Subscriber subscriber를 받아 다음을 수행합니다:

    1. subscriber구독 컨트롤러signalaborted면 return합니다.

    2. iteratorRecordCompletionGetIterator(value, sync)로 설정합니다.

    3. iteratorRecordCompletionthrow completion이면 subscribererror() 메서드를 iteratorRecordCompletion의 [[Value]]로 실행하고 중단합니다.

    4. iteratorRecord! iteratorRecordCompletion로 설정합니다.

    5. subscriber구독 컨트롤러signalaborted면 return합니다.

    6. 다음 abort 알고리즘을 subscriber구독 컨트롤러signal에 추가합니다:

      1. IteratorClose(iteratorRecord, NormalCompletion(UNUSED)).

    7. 반복 true 동안:

      1. nextIteratorStepValue(iteratorRecord)로 설정합니다.

      2. nextthrow completion이면 subscribererror() 메서드를 next의 [[Value]]로 실행하고 break합니다.

      3. next!로 변환합니다.

      4. next가 done이면:

        1. 단언: iteratorRecord의 [[Done]]이 true임을 확인합니다.

        2. subscribercomplete() 를 실행합니다.

        3. return합니다.

      5. subscribernext()next로 실행합니다.

      6. subscriber구독 컨트롤러signalabortedbreak합니다.

  9. Promise에서 변환: IsPromise(value)가 true라면:

    1. 새로운 Observable 을 반환합니다. 그 subscribe 콜백Subscriber subscriber를 받아 다음을 수행합니다:

      1. promise settle 후 반응:

        1. valuev로 fulfill되면:

          1. subscribernext()v로 실행합니다.

          2. subscribercomplete() 를 실행합니다.

        2. valuer로 reject되면 subscribererror()r로 실행합니다.

  10. TypeError를 발생시킵니다.

테스트
Observable에 구독하려면, ObserverUnion 또는 내부 옵저버 observerSubscribeOptions options가 주어졌을 때 다음 단계를 실행합니다:

참고: 이 알고리즘은 Web IDL의 subscribe() 메서드와 분리되어 명세 프로즈에서 Web IDL 바인딩을 거치지 않고 Observable에 직접 구독할 수 있습니다. w3c/IntersectionObserver#464를 참고하세요. "internal" prose는 JavaScript에서 속성이 변경될 수 있는 객체에 대해 Web IDL 바인딩을 거치지 않아야 합니다. 사용 예시는 § 2.3.3 Promise 반환 연산자에서 확인할 수 있습니다.

  1. this관련 글로벌 객체Window 객체이고, 그 연관된 Document완전히 활성화되어 있지 않으면 return합니다.

  2. internal observer를 새로운 내부 옵저버로 설정합니다.

  3. observer를 다음 방식으로 처리합니다:

    1. observerObservableSubscriptionCallback인 경우
      internal observernext 단계를 다음 단계로 설정합니다. any value를 받음:
      1. invoke observer를 «value»와 "report"로 실행합니다.

      observerSubscriptionObserver인 경우
      1. observernext존재하면, internal observernext 단계를 다음 단계로 설정합니다. any value를 받음:

        1. invoke observernext 를 «value»와 "report"로 실행합니다.

      2. observererror존재하면, internal observererror 단계를 다음 단계로 설정합니다. any error를 받음:

        1. invoke observererror 를 «error»와 "report"로 실행합니다.

      3. observercomplete존재하면, internal observercomplete 단계를 다음 단계로 설정합니다:

        1. invoke observercomplete 를 «»와 "report"로 실행합니다.

      observer내부 옵저버인 경우
      internal observerobserver로 설정합니다.
  4. 단언: internal observererror 단계기본 오류 알고리즘이거나, 제공된 error 콜백 함수를 호출하는 알고리즘임을 확인합니다.

  5. thisweak subscriber가 null이 아니고, thisweak subscriberactive가 true라면:

    1. subscriberthisweak subscriber로 설정합니다.

    2. internal observersubscriber내부 옵저버에 추가합니다.

    3. optionssignal존재하면:

      1. optionssignalaborted이면, internal observersubscriber내부 옵저버에서 제거합니다.

      2. 그 외에는 아래 abort 알고리즘optionssignal에 추가합니다:

        1. subscriberactive 가 false이면 중단합니다.

        2. internal observersubscriber내부 옵저버에서 제거합니다.

        3. subscriber내부 옵저버비어있으면, subscriber 종료optionssignalabort reason으로 실행합니다.

    4. return합니다.

  6. subscriber새로운 Subscriber로 생성합니다.

  7. internal observersubscriber내부 옵저버에 추가합니다.

  8. thisweak subscribersubscriber로 설정합니다.

  9. optionssignal존재하면:

    1. optionssignalaborted면, subscriber 종료optionssignalabort reason으로 실행합니다.

    2. 그 외에는 아래 abort 알고리즘optionssignal에 추가합니다:

      1. subscriberactive가 false이면 중단합니다.

      2. internal observersubscriber내부 옵저버에서 제거합니다.

      3. subscriber내부 옵저버비어있으면, subscriber 종료optionssignalabort reason으로 실행합니다.

  10. thissubscribe 콜백SubscribeCallback이면, invoke를 «subscriber»와 "rethrow"로 실행합니다.

    예외 E가 발생했다면, subscribererror() 메서드를 E로 실행합니다.

  11. 그 외에는 thissubscribe 콜백이 제공하는 단계를 subscriber로 실행합니다.

테스트

2.3. 연산자

현재는 https://github.com/wicg/observable#operators를 참고하세요.

2.3.1. from()

from(value) 메서드 단계:
  1. valueObservable로 변환한 결과를 반환합니다. 예외가 발생하면 다시 던집니다.

2.3.2. Observable 반환 연산자

takeUntil(value) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. notifiervalueObservable로 변환한 결과로 설정합니다.

  3. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    이 메서드는 두 개의 Observable구독하는 동작이 포함됩니다: (1) notifier, (2) sourceObservable. 다음 상황에서는 둘 다 "구독 해지"됩니다:
    1. notifier가 값을(“next” 또는 “error”) 내보내기 시작하면, notifier의 구독을 해지합니다. 더 이상 값이 필요하지 않으므로 계속 값을 생산할 필요가 없습니다. sourceObservable의 구독도 해지합니다. notifier가 값을 내보냈으므로 observable의 구독을 수동으로 종료하기 때문입니다.

    2. sourceObservableerror() 또는 complete() 를 실행하면, notifier에 더 이상 관심이 없으므로 구독을 해지합니다. sourceObservable의 구독 해지는 필요하지 않습니다(스스로 완료됨).

    1. notifierObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계

      subscribercomplete() 를 실행합니다.

      참고: 이것은 sourceObservable에 이미 구독되어 있다면 구독을 해지합니다. 왜냐하면 sourceObservable은 "외부" subscriber구독 컨트롤러signal로 구독되고, 위에서 subscribercomplete() 를 실행하면 signal이 abort되어 구독이 종료됩니다.

      error 단계

      subscribercomplete() 를 실행합니다.

      참고: notifier Observable 이 자체적으로 완료되는 경우에는 observablesubscriber를 완료할 필요가 없습니다. observablesourceObservable을 계속 미러링합니다.

    2. options를 새로운 SubscribeOptions 로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    3. notifiernotifierObserveroptions구독합니다.

    4. subscriberactive가 false면 return합니다.

      참고: notifier가 동기적으로 값을 내보내면 sourceObservable의 subscribe 콜백은 아예 실행되지 않습니다. 동기적으로 "complete"만 하면 subscriber의 active는 여전히 true이므로 sourceObservable에 구독이 진행됩니다.

    5. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계

      전달된 valuesubscribernext() 를 실행합니다.

      error 단계

      전달된 errorsubscribererror() 를 실행합니다.

      complete 단계

      subscribercomplete() 를 실행합니다.

      참고: sourceObserver는 대부분 패스스루 역할로 sourceObservable의 모든 이벤트를 미러링합니다. 단, sourceObservable이 먼저 종료되면 notifier의 구독을 해지합니다.

    6. sourceObservablesourceObserveroptions구독합니다.

  4. observable을 반환합니다.

테스트
map(mapper) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. idxunsigned long long으로 설정하며, 초기값은 0입니다.

    2. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계
      1. mapper를 «넘겨받은 value, idx»와 "rethrow"로 실행하고, 반환값을 mappedValue로 합니다.

        예외 E가 발생하면, subscribererror()E로 실행하고, 중단합니다.

      2. idx를 1 증가시킵니다.

      3. subscribernext()mappedValue로 실행합니다.

      error 단계

      subscribererror() 를 넘겨받은 error로 실행합니다.

      complete 단계

      subscribercomplete() 를 실행합니다.

    3. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    4. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

테스트
filter(predicate) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. idxunsigned long long으로 설정하며, 초기값은 0입니다.

    2. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계
      1. predicate를 «넘겨받은 value, idx»와 "rethrow"로 실행하고, 반환값을 matches로 합니다.

        예외 E가 발생하면, subscribererror()E로 실행하고, 중단합니다.

      2. idxidx + 1로 설정합니다.

      3. matches가 true이면, subscribernext()value로 실행합니다.

      error 단계

      subscribererror() 를 넘겨받은 error로 실행합니다.

      complete 단계

      subscribercomplete() 를 실행합니다.

    3. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    4. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

테스트
take(amount) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. remainingamount로 설정합니다.

    2. remaining이 0이면, subscribercomplete() 를 실행하고 중단합니다.

    3. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계
      1. 넘겨받은 valuesubscribernext() 를 실행합니다.

      2. remaining을 1 감소시킵니다.

      3. remaining이 0이면, subscribercomplete() 를 실행합니다.

      error 단계

      넘겨받은 errorsubscribererror() 를 실행합니다.

      complete 단계

      subscribercomplete() 를 실행합니다.

    4. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    5. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

테스트
drop(amount) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. remainingamount로 설정합니다.

    2. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계
      1. remaining이 > 0이면, remaining을 1 감소시키고 중단합니다.

      2. 단언: remaining이 0임을 확인합니다.

      3. 넘겨받은 valuesubscribernext() 를 실행합니다.

      error 단계

      넘겨받은 errorsubscribererror() 를 실행합니다.

      complete 단계

      subscribercomplete() 를 실행합니다.

    3. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    4. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

테스트
flatMap(mapper) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. idxunsigned long long으로 설정하며, 초기값은 0입니다.

    2. outerSubscriptionHasCompletedboolean으로 설정하며, 초기값은 false입니다.

    3. queueany 값의 리스트로 초기값은 비어있습니다.

      참고:queuesourceObservable에서 내보낸 Observable을 임시로 저장하는 데 사용됩니다. observable이 이전에 내보낸 Observable에 아직 구독 중일 경우, 새로 내보낸 Observable들은 queue에 저장됩니다.

    4. activeInnerSubscriptionboolean으로 초기값 false로 설정합니다.

    5. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계
      1. activeInnerSubscription이 true라면:

        1. queuevalue를 append합니다.

          참고:value는 현재 구독 중인 Observable이 끝난 후에 처리됩니다.

      2. 그 외의 경우:

        1. activeInnerSubscription을 true로 설정합니다.

        2. flatmap process next value stepsvalue, subscriber, mapper, 그리고 참조queue, activeInnerSubscription, outerSubscriptionHasCompleted, idx로 실행합니다.

          참고: flatmap process next value stepsvalue에서 파생된 Observable에 구독하고, 그 구독이 비활성(완료 또는 오류)될 때까지 값을 계속 처리합니다. "inner" Observable이 완료되면, 큐의 다음 값을 재귀적으로 처리합니다.

          큐에 값이 없다면, activeInnerSubscription을 false로 만들고, 이후 값이 올 때 적절하게 처리할 수 있게 합니다.

      error 단계

      넘겨받은 errorsubscribererror() 를 실행합니다.

      complete 단계
      1. outerSubscriptionHasCompleted을 true로 설정합니다.

        참고: activeInnerSubscription이 true라면 아래 단계에서 subscriber를 완료하지 않습니다. 이 경우, flatmap process next value steps가 큐가 비었을 때 subscriber를 완료합니다.

      2. activeInnerSubscription이 false이고 queue비었으면, subscribercomplete() 를 실행합니다.

    6. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    7. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

flatmap process next value stepsany value, Subscriber subscriber, Mapper mapper, 그리고 참조리스트 queue, activeInnerSubscription, outerSubscriptionHasCompleted, unsigned long long idx를 받아 실행하는 단계:
  1. mappedResultmapper를 «value, idx»와 "rethrow"로 실행한 결과로 설정합니다.

    예외 E가 발생하면 subscribererror()E로 실행하고, 중단합니다.

  2. idxidx + 1로 설정합니다.

  3. innerObservablefrom()mappedResult를 넣어 호출한 결과로 설정합니다.

    예외 E가 발생하면 subscribererror()E로 실행하고, 중단합니다.

    from()을 직접 호출하지 말고, 예외 처리를 우리가 직접 할 수 있는 내부 알고리즘을 사용해야 합니다. 예외를 subscriber에게 전달하고 싶기 때문입니다.

  4. innerObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계

    넘겨받은 valuesubscribernext() 를 실행합니다.

    error 단계

    넘겨받은 errorsubscribererror() 를 실행합니다.

    complete 단계
    1. queue가 비어있지 않으면:

      1. nextValuequeue의 첫 번째 아이템으로 설정하고, queue에서 삭제합니다.

      2. flatmap process next value stepsnextValue, subscriber, mapper, 그리고 참조queue, activeInnerSubscription로 실행합니다.

    2. 그 외의 경우:

      1. activeInnerSubscription을 false로 설정합니다.

        참고: activeInnerSubscription이 참조이므로, "outer" Observable (sourceObservable)에서 이후 내보내는 값들이 정상적으로 처리될 수 있습니다.

      2. outerSubscriptionHasCompleted이 true라면 subscribercomplete() 를 실행합니다.

        참고: "outer" Observable 이 이미 완료되었지만, 아직 대기 중인 "inner" Observable이 남아있었기 때문에 바로 subscriber를 완료하지 않았던 상황입니다. 지금 완료합니다.

  5. innerOptions를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

  6. innerObservableinnerObserverinnerOptions구독합니다.

switchMap(mapper) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. idxunsigned long long으로 설정하며, 초기값은 0입니다.

    2. outerSubscriptionHasCompletedboolean으로 초기값 false로 설정합니다.

    3. activeInnerAbortControllerAbortController 또는 null로 초기값 null로 설정합니다.

      참고:AbortController는 아래 next 단계에서 새로 생성되고, switchmap process next value steps에서 null로 변경됩니다. 현재 "inner" 구독이 활성 상태인지 여부를 나타냅니다. complete 단계에서 만약 "inner" 구독 중이면 subscriber를 즉시 완료하지 않고, "inner" 구독이 끝날 때까지 대기합니다.

    4. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계
      1. activeInnerAbortController가 null이 아니면, abort 시그널activeInnerAbortController에 보냅니다.

        참고: 이전 값에서 파생된 "inner" Observable의 구독을 해지하고, 새 값에서 파생된 Observable에 바로 구독합니다.

      2. activeInnerAbortController새로운 AbortController로 설정합니다.

      3. switchmap process next value stepsvalue, subscriber, mapper, 그리고 참조activeInnerAbortController, outerSubscriptionHasCompleted, idx로 실행합니다.

        참고: switchmap process next value stepsvalue에서 파생된 Observable에 구독하고, 구독이 비활성(완료 또는 오류)되거나 activeInnerAbortController가 abort되면 구독을 종료합니다. sourceObservable이 새 값을 내보내면 이전 "inner" 구독을 abort하고 새 값에서 파생된 "inner" 구독을 시작합니다.

      error 단계

      넘겨받은 errorsubscribererror() 를 실행합니다.

      complete 단계
      1. outerSubscriptionHasCompleted를 true로 설정합니다.

        참고: activeInnerAbortController가 null이 아니면 subscriber를 바로 완료하지 않고, "inner" 구독이 완료될 때까지 대기합니다.

      2. activeInnerAbortController가 null이면 subscribercomplete() 를 실행합니다.

    5. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    6. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

switchmap process next value stepsany value, Subscriber subscriber, Mapper mapper, 그리고 다음의 참조: AbortController activeInnerAbortController, boolean outerSubscriptionHasCompleted, unsigned long long idx에 대해 다음 단계를 실행합니다:
  1. mappedResultmapper를 «value, idx»와 "rethrow"로 실행한 결과로 설정합니다.

    예외 E가 발생하면 subscribererror()E로 실행하고, 중단합니다.

  2. idxidx + 1로 설정합니다.

  3. innerObservablefrom()mappedResult를 넣어 호출한 결과로 설정합니다.

    예외 E가 발생하면 subscribererror()E로 실행하고, 중단합니다.

  4. innerObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계

    넘겨받은 valuesubscribernext() 를 실행합니다.

    error 단계

    넘겨받은 errorsubscribererror() 를 실행합니다.

    참고: activeInnerAbortController를 여기서 null로 설정할 필요는 없습니다. subscribererror() 를 호출하면 이미 "outer" 소스 Observable의 구독이 해지되므로 더 이상 값을 푸시하지 않습니다.

    complete 단계
    1. outerSubscriptionHasCompleted이 true라면 subscribercomplete() 를 실행합니다.

    2. 그 외에는 activeInnerAbortController를 null로 설정합니다.

      참고: 이 변수는 참조이므로 switchMap complete 단계에 현재 활성 inner 구독이 없음을 알립니다.

  5. innerOptions를 새로운 SubscribeOptions로 설정하며, signal은 다음 리스트로 의존적 abort 시그널 생성 알고리즘을 사용합니다: «activeInnerAbortControllersignal, subscriber구독 컨트롤러signal», AbortSignal, 현재 realm.

  6. innerObservableinnerObserverinnerOptions구독합니다.

inspect(inspectorUnion) 메서드 단계:
  1. subscribe callbackVoidFunction 또는 null로, 초기값은 null로 설정합니다.

  2. next callbackObservableSubscriptionCallback 또는 null로, 초기값은 null로 설정합니다.

  3. error callbackObservableSubscriptionCallback 또는 null로, 초기값은 null로 설정합니다.

  4. complete callbackVoidFunction 또는 null로, 초기값은 null로 설정합니다.

  5. abort callbackObservableInspectorAbortHandler 또는 null로, 초기값은 null로 설정합니다.

  6. inspectorUnion을 다음 방식으로 처리합니다:

    inspectorUnionObservableSubscriptionCallback이면
    1. next callbackinspectorUnion으로 설정합니다.

    inspectorUnionObservableInspector이면
    1. subscribeinspectorUnion존재하면 subscribe callback에 할당합니다.

    2. nextinspectorUnion존재하면 next callback에 할당합니다.

    3. errorinspectorUnion존재하면 error callback에 할당합니다.

    4. completeinspectorUnion존재하면 complete callback에 할당합니다.

    5. abortinspectorUnion존재하면 abort callback에 할당합니다.

  7. sourceObservablethis로 설정합니다.

  8. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. subscribe callback이 null이 아니면 invoke를 «»와 "rethrow"로 실행합니다.

      예외 E가 발생하면 subscribererror()E로 실행하고, 중단합니다.

      참고: 이 결과로 sourceObservable은 구독되지 않습니다.

    2. abort callback이 null이 아니면 아래 abort 알고리즘subscriber구독 컨트롤러signal에 추가합니다:

      1. invoke abort callback을 «subscriber구독 컨트롤러signalabort reason»와 "report"로 실행합니다.

    3. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계
      1. next callback이 null이 아니면 invoke를 «넘겨받은 value»와 "rethrow"로 실행합니다.

        예외 E가 발생하면:

        1. abort callbacksubscriber구독 컨트롤러signal에서 제거합니다.

          참고: 이 단계는 중요합니다. abort callback소비자에 의해 시작된 구독 해지에만 호출되어야 합니다. 아래와 같이 프로듀서가 subscribererror() 또는 complete() 메서드를 통해 구독을 종료하는 경우에는 abort callback이 실행되지 않도록 해야 합니다.

          이는 Chromium의 구현과 동일하지만, 처음 전달된 SubscribeOptionssignal 참조를 들고 있다가 abort될 때만 abort callback을 실행해도 결과는 같으니, 확인 필요.

        2. subscribererror()E로 실행하고, 중단합니다.

      2. 넘겨받은 valuesubscribernext() 를 실행합니다.

      error 단계
      1. abort callbacksubscriber구독 컨트롤러signal에서 제거합니다.

      2. error callback이 null이 아니면 invoke를 «넘겨받은 error»와 "rethrow"로 실행합니다.

        예외 E가 발생하면, subscribererror()E로 실행하고, 중단합니다.

      3. 넘겨받은 errorsubscribererror() 를 실행합니다.

      complete 단계
      1. abort callbacksubscriber구독 컨트롤러signal에서 제거합니다.

      2. complete callback이 null이 아니면 invoke를 «»와 "rethrow"로 실행합니다.

        예외 E가 발생하면, subscribererror()E로 실행하고, 중단합니다.

      3. subscribercomplete() 를 실행합니다.

    4. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    5. sourceObservablesourceObserveroptions구독합니다.

  9. observable을 반환합니다.

테스트
catch(callback) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계

      넘겨받은 valuesubscribernext() 를 실행합니다.

      error 단계
      1. callback을 «넘겨받은 error»와 "rethrow"로 실행합니다. 반환값을 result로 합니다.

        예외 E가 발생하면, subscribererror()E로 실행하고, 중단합니다.

      2. innerObservablefrom()result를 넣어 호출한 결과로 설정합니다.

        예외 E가 발생하면, subscribererror()E로 실행하고, 중단합니다.

        from()을 직접 호출하지 말고, 예외 처리를 직접 할 수 있는 내부 알고리즘을 사용해야 합니다. 예외를 subscriber에게 전달하고 싶기 때문입니다.

      3. innerObserver를 새로운 내부 옵저버로 생성하고 다음과 같이 초기화합니다:

        next 단계

        넘겨받은 valuesubscribernext() 를 실행합니다.

        error 단계

        넘겨받은 errorsubscribererror() 를 실행합니다.

        complete 단계

        subscribercomplete() 를 실행합니다.

      4. innerOptions를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

      5. innerObservableinnerObserverinnerOptions구독합니다.

        참고: innerObservable에 안전하게 구독할 수 있습니다. sourceObservable에서 더 이상 값이 발생하지 않으므로, innerObservable로 안전하게 값을 전환할 수 있습니다.

      complete 단계

      subscribercomplete() 를 실행합니다.

    2. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    3. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

finally(callback) 메서드 단계:
  1. sourceObservablethis로 설정합니다.

  2. observable새로운 Observable로 설정합니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 실행합니다:

    1. subscriberaddTeardown()callback을 전달하여 실행합니다.

    2. sourceObserver를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

      next 단계

      넘겨받은 valuesubscribernext() 를 실행합니다.

      error 단계
      1. 넘겨받은 errorsubscribererror() 를 실행합니다.

      complete 단계
      1. subscribercomplete() 를 실행합니다.

    3. options를 새로운 SubscribeOptions로 설정하고, signalsubscriber구독 컨트롤러signal로 설정합니다.

    4. sourceObservablesourceObserveroptions구독합니다.

  3. observable을 반환합니다.

2.3.3. Promise 반환 연산자

toArray(options) 메서드 단계:
  1. p새로운 promise로 설정합니다.

  2. optionssignal 이 null이 아니면:

    1. optionssignalaborted면:

      1. poptionssignalabort reason으로 거부합니다.

      2. p를 반환합니다.

    2. 아래 abort 알고리즘optionssignal에 추가합니다:

      1. poptionssignalabort reason으로 거부합니다.

      참고: 여기서는 p만 거부하면 됩니다. Observable의 구독도 자동으로 종료됩니다. "inner" Subscriber가 close되기 때문입니다.

  3. values를 새로운 리스트로 설정합니다.

  4. observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계

    넘겨받은 valuevaluesappend합니다.

    error 단계

    p를 넘겨받은 errorreject합니다.

    complete 단계

    pvaluesresolve합니다.

  5. this에 대해 observeroptions로 구독합니다.

  6. p를 반환합니다.

테스트
forEach(callback, options) 메서드 단계:
  1. p새로운 promise로 설정합니다.

  2. visitor callback controller새로운 AbortController로 설정합니다.

  3. internal options를 새로운 SubscribeOptions로 설정합니다. signal의존적 abort signal 생성 알고리즘으로 «visitor callback controllersignal, optionssignal (null이 아니면)»을 사용합니다. AbortSignal, 현재 realm도 사용합니다.

    많은 단순 내부 옵저버는 패스스루 역할만 하며, Observable의 구독을 직접 제어하지 않습니다. 즉, error 단계complete 단계는 구독 종료 시 호출되고, next 단계는 값을 그대로 전달합니다.

    하지만 이 연산자에 대해서는 아래 observernext steps가 예외가 발생할 경우 실제로 this에 대한 기반 구독을 중단하는 역할을 합니다. 이런 경우, 우리가 "Observable에 구독" 단계에 전달하는 SubscribeOptionssignaloptionssignal에서 파생된 의존적 signal이어야 하며, 그리고 아래 next steps에서 접근할 수 있고 필요할 때 signal abort를 할 수 있는 AbortControllerAbortSignal이어야 합니다.

  4. internal optionssignalaborted면:

    1. pinternal optionssignalabort reason으로 거부합니다.

    2. p를 반환합니다.

  5. 아래 abort 알고리즘internal optionssignal에 추가합니다:

    1. pinternal optionssignalabort reason으로 거부합니다.

      참고: p의 거부와 internal optionssignal가 연결되어 있으므로, optionssignalabort 이벤트가 발생해도, p의 reject 핸들러는 그 후에 실행됩니다.

  6. idxunsigned long long으로 초기값 0으로 설정합니다.

  7. observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계
    1. callback을 «넘겨받은 value, idx»와 "rethrow"로 실행합니다.

      예외 E가 발생하면 pE로 거부하고, visitor callback controllerE로 abort 신호를 보냅니다.

    2. idx를 1 증가시킵니다.

    error 단계

    넘겨받은 errorp를 거부합니다.

    complete 단계

    pundefined로 resolve합니다.

  8. this에 대해 observerinternal options로 구독합니다.

  9. p를 반환합니다.

테스트
every(predicate, options) 메서드 단계:
  1. p새로운 promise로 설정합니다.

  2. controller새로운 AbortController로 설정합니다.

  3. internal options를 새로운 SubscribeOptions로 설정합니다. signal의존적 abort signal 생성 알고리즘으로 «controllersignal, optionssignal (null이 아니면)»을 사용합니다. AbortSignal, 현재 realm도 사용합니다.

  4. internal optionssignalaborted면:

    1. pinternal optionssignalabort reason으로 거부합니다.

    2. p를 반환합니다.

  5. 아래 abort 알고리즘internal optionssignal에 추가합니다:

    1. pinternal optionssignalabort reason으로 거부합니다.

  6. idxunsigned long long으로 초기값 0으로 설정합니다.

  7. observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계
    1. predicate를 «넘겨받은 value, idx»와 "rethrow"로 실행합니다. 반환값을 passed로 설정합니다.

      예외 E가 발생하면 pE로 거부하고, controllerE로 abort 신호를 보냅니다.

    2. idxidx + 1로 설정합니다.

    3. passed가 false이면 p를 false로 resolve하고, controller에 abort 신호를 보냅니다.

    error 단계

    넘겨받은 errorp를 거부합니다.

    complete 단계

    p를 true로 resolve합니다.

  8. this에 대해 observerinternal options로 구독합니다.

  9. p를 반환합니다.

first(options) 메서드 단계:
  1. p새로운 promise로 설정합니다.

  2. controller새로운 AbortController로 설정합니다.

  3. internal options를 새로운 SubscribeOptions로 설정합니다. signal의존적 abort signal 생성 알고리즘으로 «controllersignal, optionssignal (null이 아니면)»을 사용합니다. AbortSignal현재 realm도 사용합니다.

  4. internal optionssignalaborted면:

    1. pinternal optionssignalabort reason으로 거부합니다.

    2. p를 반환합니다.

  5. 아래 abort 알고리즘internal optionssignal에 추가합니다:

    1. pinternal optionssignalabort reason으로 거부합니다.

  6. internal observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계
    1. p를 넘겨받은 valueresolve합니다.

    2. controller에 abort 신호를 보냅니다.

    error 단계

    넘겨받은 errorp를 거부합니다.

    complete 단계

    p를 새로운 RangeError로 거부합니다.

    참고: 소스 Observable첫 번째 값을 내보내기 전에 완료될 때만 도달합니다.

  7. this에 대해 internal observerinternal options로 구독합니다.

  8. p를 반환합니다.

last(options) 메서드 단계:
  1. p새로운 promise로 설정합니다.

  2. optionssignal 이 null이 아니면:

    1. optionssignalaborted면:

      1. poptionssignalabort reason으로 거부합니다.

      2. p를 반환합니다.

    2. 아래 abort 알고리즘optionssignal에 추가합니다:

      1. poptionssignalabort reason으로 거부합니다.

  3. lastValueany 또는 null로, 초기값은 null로 설정합니다.

  4. hasLastValueboolean으로 초기값 false로 설정합니다.

  5. observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계
    1. hasLastValue를 true로 설정합니다.

    2. lastValue를 넘겨받은 value로 설정합니다.

    error 단계

    넘겨받은 errorp를 거부합니다.

    complete 단계
    1. hasLastValue가 true이면, plastValue로 resolve합니다.

      1. 그 외에는 p를 새로운 RangeError로 거부합니다.

        참고: first()의 참고를 보세요.

  6. this에 대해 observeroptions로 구독합니다.

  7. p를 반환합니다.

find(predicate, options) 메서드 단계:
  1. p새로운 promise로 설정합니다.

  2. controller새로운 AbortController로 설정합니다.

  3. internal options를 새로운 SubscribeOptions로 설정합니다. signal의존적 abort signal 생성 알고리즘으로 «controllersignal, optionssignal (null이 아니면)»을 사용합니다. AbortSignal현재 realm도 사용합니다.

  4. internal optionssignalaborted면:

    1. pinternal optionssignalabort reason으로 거부합니다.

    2. p를 반환합니다.

  5. 아래 abort 알고리즘internal optionssignal에 추가합니다:

    1. pinternal optionssignalabort reason으로 거부합니다.

  6. idxunsigned long long으로 초기값 0으로 설정합니다.

  7. observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계
    1. predicate를 «넘겨받은 value, idx»와 "rethrow"로 실행합니다. 반환값을 passed로 설정합니다.

      예외 E가 발생하면 pE로 거부하고, controllerE로 abort 신호를 보냅니다.

    2. idxidx + 1로 설정합니다.

    3. passed가 true이면, pvalue로 resolve하고, controller에 abort 신호를 보냅니다.

    error 단계

    넘겨받은 errorp를 거부합니다.

    complete 단계

    pundefined로 resolve합니다.

  8. this에 대해 observerinternal options로 구독합니다.

  9. p를 반환합니다.

some(predicate, options) 메서드 단계:
  1. p새로운 프라미스로 설정합니다.

  2. controller새로운 AbortController로 설정합니다.

  3. internal options를 새로운 SubscribeOptions로 설정합니다. signal의존적 abort signal 생성 알고리즘으로 «controllersignal, optionssignal (null이 아니면)»을 사용합니다. AbortSignal현재 realm도 사용합니다.

  4. internal optionssignalaborted라면:

    1. pinternal optionssignalabort reason으로 거부합니다.

    2. p를 반환합니다.

  5. 아래 abort 알고리즘internal optionssignal에 추가합니다:

    1. pinternal optionssignalabort reason으로 거부합니다.

  6. idxunsigned long long으로, 초기값 0으로 설정합니다.

  7. observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계
    1. predicate를 «넘겨받은 value, idx»와 "rethrow"로 실행합니다. 반환값을 passed로 설정합니다.

      예외 E가 발생하면 pE로 거부하고, controllerE로 abort 신호를 보냅니다.

    2. idxidx + 1로 설정합니다.

    3. passed가 true이면, p를 true로 resolve하고, controller에 abort 신호를 보냅니다.

    error 단계

    넘겨받은 errorp를 거부합니다.

    complete 단계

    p를 false로 resolve합니다.

  8. this에 대해 observerinternal options로 구독합니다.

  9. p를 반환합니다.

reduce(reducer, initialValue, options) 메서드 단계:
  1. p새로운 프라미스로 설정합니다.

  2. controller새로운 AbortController로 설정합니다.

  3. internal options를 새로운 SubscribeOptions로 설정합니다. signal의존적 abort signal 생성 알고리즘으로 «controllersignal, optionssignal (null이 아니면)»을 사용합니다. AbortSignal현재 realm도 사용합니다.

  4. internal optionssignalaborted라면:

    1. pinternal optionssignalabort reason으로 거부합니다.

    2. p를 반환합니다.

  5. 아래 abort 알고리즘internal optionssignal에 추가합니다:

    1. pinternal optionssignalabort reason으로 거부합니다.

  6. idxunsigned long long으로, 초기값 0으로 설정합니다.

  7. accumulatorinitialValue가 주어졌다면 그 값으로, 아니면 초기화되지 않은 값으로 설정합니다.

  8. observer를 새로운 내부 옵저버로 생성하여 다음과 같이 초기화합니다:

    next 단계
    1. accumulator가 초기화되지 않았다면(즉, initialValue가 전달되지 않은 경우), accumulator를 넘겨받은 value로 설정하고 idxidx+1로 설정한 후 이 단계를 중단합니다.

      참고: 이 경우 reducerthis가 처음 내보내는 currentValue 값을 받지 않습니다. 두 번째 값이 나오면 reducer가 호출되어, 첫 번째 값은 accumulator로, 두 번째 값은 currentValue로 전달됩니다.

    2. reducer를 «accumulator (accumulator), 넘겨받은 value (currentValue), idx (index)»와 "rethrow"로 실행합니다. 반환값을 result로 설정합니다.

      예외 E가 발생하면 pE로 거부하고, controllerE로 abort 신호를 보냅니다.

    3. idxidx + 1로 설정합니다.

    4. accumulatorresult로 설정합니다.

    error 단계

    넘겨받은 errorp를 거부합니다.

    complete 단계
    1. accumulator가 "unset"이 아니면 paccumulator로 resolve합니다.

      그 외에는 pTypeError로 거부합니다.

  9. this에 대해 observerinternal options로 구독합니다.

  10. p를 반환합니다.

3. EventTarget 통합

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};
when(type, options) 메서드 단계:
  1. this관련 글로벌 객체Window 객체이고, 그 연관된 Document완전히 활성화되어 있지 않으면 return합니다.

  2. event targetthis로 설정합니다.

  3. observable새로운 Observable로 생성합니다. 다음과 같이 초기화합니다:

    subscribe 콜백

    Subscriber subscriber를 받아 다음을 실행하는 알고리즘:

    1. event target이 null이면 중단합니다.

      참고: event target이 구독 시점에 가비지 컬렉션될 수 있음을 나타냅니다.

    2. subscriber구독 컨트롤러signalaborted면 중단합니다.

    3. 이벤트 리스너 추가 알고리즘을 event target과 다음 이벤트 리스너로 실행합니다:

      type

      type

      callback

      Web IDL EventListener 인스턴스를 새로 생성하여, 인자로 Event event 하나를 받는 함수 참조로 만듭니다. 이 함수는 observable 이벤트 리스너 실행 알고리즘subscriber, event로 실행합니다.

      capture

      optionscapture

      passive

      optionspassive 멤버가 존재하면 그 값, 아니면 null

      once

      false

      signal

      subscriber구독 컨트롤러signal

      참고: 이벤트 리스너구독 컨트롤러signalaborted될 때 정리됨을 보장합니다. 엔진의 소유권 모델과 무관하게 동작합니다.

  4. observable을 반환합니다.

observable 이벤트 리스너 실행 알고리즘Subscriber subscriberEvent event를 받아 다음 단계를 실행합니다:
  1. subscribernext() 메서드를 event로 실행합니다.

테스트

4. 보안 및 프라이버시 고려사항

이 내용은 우리 explainer에서 이 명세로 업스트림되고 있으며, 그동안 다음 자료를 참고할 수 있습니다:

5. 감사의 글

Observable API 설계에 많은 아이디어를 제공해준 Ben Lesh께 특별히 감사드립니다. 웹 플랫폼에 이 기여가 가능하게 된 것은 Ben Lesh의 오랜 기간 사용자 공간 Observable 코드 유지와 설계 경험 덕분입니다.

색인

이 명세에서 정의된 용어

참조로 정의된 용어

참고 문헌

규범적 참고문헌

[DOM]
Anne van Kesteren. DOM Standard. 현행 표준. URL: https://dom.spec.whatwg.org/
[ECMASCRIPT]
ECMAScript Language Specification. URL: https://tc39.es/ecma262/multipage/
[HTML]
Anne van Kesteren; et al. HTML Standard. 현행 표준. URL: https://html.spec.whatwg.org/multipage/
[INFRA]
Anne van Kesteren; Domenic Denicola. Infra Standard. 현행 표준. URL: https://infra.spec.whatwg.org/
[WEBIDL]
Edgar Chen; Timothy Gu. Web IDL Standard. 현행 표준. URL: https://webidl.spec.whatwg.org/

IDL 색인

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // True after the Subscriber is created, up until either
  // complete()/error() are invoked, or the subscriber unsubscribes. Inside
  // complete()/error(), this attribute is true.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

// SubscribeCallback is where the Observable "creator's" code lives. It's
// called when subscribe() is called, to set up a new subscription.
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Differs from Mapper only in return type, since this callback is exclusively
// used to visit each element in a sequence, not transform it.
callback Visitor = undefined (any value, unsigned long long index);

// This callback returns an `any` that must convert into an `Observable`, via
// the `Observable` conversion semantics.
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // Constructs a native Observable from value if it's any of the following:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observable-returning operators. See "Operators" section in the spec.
  //
  // takeUntil() can consume promises, iterables, async iterables, and other
  // observables.
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promise-returning operators.
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};

이슈 색인

from()을 직접 호출해서는 안 됩니다. 예외를 적절히 처리할 수 있도록 내부 알고리즘을 호출해야 하며, 우리는 이 예외를 subscriber에게 전달해야 합니다.
이것은 Chromium의 구현과 일치하지만, 처음 전달된 SubscribeOptionssignal 참조를 들고 있다가, 그 signal이 abort될 때만 abort callback을 호출하는 방법도 고려해볼 수 있습니다. 결과는 아마 같을 것이지만, 추가 조사가 필요합니다.
from()을 직접 호출해서는 안 됩니다. 예외를 적절히 처리할 수 있도록 내부 알고리즘을 호출해야 하며, 우리는 이 예외를 subscriber에게 전달해야 합니다.