import { DateTime } from 'luxon';

import { successfulLogin } from './slices/AuthSlice';
import { selectInstrumentsSymbols } from './selectors';
import * as balanceActions from './slices/BalanceSlice';
import * as instrumentsActions from './slices/InstrumentsSlice';
import * as tradingActions from './slices/TradingSlice';
import * as historyActions from './slices/HistorySlice';
import {
  streamsStateChanged,
  streamDataPushed,
  dataSubscriptionsChanged,
  socketStateChanged,
  selectNewSid,
  INSTRUMENTS_SUBSCRIPTION,
  BALANCE_SUBSCRIPTION,
  LIGHT_TICKERS_SUBSCRIPTION,
  INSTRUMENT_LIVE_TRADES_SUBSCRIPTION,
  INSTRUMENT_ORDER_BOOK_SUBSCRIPTION,
  INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION,
  INSTRUMENT_ORDERS_HISTORY,
  ACTIVE_ORDERS_SUBSCRIPTION,
  ORDERS_HISTORY_SUBSCRIPTION,
  TRADES_HISTORY_SUBSCRIPTION,
  TRADING_SESSION_SUBSCRIPTION,
  MARKET_DATA_SESSION_SUBSCRIPTION,
  PLACE_ORDER_SUBSCRIPTION,
  CANCEL_ORDER_SUBSCRIPTION,
} from './slices/SocketsSlice';


/**
 * @TODO handle rest socket actions
 * @link https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
 * close
 * error
 */

/**
 * Trading (WS)
 * Trading endpoint: private endpoint which can be accessed only for authenticated users.
 */
const TRADING_WS = 'TRADING_WS';

/**
 * Market Data (WS)
 * Market data endpoint: open for the public without any authentication for some method and also include some private methods that require authentication via createSession upfront.
 */
const MARKET_DATA_WS = 'MARKET_DATA_WS';

const sockets = {
  [TRADING_WS]: {
    url: process.env.REACT_APP_TRADING_SERVER_WS_ENDPOINT,
    instance: null,
  },
  [MARKET_DATA_WS]: {
    url: process.env.REACT_APP_MD_SERVER_WS_ENDPOINT,
    instance: null,
  },
};

const timestamps = {
  // now - 1 day (default)
  'last_day': DateTime.now().minus({ days: 1 }).toUnixInteger(),
  'last_week': DateTime.now().minus({ days: 7 }).toUnixInteger(),
  'last_30_days': DateTime.now().minus({ days: 30 }).toUnixInteger(),
};

/**
 * Unsubscribe to the not required streams
 * @param {WebSocket} socket
 * @param {callback} handler
 * @param {string} q
 * @param {integer} sid stream Id is a unique id per websocket connection that can be used only once in parallel, for bounded streams (such as placeOrder) you can use the same sid
 */
const subscribeToStream = (socket, payload) => {
  const { q, sid, d = {}} = payload;
  if (!sid) throw new Error(`Invalid SID for WebSocket stream ${sid}`);

  socket.send(JSON.stringify({ q, sid, d }));
};

/**
 * Unsubscribe to the not required streams
 * @param {WebSocket} socket
 * @param {string} q
 * @param {integer} sid stream Id is a unique id per websocket connection that can be used only once in parallel, for bounded streams (such as placeOrder) you can use the same sid
 */
const unsubscribeFromStream = (socket, payload) => {
  const { q, sid } = payload;
  if (!sid) {
    throw new Error('Cannot unsubscribe stream due missing SID');
  }
  if (socket instanceof WebSocket === false || socket.readyState !== 1) {
    throw new Error(`Cannot unsubscribe SID ${sid} because socket not ready yet`);
  }
  socket.send(JSON.stringify({ q, sid, sig: 3 }));
};

/**
 * Trading (WS)
 * Trading endpoint: private endpoint which can be accessed only for authenticated users.
 * @param {callback} dispatch Redux dispatch
 */
const getWebsocket = (socketName, dispatch) => {
  if (!sockets.hasOwnProperty(socketName)) throw new Error(`Invalid socket name: ${socketName}`);

  const { instance, url } = sockets[socketName];
  if (instance) return instance;

  const ws = new WebSocket(url);
  sockets[socketName].instance = ws;
  // when ready
  ws.addEventListener('open', () => {
    dispatch(socketStateChanged({ readyState: 1, error: null, sessionCreated: false, socketName }));
  });

  // handle pushed responses
  ws.addEventListener('message', (e) => {
    const data = JSON.parse(e.data);
    dispatch(streamDataPushed(data));
  });

  return ws;
};

const teardownWebsocket = (socketName) => {
  // logout, close sockets
  if (sockets[socketName].instance) {
    sockets[socketName].instance.close();
    sockets[socketName].instance = null;
  }
};

const fetchInstruments = async (token, dispatch) => {
  let data = null;
  dispatch(instrumentsActions.setLoading());

  try {
    /** @const string MetaData (Rest) */
    const metaDataEndpoint = process.env.REACT_APP_METADATA_ENDPOINT;
    const response = await fetch(`${metaDataEndpoint}instruments`, {
      method: 'GET',
      headers: {
        Authorization: `Bearer ${token}`,
      },
    });
    data = await response.json();
    if (!response.ok) {
      const exception = new Error(data?.message);
      exception.name = data?.code;
      throw exception;
    }

    dispatch(instrumentsActions.setData({ data }));
  } catch ({ name, message }) {
    dispatch(instrumentsActions.setError({
      data: {
        code: name,
        message: message,
      },
    }));

    return null;
  }

  return data;
};

const getTradingSessionPayload = (token = null) => {
  return { socketName: TRADING_WS, q: 'v1/broker.oms/createSession', d: { token }, subType: TRADING_SESSION_SUBSCRIPTION };
};

const getMarketDataSessionPayload = (token = null) => {
  return { socketName: MARKET_DATA_WS, q: 'v1/broker.oms/createSession', d: { token }, subType: MARKET_DATA_SESSION_SUBSCRIPTION };
};

/**
 * Using the instrument list retrieved in point 1, need to permanently subscribe to light ticker stream to get prices.
 * @param {string[]} symbols
 * @param {integer} interval
 */
const getLightTickersRequestPayload = (symbols = null, interval = 1000) => {
  return { socketName: MARKET_DATA_WS, q: 'v1/exchange.marketdata/lightTickers',  d: { symbols, interval }, subType: LIGHT_TICKERS_SUBSCRIPTION };
};

/**
 * Get balances per asset.
 * Can be send after created session in Trading (WS)
 */
const getBalanceRequestPayload = () => {
  return { socketName: TRADING_WS, q: 'v1/broker.account/balance',  d: { }, subType: BALANCE_SUBSCRIPTION };
};

/**
 *
 * @param {string} symbol
 * @param {integer} limit
 */
const getTradesRequestPayload = (symbol, limit = 1000) => {
  return { socketName: MARKET_DATA_WS, q: 'v1/exchange.marketdata/liveTrades',  d: { symbol, limit }, subType: INSTRUMENT_LIVE_TRADES_SUBSCRIPTION };
};

const getOrderBookRequestPayload = (symbol, levels = 5, interval = 1000, decimals = 2) => {
  return { socketName: MARKET_DATA_WS, q: 'v1/exchange.marketdata/partialOrderBook', d: { symbol, levels, interval, decimals }, subType: INSTRUMENT_ORDER_BOOK_SUBSCRIPTION };
};

const getActiveOrdersRequestPayload = (symbol) => {
  return { socketName: TRADING_WS, q: 'v1/broker.account/orders',  d: { instrument: symbol }, subType: INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION };
};

const getOrdersHistoryRequestPayload = (status = 'Active', dateFrom = null, limit = 50, offset = 0) => {
  return { socketName: MARKET_DATA_WS, q: 'v1/broker.account/ordersHistory',  d: { status, dateFrom, limit, offset } };
};

const getTradesHistoryPayload = (dateFrom = null, limit = 50, offset = 0) => {
  return { socketName: MARKET_DATA_WS, q: 'v1/broker.account/tradesHistory',  d: { dateFrom, limit, offset } };
};

const getPlaceOrderRequestPayload = (payload = {}) => {
  return { socketName: TRADING_WS, q: 'v1/broker.oms/placeOrder',  d: payload };
};

const getCancelOrderRequestPayload = (payload = {}) => {
  return { socketName: TRADING_WS, q: 'v1/broker.oms/cancelOrder',  d: payload };
};

const createNewStreamFromSubscription = (subType) => {
  switch (subType) {
    case INSTRUMENTS_SUBSCRIPTION:
      // ignore as it's rest http request
      return { subType };
    case TRADING_SESSION_SUBSCRIPTION:
      return { ...getTradingSessionPayload() };
    case MARKET_DATA_SESSION_SUBSCRIPTION:
      return { ...getMarketDataSessionPayload() };
    case BALANCE_SUBSCRIPTION:
      return { ...getBalanceRequestPayload(), outputReceiverAction: balanceActions.streamDataPushed.toString() };
    case LIGHT_TICKERS_SUBSCRIPTION:
      return { ...getLightTickersRequestPayload(), outputReceiverAction: instrumentsActions.streamDataPushed.toString() };
    case INSTRUMENT_LIVE_TRADES_SUBSCRIPTION:
      return { ...getTradesRequestPayload(), sliceDataKey: 'trading.liveTrades' };
    case INSTRUMENT_ORDER_BOOK_SUBSCRIPTION:
      return { ...getOrderBookRequestPayload(), sliceDataKey: 'trading.orderBook' };
    case INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION:
      return { ...getActiveOrdersRequestPayload(), sliceDataKey: 'trading.activeOrders' };
    case INSTRUMENT_ORDERS_HISTORY:
      return { ...getOrdersHistoryRequestPayload('Inactive'), sliceDataKey: 'trading.ordersHistory', subType };
    case ACTIVE_ORDERS_SUBSCRIPTION:
      return { ...getOrdersHistoryRequestPayload('Active'), sliceDataKey: 'history.activeOrders', subType };
    case ORDERS_HISTORY_SUBSCRIPTION:
      return { ...getOrdersHistoryRequestPayload('Inactive'), sliceDataKey: 'history.ordersHistory', subType };
    case TRADES_HISTORY_SUBSCRIPTION:
      return { ...getTradesHistoryPayload(), sliceDataKey: 'history.tradesHistory', subType };
    default:
      // do nothing
  }
};

const validateRequestPayload = (
  request,
  params = {},
  socketReady = false,
  sessionCreated = false,
  fullCheck = true
) => {
  const { subType, d = {}, sig = null, sid = 0 } = request;

  let extraPayload = { d: { ...d } };
  switch (subType) {
    case INSTRUMENTS_SUBSCRIPTION:
      throw new Error('Not a stream request');
    case TRADING_SESSION_SUBSCRIPTION:
    case MARKET_DATA_SESSION_SUBSCRIPTION:
      // token param required
      const token = params?.token || request?.d.token;
      if (!token) throw new Error('Token param missed or invalid');
      extraPayload.d.token = token;
      break;
    case BALANCE_SUBSCRIPTION:
      break;
    case ACTIVE_ORDERS_SUBSCRIPTION:
      extraPayload.d = { ...extraPayload.d, ...params.activeOrdersFilters };
      break;
    case ORDERS_HISTORY_SUBSCRIPTION:
      extraPayload.d = { ...extraPayload.d, ...params.ordersHistoryFilters };
      if (timestamps.hasOwnProperty(extraPayload.d.dateFrom)) {
        extraPayload.d.dateFrom = timestamps[extraPayload.d.dateFrom];
      }
      break;
    case TRADES_HISTORY_SUBSCRIPTION:
      extraPayload.d = { ...params.tradesHistoryFilters };
      if (timestamps.hasOwnProperty(extraPayload.d.dateFrom)) {
        extraPayload.d.dateFrom = timestamps[extraPayload.d.dateFrom];
      }
      break;
    case LIGHT_TICKERS_SUBSCRIPTION:
      // symbols non empty array required
      const symbols = params?.symbols || request?.d.symbols;
      if (!Array.isArray(symbols) || symbols.length < 1) throw new Error('Symbols param missed or invalid');
      extraPayload.d.symbols = symbols;
      break;
    case INSTRUMENT_LIVE_TRADES_SUBSCRIPTION:
    case INSTRUMENT_ORDER_BOOK_SUBSCRIPTION:
      const symbol = params?.symbol || request?.d.symbol;
      if (!symbol) throw new Error('Symbol param missed or invalid');
      extraPayload.d.symbol = symbol;
      break;
    case INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION:
      const instrument = params?.instrument || request?.d.instrument;
      if (!instrument) throw new Error('Instrument param missed or invalid');
      extraPayload.d.instrument = instrument;
      break;
    case INSTRUMENT_ORDERS_HISTORY:
      const instrumentParam = params?.instrument || request?.d.instrument;
      if (!instrumentParam) throw new Error('Instrument param missed or invalid');
      extraPayload.d = { ...extraPayload.d, ...params.instrumentOrdersHistoryFilters };
      if (timestamps.hasOwnProperty(extraPayload.d.dateFrom)) {
        extraPayload.d.dateFrom = timestamps[extraPayload.d.dateFrom];
      }
      extraPayload.d.instrument = instrumentParam;
      break;
    case PLACE_ORDER_SUBSCRIPTION:
    case CANCEL_ORDER_SUBSCRIPTION:
      // don't add store params to user defined requests
      break;
    default:
      throw new Error(`Unexpected stream payload of ${subType}`);
      // do nothing
  }

  if (fullCheck) {
    if (sid < 1) throw new Error('SID is required');
    if (sig > 1) throw new Error(`Cannot push after sig ${sig}`);
    if (!socketReady) throw new Error('Socket not ready yet');

    // all requests beside session themselves require session requests finished
    if (![
      TRADING_SESSION_SUBSCRIPTION,
      MARKET_DATA_SESSION_SUBSCRIPTION,
    ].includes(subType) && sessionCreated !== true) {
      throw new Error('Session request required to continue');
    }
  }

  return { ...request, ...extraPayload };
};

const areStreamPayloadsEqual = (a, b) => {
  if (a?.subType !== b?.subType) return false;

  // deeper comparison
  switch (a?.subType) {
    case TRADING_SESSION_SUBSCRIPTION:
    case MARKET_DATA_SESSION_SUBSCRIPTION:
      // compare tokens
      return Boolean(!a?.d.token || !b?.d.token || a?.d.token === b?.d.token);
    case LIGHT_TICKERS_SUBSCRIPTION:
      // compare symbols
      return !Array.isArray(a?.d.symbols) || !Array.isArray(b?.d.symbols) || a?.d.symbols.concat().sort() === b?.d.symbols.concat().sort();
    case INSTRUMENT_LIVE_TRADES_SUBSCRIPTION:
    case INSTRUMENT_ORDER_BOOK_SUBSCRIPTION:
      return Boolean(!a?.d.symbol || !b?.d.symbol || a?.d.symbol === b?.d.symbol);
    case INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION:
      return Boolean(!a?.d.instrument || !b?.d.instrument || a?.d.instrument === b?.d.instrument);
    // user triggered action always new
    case PLACE_ORDER_SUBSCRIPTION:
    case CANCEL_ORDER_SUBSCRIPTION:
      return false;
    // data with filters
    case INSTRUMENT_ORDERS_HISTORY:
      return ['instrument', 'offset', 'limit', 'dateFrom']
        .map(prop => a?.d[prop] === b?.d[prop])
        .every(v => Boolean(v));
    case ACTIVE_ORDERS_SUBSCRIPTION:
      return ['offset', 'limit']
        .map(prop => a?.d[prop] === b?.d[prop])
        .every(v => Boolean(v));
    case ORDERS_HISTORY_SUBSCRIPTION:
      return ['offset', 'limit', 'dateFrom']
        .map(prop => Boolean(a?.d[prop] === b?.d[prop]))
        .every(v => Boolean(v));
    case TRADES_HISTORY_SUBSCRIPTION:
      return ['offset', 'limit', 'dateFrom']
        .map(prop => Boolean(a?.d[prop] === b?.d[prop]))
        .every(v => Boolean(v));
    // streams without params are always equal
    case INSTRUMENTS_SUBSCRIPTION:
    case BALANCE_SUBSCRIPTION:
    default:
      return true;
  }
};

const streamsTriggeringActions = [
  // prepare streams
  // triggers on location change
  dataSubscriptionsChanged.toString(),
  // gives missing token param
  successfulLogin.toString(),
  // gives missing symbols list param
  instrumentsActions.setData.toString(),
  // gives missing symbol param, also means location change
  instrumentsActions.setCurrentInstrumentSymbol.toString(),
  // user initiated actions
  tradingActions.orderPlaced.toString(),
  tradingActions.orderCanceled.toString(),
  // next filters
  historyActions.dataFilterChanged.toString(),
  tradingActions.dataFilterChanged.toString(),
];

const middleware = store => next => action => {
  const { getState, dispatch } = store;
  const { type } = action;

  const result = next(action);
  let newStreams = [];
  let newOpenedSids = [];
  let outdatedSids = [];
  let closedSids = [];
  let failedStreams = [];

  // prepare params for streams
  const state = getState();
  const {
    sockets,
    auth: { token },
    instruments: { currentInstrumentSymbol: symbol },
    trading: { ordersHistory: { filters } },
    history: { activeOrders, ordersHistory, tradesHistory },
  } = state;
  const { streamsQueue, dataSubscriptions: subList } = sockets;
  const relatedStream = streamsQueue.find(({ sid }) => sid && sid === action.payload?.sid);
  const symbolsList = selectInstrumentsSymbols(state);// symbols list
  const streamParams = {
    token,
    symbol,
    symbols: symbolsList,
    instrument: symbol,
    instrumentOrdersHistoryFilters: filters,
    activeOrdersFilters: activeOrders.filters,
    ordersHistoryFilters: ordersHistory.filters,
    tradesHistoryFilters: tradesHistory.filters,
  };

  const buildStreamForComparison = (subType, params) => {
    let stream = createNewStreamFromSubscription(subType);
    try {
      stream = validateRequestPayload(stream, params, false, false, false);
    } catch (e) {
      // that's normal, error doesn't matter
    }
    return stream;
  };

  if (
    [PLACE_ORDER_SUBSCRIPTION, CANCEL_ORDER_SUBSCRIPTION].includes(relatedStream?.subType)
    && action.payload?.sig === 1
  ) {
    // that's an answer to one time action
    // stream can be closed
    outdatedSids.push(relatedStream.sid);
  }

  if (
    !subList.includes(INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION)
    && relatedStream?.subType === INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION
    && ['Added', 'Rejected', 'Executed', 'Cancelled'].includes(action.payload?.d.messageType)
  ) {
    // active orders feedback push
    // stream can be closed since we not subscribed to
    outdatedSids.push(relatedStream.sid);
  }

  if (streamsTriggeringActions.includes(type)) {
    // create new requests to compare payloads
    const streamsParams = {};
    subList.forEach(subType => {
      streamsParams[subType] = buildStreamForComparison(subType, streamParams);
    });

    // close or remove old streams
    // like when we leaving page
    streamsQueue.forEach((stream) => {
      const { socketName, subType, ...payload } = stream;
      const isUserSub = [PLACE_ORDER_SUBSCRIPTION, CANCEL_ORDER_SUBSCRIPTION].includes(subType);
      // has been modified or not required anymore
      const isRequested = streamsParams?.[subType];
      const isDuplicate = areStreamPayloadsEqual(stream, streamsParams?.[subType]);
      if (!isUserSub && (!isRequested || !isDuplicate)) {
        // not required anymore, can be closed
        outdatedSids.push(payload.sid);
      }

      if (isDuplicate) {
        // remove duplicate from prepared streams
        delete streamsParams[subType];
      }

      // additionally build user request from action payload
      switch (type) {
        case tradingActions.orderPlaced.toString():
          const { payload: { timeInForceMarketOnly, expiryDate, ...orderData } } = action;
          if (orderData?.orderType === 'Market') {
            orderData.timeInForce = timeInForceMarketOnly;
            delete orderData?.price;
          } else if (orderData.timeInForce === 'GTD') {
            // type Limit
            orderData.expiryDate = expiryDate;
          }
          streamsParams[PLACE_ORDER_SUBSCRIPTION] = Object.assign(
            getPlaceOrderRequestPayload(orderData),
            { subType: PLACE_ORDER_SUBSCRIPTION }
          );
          break;
        case tradingActions.orderCanceled.toString():
          const { payload: { orderId, instrument } } = action;
          const activeOrdersOpenStream = streamsQueue.find((item) => item?.subType === INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION && item?.d?.instrument === instrument && item?.sig !== 3);
          if (!activeOrdersOpenStream) {
            streamsParams[INSTRUMENT_ACTIVE_ORDERS_SUBSCRIPTION] = Object.assign(
              getActiveOrdersRequestPayload(instrument),
              { sliceDataKey: 'trading.activeOrders' }
            );
          }
          streamsParams[CANCEL_ORDER_SUBSCRIPTION] = Object.assign(
            getCancelOrderRequestPayload({ orderId, instrument }),
            { subType: CANCEL_ORDER_SUBSCRIPTION }
          );
          break;
        default:
          // ignore
      }
    });

    newStreams = Object.keys(streamsParams).map(key => streamsParams[key]);
  }

  // close streams before opening new one
  outdatedSids.forEach(sid => {
    try {
      const { socketName, ...payload } = streamsQueue.find((item) => item?.sid && sid === item?.sid);
      const socket = getWebsocket(socketName, dispatch);
      unsubscribeFromStream(socket, payload);
    } catch (e) {
      // socket not ready or missing SID
      // means stream hasn't been opened
      // don't need to be closed
    } finally {
      closedSids.push(sid);
    }
  });

  if (subList.length === 0) {
    [TRADING_WS, MARKET_DATA_WS].forEach(socketName => teardownWebsocket(socketName));
  }

  // handle delayed sockets queue
  // generate SIDs for new streams
  // clone streams queue
  const readyStreams = streamsQueue
    .filter(({ sig = null, subType, failedReason = null }) => {
      // user interacted actions cannot be delayed
      // skip if failed already
      if ([PLACE_ORDER_SUBSCRIPTION, CANCEL_ORDER_SUBSCRIPTION].includes(subType) && failedReason) return false;

      return sig < 1;
    })
    .map((delayedStream) => {
      const {
        sid = null,
        subType,
        socketName,
        failedReason = null,
      } = delayedStream;

      if (subType === INSTRUMENTS_SUBSCRIPTION && streamParams?.token) {
        // fetchInstruments(streamParams?.token, dispatch);
        newOpenedSids.push(sid);
        return delayedStream;
      }


      try {
        const socket = getWebsocket(socketName, dispatch);
        const socketReady = socket.readyState === 1;
        const sessionCreated = getState().sockets?.[socketName].sessionCreated;
        const streamPayload = validateRequestPayload(delayedStream, streamParams, socketReady, sessionCreated, true);
        newOpenedSids.push(sid);
        return streamPayload;
      } catch (e) {
        if (failedReason !== e.message) {
          failedStreams.push({
            reason: e.message,
            sid,
          });
        }
        return null;
      }
    })
    .filter(item => item !== null);

  // push new state to reducer
  if (closedSids.length || newOpenedSids.length || newStreams.length || failedStreams.length) {
    // add new SIDs
    if (newStreams.length) {
      let nextSid = selectNewSid(state);
      newStreams = newStreams.map(original => {
        const modified = { ...original, sid: nextSid };
        nextSid++;
        return modified;
      });
    }

    dispatch(streamsStateChanged({
      created: newStreams,
      closed: closedSids,
      opened: newOpenedSids,
      failed: failedStreams,
    }));
  }

  // finally make fetch and socket.send
  readyStreams.forEach((stream => {
    const { subType, socketName } = stream;
    if (subType === INSTRUMENTS_SUBSCRIPTION && streamParams?.token) {
      fetchInstruments(streamParams?.token, dispatch);
      return;
    }

    const socket = getWebsocket(socketName, dispatch);
    subscribeToStream(socket, stream);
  }));

  return result;
};

export default middleware;
