import { Injectable } from '@angular/core';
import { Subject, Observable, Subscription } from 'rxjs';

export interface IRequestQueueOptions {
  maxConcurrentRequests?: number;
}

interface OnCompleteI {
  request: Observable<any>;
  response: any;
  index: number;
}

interface OnErrorI {
  request: Observable<any>;
  error: any;
  index: number;
}

@Injectable({
  providedIn: 'root',
})
export class WorksRequestQueueService {
  private dataStore: { requests: Array<Observable<any>> } = { requests: [] };
  private options: IRequestQueueOptions = {
    maxConcurrentRequests: 10,
  };

  private runningRequests: Array<Subscription>;
  private onCompleteSubject = new Subject<OnCompleteI>();
  private onErrorSubject = new Subject<OnErrorI>();
  private onFinishSubject = new Subject<boolean>();
  private onCompleteSubscription: Subscription;
  private onErrorSubscription: Subscription;

  constructor() {}

  init(requests: Array<Observable<any>>, options?: IRequestQueueOptions): void {
    this.options = {
      ...this.options,
      ...options,
    };

    this.reset();

    this.dataStore.requests = [...requests];
  }

  start(): void {
    this.onCompleteSubscription = this.onComplete.subscribe(this.onRequestEnded.bind(this));

    this.onErrorSubscription = this.onError.subscribe(this.onRequestEnded.bind(this));

    for (let i = 0; i < this.runningRequests.length; i += 1) {
      const request = this.dataStore.requests.shift();
      this.handleNewRequest(request, i);
    }
  }

  stop(): void {
    for (let i = 0; i < this.runningRequests.length; i += 1) {
      this.removeHandledRequest(i);
    }

    this.reset();
  }

  private onRequestEnded(onCompleteRes: { request; response; index }) {
    this.removeHandledRequest(onCompleteRes.index);

    // Check if all requests are served
    if (!this.dataStore.requests.length) {
      this.emitFinished();
    } else {
      // Serve new requests if dont
      const request = this.dataStore.requests.shift();
      this.handleNewRequest(request, onCompleteRes.index);
    }
  }

  private handleNewRequest(request: Observable<any>, index: number) {
    if (!request) {
      return;
    }

    const subscription = request.subscribe(
      (response) => {
        this.onCompleteSubject.next({ request, response, index });
      },
      (error) => this.onErrorSubject.next({ request, error, index }),
    );

    this.runningRequests[index] = subscription;
  }

  private removeHandledRequest(index: number): number {
    const subscription = this.runningRequests[index];

    if (subscription) {
      subscription.unsubscribe();
    }

    this.runningRequests[index] = null;

    return index;
  }

  private emitFinished(): void {
    if (this.runningRequestsIsEmpty()) {
      this.onFinishSubject.next(true);
    }
  }

  private runningRequestsIsEmpty(): boolean {
    return this.runningRequests.every((element) => !element);
  }

  private reset(): void {
    this.runningRequests = new Array<Subscription>(this.options.maxConcurrentRequests);
    this.dataStore.requests = [];

    if (this.onCompleteSubscription) {
      this.onCompleteSubscription.unsubscribe();
    }

    if (this.onErrorSubscription) {
      this.onErrorSubscription.unsubscribe();
    }
  }

  get onComplete(): Observable<OnCompleteI> {
    return this.onCompleteSubject.asObservable();
  }

  get onError(): Observable<OnErrorI> {
    return this.onErrorSubject.asObservable();
  }

  get onFinish(): Observable<boolean> {
    return this.onFinishSubject.asObservable();
  }
}
