import {
  EMPTY,
  Observable,
  ReplaySubject,
  Subscription,
  distinctUntilChanged,
  filter,
  map,
  share,
  tap,
} from 'rxjs';
import { createInterest$, indicateInterest } from './distributed-interest.js';
import { createDistributedObservable$ } from './distributed-observable.js';
import {
  TokenAudience,
  TokenUpdate,
  TokenUpdateData,
  TokenUpdateMap,
  stepUpResourceId,
} from './internal.js';
import { decodeJwtPayload } from './jwt-helper.js';
import { createNonce, log, notifySubscriptionChange } from './rx-helpers.js';
import { tokenSet$ } from './token-set.js';

export function acquireToken$<A extends TokenAudience>(
  audience: A
): Observable<TokenUpdateMap[A]> {
  const resourceId = `heimdall:audience:${audience}`;
  const interestId = `in:${audience}`;

  return new Observable<TokenUpdateMap[A]>((o) => {
    const instance$ = createDistributedObservable$<TokenUpdateMap[A]>(
      resourceId,
      (o) => {
        const debugInstanceId = createNonce();
        const updateSubscription = new Subscription();

        let lastEmmited: TokenUpdateMap[A];
        indicateInterest(stepUpResourceId, `${audience} START`);

        updateSubscription.add(
          tokenSet$
            .pipe(
              map((tokenByAudience) => tokenByAudience[audience]),
              tap((token) => {
                if (lastEmmited && !token) {
                  // we're done -- token dropped.
                  o.complete();
                }
              }),
              filter(Boolean),
              distinctUntilChanged(),
              map((token): TokenUpdate => {
                const { scope, exp } = decodeJwtPayload(token);
                const tokenUpdate = { token, expiresBy: exp ? exp * 1000 : 0 };
                if (audience === 'data.ws') {
                  const dataUpdate: TokenUpdateData = {
                    ...tokenUpdate,
                    isRealTime: scope === 'realtime',
                  };
                  return dataUpdate;
                }
                return tokenUpdate;
              })
            )
            .subscribe({
              next: (n) => {
                validateTokenType(n);
                lastEmmited = n;
                o.next(n);
              },
              error: (e) => o.error(e),
            })
        );

        function validateTokenType(
          update: TokenUpdate
        ): asserts update is TokenUpdateMap[A] {
          if (audience === 'data.ws' && !('isRealTime' in update)) {
            throw new Error(`Incorrect update shape for ${audience}`);
          }
        }

        updateSubscription.add(
          createInterest$(interestId)
            .pipe(
              tap((): void => {
                if (lastEmmited) o.next(lastEmmited);
              })
            )
            .subscribe(log(`Interest on ${audience} ${debugInstanceId} noted.`))
        );

        return () => {
          updateSubscription.unsubscribe();
        };
      },
      () => {
        indicateInterest(stepUpResourceId, `${audience} END`);
        return EMPTY;
      }
    );

    const subscription = new Subscription();
    subscription.add(tokenSet$.subscribe(log(`Token set keep-alive`)));
    subscription.add(
      instance$
        .pipe(
          notifySubscriptionChange({
            on: () => indicateInterest(interestId, `${audience} CHNG`),
          }),

          distinctUntilChanged((ta, tb) => {
            return ta.token === tb.token;
          }),

          share({
            connector: () => new ReplaySubject(1),
            resetOnError: true,
            resetOnComplete: true,
            resetOnRefCountZero: true,
          })
        )
        .subscribe(o)
    );
    return subscription;
  });
}
