import * as Nats from "nats.ws";
import { CtorsUnion, ctorsUnion } from "ctors-union";
import { Dispatch, EffectManager } from "@typescript-tea/core";
import { exhaustiveCheck } from "ts-exhaustive-check";
import { Publish, NatsCmd, Rpc } from "./nats-cmd";
import {
  ConnectionStatus,
  ListenConnectionStatus,
  ListenSubjects,
  NatsSub
} from "./nats-sub";
import { home } from "./home";

export type NatsConnectionInfo = {
  readonly url: string;
  // readonly userId: string;
  // readonly sessionId: string;
  readonly fetchNatsJwt: () => Promise<string>;
  readonly natsJwtExpirationSeconds: number;
};

export function createEffectManager(conn: NatsConnectionInfo): EffectManager {
  return {
    home,
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    mapCmd: mapCmd as any,
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    mapSub: mapSub as any,
    setup: () => () => undefined,
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    onEffects: onEffects(conn) as any,
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    onSelfAction: onSelfAction as any
  };
}

// -- COMMANDS

export function mapCmd<A1, A2>(
  actionMapper: (a: A1) => A2,
  cmd: NatsCmd<A1>
): NatsCmd<A2> {
  switch (cmd.type) {
    case "Publish": {
      return cmd;
    }
    case "Rpc": {
      return {
        ...cmd,
        gotResponse: (r, i) => actionMapper(cmd.gotResponse(r, i))
      };
    }
    default: {
      return exhaustiveCheck(cmd, true);
    }
  }
}

// -- SUBSCRIPTIONS

export function mapSub<A1, A2>(
  actionMapper: (a: A1) => A2,
  sub: NatsSub<A1>
): NatsSub<A2> {
  switch (sub.type) {
    case "ListenConnectionStatus": {
      return { ...sub, changed: m => actionMapper(sub.changed(m)) };
    }
    case "ListenSubjects": {
      return { ...sub, gotMsg: m => actionMapper(sub.gotMsg(m)) };
    }
    default: {
      return exhaustiveCheck(sub, true);
    }
  }
}

// -- MANAGER

// const clientId = Uuid.v4();

const onEffects =
  (conn: NatsConnectionInfo) =>
  <ActionApp>(
    _dispatchApp: Dispatch<ActionApp>,
    dispatchSelf: Dispatch<SelfAction>,
    cmds: ReadonlyArray<NatsCmd<ActionApp>>,
    subs: ReadonlyArray<NatsSub<ActionApp>>,
    state: SelfState<ActionApp> = init(conn, dispatchSelf)
  ): SelfState<ActionApp> => {
    // Handle connection
    if (state.type === "SelfStateConnecting") {
      // Buffer the effects while connecting
      return {
        ...state,
        subs: subs,
        bufferedCmds: [...state.bufferedCmds, ...cmds]
      };
    }

    // We are connected, handle the effects
    const handleEffectsPromise = handleEffects(dispatchSelf, cmds, subs, state);

    return { ...state, subs, handleEffectsPromise };
  };

async function handleEffects<ActionApp>(
  dispatchSelf: Dispatch<SelfAction>,
  cmds: ReadonlyArray<NatsCmd<ActionApp>>,
  subs: ReadonlyArray<NatsSub<ActionApp>>,
  state: SelfStateConnected<ActionApp>
): Promise<HandleEffectsState<ActionApp>> {
  // Make sure we handle effects in sequence (serially) because the rabbitmq package cannot handle
  // new calls being made while other calls are awaited
  // We chain all publish calls to a single promise, and store it in state
  // so the chain continues even if more calls to onEffects() happens
  const handleEffectsPromise = state.handleEffectsPromise.then(
    async (handleEffectsState: HandleEffectsState<ActionApp>) => {
      // Handle commands
      for (const cmd of cmds) {
        switch (cmd.type) {
          case "Publish": {
            await executePublish(
              cmd,
              state.connection,
              dispatchSelf /* , state.userId, state.sessionId */
            );
            break;
          }
          case "Rpc": {
            await executeRpc(
              cmd,
              state.connection,
              dispatchSelf /* , state.userId, state.sessionId */
            );
            break;
          }
          default:
            exhaustiveCheck(cmd, true);
        }
      }

      // Handle ListenExchangeRoutes subscriptions
      const newSubscribers = await handleListenSubjectsSubs(
        state,
        handleEffectsState,
        subs,
        dispatchSelf
      );

      // Return new state for the promise
      return { subscribers: newSubscribers };
    }
  );
  return handleEffectsPromise;
}

function notifyConnectionStatusSubs<ActionApp>(
  subs: ReadonlyArray<NatsSub<ActionApp>>,
  status: ConnectionStatus
): void {
  const statusSubs = subs.filter(
    s => s.type === "ListenConnectionStatus"
  ) as ReadonlyArray<ListenConnectionStatus<ActionApp>>;
  for (const sub of statusSubs) {
    sub.changed(status);
  }
}

async function handleListenSubjectsSubs<ActionApp>(
  state: SelfStateConnected<ActionApp>,
  handleEffectsState: HandleEffectsState<ActionApp>,
  subs: ReadonlyArray<NatsSub<ActionApp>>,
  dispatchSelf: Dispatch<SelfAction>
): Promise<SubscribedSubjectListMap<ActionApp>> {
  const consumeSubs = subs.filter(
    s => s.type === "ListenSubjects"
  ) as ReadonlyArray<ListenSubjects<ActionApp, unknown>>;
  const groupedExchangeRouteSubs = groupByExchangeRoute(consumeSubs);
  const newSubscribers: {
    // tslint:disable-next-line: readonly-keyword
    [exchangeRoute: string]: SubscribedSubjectList<ActionApp>;
  } = {};
  for (const [subjectList, subs] of Object.entries(groupedExchangeRouteSubs)) {
    const existingSubscribedConsumer =
      handleEffectsState.subscribers[subjectList];
    if (existingSubscribedConsumer !== undefined) {
      // Re-use the AMQPConsumer object, but use new subs
      newSubscribers[subjectList] = {
        // queueName: existingSubscribedConsumer.queueName,
        natsSubscriptions: existingSubscribedConsumer.natsSubscriptions,
        subs
      };
    } else {
      // Create new AMQPConsumer for subs
      // const queueName = `client_${clientId}_listen_routes_${Uuid.v4()}`;
      const natsSubscriptions = await setupListenSubjectsSubscriber(
        state.connection,
        subjectList,
        // subs[0]!.exchange,
        subs[0]!.subjects,
        // queueName,
        dispatchSelf
      );
      // newSubscribers[exchangeRoutesKey] = { queueName, subscribers, subs };
      newSubscribers[subjectList] = { natsSubscriptions, subs };
    }
  }
  // Teardown unused consumers
  for (const [subjectList, subscribedSubjectList] of Object.entries(
    handleEffectsState.subscribers
  )) {
    // If this exchangeroute is no longer listened by any sub then tear it down
    if (newSubscribers[subjectList] === undefined) {
      await teardownSubscribers(subscribedSubjectList.natsSubscriptions);
    }
  }
  return newSubscribers;
}

function groupByExchangeRoute<ActionApp>(
  consumes: ReadonlyArray<ListenSubjects<ActionApp, unknown>>
): {
  readonly [consumerKey: string]: ReadonlyArray<
    ListenSubjects<ActionApp, unknown>
  >;
} {
  const built: {
    // tslint:disable-next-line: readonly-keyword
    [consumerKey: string]: Array<ListenSubjects<ActionApp, unknown>>;
  } = {};
  for (const c of consumes) {
    const consumerKey = buildSubjectsKey(c.subjects);
    let arr = built[consumerKey];
    if (arr === undefined) {
      arr = [];
      built[consumerKey] = arr;
    }
    built[consumerKey]!.push(c);
  }
  return built;
}

function buildSubjectsKey(subjects: ReadonlyArray<string>): string {
  return `${JSON.stringify(subjects)}`;
}

// -- STATE

type SelfState<ActionApp> =
  | SelfStateConnected<ActionApp>
  | SelfStateConnecting<ActionApp>;
type SelfStateConnected<ActionApp> = {
  readonly type: "SelfStateConnected";
  readonly connection: Nats.NatsConnection;
  // To access this state a chained promise is needed, and that chained promise
  // should replace the current promise to guarantee correct ordering
  readonly handleEffectsPromise: Promise<HandleEffectsState<ActionApp>>;
  readonly subs: ReadonlyArray<NatsSub<ActionApp>>;
  // readonly userId: string;
  // readonly sessionId: string;
};
type SelfStateConnecting<ActionApp> = {
  readonly type: "SelfStateConnecting";
  readonly subs: ReadonlyArray<NatsSub<ActionApp>>;
  readonly bufferedCmds: ReadonlyArray<NatsCmd<ActionApp>>;
  // readonly userId: string;
  // readonly sessionId: string;
};
type HandleEffectsState<ActionApp> = {
  // readonly expectedRpcReplies: ExpectedRpcReplies<ActionApp>;
  readonly subscribers: SubscribedSubjectListMap<ActionApp>;
};
// type ExpectedRpcReplies<ActionApp> = { readonly [correlationId: string]: Rpc<ActionApp, unknown, unknown> };
type SubscribedSubjectListMap<ActionApp> = {
  readonly [subjectList: string]: SubscribedSubjectList<ActionApp>;
};
type SubscribedSubjectList<ActionApp> = {
  // readonly queueName: string;
  readonly natsSubscriptions: ReadonlyArray<Nats.Subscription>;
  readonly subs: ReadonlyArray<ListenSubjects<ActionApp, unknown>>;
};
export const SelfAction = ctorsUnion({
  Connected: (conn: Nats.NatsConnection) => ({ conn }),
  Disconnected: (event: CloseEvent | Error) => ({ event }),
  GotRpcReply: (rpc: Rpc<unknown, unknown, unknown>, reply: Nats.Msg) => ({
    rpc,
    reply
  }),
  GotSubscriberMsg: (exchangeRouteKey: string, msg: Nats.Msg) => ({
    exchangeRouteKey,
    msg
  }),
  GotNatsJwt: (jwt: string) => ({ jwt })
});
export type SelfAction = CtorsUnion<typeof SelfAction>;

// Reason for "global":
// In the NATS authenticator we need to
// close over the jwt so it can get the
// fresh one every time.
let natsJwt: string = "";

function init<ActionApp>(
  conn: NatsConnectionInfo,
  dispatchSelf: Dispatch<SelfAction>
): SelfState<ActionApp> {
  const { url /* , sessionId, userId */ } = conn;

  setInterval(
    async () => {
      const freshJwt = await conn.fetchNatsJwt();
      dispatchSelf(SelfAction.GotNatsJwt(freshJwt));
    },
    // Refresh 60 seconds before expiration.
    (conn.natsJwtExpirationSeconds - 60) * 1000
  );

  conn.fetchNatsJwt().then(freshJwt => {
    natsJwt = freshJwt;
    return Nats.connect({
      servers: [url],
      authenticator: Nats.jwtAuthenticator(() => {
        // console.log("jwtAuthenticatorCallack", freshJwt);
        return natsJwt;
      }),
      ignoreAuthErrorAbort: true
    }).then(conn => {
      dispatchSelf(SelfAction.Connected(conn));
    });
  });

  return {
    type: "SelfStateConnecting",
    bufferedCmds: [],
    subs: [] /* , sessionId, userId  */
  };
}

export function onSelfAction<ActionApp>(
  dispatchApp: Dispatch<ActionApp>,
  dispatchSelf: Dispatch<SelfAction>,
  action: SelfAction,
  state: SelfState<ActionApp>
): SelfState<ActionApp> {
  // log.jonas("------> onSelfAction", action, state);
  switch (action.type) {
    case "Connected": {
      console.log("Connected to nats");
      if (state.type === "SelfStateConnecting") {
        const newState: SelfStateConnected<ActionApp> = {
          type: "SelfStateConnected",
          subs: state.subs,
          // sessionId: state.sessionId,
          // userId: state.userId,
          connection: action.conn,
          handleEffectsPromise: Promise.resolve({
            subscribers: {},
            expectedRpcReplies: {}
          })
        };
        notifyConnectionStatusSubs(
          newState.subs,
          ConnectionStatus.ConnectionStatusonnected()
        );
        if (state.bufferedCmds.length > 0 || state.subs.length > 0) {
          const handleEffectsPromise = handleEffects(
            dispatchSelf,
            state.bufferedCmds,
            state.subs,
            newState
          );
          return { ...newState, handleEffectsPromise };
        }
        return newState;
      }
      return state;
    }
    case "Disconnected": {
      // // If we were connected when we got disconnected, then re-connect
      return state;
    }
    case "GotRpcReply": {
      if (state.type === "SelfStateConnected") {
        const { reply } = action;
        // If the reply was an exception, just rethrow it
        if (reply.headers?.get("type") === "Exception") {
          const errorMsg = reply.data
            ? new TextDecoder().decode(reply.data)
            : "The body of the erorr message was empty";
          throw new Error(
            `An NATS reply was of type Exception. The original exception was: ${JSON.stringify(
              errorMsg
            )}`
          );
        }
        // In order to access the state of handle effects we need to chain the promise
        // This guarantees that the state does not go out of sync
        const handleEffectsPromise = state.handleEffectsPromise.then(
          async (
            handleEffectsState: HandleEffectsState<ActionApp>
          ): Promise<HandleEffectsState<ActionApp>> => {
            const decoded = action.rpc.responseDecoder(reply.data);
            dispatchApp(
              action.rpc.gotResponse(
                decoded,
                action.rpc.correlationId
              ) as ActionApp
            );
            return handleEffectsState;
          }
        );
        return { ...state, handleEffectsPromise };
      }
      return state;
    }
    case "GotSubscriberMsg": {
      if (state.type === "SelfStateConnected") {
        const { exchangeRouteKey, msg } = action;
        // In order to access the state of handle effects we need to chain the promise
        // This guarantees that the state does not go out of sync
        const handleEffectsPromise = state.handleEffectsPromise.then(
          async (
            handleEffectsState: HandleEffectsState<ActionApp>
          ): Promise<HandleEffectsState<ActionApp>> => {
            const subscribedConsumer =
              handleEffectsState.subscribers[exchangeRouteKey];
            if (subscribedConsumer !== undefined) {
              const decode = subscribedConsumer.subs[0]!.decode;
              const decodedMsg = decode(msg.data);
              // Notify all subs
              for (const s of subscribedConsumer.subs) {
                dispatchApp(s.gotMsg(decodedMsg));
              }
            }
            return handleEffectsState;
          }
        );
        return { ...state, handleEffectsPromise };
      }
      return state;
    }
    case "GotNatsJwt": {
      if (state.type !== "SelfStateConnected") {
        return state;
      }
      natsJwt = action.jwt;
      return state;
    }
    default:
      return exhaustiveCheck(action, true);
  }
}

async function executePublish(
  publish: Publish<unknown>,
  connection: Nats.NatsConnection,
  _dispatchSelf: Dispatch<SelfAction>
  // userId: string,
  // sessionId: string
): Promise<void> {
  try {
    const { subject, msg, msgEncoder, headers } = publish;
    const serializedRequest = msgEncoder(msg);
    connection.publish(`${subject}`, serializedRequest, {
      headers: toNatsHeaders(headers)
    });
    // connection.publish(`${subject}.${userId}.${sessionId}`, serializedRequest, {
    //   headers: toNatsHeaders(headers),
    // });
  } catch (err) {
    console.log("Error", err);
  }
}

async function executeRpc<ActionApp>(
  rpc: Rpc<ActionApp, unknown, unknown>,
  connection: Nats.NatsConnection,
  dispatchSelf: Dispatch<SelfAction>
  // userId: string,
  // sessionId: string
): Promise<void> {
  try {
    const { subject, request, requestEncoder, headers } = rpc;
    const serializedRequest = requestEncoder(request);
    const response = await connection.request(`${subject}`, serializedRequest, {
      timeout: 20000,
      headers: toNatsHeaders(headers)
    });
    // const response = await connection.request(
    //   `${subject}.${userId}.${sessionId}`,
    //   serializedRequest,
    //   {
    //     timeout: 20000,
    //     headers: toNatsHeaders(headers)
    //   }
    // );
    dispatchSelf(SelfAction.GotRpcReply(rpc, response));
  } catch (err) {
    console.error("Error", err);
  }
}

function toNatsHeaders(headers: Record<string, string>): Nats.MsgHdrs {
  const h = Nats.headers();
  for (const [k, v] of Object.entries(headers)) {
    h.append(k, v);
  }
  return h;
}

export async function setupListenSubjectsSubscriber(
  connection: Nats.NatsConnection,
  exchangeRoutesKey: string,
  subjects: ReadonlyArray<string>,
  dispatchSelf: Dispatch<SelfAction>
): Promise<ReadonlyArray<Nats.Subscription>> {
  const natsSubs: Array<Nats.Subscription> = [];
  for (const s of subjects) {
    const natsSub = connection.subscribe(s, {
      callback: (err, msg) => {
        if (err) {
          throw new Error(`Nats error: ${err.message}`);
        }
        dispatchSelf(SelfAction.GotSubscriberMsg(exchangeRoutesKey, msg));
      }
    });
    natsSubs.push(natsSub);
  }
  return natsSubs;
}

export async function teardownSubscribers(
  natsSubscriptions: ReadonlyArray<Nats.Subscription>
): Promise<void> {
  for (const ns of natsSubscriptions) {
    await ns.drain();
  }
  // tslint:disable-next-line: max-file-line-count
}
