Home Reference Source

src/result-rx.js

/**
 * Copyright (c) "Neo4j"
 * Neo4j Sweden AB [https://neo4j.com]
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/* eslint-disable-next-line no-unused-vars */
import { newError, Record, ResultSummary } from 'neo4j-driver-core'
import { Observable, Subject, ReplaySubject, from } from 'rxjs'
import { mergeMap, publishReplay, refCount } from 'rxjs/operators'

const States = {
  READY: 0,
  STREAMING: 1,
  COMPLETED: 2
}

/**
 * The reactive result interface.
 */
export default class RxResult {
  /**
   * @constructor
   * @protected
   * @param {Observable<Result>} result - An observable of single Result instance to relay requests.
   * @param {number} state - The streaming state
   */
  constructor (result, state) {
    const replayedResult = result.pipe(publishReplay(1), refCount())

    this._result = replayedResult
    this._keys = replayedResult.pipe(
      mergeMap(r => from(r.keys())),
      publishReplay(1),
      refCount()
    )
    this._records = undefined
    this._controls = new StreamControl()
    this._summary = new ReplaySubject()
    this._state = state || States.READY
  }

  /**
   * Returns an observable that exposes a single item containing field names
   * returned by the executing query.
   *
   * Errors raised by actual query execution can surface on the returned
   * observable stream.
   *
   * @public
   * @returns {Observable<string[]>} - An observable stream (with exactly one element) of field names.
   */
  keys () {
    return this._keys
  }

  /**
   * Returns an observable that exposes each record returned by the executing query.
   *
   * Errors raised during the streaming phase can surface on the returned observable stream.
   *
   * @public
   * @returns {Observable<Record>} - An observable stream of records.
   */
  records () {
    const result = this._result.pipe(
      mergeMap(
        result =>
          new Observable(recordsObserver =>
            this._startStreaming({ result, recordsObserver })
          )
      )
    )
    result.push = () => this._push()
    return result
  }

  /**
   * Returns an observable that exposes a single item of {@link ResultSummary} that is generated by
   * the server after the streaming of the executing query is completed.
   *
   * *Subscribing to this stream before subscribing to records() stream causes the results to be discarded on the server.*
   *
   * @public
   * @returns {Observable<ResultSummary>} - An observable stream (with exactly one element) of result summary.
   */
  consume () {
    return this._result.pipe(
      mergeMap(
        result =>
          new Observable(summaryObserver =>
            this._startStreaming({ result, summaryObserver })
          )
      )
    )
  }

  /**
   * Pauses the automatic streaming of records.
   *
   * This method provides a way of control the flow of records
   *
   * @experimental
   */
  pause () {
    this._controls.pause()
  }

  /**
   * Resumes the automatic streaming of records.
   *
   * This method won't need to be called in normal stream operation. It only applies to the case when the stream is paused.
   *
   * This method is method won't start the consuming records if the ${@link records()} stream didn't get subscribed.
   * @experimental
   * @returns {Promise<void>} - A promise that resolves when the stream is resumed.
   */
  resume () {
    return this._controls.resume()
  }

  /**
   * Pushes the next record to the stream.
   *
   * This method automatic pause the auto-streaming of records and then push next record to the stream.
   *
   * For returning the automatic streaming of records, use {@link resume} method.
   *
   * @experimental
   * @returns {Promise<void>} - A promise that resolves when the push is completed.
   */
  push () {
    return this._controls.push()
  }

  _startStreaming ({
    result,
    recordsObserver = null,
    summaryObserver = null
  } = {}) {
    const subscriptions = []

    if (summaryObserver) {
      subscriptions.push(this._summary.subscribe(summaryObserver))
    }

    if (this._state < States.STREAMING) {
      this._state = States.STREAMING
      this._setupRecordsStream(result)
      if (recordsObserver) {
        subscriptions.push(this._records.subscribe(recordsObserver))
      } else {
        result._cancel()
      }

      subscriptions.push({
        unsubscribe: () => {
          if (result._cancel) {
            result._cancel()
          }
        }
      })
    } else if (recordsObserver) {
      recordsObserver.error(
        newError(
          'Streaming has already started/consumed with a previous records or summary subscription.'
        )
      )
    }

    return () => {
      subscriptions.forEach(s => s.unsubscribe())
    }
  }

  /**
   * Create a {@link Observable} for the current {@link RxResult}
   *
   *
   * @package
   * @experimental
   * @since 5.0
   * @return {Observable<RxResult>}
   */
  _toObservable () {
    function wrap (result) {
      return new Observable(observer => {
        observer.next(result)
        observer.complete()
      })
    }
    return new Observable(observer => {
      this._result.subscribe({
        complete: () => observer.complete(),
        next: result => observer.next(new RxResult(wrap(result)), this._state),
        error: e => observer.error(e)
      })
    })
  }

  _setupRecordsStream (result) {
    if (this._records) {
      return this._records
    }

    this._records = createFullyControlledSubject(
      result[Symbol.asyncIterator](),
      {
        complete: async () => {
          this._state = States.COMPLETED
          this._summary.next(await result.summary())
          this._summary.complete()
        },
        error: error => {
          this._state = States.COMPLETED
          this._summary.error(error)
        }
      },
      this._controls
    )
    return this._records
  }
}

function createFullyControlledSubject (
  iterator,
  completeObserver,
  streamControl = new StreamControl()
) {
  const subject = new Subject()

  const pushNextValue = async result => {
    try {
      streamControl.pushing = true
      const { done, value } = await result
      if (done) {
        subject.complete()
        completeObserver.complete()
      } else {
        subject.next(value)
        if (!streamControl.paused) {
          pushNextValue(iterator.next())
            .catch(() => {})
        }
      }
    } catch (error) {
      subject.error(error)
      completeObserver.error(error)
    } finally {
      streamControl.pushing = false
    }
  }

  async function push (value) {
    await pushNextValue(iterator.next(value))
  }

  streamControl.pusher = push
  push()

  return subject
}

class StreamControl {
  constructor (push = async () => {}) {
    this._paused = false
    this._pushing = false
    this._push = push
  }

  pause () {
    this._paused = true
  }

  get paused () {
    return this._paused
  }

  set pushing (pushing) {
    this._pushing = pushing
  }

  get pushing () {
    return this._pushing
  }

  async resume () {
    const wasPaused = this._paused
    this._paused = false
    if (wasPaused && !this._pushing) {
      await this._push()
    }
  }

  async push () {
    this.pause()
    return await this._push()
  }

  set pusher (push) {
    this._push = push
  }

  get pusher () {
    return this._push
  }
}