import { Injectable } from '@angular/core';
import {HttpClient} from '@angular/common/http';
import { concat } from 'rxjs/internal/observable/concat';
import {concatMap, delay, map, shareReplay, tap} from 'rxjs/operators';
import {Observable} from 'rxjs/internal/Observable';
import {environment} from '../../environments/environment';
import * as socketIo from 'socket.io-client';
import {MarketPair} from '../interfaces/market-pair';
import {of} from 'rxjs/internal/observable/of';

@Injectable()
export class LandingService {

  private _marketApi = `${environment.host}api/v1/settings/markets`;
  private socket: any;

  private _lastMarketsDetails = new Map<string, any>();

  marketsDetails$ = concat(
    this.getMarketList(),
    this.SubscribeToSummaryDeltas(),
  ).pipe(
    map(
      (markets) => markets.map(
        (v) => this._mergeMarketField(this._lastMarketsDetails.get(v.MarketName), v),
      )
    ),
    tap((markets) => this._lastMarketsDetails = new Map<string, any>(markets.map((v) => [v.MarketName, v]))),
    shareReplay(1)
  );

  private _mergeMarketField = (last: any, current: any) => {
    const mergedKeys = new Set([...Object.keys(last || {}), ...Object.keys(current)]);
    return {...Array.from(mergedKeys.values()).reduce(
        (acc, k) => ({...acc, [k]: current[k] || (last || {})[k]}), {}
      ),
      lastUp: last
        ? (+last.Last < +current.Last || (+last.lastUp && +last.Last === +current.Last))
        : +current.Last > +current.prevPrice,
      lastDown: last
        ? (+last.Last > +current.Last || (+last.lastDown && +last.Last === +current.Last))
        : +current.Last < +current.prevPrice,
      BaseVolumeUSDT: current.MarketName.split('-')[0] === 'USDT' ? current.BaseVolume24 : current.BaseVolumeUSDT,
    };
  }

  constructor(private _httpClient: HttpClient) {
    this.socket = socketIo(environment.host);
  }

  getMarketList(): Observable<MarketPair[]> {
    return this._httpClient.get<MarketPair[]>(this._marketApi);
  }

  SubscribeToSummaryDeltas() {
    return new Observable<any>(observer => {
      this.socket?.on('SubscribeToSummaryDeltas', data => {
        observer.next(data);
      }).pipe(
        concatMap(data => of(data).pipe(delay(1000))) // Задержка в 1000 миллисекунд (1 секунда) для каждого сообщения
      );
    });
  }
}
