import { BehaviorSubject, combineLatest, Observable, Subscription } from 'rxjs';
import { take } from 'rxjs/operators';

/**
 * Extends the Behavior Subject to only emit a value when it is not the same as before.
 * Used to reduce overhead of function calls with the same value.
 */
export class ValueSubject<T> extends BehaviorSubject<T> {
  private cleanupSubscription: Subscription;

  /**
   * @param initialValue Initial Value of this subject.
   * @param cleanupWhen any observable which defines the lifetime of this subject
   * todo in a future refactoring we should make cleanupWhen obligatory to spot potential memory leaks!
   */
  constructor(initialValue: T, cleanupWhen?: Observable<unknown>) {
    super(initialValue);
    if (cleanupWhen) {
      this.until(cleanupWhen);
    }
  }

  // Make sure this doesn't produce memory leaks
  static combineLatest<A, B, C>(
    input1: BehaviorSubject<A>,
    input2: BehaviorSubject<B>,
    transformer: (value1: A, value2: B) => C
  ): ValueSubject<C> {
    const subject = new ValueSubject<C>(transformer(input1.getValue(), input2.getValue()));
    combineLatest([input1, input2]).subscribe(([v1, v2]) => subject.next(transformer(v1, v2)));
    return subject;
  }

  next(value: T): void {
    if (value !== this.getValue()) {
      super.next(value);
    }
  }

  until(cleanupWhen: Observable<unknown>): ValueSubject<T> {
    this.cleanupSubscription?.unsubscribe();
    this.cleanupSubscription = cleanupWhen.pipe(take(1)).subscribe(() => {
      this.complete();
    });
    return this;
  }

  public emitAgain(): void {
    super.next(this.getValue());
  }
}
