import { Injectable } from '@angular/core';
import { interval, Observable } from 'rxjs';
import { RxStomp, RxStompState } from '@stomp/rx-stomp';
import { bufferCount, distinctUntilChanged, filter, map, share, startWith, switchMap, tap } from 'rxjs/operators';
import { AbstractCommunicationService } from '../abstract-communication.service';
import { environment } from '../../../environments/environment';
import { AuthenticationService } from '../../auth/authentication.service';
import * as SockJS from 'sockjs-client/dist/sockjs';
import { ValueSubject } from '../../util/reactive/value-subject';
import { Message as StompMessage } from '@stomp/stompjs';
import { DeltaLike } from '../../util/delta-parser';
import { HttpClient } from '@angular/common/http';
import { VersionController } from '../version-controller.service';
import { GRAFITI_BUILD_VERSION } from '../../util/version';
import { requestReload } from '../../util/request-reload';

export interface Message<T> extends StompMessage {
  content?: T;
}
const PING_INTERVAL = 10000;
const PING_TOPIC = '/ping';
/**
 * Determines the size of the sliding window. The sliding window is used
 * to store the last N ping values and calculate the average ping of
 * those values.
 */
const PING_SLIDING_WINDOW_SIZE = 5;

/**
 * Represents a service to interact with the server's web socket
 * connection.
 */
@Injectable()
export class WebSocketService extends AbstractCommunicationService {
  private activated = false;

  /* Stores the last {@link PING_SLIDING_WINDOW_SIZE N} ping values
   * to be able to calculate an average and therefore having less
   * noise in the ping.
   */
  private slidingPingWindow: number[] = [];

  private readonly ping$ = new ValueSubject<number>(null);

  /**
   * Whether we can handle ping messages by being subscribed to it.
   */
  private canHandlePing = false;

  public connectionClosed$: Observable<void>;
  public connected$: Observable<void>;
  public isConnected$: Observable<boolean>;

  private readonly disconnectTime$ = new ValueSubject<Date>(null);

  private readonly stomp: RxStomp;

  private readonly legacySubscriptions = new Map<string, Observable<Message<unknown>>>();

  /**
   * Creates a new WebSocketService.
   * Initializes state observables.
   *
   * @param authService DI-injected AuthenticationService
   * @param versionController
   */
  public constructor(
    private readonly authService: AuthenticationService,
    private readonly versionController: VersionController
  ) {
    super(environment.webSocketEndpoint);
    const brokerURL = this.getBaseUrl().replace(/\/\/api/, '/api');
    this.stomp = new RxStomp();
    this.stomp.configure({
      webSocketFactory: () =>
        new SockJS(brokerURL, undefined, {
          // todo: keep track of https://github.com/sockjs/sockjs-client/issues/549
          // 'eventsource' is probably the best fallback option for websocket, unfortunately it does not work with our cookie auth at the moment.
          // there is an open ticket for enabling cookie auth with this transport, so maybe we should just double-check that from time to time.
          transports: ['websocket', /*'eventsource',*/ 'xhr-streaming', 'xhr-polling'],
        }),
      logRawCommunication: true,

      // Headers
      // Typical keys: login, passcode, host
      connectHeaders: {},
      connectionTimeout: 2500,

      brokerURL,

      splitLargeFrames: true,

      // How often to heartbeat?
      // Interval in milliseconds, set to 0 to disable
      heartbeatIncoming: 10000, // Typical value 0 - disabled
      heartbeatOutgoing: 10000, // Typical value 20000 - every 20 seconds

      // Wait in milliseconds before attempting auto reconnect
      // Set to 0 to disable
      // Typical value 500 (500 milli seconds)
      reconnectDelay: 2000,
    });

    this.authService.currentUser$.subscribe((user) => {
      if (!user) {
        console.log('Deactivating Websocket...');
        this.stomp.deactivate();
      } else {
        console.log('Activating Websocket...');
        this.tryReconnect();
      }
    });

    this.connectionClosed$ = this.stomp.connectionState$.pipe(
      bufferCount(3, 1),
      filter((states) => {
        return (
          states.length === 3 && // On Completion it is only one value
          states.indexOf(RxStompState.OPEN) !== -1 && // The connection was already open ...
          states[2] === RxStompState.CLOSED
        ); // ... and is now closed
      }),
      tap(() => console.log('Websocket Connection closed')),
      map(() => void 0),
      share()
    );

    this.isConnected$ = this.stomp.connected$.pipe(map(() => true));
    this.connected$ = this.stomp.connected$.pipe(
      tap(() => console.log('Websocket connected!')),
      map(() => void 0),
      share()
    );

    // Resend pending packages
    this.stomp.connected$.subscribe(this.reconnected.bind(this));

    this.stomp.connectionState$
      .pipe(
        map((s) => s !== RxStompState.OPEN),
        distinctUntilChanged(),
        filter((s) => s)
      )
      .subscribe(() => {
        this.disconnectTime$.next(new Date());
      });

    this.subscribeToPings();

    // ping the server if we didn't send a change within the last 10 seconds
    this.authService.currentUser$
      .pipe(
        switchMap(() => interval(PING_INTERVAL).pipe(startWith(0))),
        filter(() => this.isConnected()) // don't send if not connected
      )
      .subscribe({
        next: () => this.sendPing(),
        error: () => {
          console.error('ping error');
        },
        complete: () => {
          console.error('ping complete');
        },
      });

    this.authService.sessionLocked$.subscribe(async (locked: boolean) => {
      if (locked) {
        await this.setConnectionStatus(false);
      }
      if (!this.isConnected() && !locked && this.authService.getCurrentUser()) {
        await this.setConnectionStatus(true);
      }
    });

    this.ping$.subscribe((ping) => {});
    this.connected$.subscribe(() => {
      this.versionController.getVersion$().subscribe((version) => {
        console.log('Using application version: ' + version);
        if (version !== GRAFITI_BUILD_VERSION) {
          requestReload('Die Anwendung muss für ein Update neugestartet werden');
        }
      });
    });
  }

  /**
   * Checks if the StompService is connected to the server.
   */
  public isConnected(): boolean {
    return this.stomp.connectionState$.getValue() === RxStompState.OPEN;
  }

  /**
   * Trys to reconnect the frontend with the server by reactivating the StompService.
   */
  public async tryReconnect(): Promise<void> {
    if (this.stomp.connectionState$.getValue() === RxStompState.OPEN) {
      await this.stomp.deactivate();
    }
    this.stomp.activate();
  }

  /**
   * Activates or deactivates the StompService depending on the current connection status.
   * @param connected if the StompService should be active
   */
  public async setConnectionStatus(connected: boolean): Promise<void> {
    if (connected) {
      await this.stomp.activate();
    } else {
      await this.stomp.deactivate();
    }
  }

  /**
   * Returns an Observable emitting the current ping.
   */
  public getPing$(): Observable<number> {
    return this.ping$;
  }

  /**
   * Returns an Observable emitting the time the STOMP broker disconnected.
   */
  public getDisconnectTime$(): Observable<Date> {
    return this.disconnectTime$;
  }

  /**
   * Whether the web socket connection is closed or about to close.
   */
  public isClosed(): boolean {
    /*
     * WORKAROUND
     * We use the underlying web socket state directly, because RxStompService's
     * state does not hold the CLOSING state, even though the underlying web socket
     * is in the CLOSING state.
     */
    const state = this.stomp.connectionState$.immediate();
    return state === WebSocket.CLOSED || state === WebSocket.CLOSING;
  }

  public subscribeLegacy<T>(topic: string): Observable<Message<T>> {
    if (!this.legacySubscriptions.has(topic)) {
      this.legacySubscriptions.set(
        topic,
        this.stomp.watch(topic).pipe(
          map((message) => ({ ...message, content: JSON.parse(message.body) } as Message<T>)),
          share()
        )
      );
    }
    return this.legacySubscriptions.get(topic) as Observable<Message<T>>;
  }

  public subscribeToDeltas(topic: string): Observable<Message<DeltaLike>> {
    return this.subscribeLegacy<DeltaLike>('/user/topic/deltas' + topic);
  }

  private subscribeToPings(): void {
    this.subscribeLegacy<{ timestamp: number }>('/user/topic' + PING_TOPIC).subscribe((message) => {
      const timestamp = message?.content?.timestamp;
      if (timestamp) {
        this.handlePing(timestamp);
      }
    });

    // Now that we are subscribed, we can handle ping messages. Therefore we can start sending pings.
    this.canHandlePing = true;
  }

  private handlePing(timestamp: number, computationTime?: number): void {
    let ping: number;
    if (computationTime) {
      ping = Date.now() - timestamp - computationTime;
    } else {
      ping = Date.now() - timestamp;
    }

    this.disconnectTime$.next(null);
    const newLength = this.slidingPingWindow.push(ping);
    if (newLength > PING_SLIDING_WINDOW_SIZE) {
      this.slidingPingWindow.shift();
    }
    const newPing = this.slidingPingWindow.reduce((acc, curr) => acc + curr) / this.slidingPingWindow.length;
    this.ping$.next(Math.round(newPing));
  }

  private sendPing(): void {
    const timestamp = JSON.stringify({ timestamp: Date.now() });
    this.stomp.publish({ destination: '/app' + PING_TOPIC, body: timestamp, retryIfDisconnected: false });
  }

  /**
   * Clears the state of the web-socket service.
   * Needed for example after the user logs out.
   */
  public clearState(): void {
    this.legacySubscriptions.clear();
  }

  private reconnected(): void {
    // when reconnecting we want to send the first ping message right away if we cannot determine the ping via changes
    if (this.canHandlePing) {
      this.sendPing();
    }
    // We want to get a proper ping representation when reconnecting. Therefore, we need to delete all old ping values.
    this.slidingPingWindow = [];
    //this.validateServerVersion$.next(null);
  }
}
