import { Auth } from 'aws-amplify'
import _ from 'lodash'
import { encode } from 'base-64'
import { print } from 'graphql/language/printer'
import { v4 as uuidv4 } from 'uuid'

// Manage AWS AppSync GraphQL subscriptions
// Spec: https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html
export default class RealTimeClient {
  static logToConsole = false

  constructor(wssEndpoint) {
    this.wssEndpoint = wssEndpoint

    this.subscriptionsInProgress = []
    this.subscriptions = []
    this.connection = new Promise((resolve, reject) => {
      this._connect(resolve, reject)
    })

    this._suspendedConnection = false
    this._terminatedConnection = false

    this.onConnect = null
    this.onDisconnect = null
  }

  // Disconnect the WebSocket but retain all the subscription configuration so we can
  // restore the connection and all subscriptions later.
  suspendConnection = () => {
    if (RealTimeClient.logToConsole)
      console.log('[realtime] suspending connection')
    this._suspendedConnection = true
    this.ws.close()
  }

  // Resume a suspended connection and reconnect existing subscriptions.
  resumeConnection = () => {
    if (RealTimeClient.logToConsole)
      console.log('[realtime] resuming connection and resubscribing')
    this._suspendedConnection = false
    this._reconnect()
  }

  terminateConnection = () => {
    if (RealTimeClient.logToConsole)
      console.log('[realtime] terminating connection')
    this._terminatedConnection = true
    this.ws.close()
  }

  // Subscribe to a new GraphQL subscription with the supplied query, variables, headers,
  // and a callback for when data is received.
  subscribe = ({ query, variables, headers, onData }) => {
    const subscription = new Subscription(this, {
      query,
      variables,
      headers,
      onData,
    })
    this._addSubscriptionInProgress(subscription)
    subscription._register()
    return subscription
  }

  // Returns an object containing current auth credentials for use when connecting and
  // when registering subscriptions. Returns an empty object if it can't auth.
  _authorizationObject = async ({ headers = {} } = {}) => {
    let session
    try {
      session = await Auth.currentSession()
    } catch (err) {
      if (RealTimeClient.logToConsole)
        console.log(`[realtime] unable to get access token from Cognito:`, err)
      return {}
    }

    // host must be set to the GraphQL endpoint's hostname, not the wss hostname
    const host = this.wssEndpoint
      .replace('wss://', '')
      .replace('/graphql', '')
      .replace('appsync-realtime-api', 'appsync-api')

    const auth = {
      Authorization: session.accessToken.jwtToken,
      host,
      ...headers,
    }

    return auth
  }

  _connect = async (resolveConnection, _rejectConnection) => {
    if (RealTimeClient.logToConsole)
      console.log('[realtime] initiating WebSocket connection')

    const authObject = await this._authorizationObject()
    const payload = {}

    this.ws = new WebSocket(
      `${this.wssEndpoint}?header=${encode(
        JSON.stringify(authObject)
      )}&payload=${encode(JSON.stringify(payload))}`,
      'graphql-ws'
    )

    this.ws.onopen = (_event) => {
      this.reconnecting = false
      const init = { type: 'connection_init' }
      this.ws.send(JSON.stringify(init))
    }

    this.ws.onerror = (event) => {
      if (RealTimeClient.logToConsole)
        console.log(`[realtime] WebSocket error:`, event)
    }

    this.ws.onclose = () => {
      if (RealTimeClient.logToConsole) console.log('[realtime] disconnected')

      this.reconnecting = false

      // Suspended connections mean we want to retain the state of all subscriptions
      // so they can be resumed later.
      if (this._suspendedConnection) return

      // Terminated connections mean we do not want to retain the state of all subscriptions
      // and we do not want to reconnect.
      if (this._terminatedConnection) return

      this._onDisconnected()
      this._reconnect()
    }

    this.ws.onmessage = (message) => {
      let wsData
      try {
        wsData = JSON.parse(message.data)
      } catch (err) {
        // Log and bail out if the message isn't JSON
        if (RealTimeClient.logToConsole)
          console.log('[realtime] failed parsing message:', message)
        return
      }

      if (wsData.type === 'connection_ack') {
        // The connection was successful and we can now begin subscribing
        // to things.
        this._onConnected()
        resolveConnection()

        if (this.resubscribeOnReconnect) {
          this.resubscribeOnReconnect = false

          this.subscriptionsInProgress = [...this.subscriptions]
          this.subscriptions = []

          this.subscriptionsInProgress.forEach(async (sip) => {
            try {
              await sip._register()
            } catch (err) {
              if (RealTimeClient.logToConsole)
                console.log('[realtime] unable to resubscribe', err)
            }
          })
        }

        return
      }

      if (wsData.type === 'ka') {
        // ka = periodic keep-alive ping, do nothing
        return
      }

      if (wsData.type === 'start_ack') {
        // After sending the subscription start message, the client should wait for
        // AWS AppSync to send the start_ack message. start_ack indicates the subscription
        // was successful.
        // {
        //   "type": "start_ack",
        //   "id": "eEXAMPLE-cf23-1234-5678-152EXAMPLE69"
        // }
        this._succeedSubscriptionInProgress(wsData.id)
        return
      }

      if (wsData.type === 'complete') {
        // fired when a subscription successfully unregisters
        // wsData.id is set to our unique subscription ID
        // {
        //   "type":"complete",
        //   "id":"eEXAMPLE-cf23-1234-5678-152EXAMPLE69"
        // }
        this._removeSubscription(wsData.id)
        return
      }

      if (wsData.type === 'data') {
        // receiving data from a subscription when a mutation happens
        //
        // {
        //   "type": "data",
        //   "id": "ee849ef0-cf23-4cb8-9fcb-152ae4fd1e69",
        //   "payload": {
        //     "data": {
        //       "onCreateMessage": {
        //         "__typename": "Message",
        //         "message": "test"
        //       }
        //     }
        //   }
        // }

        const subscription = _.find(
          this.subscriptions,
          (sub) => sub.id === wsData.id
        )
        if (subscription) {
          if (RealTimeClient.logToConsole)
            console.log(
              `[realtime] data received for subscription ${subscription.id}`,
              wsData.payload
            )
          if (typeof subscription.onData === 'function') {
            subscription.onData(wsData.payload)
          }
        }

        return
      }

      if (wsData.type === 'error') {
        // If connection init or subscription registration fails or if a subscription
        // is terminated from the server, the server sends the following error message
        // to the client.
        //
        // {
        //   "type": "error",
        //   "payload": {
        //     "errors": [
        //       {
        //         "errorType": "LimitExceededError",
        //         "message": "Rate limit exceeded"
        //       }
        //     ]
        //   }
        // }

        if (RealTimeClient.logToConsole)
          console.log('[realtime] AppSync error:', message)

        // TODO: handle failure
        // if (wsData.payload.errors[0].message === "something") {
        //   this._failChannelSubscriptionInProgress(
        //     wsData.channel,
        //     eventData.message
        //   )
        //   return
        // }
      }
    }
  }

  _addSubscriptionInProgress = (subscription) => {
    if (RealTimeClient.logToConsole)
      console.log(
        `[realtime] subscription to ${subscription.id} is in progress`
      )

    this.subscriptionsInProgress.push(subscription)
  }

  _succeedSubscriptionInProgress = (subscriptionId) => {
    if (RealTimeClient.logToConsole)
      console.log(`[realtime] subscription to ${subscriptionId} succeeded`)

    const succeeded = _.remove(this.subscriptionsInProgress, {
      id: subscriptionId,
    })
    _.each(succeeded, (i) => {
      this.subscriptions.push(i)
      i._onSubscribed()
    })
  }

  _failSubscriptionInProgress = (subscriptionId, reason) => {
    if (RealTimeClient.logToConsole)
      console.log(
        `[realtime] unable to subscribe to ${subscriptionId}: ${reason}`
      )

    const failed = _.remove(this.subscriptionsInProgress, {
      id: subscriptionId,
    })
    _.each(failed, (i) => i._onUnsubscribed())
  }

  _removeSubscription = (subscriptionId) => {
    if (RealTimeClient.logToConsole)
      console.log(`[realtime] unsubscribed from ${subscriptionId}`)

    const removed = _.remove(this.subscriptions, { id: subscriptionId })
    _.each(removed, (i) => i._onUnsubscribed())
  }

  _reconnectAndSubscribeUntilSuccessful = () => {
    if (RealTimeClient.logToConsole)
      console.log('[realtime] attempting reconnect and resubscribe')

    this.resubscribeOnReconnect = true
    this.connection = new Promise((resolve, reject) => {
      this._connect(resolve, reject)
    })
  }

  _onConnected = () => {
    this.connected = true

    if (typeof this.onConnect === 'function') this.onConnect()
  }

  _onDisconnected = () => {
    this.connected = false

    if (typeof this.onDisconnect === 'function') this.onDisconnect()
  }

  _reconnect = () => {
    if (this.reconnecting) return

    if (RealTimeClient.logToConsole)
      console.log('[realtime] reconnect attempt in 3 seconds...')

    this.reconnecting = true
    setTimeout(() => {
      this._reconnectAndSubscribeUntilSuccessful()
    }, 3000)
  }
}

class Subscription {
  constructor(client, { query, variables, headers = {}, onData }) {
    this.client = client
    this.query = query
    this.variables = variables
    this.headers = headers
    this.onData = onData
    this.subscribed = false
    this.id = uuidv4()
  }

  unsubscribe = () => {
    this._unregister()
  }

  _register = async () => {
    try {
      await this.client.connection
    } catch (err) {
      this.client._failSubscriptionInProgress(this, 'not connected')
      return
    }

    const authObject = await this.client._authorizationObject({
      headers: this.headers,
    })

    const registration = {
      type: 'start',
      id: this.id,
      payload: {
        data: JSON.stringify({
          query: print(this.query),
          variables: this.variables,
        }),
        extensions: {
          authorization: authObject,
        },
      },
    }
    this.client.ws.send(JSON.stringify(registration))
  }

  _unregister = () => {
    if (this.client.ws.readyState !== WebSocket.OPEN) {
      if (RealTimeClient.logToConsole)
        console.log(
          '[realtime] unable to unregister, websocket state is',
          this.client.ws.readyState
        )
      return
    }
    const unregister = {
      type: 'stop',
      id: this.id,
    }
    this.client.ws.send(JSON.stringify(unregister))
  }

  _onSubscribed = () => {
    // Internal hook called when the subscription is registered
    this.subscribed = true
  }

  _onUnsubscribed = () => {
    // Internal hook called when the subscription is unregistered
    this.subscribed = false
  }
}
