옵저버블

커뮤니티 그룹 초안 보고서,

이번 버전:
https://wicg.github.io/observable/
편집자:
Dominic Farolino (Google)
참여하기:
GitHub WICG/observable (새 이슈, 열려 있는 이슈)
커밋:
GitHub spec.bs 커밋
테스트 스위트:
https://wpt.fyi/results/dom/observable/

요약

Observable API는 비동기 이벤트 스트림을 다루는 조합 가능하고 사용하기 쉬운 방법을 제공합니다.

이 문서의 상태

이 명세는 웹 플랫폼 인큐베이터 커뮤니티 그룹에서 발행하였습니다. W3C 표준이 아니며, W3C 표준 트랙에 있지 않습니다. W3C 커뮤니티 기여자 라이선스 계약 (CLA)에 따라 제한적 옵트아웃과 기타 조건이 적용됩니다. W3C 커뮤니티 및 비즈니스 그룹에 대해 더 알아보세요.

1. 소개

이 섹션은 규범적이지 않습니다.

2. 핵심 인프라

2.1. Subscriber 인터페이스

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // Subscriber가 생성된 이후부터,
  // complete()/error()가 호출되거나, subscriber가 구독 해제되기 전까지 true 입니다.
  // complete()/error() 내부에서는 이 속성이 true입니다.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

Subscriberordered set내부 옵저버를 가지며, 처음에는 비어 있습니다.

Subscriberteardown 콜백을 가지며, 이것은 listVoidFunction 들로, 처음에는 비어 있습니다.

Subscribersubscription controller를 가지며, AbortController 입니다.

Subscriberactive 불리언을 가지며, 처음에는 true입니다.

참고: 이 변수는 Subscriberclose된 이후에는 자신이 소유한 콜백을 절대로 호출하지 않도록 관리하기 위한 것입니다.

active getter 단계는 thisactive 불리언을 반환합니다.

signal getter 단계는 thissubscription controllersignal을 반환합니다.

next(value) 메서드 단계:
  1. thisactive가 false라면, 반환합니다.

  2. thisrelevant global objectWindow 객체이고, 그 associated Documentfully active가 아니라면, 반환합니다.

  3. internal observers copythis내부 옵저버의 복사본으로 둡니다.

    참고: 내부 옵저버 리스트를 복사해 순회하는 이유는, 아래 내부 옵저버next 단계가 추가 구독을 유발할 경우, 순회 중 리스트가 변형되지 않도록 하기 위함입니다.

  4. internal observers copy의 각 observer에 대해:

    1. observernext 단계value로 실행합니다.

      단언: 예외가 발생하지 않아야 합니다.

      참고: 예외가 발생하지 않는 이유는, 내부 옵저버의 next 단계가 스크립트 제공 콜백의 래퍼인 경우 process observer 단계가 콜백 호출 시 예외를 잡아 글로벌에 보고하기 때문입니다.

      next 단계가 명세 알고리즘인 경우, 해당 단계 내부에서만 예외를 처리하여 이 단언을 만족합니다.

error(error) 메서드 단계:
  1. thisactive가 false이면, 예외를 보고하고 errorthisrelevant global object를 함께 반환합니다.

  2. thisrelevant global objectWindow 객체이고, 그 associated Documentfully active가 아니라면, 반환합니다.

  3. close this.

  4. internal observers copythis내부 옵저버의 복사본으로 둡니다.

  5. internal observers copy의 각 observer에 대해:

    1. observererror 단계error로 실행합니다.

      단언: 예외가 발생하지 않아야 합니다.

      참고: 자세한 설명은 next() 문서를 참고하세요.

complete() 메서드 단계:
  1. thisactive가 false라면, 반환합니다.

  2. thisrelevant global objectWindow 객체이고, 그 associated Documentfully active가 아니라면, 반환합니다.

  3. close this.

  4. internal observers copythis내부 옵저버의 복사본으로 둡니다.

  5. internal observers copy의 각 observer에 대해:

    1. observercomplete 단계를 실행합니다.

      단언: 예외가 발생하지 않아야 합니다.

      참고: 자세한 설명은 next() 문서를 참고하세요.

addTeardown(teardown) 메서드 단계:
  1. thisrelevant global objectWindow 객체이고, 그 associated Documentfully active가 아니라면, 반환합니다.

  2. thisactive가 true라면, append teardownthisteardown 콜백 리스트에 추가합니다.

  3. 그렇지 않으면, invoke teardown을 «»와 "report"로 호출합니다.

구독을 종료하기Subscriber subscriber와 선택적 any reason을 받아 다음 단계를 실행합니다:
  1. subscriberactive가 false라면, 반환합니다.

    이 단계는 재진입 호출을 방지합니다. "producer-initiated" 구독 해제의 경우 예시:

    const outerController = new AbortController();
    const observable = new Observable(subscriber => {
      subscriber.addTeardown(() => {
        // 2.) 이 teardown은 "Close" 알고리즘이 실행 중일 때 내부에서 실행됩니다.
        //     다운스트림 시그널을 abort하면 abort 알고리즘이 실행되는데, 그 중 하나가 현재 실행 중인 "Close" 알고리즘입니다.
        outerController.abort();
      });
    
      // 1.) 이것은 즉시 "Close" 알고리즘을 실행하며,
      //     subscriber.active를 false로 설정합니다.
      subscriber.complete();
    });
    
    observable.subscribe({}, {signal: outerController.signal});
    
  2. subscriberactive 불리언을 false로 설정합니다.

  3. Signal abort subscribersubscription controllerreason이 있다면 함께 전달합니다.

  4. subscriberteardown 콜백들을 역순으로 순회합니다:

    1. subscriberrelevant global objectWindow 객체이고, 그 associated Documentfully active가 아니라면, 이 단계를 중단합니다.

      참고:teardown이 위의 Document 를 비활성화시킬 수 있으므로, 이 단계는 반복적으로 실행될 수 있습니다.

    2. invoke teardown을 «»와 "report"로 호출합니다.

2.2. Observable 인터페이스

// SubscribeCallback은 Observable "생성자"의 코드가 위치하는 곳입니다.
// subscribe()가 호출될 때 실행되어 새로운 구독을 설정합니다.
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Mapper와 반환 타입만 다르며, 이 콜백은 시퀀스의 각 요소를 방문하는 용도로만 사용됩니다.
callback Visitor = undefined (any value, unsigned long long index);

// 이 콜백은 `any`를 반환하며, 반드시 `Observable`로 변환되어야 합니다. 변환 규칙은 Observable 변환 방식에 따릅니다.
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // value가 다음 중 하나일 때 네이티브 Observable을 만듭니다:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observable을 반환하는 연산자들. 명세의 "연산자" 섹션 참고.
  //
  // takeUntil()은 promise, iterable, async iterable, 다른 observable을 소비할 수 있습니다.
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promise를 반환하는 연산자들.
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

Observablesubscribe callback을 가지며, SubscribeCallback 또는 Subscriber를 받는 단계 집합입니다.

Observableweak subscriber를 가지며, Subscriber에 대한 약한 참조 혹은 null입니다. 초기값은 null입니다.

참고: 이 타입들의 "union"은 Observable이 자바스크립트에서 생성될 때(SubscribeCallback으로 항상 생성), 그리고 네이티브로 생성된 Observable 객체(그 subscribe callback이 자바스크립트 콜백이 아닌 임의의 네이티브 단계일 수 있음)를 모두 지원하기 위함입니다. when() 의 반환값이 후자의 예입니다.

new Observable(callback) 생성자 단계:
  1. thissubscribe callbackcallback으로 설정합니다.

    참고: 이 콜백은 이후 subscribe() 가 호출될 때 실행됩니다.

subscribe(observer, options) 메서드 단계:
  1. Observable에 구독this에 대해, observeroptions를 이용하여 수행합니다.

2.2.1. 지원 개념

기본 에러 알고리즘any error를 받아 다음 단계를 실행하는 알고리즘입니다:
  1. 예외를 보고합니다. error현재 realmglobal object와 함께 보고합니다.

참고: 이 기본 알고리즘을 분리한 이유는, 명세의 모든 장소에서 네이티브로 Observable에 구독을 할 때 (subscribe() 메서드를 통하지 않고 명세 단계에서 직접 구독할 때), 중복적으로 해당 단계를 정의하지 않기 위함입니다.

내부 옵저버란 다음 구조체(struct)로, 다음 항목(item)을 가집니다:

next 단계

any 타입의 파라미터 하나를 받는 알고리즘입니다. 초기에는 아무 작업도 하지 않습니다.

error 단계

any 타입의 파라미터 하나를 받는 알고리즘입니다. 초기값은 기본 에러 알고리즘입니다.

complete 단계

파라미터가 없는 알고리즘입니다. 초기에는 아무 작업도 하지 않습니다.

내부 옵저버 구조체next, error, 그리고 complete 콜백 함수를 미러링합니다. 자바스크립트가 Observablesubscribe() 메서드로 구독할 경우, 이 알고리즘 "단계"는 스크립트가 제공한 콜백 함수 호출의 래퍼에 불과합니다.

하지만 명세 내부에서(사용자 스크립트가 아니라) Observable에 구독할 경우, 이 "단계"는 ObserverUnion 에 포장된 Web IDL 콜백 함수가 아닌 임의의 명세 알고리즘이 됩니다. 예시는 § 2.3.3 Promise 반환 연산자를 참고하세요.

Observable로 변환 알고리즘은 any value를 받아 다음 단계를 실행합니다:

참고: 이 알고리즘은 Web IDL의 from() 메서드에서 분리하였습니다. 명세 단계에서 Web IDL 바인딩을 거치지 않고 값을 Observable로 변환할 수 있게 하기 위함입니다.

  1. Type(value)가 Object가 아니라면, TypeErrorTypeError로 throw합니다.

    참고: 원시 타입이 iterable로 강제로 변환되는 것을 방지합니다(예: String). 자세한 토론은 WICG/observable#125 참고.

  2. Observable에서 변환: valuespecific typeObservable 이라면, value를 반환합니다.

  3. async iterable에서 변환: asyncIteratorMethod? GetMethod(value, %Symbol.asyncIterator%)로 설정합니다.

    참고: async iterator 프로토콜 지원 여부만 확인하기 위해 GetMethod를 사용합니다. 구현되지 않은 경우 예외를 던지지 않도록 하기 위함입니다. GetIterator는 (a) 프로토콜이 구현되지 않았거나, (b) 프로토콜이 구현되어도 호출 불가 또는 getter에서 예외 발생 시 모두 예외를 던집니다. GetMethod는 후자의 경우에만 throw합니다.

  4. asyncIteratorMethod가 undefined 또는 null이면, iterable에서 변환 단계로 이동합니다.

  5. nextAlgorithm을 다음 단계로 둡니다. Subscriber subscriberIterator Record iteratorRecord를 받음:

    1. subscribersubscription controllersignalaborted라면 반환합니다.

    2. nextPromisePromise 또는 undefined로 둡니다. 초기값은 undefined입니다.

    3. nextCompletionIteratorNext(iteratorRecord)로 둡니다.

      참고: async iterator의 경우 next()가 Promise/thenable을 반환하므로, 값을 얻기 위해 Promise로 감싸고 반응합니다.

    4. nextCompletionthrow completion이면:

      1. 단언: iteratorRecord의 [[Done]]이 true입니다.

      2. nextPromisenextRecord의 [[Value]]로 reject된 promise로 둡니다.

    5. 그 밖에 nextRecordnormal completion이면, nextPromisenextRecord의 [[Value]]로 resolve된 promise로 설정합니다.

      참고: nextRecord의 [[Value]]가 이미 Promise가 아닐 경우를 대비합니다.

    6. nextPromise에 반응합니다:

      • nextPromise가 iteratorResult로 fulfilled되면:

        1. Type(iteratorResult) 가 Object가 아니면, subscribererror() 메서드를 TypeError와 함께 실행하고, 단계를 중단합니다.

        2. doneIteratorComplete(iteratorResult)로 둡니다.

        3. donethrow completion이면, subscribererror() 메서드를 done의 [[Value]]로 실행하고 단계를 중단합니다.

        4. done의 [[Value]]가 true이면, subscribercomplete() 를 실행하고 단계를 중단합니다.

        5. valueIteratorValue(iteratorResult)로 둡니다.

        6. valuethrow completion이면, subscribererror() 메서드를 value의 [[Value]]로 실행하고 단계를 중단합니다.

        7. subscribernext()value의 [[Value]]로 실행합니다.

        8. nextAlgorithmsubscriberiteratorRecord로 실행합니다.

      • nextPromise가 r로 reject되면, subscribererror() 메서드를 r로 실행합니다.

  6. new Observable 을 반환합니다. subscribe callback은 Subscriber subscriber를 받아서 다음을 수행:

    1. subscribersubscription controllersignalaborted라면 반환합니다.

    2. iteratorRecordCompletionGetIterator(value, async)로 둡니다.

      참고: value%Symbol.asyncIterator% 메서드 getter를 재호출하는 효과가 있습니다. extreme corner case지만 테스트 기대치와 일치합니다. 토론은 issue#127 참고.

    3. iteratorRecordCompletionthrow completion이면, subscribererror() 메서드를 iteratorRecordCompletion의 [[Value]]로 실행하고 단계를 중단합니다.

      참고: 이 경우 async iterable이 Observable로 변환되는 과정에서만 구독과 동시에 동기적으로 error() 메서드가 실행됩니다. 그 밖의 경우에는 promise reject 래퍼에 의해 비동기적으로(마이크로태스크 타이밍) observer로 전파됩니다. 이 동기 에러 전파는 for-await-of 루프의 동작과 일치합니다.

    4. iteratorRecord! iteratorRecordCompletion으로 둡니다.

    5. 단언: iteratorRecordIterator Record임을 확인합니다.

    6. subscribersubscription controllersignalaborted라면 반환합니다.

    7. 아래 abort 알고리즘을 추가합니다. subscribersubscription controllersignal에 추가:

      1. AsyncIteratorClose(iteratorRecord, NormalCompletion(subscribersubscription controllerabort reason))을 실행합니다.

    8. nextAlgorithmsubscriberiteratorRecord로 실행합니다.

  7. iterable에서 변환: iteratorMethod? GetMethod(value, %Symbol.iterator%)로 둡니다.

  8. iteratorMethod가 undefined이면 Promise에서 변환 단계로 이동합니다.

    그 밖의 경우, new Observable 을 반환합니다. subscribe callback은 Subscriber subscriber를 받아 다음을 수행:

    1. subscribersubscription controllersignalaborted라면 반환합니다.

    2. iteratorRecordCompletionGetIterator(value, sync)로 둡니다.

    3. iteratorRecordCompletionthrow completion이면, subscribererror() 메서드를 iteratorRecordCompletion의 [[Value]]로 실행하고 단계를 중단합니다.

    4. iteratorRecord! iteratorRecordCompletion으로 둡니다.

    5. subscribersubscription controllersignalaborted라면 반환합니다.

    6. 아래 abort 알고리즘을 추가합니다. subscribersubscription controllersignal에 추가:

      1. IteratorClose(iteratorRecord, NormalCompletion(UNUSED))를 실행합니다.

    7. 반복(while) true:

      1. nextIteratorStepValue(iteratorRecord)로 둡니다.

      2. nextthrow completion이면, subscribererror() 메서드를 next의 [[Value]]로 실행하고 break합니다.

      3. next!로 변환합니다.

      4. next가 done이면:

        1. 단언: iteratorRecord의 [[Done]]이 true입니다.

        2. subscribercomplete() 를 실행합니다.

        3. return

      5. subscribernext()next로 실행합니다.

      6. subscribersubscription controllersignalaborted라면 break합니다.

  9. Promise에서 변환: IsPromise(value)가 true이면:

    1. new Observable 을 반환합니다. subscribe callback은 Subscriber subscriber를 받아서 다음을 수행:

      1. value에 반응합니다:

        1. value가 v로 fulfilled되면:

          1. subscribernext() 메서드를 v로 실행합니다.

          2. subscribercomplete() 메서드를 실행합니다.

        2. value가 r로 reject되면, subscribererror() 메서드를 r로 실행합니다.

  10. TypeError를 throw합니다. TypeError로 throw합니다.

테스트
Observable에 구독하기(Observable)ObserverUnion-또는-내부 옵저버 observer, 그리고 SubscribeOptions options가 주어졌을 때, 다음 단계를 실행합니다:

참고: 이 알고리즘은 Web IDL subscribe() 메서드와 분리되어 있습니다. 명세 텍스트가 Observable에 구독할 수 있도록 하기 위함이며, JavaScript에서 속성이 변경될 수 있는 객체에 대해 "내부" 텍스트는 Web IDL 바인딩을 거치지 않아야 합니다. 유사한 맥락은 w3c/IntersectionObserver#464에서 볼 수 있습니다. § 2.3.3 Promise 반환 연산자에서 사용 예시를 확인할 수 있습니다.

  1. this관련 전역 객체Window 객체이고, 해당 연결된 문서완전히 활성이 아니라면, 반환합니다.

  2. internal observer를 새로운 내부 옵저버로 둡니다.

  3. observer를 다음과 같이 처리합니다:

    1. 만약 observerObservableSubscriptionCallback이라면
      internal observernext 단계를 다음 단계로 설정합니다. any value를 받습니다:
      1. observer를 «value» 및 "report"로 호출합니다.

      만약 observerSubscriptionObserver라면
      1. observernext존재하면, internal observernext 단계를 다음 단계로 설정합니다. any value를 받습니다:

        1. observernext 을 «value» 및 "report"로 호출합니다.

      2. observererror존재하면, internal observererror 단계를 다음 단계로 설정합니다. any error를 받습니다:

        1. observererror 을 «error» 및 "report"로 호출합니다.

      3. observercomplete존재하면, internal observercomplete 단계를 다음 단계로 설정합니다:

        1. observercomplete 을 «» 및 "report"로 호출합니다.

      만약 observer내부 옵저버라면
      internal observerobserver로 설정합니다.
  4. 단언: internal observererror 단계기본 error 알고리즘이거나, 제공된 error 콜백 함수를 호출하는 알고리즘이어야 합니다.

  5. thisweak subscriber가 null이 아니고 thisweak subscriberactive가 true라면:

    1. subscriberthisweak subscriber로 둡니다.

    2. internal observersubscriber내부 옵저버에 추가합니다.

    3. optionssignal존재하면:

      1. optionssignalaborted 상태라면, internal observersubscriber내부 옵저버에서 제거합니다.

      2. 그렇지 않으면, 다음 abort 알고리즘optionssignal에 추가합니다:

        1. subscriberactive 가 false라면, 이 단계를 중단합니다.

        2. internal observersubscriber내부 옵저버에서 제거합니다.

        3. subscriber내부 옵저버비어 있다면, subscriber를 종료합니다. 종료 사유는 optionssignalabort reason입니다.

    4. 반환합니다.

  6. subscriber새로운 Subscriber로 둡니다.

  7. internal observersubscriber내부 옵저버에 추가합니다.

  8. thisweak subscribersubscriber로 설정합니다.

  9. optionssignal존재하면:

    1. optionssignalaborted 상태라면 subscriber를 종료합니다. 종료 사유는 optionssignalabort reason입니다.

    2. 그렇지 않으면, 다음 abort 알고리즘optionssignal에 추가합니다:

      1. subscriberactive가 false라면, 이 단계를 중단합니다.

      2. internal observersubscriber내부 옵저버에서 제거합니다.

      3. subscriber내부 옵저버비어 있다면, subscriber를 종료합니다. 종료 사유는 optionssignalabort reason입니다.

  10. thissubscribe 콜백SubscribeCallback이라면, «subscriber»를 "rethrow"와 함께 호출합니다.

    예외 E가 발생하면, subscribererror() 메서드를 E로 호출합니다.

  11. 그렇지 않으면, thissubscribe 콜백subscriber를 인자로 주어 단계대로 실행합니다.

테스트

2.3. 연산자

현재는 https://github.com/wicg/observable#operators 를 참고하세요.

2.3.1. from()

from(value) 메서드 단계:
  1. valueObservable로 변환한 결과를 반환합니다. 예외가 발생하면 다시 throw합니다.

2.3.2. Observable 반환 연산자

takeUntil(value) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. notifier value를 Observable로 변환한 결과로 둡니다.

  3. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 아래 과정을 수행합니다:

    이 메서드는 Observable 두 개에 구독하는 과정을 포함합니다: (1) notifier, (2) sourceObservable. 아래 두 상황에서는 둘 다 구독을 해제합니다:
    1. notifier가 값을 내보내기 시작(next 또는 error). 이 경우, 더 이상 필요 없으므로 notifier 구독을 해제하고, sourceObservable도 더 이상 값을 내보낼 필요가 없으므로 구독을 해제합니다. 즉, observable의 구독을 끝내는 행위입니다.

    2. sourceObservableerror() 또는 complete()를 호출. 이 경우, notifier도 더 이상 들을 필요 없으므로 구독을 해제합니다. sourceObservable은 자체적으로 종료되므로 추가 해제는 필요 없습니다.

    1. notifierObserver를 새로운 내부 옵저버로 두고, 다음과 같이 초기화합니다:

      next 단계

      subscribercomplete()를 실행합니다.

      참고: 이 동작으로 sourceObservable의 구독도 해제됩니다. subscribe는 "outer" subscribersubscription controllersignal을 사용하며, complete()가 호출되면 signal이 abort되어 sourceObservable 구독이 해제됩니다.

      error 단계

      subscribercomplete()를 실행합니다.

      참고: complete 단계는 지정하지 않습니다. notifier Observable이 스스로 complete될 경우, 반환된 observablesubscriber를 별도로 complete할 필요가 없습니다. observablesourceObservable을 계속 미러링합니다.

    2. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    3. notifiernotifierObserveroptions로 구독합니다.

    4. subscriberactive가 false라면, 반환합니다.

      참고: notifier가 동기적으로 값을 emit하면 sourceObservable의 subscribe 콜백이 호출되지 않습니다. 만약 notifier가 동기적으로 complete만 하면(값 없이), subscriber의 active는 true이고, sourceObservable을 구독하게 됩니다. 이 경우 observablesourceObservable을 계속 미러링합니다.

    5. sourceObserver를 새로운 내부 옵저버로 두고, 다음과 같이 초기화합니다:

      next 단계

      전달받은 valuesubscribernext()를 실행합니다.

      error 단계

      전달받은 errorsubscribererror()를 실행합니다.

      complete 단계

      subscribercomplete()를 실행합니다.

      참고: sourceObserver는 대부분 단순 미러링 역할을 하며, sourceObservable에서 발생하는 모든 이벤트를 전달합니다. 단, sourceObservable이 먼저 종료되면 notifier 구독을 해제할 수 있습니다.

    6. sourceObservablesourceObserveroptions로 구독합니다.

  4. observable을 반환합니다.

테스트
map(mapper) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 아래의 절차를 수행합니다:

    1. idxunsigned long long 값으로 초기값 0으로 둡니다.

    2. sourceObserver를 새로운 내부 옵저버로 둡니다. 초기화는 아래와 같습니다:

      next 단계
      1. mapper를 «넘겨받은 value, idx»와 "rethrow"로 호출합니다. 반환된 값을 mappedValue로 둡니다.

        만약 예외 E가 발생했다면, subscribererror()E와 함께 실행하고 이 단계를 중단합니다.

      2. idx를 증가시킵니다.

      3. subscribernext()mappedValue로 실행합니다.

      error 단계

      넘겨받은 errorsubscribererror()를 실행합니다.

      complete 단계

      subscribercomplete()를 실행합니다.

    3. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    4. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

테스트
filter(predicate) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 아래의 절차를 수행합니다:

    1. idxunsigned long long 값으로 초기값 0으로 둡니다.

    2. sourceObserver를 새로운 내부 옵저버로 둡니다. 초기화는 아래와 같습니다:

      next 단계
      1. predicate를 «넘겨받은 value, idx»와 "rethrow"로 호출합니다. 반환값을 matches로 둡니다.

        만약 예외 E가 발생했다면, subscribererror()E와 함께 실행하고 이 단계를 중단합니다.

      2. idxidx + 1로 설정합니다.

      3. matches가 true라면 subscribernext()value로 실행합니다.

      error 단계

      넘겨받은 errorsubscribererror()를 실행합니다.

      complete 단계

      subscribercomplete()를 실행합니다.

    3. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    4. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

테스트
take(amount) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 아래의 절차를 수행합니다:

    1. remainingamount로 둡니다.

    2. remaining이 0이면, subscribercomplete()를 실행하고 이 단계를 중단합니다.

    3. sourceObserver를 새로운 내부 옵저버로 둡니다. 초기화는 아래와 같습니다:

      next 단계
      1. 넘겨받은 valuesubscribernext()를 실행합니다.

      2. remaining을 감소시킵니다.

      3. remaining이 0이면 subscribercomplete()를 실행합니다.

      error 단계

      넘겨받은 errorsubscribererror()를 실행합니다.

      complete 단계

      subscribercomplete()를 실행합니다.

    4. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    5. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

테스트
drop(amount) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 아래의 절차를 수행합니다:

    1. remainingamount로 둡니다.

    2. sourceObserver를 새로운 내부 옵저버로 둡니다. 초기화는 아래와 같습니다:

      next 단계
      1. remaining이 0보다 크면 remaining을 감소시키고 이 단계를 중단합니다.

      2. 단언: remaining은 0이어야 합니다.

      3. 넘겨받은 valuesubscribernext()를 실행합니다.

      error 단계

      넘겨받은 errorsubscribererror()를 실행합니다.

      complete 단계

      subscribercomplete()를 실행합니다.

    3. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    4. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

테스트
flatMap(mapper) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 아래의 절차를 수행합니다:

    1. idxunsigned long long 값으로 초기값 0으로 둡니다.

    2. outerSubscriptionHasCompletedboolean 값으로 초기값 false로 둡니다.

    3. queue를 새로운 list로 둡니다. any 값들을 담으며, 처음에는 비어 있습니다.

      참고: queuesourceObservable에서 방출된 Observable을 저장하기 위한 것입니다. observable이 이전에 방출된 Observable에 구독 중일 때, 아직 소진되지 않은 경우에 사용됩니다.

    4. activeInnerSubscriptionboolean 값으로 초기값 false로 둡니다.

    5. sourceObserver를 새로운 내부 옵저버로 둡니다. 초기화는 아래와 같습니다:

      next 단계
      1. activeInnerSubscription이 true이면:

        1. queuevalue를 추가합니다.

          참고:value는 현재 구독 중인 Observable이 소진될 때 처리됩니다. activeInnerSubscription이 true임을 나타냅니다.

      2. 그렇지 않으면:

        1. activeInnerSubscription을 true로 설정합니다.

        2. flatmap process next value 단계value, subscriber, mapper, 그리고 다음 변수들의 참조와 함께 실행합니다: queue, activeInnerSubscription, outerSubscriptionHasCompleted, idx.

          참고: 이 flatmap process next value 단계value에서 유도된 Observable에 구독하고, 그 옵저버블의 구독이 비활성화될 때까지 값을 계속 처리합니다. "inner" Observable이 complete되면, 처리 단계는 queue의 다음 값으로 재귀적으로 호출됩니다.

          만약 queue에 값이 없다면, 처리 단계는 종료되고, activeInnerSubscription해제하여 이후에 방출되는 값이 올바르게 처리될 수 있도록 합니다.

      error 단계

      넘겨받은 errorsubscribererror()를 실행합니다.

      complete 단계
      1. outerSubscriptionHasCompleted을 true로 설정합니다.

        참고: activeInnerSubscription이 true면 아래 단계에서 subscriber를 complete하지 않습니다. 이 경우 flatmap process next value 단계가 "inner" 구독이 비활성화되고 queue비어 있을 때 subscriber를 complete할 책임을 집니다.

      2. activeInnerSubscription이 false이고 queue비어 있으면, subscribercomplete()를 실행합니다.

    6. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    7. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

flatmap process next value 단계, any value, Subscriber subscriber, Mapper mapper, 그리고 다음 값들의 참조: list anyqueue, boolean activeInnerSubscription, boolean outerSubscriptionHasCompleted, unsigned long long idx:
  1. mappedResultmapper를 «value, idx»와 "rethrow"로 호출한 결과로 둡니다.

    예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

  2. idxidx + 1로 설정합니다.

  3. innerObservablefrom()mappedResult를 인자로 호출한 결과로 둡니다.

    예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

    from()를 직접 호출하는 대신, 내부 알고리즘을 사용해 예외를 처리해야 합니다. 예외를 subscriber로 전달해야 하기 때문입니다.

  4. innerObserver를 새로운 내부 옵저버로 초기화합니다:

    next 단계

    넘겨받은 valuesubscribernext()를 실행합니다.

    error 단계

    넘겨받은 errorsubscribererror()를 실행합니다.

    complete 단계
    1. queue가 비어 있지 않으면:

      1. nextValuequeue의 첫 번째 항목으로 두고, queue에서 제거합니다.

      2. flatmap process next value 단계nextValue, subscriber, mapper, 그리고 참조 queue, activeInnerSubscription로 실행합니다.

    2. 그렇지 않으면:

      1. activeInnerSubscription을 false로 설정합니다.

        참고: activeInnerSubscription이 참조이므로, "outer" Observable에서 이후로 방출되는 값들이 올바르게 처리됩니다.

      2. outerSubscriptionHasCompleted이 true이면 subscribercomplete()를 실행합니다.

        참고: "outer" Observable이 이미 complete 되었으나, 아직 subscriber를 complete하지 않은 상태입니다. 왜냐하면 대기 중인 "inner" Observable이 있었기 때문이며, 이제 모두 소진되었습니다.

  5. innerOptions를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

  6. innerObservableinnerObserverinnerOptions로 구독합니다.

switchMap(mapper) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 아래의 절차를 수행합니다:

    1. idxunsigned long long 값으로 초기값 0으로 둡니다.

    2. outerSubscriptionHasCompletedboolean 값으로 초기값 false로 둡니다.

    3. activeInnerAbortControllerAbortController-또는-null로 두고, 초기값은 null입니다.

      참고: AbortController는 아래 next 단계에서 새로 할당되고, switchmap process next value 단계에서 null로 할당됩니다. 이 변수는 현재 "inner" 구독이 활성화되어 있는지 표시하는 마커입니다. 아래 complete 단계는 이를 기반으로 sourceObservable이 complete될 때 subscriber를 즉시 complete할지 결정합니다. 만약 활성 inner subscription이 있다면, subscriber의 complete는 inner가 끝날 때까지 대기합니다.

    4. sourceObserver를 새로운 내부 옵저버로 초기화합니다:

      next 단계
      1. activeInnerAbortController가 null이 아니면 activeInnerAbortController를 abort합니다.

        참고: 이 동작은 "inner" Observable 구독을 해제합니다. 즉, sourceObservable에서 마지막으로 내보낸 값을 기반으로 구독된 inner Observable을 해제한 뒤, 바로 새로 내보낼 값을 기반으로 inner Observable을 구독합니다.

      2. activeInnerAbortController새로운 AbortController로 할당합니다.

      3. switchmap process next value 단계value, subscriber, mapper, 그리고 참조: activeInnerAbortController, outerSubscriptionHasCompleted, idx로 실행합니다.

        참고: switchmap process next value 단계value에서 유도된 Observable에 구독하고, 그 옵저버블의 구독이 비활성화될 때까지 값을 처리합니다. (1) 에러 또는 complete로 인해 구독이 종료되거나, (2) activeInnerAbortControllerabort되면, sourceObservable이 새로운 값을 내보내서 inner를 교체한 경우입니다.

      error 단계

      넘겨받은 errorsubscribererror()를 실행합니다.

      complete 단계
      1. outerSubscriptionHasCompleted를 true로 설정합니다.

        참고: activeInnerAbortController가 null이 아니면 subscriber를 즉시 complete하지 않습니다. 대신, switchmap process next value 단계가 inner 구독이 끝날 때 subscriber를 complete합니다.

      2. activeInnerAbortController가 null이면 subscribercomplete()를 실행합니다.

    5. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    6. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

switchmap process next value 단계, any value, Subscriber subscriber, Mapper mapper, 그리고 다음 값들의 참조: AbortController activeInnerAbortController, boolean outerSubscriptionHasCompleted, unsigned long long idx는 다음 단계를 실행합니다:
  1. mappedResultmapper를 «value, idx»와 "rethrow"로 호출한 결과로 둡니다.

    예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

  2. idxidx + 1로 설정합니다.

  3. innerObservablefrom()mappedResult를 인자로 호출한 결과로 둡니다.

    예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

  4. innerObserver를 새로운 내부 옵저버로 초기화합니다:

    next 단계

    넘겨받은 valuesubscribernext()를 실행합니다.

    error 단계

    넘겨받은 errorsubscribererror()를 실행합니다.

    참고: 여기서 activeInnerAbortController를 null로 설정할 필요는 없습니다. switchMap() 단계는 inner 구독이 cancel된 것을 신호로 사용할 수 있습니다. subscribererror()를 실행하면 "outer" Observable 구독도 해제되어, 더 이상 값을 내보내지 않습니다.

    complete 단계
    1. outerSubscriptionHasCompleted가 true면 subscribercomplete()를 실행합니다.

    2. 그렇지 않으면 activeInnerAbortController를 null로 설정합니다.

      참고: 이 변수는 참조이므로, switchMap complete 단계에 inner 구독이 비활성화된 것을 신호합니다.

  5. innerOptions를 새로운 SubscribeOptions로 두고, signal종속 abort signal 생성의 결과입니다. 리스트 «activeInnerAbortControllersignal, subscribersubscription controllersignal», AbortSignal, 그리고 current realm을 사용합니다.

  6. innerObservableinnerObserverinnerOptions로 구독합니다.

inspect(inspectorUnion) 메서드 단계:
  1. subscribe callbackVoidFunction-또는-null로 두고, 초기값은 null입니다.

  2. next callbackObservableSubscriptionCallback-또는-null로 두고, 초기값은 null입니다.

  3. error callbackObservableSubscriptionCallback-또는-null로 두고, 초기값은 null입니다.

  4. complete callbackVoidFunction-또는-null로 두고, 초기값은 null입니다.

  5. abort callbackObservableInspectorAbortHandler-또는-null로 두고, 초기값은 null입니다.

  6. inspectorUnion을 다음과 같이 처리합니다:

    inspectorUnionObservableSubscriptionCallback이면
    1. next callbackinspectorUnion으로 설정합니다.

    inspectorUnionObservableInspector이면
    1. subscribe 가 존재하면 subscribe callback에 할당합니다.

    2. next 가 존재하면 next callback에 할당합니다.

    3. error 가 존재하면 error callback에 할당합니다.

    4. complete 가 존재하면 complete callback에 할당합니다.

    5. abort 가 존재하면 abort callback에 할당합니다.

  7. sourceObservablethis로 둡니다.

  8. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 수행합니다:

    1. subscribe callback이 null이 아니면 invoke합니다. 인자는 «»와 "rethrow"입니다.

      예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

      참고: 이 결과로 sourceObservable에는 구독하지 않습니다.

    2. abort callback이 null이 아니면 다음 abort 알고리즘subscribersubscription controllersignal에 추가합니다:

      1. abort callback을 «subscribersubscription controllersignalabort reason»와 "report"로 호출합니다.

    3. sourceObserver를 새로운 내부 옵저버로 초기화합니다:

      next 단계
      1. next callback이 null이 아니면 invoke합니다. 인자는 «넘겨받은 value»와 "rethrow"입니다.

        예외 E가 발생하면:

        1. abort callbacksubscribersubscription controllersignal에서 제거합니다.

          참고: 이 단계는 중요합니다. abort callback은 소비자 주도 해지(unsubscribe)만을 위한 것이므로, 프로듀서가 subscribererror() 또는 complete()로 해지할 때는 실행하지 않아야 합니다.

          Chromium 구현과 같으나, 원래 전달된 SubscribeOptionssignal에 참조를 유지하고, abort될 때만 abort callback을 실행하는 방식도 고려해야 합니다. 결과는 유사하나, 추가 검토 필요.

        2. subscribererror()E로 실행하고 이 단계를 중단합니다.

      2. subscribernext() 를 넘겨받은 value로 실행합니다.

      error 단계
      1. abort callbacksubscribersubscription controllersignal에서 제거합니다.

      2. error callback이 null이 아니면 invoke합니다. 인자는 «넘겨받은 error»와 "rethrow"입니다.

        예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

      3. subscribererror()를 넘겨받은 error로 실행합니다.

      complete 단계
      1. abort callbacksubscribersubscription controllersignal에서 제거합니다.

      2. complete callback이 null이 아니면 invoke합니다. 인자는 «»와 "rethrow"입니다.

        예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

      3. subscribercomplete()를 실행합니다.

    4. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    5. sourceObservablesourceObserver, options로 구독합니다.

  9. observable을 반환합니다.

테스트
catch(callback) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 수행합니다:

    1. sourceObserver를 새로운 내부 옵저버로 초기화합니다:

      next 단계

      subscribernext()를 넘겨받은 value로 실행합니다.

      error 단계
      1. callback을 «넘겨받은 error»와 "rethrow"로 호출합니다. 반환값을 result로 둡니다.

        예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

      2. innerObservablefrom()result를 인자로 호출한 결과로 둡니다.

        예외 E가 발생하면, subscribererror()E로 실행하고 이 단계를 중단합니다.

        from()를 직접 호출하지 말고, 내부 알고리즘을 사용해 예외를 적절히 전달해야 합니다. 예외를 subscriber로 파이프해야 하기 때문입니다.

      3. innerObserver를 새로운 내부 옵저버로 초기화합니다:

        next 단계

        subscribernext()를 넘겨받은 value로 실행합니다.

        error 단계

        subscribererror()를 넘겨받은 error로 실행합니다.

        complete 단계

        subscribercomplete()를 실행합니다.

      4. innerOptions를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

      5. innerObservableinnerObserver, innerOptions로 구독합니다.

        참고: innerObservable에 구독하기 전에 sourceObservable을 해지(unsubscribe)하지 않아도 되고, sourceObservable이 계속 값을 내보낼 걱정도 없습니다. 이 모든 것이 sourceObservableerror 단계 내부에서 발생하기 때문입니다. 즉, sourceObservable은 이미 구독이 종료되어 더 이상 값을 내보내지 않으므로, 안전하게 innerObservable로 값을 전환할 수 있습니다.

      complete 단계

      subscribercomplete()를 실행합니다.

    2. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    3. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

finally(callback) 메서드 단계:
  1. sourceObservablethis로 둡니다.

  2. observable새로운 Observable로 둡니다. subscribe 콜백은 Subscriber subscriber를 받아 다음을 수행합니다:

    1. subscriberaddTeardown() 메서드에 callback을 인자로 실행합니다.

    2. sourceObserver를 새로운 내부 옵저버로 초기화합니다:

      next 단계

      subscribernext()를 넘겨받은 value로 실행합니다.

      error 단계
      1. subscribererror() 를 넘겨받은 error로 실행합니다.

      complete 단계
      1. subscribercomplete() 를 실행합니다.

    3. options를 새로운 SubscribeOptions로 두고, signalsubscribersubscription controllersignal입니다.

    4. sourceObservablesourceObserver, options로 구독합니다.

  3. observable을 반환합니다.

2.3.3. Promise 반환 연산자

toArray(options) 메서드 단계:
  1. p새로운 promise로 둡니다.

  2. optionssignal 이 null이 아니라면:

    1. optionssignalaborted 상태라면:

      1. Reject poptionssignalabort reason으로 reject합니다.

      2. p를 반환합니다.

    2. 다음 abort 알고리즘optionssignal에 추가합니다:

      1. Reject poptionssignalabort reason으로 거부합니다.

      참고: 여기서는 p를 reject만 하면 됩니다. this Observable에 대한 구독도 자동으로 종료됩니다. 왜냐하면 "내부" Subscriber가 optionssignalabort될 때 close되기 때문입니다.

  3. values를 새로운 list로 둡니다.

  4. observer를 새로운 내부 옵저버로 초기화합니다:

    next steps

    Append 넘겨받은 valuevalues에 추가합니다.

    error steps

    Reject p를 넘겨받은 error로 reject합니다.

    complete steps

    Resolve pvalues로 resolve합니다.

  5. thisobserveroptions로 구독합니다.

  6. p를 반환합니다.

테스트
forEach(callback, options) 메서드 단계:
  1. p새로운 promise로 둡니다.

  2. visitor callback controller새로운 AbortController로 둡니다.

  3. internal options를 새로운 SubscribeOptions로 둡니다. signal종속 abort signal 생성의 결과입니다. 리스트 «visitor callback controllersignal, optionssignal (null이 아니면)», AbortSignal, 그리고 current realm을 사용합니다.

    많은 사소한 internal observers는 통과자(pass-through)로 동작하며, 그들이 나타내는 Observable에 대한 구독을 제어하지 않습니다; 즉, 구독이 종료되면 그들의 error stepscomplete steps가 호출되고, 그들의 next steps는 단순히 전달된 값의 어떤 버전을 체인에 전달합니다.

    그러나 이 연산자에서는 아래의 observernext stepscallback이 예외를 던질 경우 실제로 이(this)에 대한 기저 구독을 abort할 책임이 있습니다. 그런 경우, 우리가 "Subscribe to an Observable"에 전달하는 SubscribeOptionssignaldependent signal로서 optionssignal에서 파생되어야 하며, 아래의 next steps가 접근할 수 있고 필요 시 signal abort를 발생시킬 수 있는 AbortSignal을 가진 AbortController이어야 합니다.

  4. internal optionssignalaborted 상태라면:

    1. pinternal optionssignalabort reason으로 reject합니다.

    2. p를 반환합니다.

  5. 다음 abort 알고리즘internal optionssignal에 추가합니다:

    1. Reject pinternal optionssignalabort reason으로 거부합니다.

      Note: p의 거부가 internal optionssignal에 연결되어 있고, optionssignal에 연결되어 있지 않다는 것은, optionssignalabort 이벤트가 발생하는 동안 마이크로태스크(microtasks)대기열에 추가된다면, p의 reject 핸들러가 실행되기 전에 해당 마이크로태스크들이 실행된다는 의미입니다.

  6. idxunsigned long long 값으로 초기값 0으로 둡니다.

  7. observer를 새로운 내부 옵저버로 초기화합니다:

    다음 단계
    1. 호출 callback을 «전달된 value, idx» 및 "rethrow"와 함께 호출합니다.

      만약 예외 E가 던져진 경우, 그러면 거부 pE로 하고, signal abortvisitor callback controller에 대해 E로 abort합니다.

    2. idx를 증가시킵니다.

    오류 단계

    거부 p를 전달된 error로 거부합니다.

    완료 단계

    해결 pundefined로 해결합니다.

  8. thisobserver, internal options로 구독합니다.

  9. p를 반환합니다.

테스트
every(predicate, options) 메서드 단계:
  1. p새로운 promise로 둡니다.

  2. controller새로운 AbortController로 둡니다.

  3. internal options를 새로운 SubscribeOptions로 둡니다. signal종속 abort signal 생성의 결과입니다. 리스트 «controllersignal, optionssignal (null이 아니면)», AbortSignal, 그리고 current realm을 사용합니다.

  4. internal optionssignalaborted 상태라면:

    1. 거부 pinternal optionssignal중단 이유으로 거부합니다.

    2. p를 반환합니다.

  5. 다음 abort 알고리즘internal optionssignal에 추가합니다:

    1. 거부 pinternal optionssignal중단 이유로 거부합니다.

  6. idxunsigned long long 값으로 초기값 0으로 둡니다.

  7. observer를 새로운 내부 옵저버로 초기화합니다:

    next 단계
    1. predicate를 «넘겨받은 value, idx»와 "rethrow"로 호출합니다. 반환값을 passed로 둡니다.

      예외 E가 발생하면, p를 E로 reject하고, controller를 E로 abort합니다.

    2. idxidx + 1로 설정합니다.

    3. passed가 false이면, p를 false로 resolve하고 controller를 abort합니다.

    error 단계

    거부 p를 전달된 error로 거부합니다.

    complete 단계

    해결 p를 true로 해결합니다.

  8. thisobserver, internal options로 구독합니다.

  9. p를 반환합니다.

first(options) 메서드 단계:
  1. p새로운 promise로 둡니다.

  2. controller새로운 AbortController로 둡니다.

  3. internal options를 새로운 SubscribeOptions로 둡니다. signal종속 abort signal 생성의 결과입니다. 리스트 «controllersignal, optionssignal (null이 아니면)», AbortSignal, 그리고 current realm을 사용합니다.

  4. internal optionssignalaborted 상태라면:

    1. 거부 pinternal optionssignal중단 이유으로 거부합니다.

    2. p를 반환합니다.

  5. 다음 abort 알고리즘internal optionssignal에 추가합니다:

    1. 거부 pinternal optionssignal중단 이유로 거부합니다.

  6. internal observer를 새로운 내부 옵저버로 초기화합니다:

    다음 단계
    1. 해결 p를 전달된 value로 해결합니다.

    2. 시그널 중단 controller.

    오류 단계

    거부 p를 전달된 error로 거부합니다.

    완료 단계

    거부 p를 새로운 RangeError로 거부합니다.

    Note: 이는 소스 Observable가 단일 값을 방출하기 전에 완료될 때만 도달합니다.

  7. thisinternal observerinternal options로 구독합니다.

  8. p를 반환합니다.

last(options) 메서드 단계:
  1. Let p 새로운 프라미스.

  2. 만약 options’s signal 이 null이 아닌 경우:

    1. 만약 options’s signal중단된(aborted) 경우, 그러면:

      1. 거부 poptions’s signal중단 이유로 거부합니다.

      2. p를 반환합니다.

    2. 다음의 abort 알고리즘을 추가options’s signal에 추가합니다:

      1. 거부 poptions’s signal중단 이유로 거부합니다.

  3. lastValueany-or-null로, 초기값은 null로 둡니다.

  4. hasLastValueboolean으로, 초기값은 false로 둡니다.

  5. observer를 새로운 internal observer로 두고, 다음과 같이 초기화합니다:

    다음 단계
    1. hasLastValue를 true로 설정합니다.

    2. lastValue를 전달된 value로 설정합니다.

    오류 단계

    거부 p를 전달된 error로 거부합니다.

    완료 단계
    1. 만약 hasLastValue가 true이면, 해결 plastValue로 합니다.

      1. 그렇지 않으면, 거부 p를 새로운 RangeError로 거부합니다.

        Note: first()의 주석을 참조하세요.

  6. 구독(Subscribe)this에 대해 주어진 observeroptions로 수행합니다.

  7. p를 반환합니다.

find(predicate, options) 메서드의 단계는 다음과 같습니다:
  1. p새로운 promise로 설정합니다.

  2. controller새로운 AbortController로 설정합니다.

  3. internal optionsSubscribeOptions의 새로운 인스턴스로 설정하고, 이 객체의 signal의존적인 abort 신호를 생성의 결과로 설정합니다. 이 작업은 «controllersignal, optionssignal (null이 아닌 경우)» 목록을 사용하여 AbortSignal현재 realm을 통해 수행됩니다.

  4. 만약 internal optionssignalaborted 상태라면:

    1. Reject pinternal optionssignalabort 이유로 설정합니다.

    2. p를 반환합니다.

  5. 다음 abort 알고리즘을 추가합니다. internal optionssignal에 다음 내용을 추가합니다:

    1. Reject pinternal optionssignalabort 이유로 설정합니다.

  6. idxunsigned long long으로 설정하고, 초기값은 0으로 합니다.

  7. observerinternal observer의 새 인스턴스로 설정하며, 다음과 같이 초기화합니다:

    next steps
    1. Invoke predicate를 «전달된 value, idx»와 함께 호출하고, "rethrow"로 설정합니다. 그리고 반환된 값을 passed로 설정합니다.

      만약 예외 E가 발생하면, reject pE로 설정하고, signal abort controllerE로 설정합니다.

    2. idxidx + 1로 설정합니다.

    3. 만약 passed가 true라면, resolve pvalue로 설정하고, signal abort controller를 설정합니다.

    error steps

    Reject p를 전달된 error로 설정합니다.

    complete steps

    Resolve pundefined로 설정합니다.

  8. Subscribethisobserverinternal options를 제공하여 호출합니다.

  9. p를 반환합니다.

reduce(reducer, initialValue, options) 메서드 단계:
  1. p새로운 promise로 둡니다.

  2. controller새로운 AbortController로 둡니다.

  3. internal options를 새로운 SubscribeOptions로 둡니다. signal종속 abort signal 생성의 결과입니다. 리스트 «controllersignal, optionssignal (null이 아니면)», AbortSignal, 그리고 current realm을 사용합니다.

  4. internal optionssignalaborted 상태라면:

    1. 거부 pinternal optionssignalabort 이유로 설정합니다.

    2. p를 반환합니다.

  5. 다음 abort 알고리즘internal optionssignal에 추가합니다:

    1. 거부 pinternal optionssignalabort 이유로 설정합니다.

  6. idxunsigned long long 값으로 초기값 0으로 둡니다.

  7. accumulatorinitialValue가 주어지면 그 값으로, 아니면 초기화되지 않은 상태로 둡니다.

  8. observer를 새로운 내부 옵저버로 초기화합니다:

    next 단계
    1. accumulator가 초기화되지 않았다면 (initialValue가 전달되지 않은 경우), accumulator를 넘겨받은 value로 설정하고, idxidx + 1로 하고 이 단계를 중단합니다.

      참고: reducerthis가 처음 내보내는 currentValue로 호출되지 않습니다. 두 번째 값이 나올 때 reducer가 호출되며, 첫 번째 값은 accumulator로 사용됩니다.

    2. reducer를 «accumulator (accumulator로), 넘겨받은 value (currentValue로), idx (index로)»와 "rethrow"로 호출합니다. 반환값을 result로 둡니다.

      예외 E가 발생하면, p를 E로 reject하고, controller를 E로 abort합니다.

    3. idxidx + 1로 설정합니다.

    4. accumulatorresult로 설정합니다.

    error 단계

    거부 p를 전달된 error로 설정합니다.

    완료 단계
    1. 만약 accumulator가 "unset"이 아니라면, resolve paccumulator로 설정합니다.

      그렇지 않다면, reject pTypeError로 설정합니다.

  9. 구독 thisobserverinternal options와 함께 호출합니다.

  10. p를 반환합니다.

3. EventTarget 통합

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};
when(type, options) 메서드 단계:
  1. this관련 전역 객체Window 객체이고, 연결된 문서완전히 활성이 아니라면, 반환합니다.

  2. event targetthis로 둡니다.

  3. observable새로운 Observable로 두고, 아래와 같이 초기화합니다:

    subscribe 콜백

    Subscriber subscriber를 받아 아래 단계를 실행하는 알고리즘:

    1. event target이 null이면 이 단계를 중단합니다.

      참고: event target이 구독 시점에 GC될 수 있음을 포착하기 위함입니다.

    2. subscribersubscription controllersignalaborted 상태라면, 이 단계를 중단합니다.

    3. 다음 이벤트 리스너 추가event target과 아래 이벤트 리스너로 수행합니다:

      type

      type

      callback

      Web IDL EventListener 인스턴스를 새로 생성하여, Event 타입의 인자 하나를 받는 함수 참조로 둡니다. 이 함수는 observable event listener invoke algorithmsubscriber, event로 실행합니다.

      capture

      optionscapture

      passive

      optionspassive존재하면 그 값, 아니면 null

      once

      false

      signal

      subscribersubscription controllersignal

      참고: 이벤트 리스너subscription controllersignalaborted될 때 정리되도록 보장합니다. 엔진의 소유 모델과 관계없이 정리됩니다.

  4. observable을 반환합니다.

observable event listener invoke algorithmSubscriber subscriberEvent event를 받아 아래 단계를 실행합니다:
  1. subscribernext()event로 실행합니다.

테스트

4. 보안 및 프라이버시 관련 사항

이 내용은 explainer에서 본 명세로 상향되고 있으며, 그동안 아래 자료를 참고할 수 있습니다:

5. 감사의 글

Observable API의 많은 디자인에 도움을 준 Ben Lesh에게 특별히 감사드립니다. 그리고 오랜 기간 유저랜드 Observable 코드를 관리하며 웹 플랫폼에 공헌할 수 있도록 해주신 점에 깊이 감사드립니다.

색인

이 명세에서 정의한 용어

참조로 정의된 용어

참고 문헌

규범적 참고 문헌

[DOM]
Anne van Kesteren. DOM 표준. Living Standard. URL: https://dom.spec.whatwg.org/
[ECMASCRIPT]
ECMAScript 언어 명세. URL: https://tc39.es/ecma262/multipage/
[HTML]
Anne van Kesteren; et al. HTML 표준. Living Standard. URL: https://html.spec.whatwg.org/multipage/
[INFRA]
Anne van Kesteren; Domenic Denicola. Infra 표준. Living Standard. URL: https://infra.spec.whatwg.org/
[WEBIDL]
Edgar Chen; Timothy Gu. Web IDL 표준. Living Standard. URL: https://webidl.spec.whatwg.org/

IDL 색인

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // True after the Subscriber is created, up until either
  // complete()/error() are invoked, or the subscriber unsubscribes. Inside
  // complete()/error(), this attribute is true.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

// SubscribeCallback is where the Observable "creator's" code lives. It's
// called when subscribe() is called, to set up a new subscription.
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Differs from Mapper only in return type, since this callback is exclusively
// used to visit each element in a sequence, not transform it.
callback Visitor = undefined (any value, unsigned long long index);

// This callback returns an `any` that must convert into an `Observable`, via
// the `Observable` conversion semantics.
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // Constructs a native Observable from value if it's any of the following:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observable-returning operators. See "Operators" section in the spec.
  //
  // takeUntil() can consume promises, iterables, async iterables, and other
  // observables.
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promise-returning operators.
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};

이슈 색인

from()을 직접 호출하면 안 됩니다. 대신, 예외를 적절히 처리할 수 있는 내부 알고리즘을 호출해야 합니다. 이렇게 하면 예외를 subscriber로 전달할 수 있습니다.
이 내용은 Chromium의 구현과 일치하지만, 원래 전달받은 SubscribeOptionssignal 참조를 유지하고, 그것이 abort될 때 abort callback만 호출하는 방식을 고려해볼 수 있습니다. 결과는 아마 동일하겠지만, 추가 검증이 필요합니다.
from()을 직접 호출하면 안 됩니다. 대신, 예외를 적절히 처리할 수 있는 내부 알고리즘을 호출해야 합니다. 이렇게 하면 예외를 subscriber로 전달할 수 있습니다.