import {interval, Observable, Observer, of, OperatorFunction, range, Subject} from 'rxjs';
import {concatMap, map, switchMap, takeUntil, takeWhile, tap, timeout} from 'rxjs/operators';

/**
 * Parameter format for specifying waitAll observable parameters to have a timeout
 */
export interface WaitAllParam<T> {
  /**
   * Observable to wait on
   */
  source$: Observable<T>;
  /**
   * Milliseconds before timeout
   */
  due: number;
}

/**
 * Parameter interface for RxUtility.waitUntil$(...)
 */
export interface WaitUntilParam<T> {
  /**
   * Condition to wait until. Needs to be a callback, if not the condition will never resolve since primitives are passed by value
   */
  until: () => boolean;

  /**
   * Callback for code to execute when the condition is met, provides Observer and tick count to callback
   * @param observer
   * @param tick
   */
  ifTrue?: {
    then?: (observer: Observer<T>, tick: number) => void;
    next?: T;
  };

  /**
   * Callback for code to execute while the condition is not met, provides Observer and tick count to callback
   * @param observer
   * @param tick
   */
  ifFalse?: {
    then?: (observer: Observer<T>, tick: number) => void;
    next?: T;
  };
}

/**
 * RxUtility is for common RxJS operations such as waiting for Observables to complete or for conditions to change
 * before continuing a process
 */
export class RxUtility {
  // waitUntil poll rate
  public static waitUntilTick = 50;
  // interval ticks in a second
  public static ticksPerSecond = 1000 / RxUtility.waitUntilTick;

  /**
   * Takes an array of observables and returns when all are complete.
   * If an observable neither emits nor encounters an error, log a warning
   */
  static waitAll$<T extends any[]>(...sources$: (Observable<unknown> | WaitAllParam<unknown>)[]): Observable<T extends any[] ? T : never> {
    //const log = new LogService();
    if (sources$.length === 0) {
      return of(void 0);
    }

    const flags = new Array(sources$.length);
    const result = new Array(sources$.length);
    let errorsOrUnemitted = 0;
    let total = sources$.length;

    const __waitAll_checkResults = () => {
      if (flags.some(flag => flag === false)) {
        let positions = '';
        flags.forEach((flag, flagIndex) => {
          if (!flag) {
            if ((flagIndex + 1) < flags.length) {
              positions += (flagIndex + 1) + ', ';
            } else {
              positions += (flagIndex + 1);
            }
          }
          errorsOrUnemitted++;
        });
      }
    };

    const __waitAll_handleResult = (res: any, asError: boolean, obs: Observer<any>, i: number) => {
      if (flags[i] === false) {
        if (!asError) {
          flags[i] = true;
        }
        result[i] = res;
        total--;
      }
      if (total < 1) {
        __waitAll_checkResults();
        obs.next(result);
        obs.complete();
      }
    };

    return new Observable(observer => {
      sources$.forEach((source$, i) => {
        flags[i] = false;
        RxUtilityHelpers.handleTimeout(source$)
          .subscribe(res => {
              __waitAll_handleResult(res, false, observer, i);
            },
            error => {
              __waitAll_handleResult(error, true, observer, i);
            });
      });
    });
  }


  /**
   * Like waitAll but uses a series of Subjects as triggers to ensure execution in order of parameters
   * @param sources$
   */
  static waitAllInOrder$<T>(...sources$: (Observable<unknown> | WaitAllParam<unknown>)[]): Observable<T> {
    if (sources$.length === 0) {
      return of(void 0);
    }

    const triggers: { trigger$: any }[] = [];
    const results: any[] = [];

    sources$.forEach((source$) => {
      triggers.push({trigger$: new Subject<void>().pipe(switchMap(() => RxUtilityHelpers.handleTimeout(source$)))});
    });

    return new Observable(observer => {

      triggers.forEach((trigger, i) => {
        trigger.trigger$
          .subscribe(result => {
              results.push(result);
              triggers.shift();
              if (triggers[0]) {
                triggers[0].trigger$.next();
              } else if (triggers.length < 1) {
                observer.next(results as unknown as T);
                observer.complete();
              }
            },
            error => {
              results.push(error);
              console.error(error);
            });
      });

      triggers[0].trigger$.next();
    });
  }

  /**
   * Utility function for waiting until one to many conditions are satisfied before completing the outer Observable
   * @param conditions
   */
  public static waitUntil$<T>(...conditions: WaitUntilParam<T>[]): Observable<T & T[]> {
    const unsubscriber = new Subject();
    const conditionsAreMet: boolean[] = [];
    const nextValues = new Map<any, T>();
    conditions.forEach(_ => conditionsAreMet.push(false));

    return new Observable(observer => {
      interval(RxUtility.waitUntilTick)
        .pipe(takeUntil(unsubscriber))
        .subscribe(tick => {
          conditions.forEach((condition, i) => {
            if (condition.until()) {
              if (condition.ifTrue) {
                if (condition.ifTrue.then) {
                  condition.ifTrue.then(observer, tick);
                }
                if (condition.ifTrue.next) {
                  nextValues.set(i, condition.ifTrue.next);
                }
              }
              conditionsAreMet[i] = true;
            } else {
              if (condition.ifFalse) {
                if (condition.ifFalse.then) {
                  condition.ifFalse.then(observer, tick);
                }
                if (condition.ifFalse.next) {
                  nextValues.set(i, condition.ifFalse.next);
                }
              }
            }
          });

          if (!conditionsAreMet.some(_ => _ === false)) {
            unsubscriber.next(null);
            if (nextValues.size > 0) {
              const nextValue: any = [...nextValues.values()];
              observer.next(nextValue.length === 1 ? nextValue[0] : nextValue);
              observer.complete();
            } else {
              observer.next();
              observer.complete();
            }
          }
        });
    });
  }

  /**
   * Utility for making repeated calls to a paginated endpoint
   * @param source$                   arrow function that is given the page number that returns an Observable of an API call that uses the supplied page number
   * @param takePerRequest            number of items expected per API call
   * @param additionalOperations$
   */
  public static getPaginatedData$<T>(source$: (page: number) => Observable<T[]>, takePerRequest: number, additionalOperations$?: Observable<any>): Observable<T[]> {
    let additionalOperationsAsOperator: OperatorFunction<{ results: any[], page: number }, { results: any[], page: number }>;

    if (!additionalOperations$) {
      // do nothing
      additionalOperationsAsOperator = tap();
    } else {
      // Needs to do operation and then pass on the results and page index
      additionalOperationsAsOperator =
        concatMap((res: { results: any[], page: number }) => additionalOperations$.pipe(map(() => res)));
    }

    return range(0, 1000)
      .pipe(
        concatMap((page) => source$(page)
          .pipe(
            switchMap(res =>
              // Pass along page and results to takeWhile
              of({results: res, page: page})
                // Pipe additional operations here instead of later so the operation runs the right number of times
                // If the pipe happens after takeWhile it'll run one extra time for some reason
                .pipe(additionalOperationsAsOperator)
            )
          )
        ),
        takeWhile(res => {
          // console.log('result length', res.results.length, 'take per request size', (takePerRequest * (res.page + 1)));
          return res.results.length === (takePerRequest * (res.page + 1))
        }),
        // strip away page index and return results
        map(res => res.results)
      );
  }
}

class RxUtilityHelpers {
  /**
   * Helper function for appending timeout operations onto Rx methods
   * @param source$
   */
  public static handleTimeout<T>(source$: WaitAllParam<T> | Observable<unknown>): Observable<unknown> {
    let newSource$;
    if ((source$ as WaitAllParam<unknown>).source$ && (source$ as WaitAllParam<unknown>).due) {
      newSource$ = (source$ as WaitAllParam<unknown>).source$;
      if ((source$ as WaitAllParam<unknown>).due) {
        newSource$ = (newSource$ as Observable<unknown>)
          .pipe(timeout((source$ as WaitAllParam<unknown>).due));
      }
    } else {
      newSource$ = source$;
    }

    return newSource$;
  }
}
