import { RpcError } from '@protobuf-ts/runtime-rpc';
import { SWServerClient } from '@sparx/api/apis/sparx/messaging/server/v1/server.client';

function sleep(ms: number) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

/**
 * Payload of messages received from the stream.
 */
type Payload = {
  id: string;
};

export type Handler = {
  /**
   * Unique ID for the handler. This is used for de-duplication when registering a handler multiple
   * times, for example, in a useEffect.
   */
  id: string;

  /**
   * Handler function that will be called with the message payload.
   */
  callback: (payload: Payload) => void;
};

/**
 * List of supported message types.
 */
export type MessageType = 'sparxweb.LogoutResponse';

type HandlerMap = Record<string, Record<string, Handler>>;

/**
 * Manages server streaming calls and calling a set of handlers for each message type received from
 * the stream.
 */
export default class ServerSteamingManager {
  private ackBackoffDelays = [100, 100, 100, 1000, 1000, 1000, 5000];

  private sessionId: string | undefined;
  private aborter: AbortController | undefined;
  private handlers: HandlerMap;
  private swserverClient: SWServerClient;

  constructor(swserverClient: SWServerClient) {
    this.handlers = {};
    this.swserverClient = swserverClient;
  }

  /**
   * Starts the server streaming call using the provided session id.
   * Any previously running streams will be cancelled to ensure only one connection is active at
   * any time.
   */
  public startWithSessionId(sessionId: string): void {
    if (sessionId !== this.sessionId) {
      this.sessionId = sessionId;
      this.run();
    }
  }

  /**
   * Registers a handler for a given message type. Handlers are keyed by their id so if addHandler
   * is called multiple times with the same handler, there will only be one instance called when
   * the message type is received. This means that subsequent calls with the same handler will
   * overwrite and previously registered handler with the same id.
   */
  public addHandler(messageType: string, handler: Handler) {
    this.handlers[messageType] = {
      ...this.handlers[messageType],
      [handler.id]: handler,
    };
  }

  /**
   * Handles starting a new server streaming call and calling registered handlers for each message
   * that is received. Any previous stream calls will be cancelled before starting the new one.
   */
  private async run(): Promise<void> {
    // Kill any previously running streams
    this.aborter?.abort();

    // We can't continue if we have no session id
    if (!this.sessionId) {
      return;
    }

    this.aborter = new AbortController();

    while (this.aborter?.signal.aborted === false) {
      const stream = this.swserverClient.serverMessageStreaming(
        {
          sessionId: this.sessionId,
        },
        { abort: this.aborter?.signal },
      );

      // Errors are thrown in the status, trailers, and headers promises. This
      // catches those errors and does nothing with them as we handle the errors
      // when looping over the responses.
      const noop = () => {
        return;
      };
      stream.status.catch(noop);
      stream.trailers.catch(noop);
      stream.headers.catch(noop);

      try {
        // Start a new stream
        for await (const response of stream.responses) {
          for (const msg of response.messages) {
            const payload: Payload = JSON.parse(msg.payload);

            const handlers = this.handlers[msg.type];
            if (handlers && Object.keys(handlers).length > 0) {
              for (const handler of Object.values(handlers)) {
                handler.callback(payload);
              }
            }

            this.ackServerMessage(payload.id);
          }
        }
      } catch (e) {
        if (e instanceof RpcError) {
          // sleep here to let Safari correctly kill this js when reloading
          await sleep(1);

          switch (e.code) {
            case 'CANCELLED':
              continue;
            case 'INTERNAL':
              // This is the best I can find to catch timeouts. Not ideal.
              // The timeout seems to be coming from somewhere between the browser and the server.
              // SWServer has a context timeout of 90 minutes but the stream receives a 524 response
              // after 1.7 minutes.
              if (e.message === 'Failed to fetch') {
                continue;
              }
              console.error(e);
              break;
            case 'UNAUTHENTICATED':
              console.error(e);
              this.aborter?.abort();
              break;
            default:
              console.error(e);
          }
        } else {
          console.error(e);
        }
      }
    }
  }

  /**
   * Ensures a server message is acknowledged by retrying the request if it fails
   */
  private async ackServerMessage(messageId: string): Promise<void> {
    let attempt = 0;

    // eslint-disable-next-line no-constant-condition
    while (true) {
      try {
        await this.swserverClient.ackServerMessage({ messageId }).response;
        break;
      } catch (e) {
        console.error(
          `ServerStreamingProvider: failed to ack server message with id ${messageId}: ${e}`,
        );

        await new Promise(resolve => setTimeout(resolve, this.getBackOffDelay(attempt)));
        attempt++;
      }
    }
  }

  /**
   * Gets the back off delay based on the current attempt.
   */
  private getBackOffDelay(attempt: number): number {
    return attempt >= this.ackBackoffDelays.length
      ? this.ackBackoffDelays[this.ackBackoffDelays.length - 1]
      : this.ackBackoffDelays[attempt];
  }
}
