import { Inject, Injectable, InjectionToken } from '@angular/core';
import { interval, Observable, Observer, Subject, Subscriber } from 'rxjs';
import { distinctUntilChanged, filter, map, share, take, takeWhile } from 'rxjs/operators';
import { TokenService } from '~/shared/services';
import { select, Store } from '@ngrx/store';
import { CurrentUserState, selectCurrentUser } from '~/store/current-user';
import { UserTokenData } from '~/shared/models';

export const WEB_SOCKET_CONFIG: InjectionToken<WebSocketConfig> = new InjectionToken<WebSocketConfig>('Web socket config');

export interface IWebsocketService {
  status: Observable<boolean>;
  on<T>(event: string): Observable<T>;
}

export interface IWsMessage<T> {
  topicId: string;
  data: T;
}

export interface WebSocketConfig {
  url: string;
  reconnectInterval?: number;
  reconnectAttempts?: number;
}

const MESSAGE_TYPEID_WELCOME = 0;
const MESSAGE_TYPEID_SUBSCRIBE = 5;
const MESSAGE_TYPEID_UNSUBSCRIBE = 6;
const MESSAGE_TYPEID_EVENT = 8;

@Injectable({
  providedIn: 'root'
})
export class WebsocketService implements IWebsocketService {
  static sessionId: string;
  // config object for ws
  private config: WebSocketConfig;
  private ws: WebSocket;
  // observable for reconnect by interval
  private reconnection$: Observable<number>;
  // helper Observable for work with message subscriptions
  private wsLastMessage$: Subject<IWsMessage<any>>;
  // tels, when reconnect is happening
  private connection$: Observer<boolean>;
  // status of connection
  status: Observable<boolean>;
  // helper for connection status
  private isConnected = false;
  // userId of logged user
  private userId: number;
  // topic ids we subscribed
  private subscriptions: string[] = [];

  constructor(
    @Inject(WEB_SOCKET_CONFIG) private wsConfig: WebSocketConfig,
    private readonly store: Store<CurrentUserState>
  ) {
    // attach default config
    this.config = {
      ...wsConfig,
      reconnectInterval: wsConfig.reconnectInterval || 5000,
      reconnectAttempts: wsConfig.reconnectAttempts || 10
    };
    this.init();
  }

  on<T>(topicId: string): Observable<T> {
    if (!this.subscriptions.includes(topicId)) {
      // subscribe only if not subscribed yet
      this.status.pipe(
        // wait for positive status
        filter(status => status),
        // take it only once
        take(1)
      )
        .subscribe(() => {
          this.subscribe(topicId);
        });
    }

    return this.wsLastMessage$.pipe(
      // filter last message by subscribed topic
      filter((message: IWsMessage<T>) => {
        return message.topicId === this.getFullTopicId(topicId);
      }),
      // take only data from message
      map((message: IWsMessage<T>) => message.data)
    );
  }

  private init() {
    // initialize message
    this.wsLastMessage$ = new Subject<IWsMessage<any>>();
    // connection status
    this.status = new Observable<boolean>((observer: Subscriber<boolean>) => {
      this.connection$ = observer;
    }).pipe(share(), distinctUntilChanged());
    // run reconnection after connection lost
    this.status
      .subscribe((isConnected: boolean) => {
        this.isConnected = isConnected;
        this.reconnect();
      });
    // take user data
    // because connection based on jwt token
    this.store.pipe(
      select(selectCurrentUser)
    )
      .subscribe((user?: UserTokenData) => {
        if (user && !this.ws) {
          // connect if user logged and not connected
          this.connect();
        }
        if (!user && this.ws) {
          // disconnect if user logged out and still connected
          this.disconnect();
        }
        // save user id
        this.userId = user?.id;
      });
  }


  send(type: number, topicId: string): void {
    // send data to server
    this.ws.send(JSON.stringify([type, this.getFullTopicId(topicId)]));
  }

  private subscribe(topicId: string) {
    // subscribe
    this.send(MESSAGE_TYPEID_SUBSCRIBE, topicId);
    // save subscription
    this.subscriptions.push(topicId);
  }

  private unsubscribe() {
    this.subscriptions.map((topicId: string) => {
      // unsubscribe
      this.send(MESSAGE_TYPEID_UNSUBSCRIBE, topicId);
    });
    // clear subscriptions
    this.subscriptions = [];
  }

  private getFullTopicId(topicId): string {
    return `/users/${this.userId}/${topicId}`;
  }

  private connect(): void {
    this.ws = new WebSocket(this.config.url + '?token=' + TokenService.getToken(), ['wamp']);
    this.ws.onopen = () => {
      // connected
      this.connection$.next(true);
    };

    this.ws.onmessage = (event: MessageEvent) => {
      // received message
      const [messageType, topicId, data]: [number, string, Object] = JSON.parse(event.data);
      // filter only real event message

      switch (messageType) {
        case MESSAGE_TYPEID_EVENT:
          this.wsLastMessage$.next({
            topicId,
            data
          });
          break;
        case MESSAGE_TYPEID_WELCOME:
          WebsocketService.sessionId = topicId;
          break;
      }
    };

    this.ws.onclose = () => {
      // clear var
      this.clearWs();
      // not connected anymore
      this.connection$.next(false);
    };

    this.ws.onerror = () => {
      // not sure is it going to call, just in case.
      this.reconnect();
    };
  }

  private disconnect() {
    // unsubscribe
    this.unsubscribe();
    // close connection
    this.ws.close();
    // clear var
    this.clearWs();
  }

  private clearWs() {
    this.ws = null;
  }

  private reconnect(): void {
    // check if we really need reconnection
    if (this.reconnection$ || this.isConnected || !this.userId || this.ws) {
      return;
    }
    // create interval with value from reconnectInterval
    this.reconnection$ = interval(this.config.reconnectInterval)
      .pipe(takeWhile((v, index) => index < this.config.reconnectAttempts && !this.ws));

    // trying to connect until connect, or don't reach retry connection limit
    this.reconnection$.subscribe(
      () => this.connect(),
      null,
      () => {
        // subject complete if reconnect attemts ending
        this.reconnection$ = null;

        if (!this.ws) {
          // if still not connected then it is not possible connect for some reason
          this.wsLastMessage$.complete();
          this.connection$.complete();
        }
      });
  }
}
