import {
  asyncScheduler,
  BehaviorSubject,
  concat,
  interval,
  Observable,
  OperatorFunction,
  pipe,
  UnaryFunction
} from "rxjs";
import { debounce, distinctUntilChanged, filter, first, take, tap, throttleTime } from "rxjs/operators";
import { isEmptyObject } from "./json-utils";
import { objectsEqual } from "./object-utils";

/**
 * Converts an Observable to a BehaviorSubject, can be used via the `convertToSubject` angular pipe.
 */
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
  const subject = new BehaviorSubject(initValue);
  void observable.subscribe(
    (value: T) => subject.next(value),
    (error: unknown) => subject.error(error),
    () => subject.complete()
  );
  return subject;
}

/**
 * Filters null and undefined values while indicating to TypeScript that these values are no longer possible.
 */
export function filterNullish<T>(options?: {
  filterEmptyObjects?: boolean;
}): UnaryFunction<Observable<T | null | undefined>, Observable<T>> {
  let filterNullishFn = (x: any) => x !== null && x !== undefined;

  if (options?.filterEmptyObjects === true)
    filterNullishFn = (x: any) => x !== null && x !== undefined && !isEmptyObject(x);

  return pipe(
    filter((x) => {
      if (x instanceof Array) {
        for (const elem of x) if (!filterNullishFn(elem)) return false;
        return true;
      } else return filterNullishFn(x);
    }) as OperatorFunction<T | null | undefined, T>
  );
}

/**
 * Removes null and undefined values recursively from an object
 */
export function removeNullish<T>(
  obj: T,
  options: { removeEmptyStrings?: boolean; removeEmptyObjects?: boolean; removeEmptyArrays?: boolean } = {
    removeEmptyStrings: false
  }
): T {
  if (obj === undefined || obj === null) return obj;
  for (const key in obj) {
    if (typeof obj[key] === "object") obj[key] = removeNullish(obj[key], options);
    if (Array.isArray(obj[key]) && (obj[key] as []).length === 0 && options?.removeEmptyArrays === true)
      delete obj[key];
    if (
      obj[key] === null ||
      obj[key] === undefined ||
      (options?.removeEmptyStrings === true && (obj[key] as unknown) === "") ||
      (options?.removeEmptyObjects === true && isEmptyObject(obj[key]))
    )
      delete obj[key];
  }
  return obj;
}

/**
 * Will resolve when the expected value is emitted by the observable
 *
 * @param observable The observable
 * @param expectedValue Defaults to the boolean value `true`
 * @returns Resolves when the expected value was emitted at least once
 */
export function waitForExpectedValue<T = any>(observable: Observable<T>, expectedValue?: T): Promise<void> {
  if (expectedValue === undefined) expectedValue = true as any;
  return new Promise((resolve) => {
    void observable
      .pipe(
        first((val) => val === expectedValue),
        take(1),
        tap(() => resolve())
      )
      .subscribe();
  });
}

/**
 * Will only forward values that have changed, can include objects and functions and references
 */
export function distinctUntilChangedObj<T>(): UnaryFunction<Observable<T | null | undefined>, Observable<T>> {
  return pipe(distinctUntilChanged<T>((a, b) => objectsEqual(a, b)) as OperatorFunction<T | null | undefined, T>);
}

/** Will debounce, but not first */
export function debounceAllButFirst(ms: number) {
  return function <T>(source$: Observable<T>): Observable<T> {
    return concat(source$.pipe(take(1)), source$.pipe(debounce(() => interval(ms))));
  };
}

/** Will throttle an observable, e.g. an input, and only emit distinct values by default */
export function throttleInput(ms: number, config: { distinctValues: boolean } = { distinctValues: true }) {
  return function <T>(source$: Observable<T>): Observable<T> {
    const throttledPipe$ = source$.pipe(throttleTime(ms, asyncScheduler, { leading: true, trailing: true }));
    if (config.distinctValues) return throttledPipe$.pipe(distinctUntilChanged());
    else return throttledPipe$;
  };
}
