import { Injectable } from '@angular/core';
import { AbstractRestService } from '../abstract-rest.service';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, of, ReplaySubject, Subject } from 'rxjs';
import { Delta } from './delta/delta';
import { UUID } from '../../util/math/uuid';
import { DeltaQueue } from './delta-queue';
import { Message, WebSocketService } from '../websocket/web-socket.service';
import { filter, first, map, share, switchMap, takeUntil } from 'rxjs/operators';

import { DeltaResponseDto } from './delta-response.dto';
import { DeltaLike, DeltaParser } from '../../util/delta-parser';
import { RetransmissionDto } from '../../dto/retransmission.dto';
import { DeltaReceiverRegistry } from './target/delta-receiver';
import { ToastrService } from 'ngx-toastr';
import { ErrorCodes } from '../../util/error-codes';
import { LimitedSet } from '../../util/limited-set';
import { LimitedMap } from '../../util/limited-map';

@Injectable()
export class DeltaService extends AbstractRestService {
  private static readonly LOCAL_ONLY = false;

  private activated = false;
  private readonly unconfirmedDeltas = new LimitedMap<UUID, Delta>(200);
  private readonly confirmedIds = new Set<UUID>();
  private readonly queue = new DeltaQueue();
  private lastKnownSerialId: number = 0;

  public static readonly REMOVAL_GRACE_PERIOD = 1000 * 1;

  /**
   * Maybe a delta topic as added right after it is removed (i.e. on page navigation).
   * To not miss any deltas in the mean time, we only delete the topic mapping, wait some time, and then check if the topic was added again.
   * If so,
   * How long the delta service should wait after
   */
  public static readonly UNSUB_GRACE_PERIOD = 1000;

  private deltaResponses$ = new ReplaySubject<[UUID, Delta | DeltaResponseDto]>();

  private _onError$ = new Subject<{ delta: Delta; response?: DeltaResponseDto; exception?: Error }>();

  private readonly activeLegacySubscriptions = new Map<string, Observable<Message<DeltaLike>>>();

  private readonly topicDestroy$ = new Subject<string>();

  public constructor(
    protected readonly http: HttpClient,
    private readonly ws: WebSocketService,
    private readonly receiverRegistry: DeltaReceiverRegistry,
    private readonly toastrService: ToastrService
  ) {
    super(http);
  }

  public activate(): void {
    if (this.activated) {
      return;
    }
    this.http.get<number>(this.getUrl('/deltas/dsid/current')).subscribe((res) => {
      console.log('Latest known delta serial id is ' + res);
      this.updateLastKnownDelta(res);
    });
    this.activated = true;
    // Catch Reconnection
    this.ws.connectionClosed$
      .pipe(
        switchMap(() => of(void 0).pipe(takeUntil(this.ws.connectionClosed$))), // only one should be active at a time
        switchMap(() => this.ws.connected$.pipe(first())),
        switchMap(() => this.attemptReconnection())
      )
      .subscribe();

    this.receiverRegistry.addedTopics$.subscribe((topics) => {
      topics.forEach((topic) => {
        if (!this.activeLegacySubscriptions.has(topic)) {
          console.log(`%cTopic '${topic}' may now receive from ws...`, 'background: green');
          const sub = this.ws
            .subscribeToDeltas(topic)
            .pipe(takeUntil(this.topicDestroy$.pipe(filter((t) => t === topic))));
          this.activeLegacySubscriptions.set(topic, sub);
          sub.subscribe((msg) => {
            const delta = DeltaParser.parseDelta(msg.content);
            this.applyIncomingDelta(delta);
          });
        }
      });
    });

    this.receiverRegistry.removedTopics$.subscribe((topics) => {
      topics.forEach((topic) => {
        if (this.activeLegacySubscriptions.has(topic)) {
          console.log(`%cRequested removal of ${topic}! Checking again in `, 'background: orange');
          this.activeLegacySubscriptions.delete(topic);
          setTimeout(() => {
            const newSubscription = this.activeLegacySubscriptions.get(topic);
            if (!newSubscription) {
              console.log(
                `%cTopic ${topic} has not been re-added in ${DeltaService.REMOVAL_GRACE_PERIOD}ms, deleting now!`,
                'background: red'
              );
              this.receiverRegistry.unload(topic);
              this.topicDestroy$.next(topic);
            }
          }, DeltaService.REMOVAL_GRACE_PERIOD);
        }
      });
    });
  }

  public onError$(): Observable<{ delta: Delta; response?: DeltaResponseDto; exception?: Error }> {
    return this._onError$.asObservable();
  }

  public applyOutgoingDelta<T extends Delta>(delta: T): T {
    delta.init();

    if (delta.wouldBeNoop()) {
      return delta;
    }

    let mayPublish = true;
    // Apply first, then send
    if (delta.isMyOwnDelta()) {
      // console.log('Locally apply outgoing delta ' + delta.getType(), delta);
      this.unconfirmedDeltas.set(delta.getDeltaId(), delta);
      mayPublish = this.applyLocally(delta);
    }

    if (mayPublish && delta.isStandalone()) {
      if (DeltaService.LOCAL_ONLY) {
        const jsonCopy = DeltaParser.parseDelta(JSON.parse(JSON.stringify(delta)));
        jsonCopy.setReceivedTime(jsonCopy.getSentTime());
        this.applyIncomingDelta(jsonCopy);
      } else {
        this.publish(delta);
      }
    }
    return delta;
  }

  public applyIncomingDelta(delta: Delta, retransmit = false): void {
    if (delta.getType() === 'noop' || delta.wouldBeNoop()) {
      return;
    }
    // Dont apply/confirm deltas twice
    if (this.confirmedIds.has(delta.getDeltaId())) {
      return;
    }

    if (!delta.isMyOwnDelta() && delta.isStandalone()) {
      // someone elses delta
      console.log('Locally apply incoming delta ' + delta.toString() + ` (RTT = ${delta.getRTT()}ms)`);
      this.applyLocally(delta);
      if (!retransmit) {
        delta.flatten().forEach((d) => {
          this.updateLastKnownDelta(d.getSerialId());
        });
      }
      this.confirmedIds.add(delta.getDeltaId());
    } else {
      // our own delta coming back
      this.confirmDelta(delta);
    }
  }

  public failIncomingDelta(response: DeltaResponseDto): void {
    const delta = this.unconfirmedDeltas.get(response.deltaId.toString());
    const extraInfo: string[] = [];
    if (response.message instanceof HttpErrorResponse) {
      const err = response.message;
      if (err.status === 405) {
        extraInfo.push(`Confirm there is a delta with type '${delta.getType()}' present in the backend`);
      }
    }

    if (!delta) {
      const msg = `Incoming failing delta ${response.deltaId} does not exist! (Code ${ErrorCodes.D_100})`;
      this.toastrService.error(msg, 'Schwerwiegender Fehler');
      throw new Error(msg);
    }
    console.group(`${delta.toString()} has been rejected:`);
    console.error(
      `Message: ${JSON.stringify((response.message as HttpErrorResponse)?.message ?? response.message, null, 2)}`
    );
    extraInfo.forEach((info) => console.log(info));
    console.groupEnd();
    this.toastrService.error([response.message, extraInfo].join(', '), delta.getErrorMessage(), {});
    const undoDelta = delta.getUndoDelta();
    if (undoDelta) {
      this.applyLocally(undoDelta);
    } else {
      delta.onFailure();
    }
    delta.setAppliedLocally(false);
    this._onError$.next({
      delta: delta,
      response: response,
    });
    this.deltaResponses$.next([delta.getDeltaId(), response]);
  }

  public waitForResponse(
    deltaId: UUID,
    successCallback: (delta) => void,
    errorCallback: (DeltaResponseDto) => void
  ): void {
    this.deltaResponses$
      .pipe(
        filter(([id, _]) => id === deltaId),
        first(),
        share()
      )
      .subscribe(([id, val]) => {
        if (val instanceof Delta) {
          successCallback(val);
        } else {
          errorCallback(val);
        }
      });
  }

  public waitForResponse$(deltaId: UUID): Observable<DeltaResponseDto> {
    return this.deltaResponses$.pipe(
      filter(([id, _]) => id === deltaId),
      first(),
      share(),
      map(([id, res]) => {
        if (res instanceof Delta) {
          return {
            deltaId: res.getDeltaId(),
            successful: true,
            message: undefined,
          };
        }
        return res;
      })
    );
  }

  /**
   * Applies the delta locally.
   * Returns true when application was successful, false otherwise.
   * @param delta
   * @param depth
   * @private
   */
  private applyLocally(delta: Delta, depth = 0): boolean {
    try {
      delta.apply();
      delta.setAppliedLocally(true);
      /*delta.flatten().forEach((d) => {
        this.updateLastKnownDelta(d.getSerialId());
      });*/
      return true;
    } catch (exception) {
      console.error(exception);
      this.toastrService.error('Lokale Änderung konnte nicht übernommen werden', delta.getErrorMessage());
      const undoDelta = delta.getUndoDelta();
      if (depth === 0) {
        if (undoDelta) {
          this.applyLocally(undoDelta, depth + 1);
        } else {
          delta.onFailure();
        }
      }
      delta.setAppliedLocally(false);
      this._onError$.next({
        delta: delta,
        exception: exception,
      });
      return false;
    }
  }

  private confirmDelta(delta: Delta): void {
    console.log('Confirming my own incoming delta ' + delta.toString() + ` (RTT = ${delta.getRTT()}ms)`);
    const deltaId = delta.getDeltaId();
    let existingDelta = this.unconfirmedDeltas.get(deltaId);
    if (existingDelta) {
      existingDelta.onConfirmation(delta);
      this.unconfirmedDeltas.delete(deltaId);
      this.confirmedIds.add(deltaId);
      delta.flatten().forEach((d) => {
        this.updateLastKnownDelta(d.getSerialId());
      });
    }
    this.deltaResponses$.next([deltaId, delta]);
  }

  private publish(delta: Delta): void {
    delta.prePublish();
    this.queue.insert(delta);

    console.log('Publishing delta ' + delta.toString());
    this.queue.submit((deltas) => {
      deltas.forEach((delta) => {
        delta.setSentTime(Date.now());
        (this.http.post(this.getUrl('/deltas/'), delta) as Observable<DeltaResponseDto>).subscribe({
          next: (response) => {
            if (!response.successful) {
              this.failIncomingDelta(response);
            }
          },
          error: (err) => {
            this.failIncomingDelta({
              deltaId: delta.getDeltaId(),
              message: err,
              successful: false,
            });
          },
        });
      });
    });
  }

  private attemptReconnection(): Observable<void> {
    console.log('Trying to reconnect');
    console.log('Last known delta serial id', this.lastKnownSerialId);

    const dto = <RetransmissionDto>{
      lastKnownSerialId: this.lastKnownSerialId,
      // topics: [...this.deltaTargets.values()],
      topics: this.receiverRegistry.getAllTopics().map((topic) => '/deltas' + topic),
    };
    this.http.post(this.getUrl('/deltas/since'), dto).subscribe((response: unknown) => {
      const rawDeltas = response as DeltaLike[];
      rawDeltas.forEach((rawDelta) => {
        const delta = DeltaParser.parseDelta(rawDelta);
        this.applyIncomingDelta(delta);
      });
    });
    return of(null);
  }

  private getLastKnownSerialId(): number {
    return this.lastKnownSerialId;
  }

  public updateLastKnownDelta(serialId: number): void {
    if (this.lastKnownSerialId < serialId) {
      this.lastKnownSerialId = serialId;
    }
  }
}
