'use strict';

var uuid = require('uuid');
var isDOMUnloaded = require('../../../helpers/is_dom_unloaded.js');
var Message = require('./rumor_message.js');
var OTHelpers = require('@opentok/ot-helpers');
var logging = require('../../logging.js');
var NativeSocket = require('./native_socket.js');
var PluginSocket = require('./plugin_socket.js');
var RumorMessageTypes = require('./rumor_message_types.js');
var RumorMessage = require('./rumor_message.js');
var SocketError = require('./socket_error.js');
var ws = require('ws');

var WEB_SOCKET_KEEP_ALIVE_INTERVAL = 5000;

// Magic Connectivity Timeout Constant: We wait 2*the keep alive interval,
// on the third keep alive we trigger the timeout if we haven't received the
// server pong.
var WEB_SOCKET_CONNECTIVITY_TIMEOUT = 2 * WEB_SOCKET_KEEP_ALIVE_INTERVAL - 100;
var WEB_SOCKET_CONNECTIVITY_TIMEOUT_NO_RECONNECT = 10 * WEB_SOCKET_KEEP_ALIVE_INTERVAL - 100;

// https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#Close_codes
// http://docs.oracle.com/javaee/7/api/javax/websocket/CloseReason.CloseCodes.html
var wsCloseErrorCodes = {
  1002: 'The endpoint is terminating the connection due to a protocol error. ' +
    '(CLOSE_PROTOCOL_ERROR)',
  1003: 'The connection is being terminated because the endpoint received data of ' +
    'a type it cannot accept (for example, a text-only endpoint received binary data). ' +
    '(CLOSE_UNSUPPORTED)',
  1004: 'The endpoint is terminating the connection because a data frame was received ' +
    'that is too large. (CLOSE_TOO_LARGE)',
  1005: 'Indicates that no status code was provided even though one was expected. ' +
  '(CLOSE_NO_STATUS)',
  1006: 'Used to indicate that a connection was closed abnormally (that is, with no ' +
    'close frame being sent) when a status code is expected. (CLOSE_ABNORMAL)',
  1007: 'Indicates that an endpoint is terminating the connection because it has received ' +
    'data within a message that was not consistent with the type of the message (e.g., ' +
    'non-UTF-8 [RFC3629] data within a text message)',
  1008: 'Indicates that an endpoint is terminating the connection because it has received a ' +
    'message that violates its policy.  This is a generic status code that can be returned ' +
    'when there is no other more suitable status code (e.g., 1003 or 1009) or if there is a ' +
    'need to hide specific details about the policy',
  1009: 'Indicates that an endpoint is terminating the connection because it has received a ' +
    'message that is too big for it to process',
  1011: 'Indicates that a server is terminating the connection because it encountered an ' +
    'unexpected condition that prevented it from fulfilling the request',

  // .... codes in the 4000-4999 range are available for use by applications.
  4001: 'Connectivity loss was detected as it was too long since the socket received the ' +
    'last PONG message',
  4010: 'Timed out while waiting for the Rumor socket to connect.',
  4020: 'Error code unavailable.',
  4030: 'Exception was thrown during Rumor connection, possibly because of a blocked port.'
};

var errorMap = {
  CLOSE_PROTOCOL_ERROR: 1002,
  CLOSE_UNSUPPORTED: 1003,
  CLOSE_TOO_LARGE: 1004,
  CLOSE_NO_STATUS: 1005,
  CLOSE_ABNORMAL: 1006,
  CLOSE_UNEXPECTED_CONDITION: 1011,
  CLOSE_TIMEOUT: 4010,
  CLOSE_FALLBACK_CODE: 4020,
  CLOSE_CONNECT_EXCEPTION: 4030
};

// The TheWebSocket bit is purely to make testing simpler, it defaults to WebSocket
// so in normal operation you would omit it.
var RumorSocket = function(options) {
  var webSocket,
      onOpen,
      onError,
      onClose,
      onReconnecting,
      onReconnectAttempt,
      onReconnectFailure,
      onReconnected,
      onMessage,
      connectCallback,
      connectTimeout,
      reconnectTimeout,
      reconnectThreshold,
      lastMessageTimestamp,         // The timestamp of the last message received
      keepAliveTimer;               // Timer for the connectivity checks
  var rumorSocket = this;
  var reconnectAttempts = 0;
  var states = ['disconnected', 'error', 'connected', 'connecting', 'disconnecting',
        'reconnecting'];
  var socketID = uuid();
  var receivedTransactionIDs = [];
  var messagingURL = options.messagingURL;
  var notifyDisconnectAddress = options.notifyDisconnectAddress;
  var connectionId = options.connectionId;
  var enableReconnection = options.enableReconnection;
  var TheWebSocket = options.TheWebSocket || ws;
  var everConnected = false;
  var connectivityTimeout = enableReconnection ? WEB_SOCKET_CONNECTIVITY_TIMEOUT :
    WEB_SOCKET_CONNECTIVITY_TIMEOUT_NO_RECONNECT;

  // pendingMessages is pre-populated to connect to rumor by registering our connection id
  // and the app server aQddress to notify if we disconnect, this will be sent on the first
  // successful connection.
  var pendingMessages = [RumorMessage.Connect(connectionId, notifyDisconnectAddress)];

  //// Private API
  var stateChanged = function(newState) {
    switch (newState) {
      case 'disconnected':
      case 'error':
        reconnectThreshold = undefined;
        webSocket = null;
        if (onClose) {
          var error;
          if (hasLostConnectivity()) {
            error = new Error(wsCloseErrorCodes[4001]);
            error.code = 4001;
          }
          onClose(error);
        }
        break;
      case 'reconnecting':
        if (onReconnecting && everConnected) {
          onReconnecting();
        }

        if (rumorSocket.is('disconnected')) {
          // This means some called disconnect in an onReconnecting
          // handler, in that case we don't want to continue trying
          // to reconnect.
          return;
        }
        reconnectAttempts = 0;
        reconnectThreshold = Date.now() + RumorSocket.RECONNECT_TIMEOUT;
        break;
      case 'connected':
        if (onReconnected && everConnected) {
          onReconnected();
        } else {
          everConnected = true;
        }

        reconnectThreshold = undefined;
        break;
      default:
    }
  };

  var setState = OTHelpers.statable(rumorSocket, states, 'disconnected', stateChanged);

  var validateCallback = function validateCallback(name, callback) {
    if (callback === null || !OTHelpers.isFunction(callback)) {
      throw new Error('The RumorSocket ' + name +
        ' callback must be a valid function or null');
    }
  };

  var connectCallbackOnce = function() {
    if (!connectCallback) {
      return;
    }
    var args = Array.prototype.slice.call(arguments);
    var callback = connectCallback;
    connectCallback = void 0;
    callback.apply(void 0, args);
  };

  var raiseError = function raiseError(code, extraDetail) {
    code = code || errorMap.CLOSE_FALLBACK_CODE;

    var messageFromCode = wsCloseErrorCodes[code] || 'No message available from code.';
    var message = messageFromCode + (extraDetail ? ' ' + extraDetail : '');

    logging.error('RumorSocket: ' + message);

    var socketError = new SocketError(code, message);

    if (connectTimeout) {
      clearTimeout(connectTimeout);
    }

    if (rumorSocket.is('reconnecting') && onReconnectFailure) {
      onReconnectFailure(socketError);
    }

    if (rumorSocket.isNot('reconnecting', 'disconnecting', 'disconnected')) {
      if (enableReconnection && code !== errorMap.CLOSE_UNEXPECTED_CONDITION) {
        logging.debug('Initial connectivity loss detected at ' + new Date());
        setState('reconnecting');
      } else {
        logging.debug('Connectivity loss detected at ' + new Date());
      }
    }

    if (rumorSocket.is('disconnected', 'disconnecting')) {
      // This probably means someone one called disconnect in an onReconnecting
      // handler, in that case we don't want to continue trying
      // to reconnect.
      return;
    }

    if (!enableReconnection || Date.now() >= reconnectThreshold ||
      code === errorMap.CLOSE_UNEXPECTED_CONDITION) {
      if (code === errorMap.CLOSE_UNEXPECTED_CONDITION) {
        logging.debug('Connectivity not restored because the server said to stop connecting.');
      } if (!enableReconnection) {
        logging.debug('Reconnections are disabled, will not attempt to reconnect.');
      } else {
        logging.debug('Connectivity not restored within ' +
          RumorSocket.RECONNECT_TIMEOUT + 'ms, we have disconnected.');
      }
      setState('error');

      if (connectCallback) {
        connectCallbackOnce(socketError, void 0);
      } else if (onError) {
        onError(socketError);
      }
    } else {
      // @TODO Log this?
      logging.debug('Scheduling reconnection in ' + RumorSocket.RECONNECT_RETRY + 'ms. ' +
        'Remaining time: ' + (reconnectThreshold - Date.now()) + 'ms.');
      reconnectTimeout = setTimeout(connect, RumorSocket.RECONNECT_RETRY);
      reconnectAttempts++;
      if (onReconnectAttempt) {
        onReconnectAttempt();
      }
    }
  };

  var hasLostConnectivity = function hasLostConnectivity() {
    if (!lastMessageTimestamp) { return false; }

    return (OTHelpers.now() - lastMessageTimestamp) >= connectivityTimeout;
  };

  var sendKeepAlive = function() {
    if (!rumorSocket.is('connected')) { return; }

    if (hasLostConnectivity()) {
      webSocketDisconnected({ code: 4001 });
      webSocket.close(false, true);
    } else {
      webSocket.send(Message.Ping());
      keepAliveTimer = setTimeout(sendKeepAlive, WEB_SOCKET_KEEP_ALIVE_INTERVAL);
    }
  };

  var sendAck = function(msg) {
    webSocket.send(Message.Status(msg.toAddress, {
      'TRANSACTION-ID': msg.headers['TRANSACTION-ID'],
      'X-TB-FROM-ADDRESS': connectionId
    }));
  };

  //// Private Event Handlers
  var webSocketConnected = function webSocketConnected() {
    if (connectTimeout) { clearTimeout(connectTimeout); }
    if (rumorSocket.isNot('connecting', 'reconnecting')) {
      logging.debug('webSocketConnected reached in state other than connecting (' +
        rumorSocket.currentState + ')');
      return;
    }

    logging.debug('Sending ' + pendingMessages.length + ' pending messages');
    pendingMessages.map(function(message) {
      webSocket.send(message);
    });

    pendingMessages = pendingMessages.filter(function(message) {
      return message.type !== RumorMessageTypes.CONNECT;
    });

    setState('connected');

    connectCallbackOnce(void 0, connectionId);

    if (onOpen) {
      onOpen(connectionId);
    }

    keepAliveTimer = setTimeout(function() {
      lastMessageTimestamp = OTHelpers.now();
      sendKeepAlive();
    }, WEB_SOCKET_KEEP_ALIVE_INTERVAL);
  };

  var webSocketConnectTimedOut = function webSocketConnectTimedOut() {
    var webSocketWas = webSocket;
    raiseError(errorMap.CLOSE_TIMEOUT);
    // This will prevent a socket eventually connecting
    // But call it _after_ the error just in case any of
    // the callbacks fire synchronously, breaking the error
    // handling code.
    try {
      webSocketWas.close();
    } catch (err) {
      logging.debug('webSocket.close() raised an exception: ' + (err.message || err));
    }
  };

  var webSocketError = function webSocketError() {};

  var webSocketDisconnected = function webSocketDisconnected(closeEvent) {
    logging.debug('OT.Rumor.Socket: webSocketDisconnected (code: ' + closeEvent.code + ')');

    if (connectTimeout) { clearTimeout(connectTimeout); }
    if (keepAliveTimer) { clearTimeout(keepAliveTimer); }

    if (isDOMUnloaded()) {
      // Sometimes we receive the web socket close event after
      // the DOM has already been partially or fully unloaded
      // if that's the case here then it's not really safe, or
      // desirable, to continue.
      return;
    }

    if (rumorSocket.isNot('disconnecting') && closeEvent.code !== 1000 && closeEvent.code !== 1001) {
      if (closeEvent.code) {
        raiseError(closeEvent.code);
      } else {
        raiseError(
          errorMap.CLOSE_FALLBACK_CODE,
          closeEvent.reason || closeEvent.message
        );
      }
    }

    if (rumorSocket.isNot('error', 'reconnecting')) {
      setState('disconnected');
    }
  };

  var webSocketReceivedMessage = function webSocketReceivedMessage(msg) {
    lastMessageTimestamp = OTHelpers.now();

    if (msg.type !== RumorMessageTypes.PONG) {
      logging.debug('OT.Rumor.Socket webSocketReceivedMessage: ' +
        JSON.stringify(msg, null, 2));
      if (msg.transactionId) {
        // remove pending message
        pendingMessages = pendingMessages.filter(function(pendingMessage) {
          if (pendingMessage.transactionId === msg.transactionId) {
            logging.debug('Marking', msg.transactionId, ' as received');
          }
          return pendingMessage.transactionId !== msg.transactionId;
        });
      }

      if (msg.transactionId && msg.type !== RumorMessageTypes.STATUS) {
        // 1) ack it!
        sendAck(msg);

        // Have we seen this transaction before?
        if (receivedTransactionIDs.indexOf(msg.transactionId) >= 0) {
          // We've handled this transactionId before, but the ACK
          // must have been lost. That's ok, we've told the server
          // so we can just ignore this message now.
          return;
        }

        receivedTransactionIDs.push(msg.transactionId);
      }

      if (onMessage) {
        onMessage(msg);
      }
    }
  };

  var connect = function() {
    if (rumorSocket.is('connecting', 'connected')) {
      logging.error('Rumor.Socket cannot connect when it is already connecting or connected.');
      return;
    }

    if (!rumorSocket.is('reconnecting')) {
      setState('connecting');
    } else {
      logging.debug('Attempting reconnection...');
    }

    var attempt = uuid();

    var events = {
      onOpen: webSocketConnected,
      onClose: webSocketDisconnected,
      onError: webSocketError,
      onMessage: webSocketReceivedMessage
    };

    var fullMessagingURL = messagingURL;
    if (enableReconnection) {
      fullMessagingURL = [
        messagingURL,
        messagingURL.indexOf('?') >= 0 ? '&' : '?',
        'socketId=' + socketID,
        rumorSocket.is('reconnecting') ? '&reconnect=true' : '',
        '&attempt=' + attempt
      ].join('');
    }

    try {
      if (TheWebSocket != null) {
        webSocket = new NativeSocket(TheWebSocket, fullMessagingURL, events);
      } else {
        webSocket = new PluginSocket(fullMessagingURL, events);
      }

      connectTimeout = setTimeout(webSocketConnectTimedOut, RumorSocket.CONNECT_TIMEOUT);
    } catch (e) {
      logging.error(e);

      raiseError(errorMap.CLOSE_CONNECT_EXCEPTION);
    }
  };

  //// Public API

  rumorSocket.publish = function(topics, message, headers, retryAfterReconnect) {
    var rumorMessage = Message.Publish(topics, message, headers);
    if (retryAfterReconnect) {
      pendingMessages.push(rumorMessage);
    }
    if (rumorSocket.is('connected')) {
      webSocket.send(rumorMessage);
    }
  };

  rumorSocket.subscribe = function(topics) {
    webSocket.send(Message.Subscribe(topics));
  };

  rumorSocket.unsubscribe = function(topics) {
    webSocket.send(Message.Unsubscribe(topics));
  };

  rumorSocket.connect = function(complete) {
    if (rumorSocket.is('connecting', 'connected')) {
      if (typeof complete === 'function') {
        complete(new SocketError(null,
            'Rumor.Socket cannot connect when it is already connecting or connected.'));
      } else {
        logging.error('Rumor.Socket cannot connect when it is already connecting or connected.');
      }
    } else {
      connectCallback = complete;
      connect();
    }
  };

  rumorSocket.disconnect = function(drainSocketBuffer) {
    if (connectTimeout) { clearTimeout(connectTimeout); }
    if (keepAliveTimer) { clearTimeout(keepAliveTimer); }
    if (reconnectTimeout) { clearTimeout(reconnectTimeout); }

    if (!webSocket) {
      if (rumorSocket.isNot('error')) { setState('disconnected'); }
      return;
    }

    if (webSocket.isClosed()) {
      if (rumorSocket.isNot('error')) { setState('disconnected'); }
    } else {

      if (rumorSocket.is('connected')) {
        // Look! We are nice to the rumor server ;-)
        webSocket.send(Message.Disconnect());
      }

      setState('disconnecting');

      // Wait until the socket is ready to close
      webSocket.close(drainSocketBuffer);
    }
  };

  rumorSocket.reconnectRetriesCount = function() {
    return reconnectAttempts;
  };

  rumorSocket.messageQueueSize = function() {
    return pendingMessages.length;
  };

  OTHelpers.defineProperties(rumorSocket, {
    id: {
      get: function() { return connectionId; }
    },

    socketID: {
      get: function() { return socketID; }
    },

    onOpen: {
      set: function(callback) {
        validateCallback('onOpen', callback);
        onOpen = callback;
      },

      get: function() { return onOpen; }
    },

    onError: {
      set: function(callback) {
        validateCallback('onError', callback);
        onError = callback;
      },

      get: function() { return onError; }
    },

    onClose: {
      set: function(callback) {
        validateCallback('onClose', callback);
        onClose = callback;
      },

      get: function() { return onClose; }
    },

    onMessage: {
      set: function(callback) {
        validateCallback('onMessage', callback);
        onMessage = callback;
      },

      get: function() { return onMessage; }
    },

    onReconnecting: {
      set: function(callback) {
        validateCallback('onReconnecting', callback);
        onReconnecting = callback;
      },

      get: function() { return onReconnecting; }
    },

    onReconnectAttempt: {
      set: function(callback) {
        validateCallback('onReconnectAttempt', callback);
        onReconnectAttempt = callback;
      },

      get: function() { return onReconnectAttempt; }
    },

    onReconnectFailure: {
      set: function(callback) {
        validateCallback('onReconnectFailure', callback);
        onReconnectFailure = callback;
      },

      get: function() { return onReconnectFailure; }
    },

    onReconnected: {
      set: function(callback) {
        validateCallback('reconnected', callback);
        onReconnected = callback;
      },

      get: function() {
        return onReconnected;
      }
    }
  });
};

// The number of ms to wait for the websocket to connect
RumorSocket.CONNECT_TIMEOUT = 15000;
RumorSocket.RECONNECT_TIMEOUT = 60000;
RumorSocket.RECONNECT_RETRY = 500;

module.exports = RumorSocket;
