import partition from "lodash/partition";
import uniqBy from "lodash/uniqBy";
import { delay, put, select, takeEvery } from "redux-saga/effects";

import {
  addScansFromSocket,
  clearSocketBuffer,
  processSocketMessage,
} from "actions";

import { PROCESS_SOCKET_MESSAGE } from "constants/actionTypes";
import { deduplicatePusherMessageTypes, PusherTypes } from "constants/socket";

import { getLivestockSaleId } from "lib/navigation";
import { channelEvents } from "lib/socket";

import {
  getIsSaleWatcher,
  getSaleLotById,
  getSaleyardScanSaleLotById,
  selectCurrentDeploymentIds,
} from "selectors";
// Filter new consignments and sale lots, based on whether we're current
// viewing the matching sale.
// Filter the scans based on whether they're part of a visible deployment sale
// or sale lot.

// patches and deletes will only update based on id.
// properties are system wide, so always allow
// businesses and livestock sales are per deployment, so don't need filtering.

const filterScansPayload = (message, state) => {
  // {'sale_lot_id': sale_lot.cbid, 'EIDs': [ { deployment_sale_id: ..., ...}, ... ] }
  const { EIDs, sale_lot_id: saleLotId } = message.payload.object;

  const livestockSaleId = getLivestockSaleId();
  if (!livestockSaleId) {
    // When the user is not on a Livestock Sale page, the scans aren't for them.
    return [];
  }

  // Determine if these Scans are from the currently available Deployment Sales by checking for the existence of its related Sale Lot
  // We must be sure to check all types of Sale Lots, and all of their stores(type: "auction" | "saleyard_scan")
  // N.B. There is likely a scenario where we don't have the Sale Lot **yet**, so we still need to fall back to checking the Deployment Sale
  if (
    saleLotId &&
    (getSaleLotById(saleLotId)(state) ||
      getSaleyardScanSaleLotById(saleLotId)(state))
  ) {
    return EIDs;
  }

  // If no sale lot is provided, we need to confirm the livestock sale, and deployment is either blank, or one of mine.
  const currentDeploymentIds = selectCurrentDeploymentIds(state);
  return EIDs.filter(
    scan =>
      scan.livestock_sale_id === livestockSaleId &&
      (!scan.deployment_id ||
        currentDeploymentIds.includes(scan.deployment_id)),
  );
};

const OUT_OF_SALE_PUSHER_WHITELIST = [
  PusherTypes.REFRESH_BILLING_DOCUMENTS,
  PusherTypes.REFRESH_DEFAULT_VENDOR_SPLITS,
  PusherTypes.REFRESH_EMAILS,
  PusherTypes.REFRESH_BUSINESSES_V2,
  PusherTypes.REFRESH_INTEGRATION_CREDENTIALS,
  PusherTypes.REFRESH_INTEREST_SETTINGS,
  PusherTypes.REFRESH_LEDGER_ACCOUNTS,
  PusherTypes.REFRESH_MASTER_LEDGER_ACCOUNTS,
  PusherTypes.REFRESH_MASTER_RULE_BOOKS,
  PusherTypes.REFRESH_MASTER_RULES,
  PusherTypes.REFRESH_PAYMENTS,
  PusherTypes.REFRESH_REPORT_JOBS,
  PusherTypes.REFRESH_RULE_BOOKS,
  PusherTypes.REFRESH_RULES,
  PusherTypes.REFRESH_SALES,
  PusherTypes.REFRESH_TRADING_TERMS,
];

function* onProcessSocketMessage(action) {
  // Takes a message from the pusher socket, determine whether we process
  // it, then do so.
  const { message } = action;
  const { message_type, payload } = message;

  // Check for either livestocksale_id or livestock_sale_id as it can vary
  // on different object types. (FIXME one day)
  const currentLiveStockSaleId = getLivestockSaleId();
  if (
    !currentLiveStockSaleId &&
    !OUT_OF_SALE_PUSHER_WHITELIST.includes(message_type)
  ) {
    return;
  }
  if (
    channelEvents[message_type].filterOnLivestockSaleId &&
    currentLiveStockSaleId !==
      (payload.object?.livestocksale_id ||
        payload.object?.livestock_sale_id ||
        payload.object?.livestocksale ||
        payload?.livestock_sale_id ||
        payload?.object?.livestock_sale)
  ) {
    // bail if we have a non matching livestock sale id.
    return;
  }

  if (channelEvents[message_type].action !== null) {
    let actionPayload = payload;
    if (typeof channelEvents[message_type].mutatePayload === "function") {
      const state = yield select();
      actionPayload = channelEvents[message_type].mutatePayload(payload, state);
      if (actionPayload === null) {
        return;
      }
    }
    yield put(channelEvents[message_type].action(actionPayload));
  }
}

export function* scheduleCommitSocketBuffer() {
  // When a pusher message comes in, rather than handling instantly, stick it
  // in a buffer, which flushes every 5 seconds.
  // In the case of scan updates, join them together and
  while (true) {
    const state = yield select();
    if (state.socket.buffer.length > 0) {
      const scanMessages = [];

      const [refreshEvents, allOtherEvents] = partition(
        state.socket.buffer,
        e => deduplicatePusherMessageTypes.includes(e.message_type),
      );

      for (const event of uniqBy(refreshEvents, "message_type")) {
        yield put(processSocketMessage(event));
      }

      // Handle the remainder of the messages as per usual
      for (const event of allOtherEvents) {
        if (event.message_type === "create_scans") {
          scanMessages.push(event);
        } else {
          yield put(processSocketMessage(event));
        }
      }

      if (scanMessages.length) {
        const scans = scanMessages.reduce(
          (acc, scanMessage) =>
            acc.concat(filterScansPayload(scanMessage, state)),
          [],
        );
        if (scans.length) {
          yield put(addScansFromSocket(scans));
        }
      }

      yield put(clearSocketBuffer());
    }

    const batchTimerMs = getIsSaleWatcher(state) ? 500 : 5000;
    yield delay(batchTimerMs);
  }
}

function* rootSaga() {
  yield takeEvery(PROCESS_SOCKET_MESSAGE, onProcessSocketMessage);
  yield scheduleCommitSocketBuffer();
}

export default rootSaga;
