Home Reference Source

lib6/transaction.js

/**
 * Copyright (c) "Neo4j"
 * Neo4j Sweden AB [https://neo4j.com]
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
    function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
    return new (P || (P = Promise))(function (resolve, reject) {
        function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
        function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
        function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
        step((generator = generator.apply(thisArg, _arguments || [])).next());
    });
};
/* eslint-disable @typescript-eslint/promise-function-async */
import { validateQueryAndParameters } from './internal/util';
import { ReadOnlyConnectionHolder, EMPTY_CONNECTION_HOLDER } from './internal/connection-holder';
import { Bookmarks } from './internal/bookmarks';
import { TxConfig } from './internal/tx-config';
import { FailedObserver, CompletedObserver } from './internal/observers';
import { newError } from './error';
import Result from './result';
/**
 * Represents a transaction in the Neo4j database.
 *
 * @access public
 */
class Transaction {
    /**
     * @constructor
     * @param {object} args
     * @param {ConnectionHolder} args.connectionHolder - the connection holder to get connection from.
     * @param {function()} args.onClose - Function to be called when transaction is committed or rolled back.
     * @param {function(bookmarks: Bookmarks)} args.onBookmarks callback invoked when new bookmark is produced.
     * @param {function()} args.onConnection - Function to be called when a connection is obtained to ensure the conneciton
     * is not yet released.
     * @param {boolean} args.reactive whether this transaction generates reactive streams
     * @param {number} args.fetchSize - the record fetch size in each pulling batch.
     * @param {string} args.impersonatedUser - The name of the user which should be impersonated for the duration of the session.
     * @param {number} args.highRecordWatermark - The high watermark for the record buffer.
     * @param {number} args.lowRecordWatermark - The low watermark for the record buffer.
     * @param {NotificationFilter} args.notificationFilter - The notification filter used for this transaction.
     * @param {NonAutoCommitApiTelemetryConfig} args.apiTelemetryConfig - The api telemetry configuration. Empty/Null for disabling telemetry
     */
    constructor({ connectionHolder, onClose, onBookmarks, onConnection, reactive, fetchSize, impersonatedUser, highRecordWatermark, lowRecordWatermark, notificationFilter, apiTelemetryConfig }) {
        this._connectionHolder = connectionHolder;
        this._reactive = reactive;
        this._state = _states.ACTIVE;
        this._onClose = onClose;
        this._onBookmarks = onBookmarks;
        this._onConnection = onConnection;
        this._onError = this._onErrorCallback.bind(this);
        this._fetchSize = fetchSize;
        this._onComplete = this._onCompleteCallback.bind(this);
        this._results = [];
        this._impersonatedUser = impersonatedUser;
        this._lowRecordWatermak = lowRecordWatermark;
        this._highRecordWatermark = highRecordWatermark;
        this._bookmarks = Bookmarks.empty();
        this._notificationFilter = notificationFilter;
        this._apiTelemetryConfig = apiTelemetryConfig;
        this._acceptActive = () => { }; // satisfy DenoJS
        this._activePromise = new Promise((resolve, reject) => {
            this._acceptActive = resolve;
        });
    }
    /**
     * @private
     * @param {Bookmarks | string |  string []} bookmarks
     * @param {TxConfig} txConfig
     * @param {Object} events List of observers to events
     * @returns {void}
     */
    _begin(getBookmarks, txConfig, events) {
        this._connectionHolder
            .getConnection()
            .then((connection) => __awaiter(this, void 0, void 0, function* () {
            this._onConnection();
            if (connection != null) {
                this._bookmarks = yield getBookmarks();
                return connection.beginTransaction({
                    bookmarks: this._bookmarks,
                    txConfig,
                    mode: this._connectionHolder.mode(),
                    database: this._connectionHolder.database(),
                    impersonatedUser: this._impersonatedUser,
                    notificationFilter: this._notificationFilter,
                    apiTelemetryConfig: this._apiTelemetryConfig,
                    beforeError: (error) => {
                        if (events != null) {
                            events.onError(error);
                        }
                        this._onError(error).catch(() => { });
                    },
                    afterComplete: (metadata) => {
                        if (events != null) {
                            events.onComplete(metadata);
                        }
                        this._onComplete(metadata);
                    }
                });
            }
            else {
                throw newError('No connection available');
            }
        }))
            .catch(error => {
            if (events != null) {
                events.onError(error);
            }
            this._onError(error).catch(() => { });
        })
            // It should make the transaction active anyway
            // further errors will be treated by the existing
            // observers
            .finally(() => this._acceptActive());
    }
    /**
     * Run Cypher query
     * Could be called with a query object i.e.: `{text: "MATCH ...", parameters: {param: 1}}`
     * or with the query and parameters as separate arguments.
     * @param {mixed} query - Cypher query to execute
     * @param {Object} parameters - Map with parameters to use in query
     * @return {Result} New Result
     */
    run(query, parameters) {
        const { validatedQuery, params } = validateQueryAndParameters(query, parameters);
        const result = this._state.run(validatedQuery, params, {
            connectionHolder: this._connectionHolder,
            onError: this._onError,
            onComplete: this._onComplete,
            onConnection: this._onConnection,
            reactive: this._reactive,
            fetchSize: this._fetchSize,
            highRecordWatermark: this._highRecordWatermark,
            lowRecordWatermark: this._lowRecordWatermak,
            preparationJob: this._activePromise
        });
        this._results.push(result);
        return result;
    }
    /**
     * Commits the transaction and returns the result.
     *
     * After committing the transaction can no longer be used.
     *
     * @returns {Promise<void>} An empty promise if committed successfully or error if any error happened during commit.
     */
    commit() {
        const committed = this._state.commit({
            connectionHolder: this._connectionHolder,
            onError: this._onError,
            onComplete: (meta) => this._onCompleteCallback(meta, this._bookmarks),
            onConnection: this._onConnection,
            pendingResults: this._results,
            preparationJob: this._activePromise
        });
        this._state = committed.state;
        // clean up
        this._onClose();
        return new Promise((resolve, reject) => {
            committed.result.subscribe({
                onCompleted: () => resolve(),
                onError: (error) => reject(error)
            });
        });
    }
    /**
     * Rollbacks the transaction.
     *
     * After rolling back, the transaction can no longer be used.
     *
     * @returns {Promise<void>} An empty promise if rolled back successfully or error if any error happened during
     * rollback.
     */
    rollback() {
        const rolledback = this._state.rollback({
            connectionHolder: this._connectionHolder,
            onError: this._onError,
            onComplete: this._onComplete,
            onConnection: this._onConnection,
            pendingResults: this._results,
            preparationJob: this._activePromise
        });
        this._state = rolledback.state;
        // clean up
        this._onClose();
        return new Promise((resolve, reject) => {
            rolledback.result.subscribe({
                onCompleted: () => resolve(),
                onError: (error) => reject(error)
            });
        });
    }
    /**
     * Check if this transaction is active, which means commit and rollback did not happen.
     * @return {boolean} `true` when not committed and not rolled back, `false` otherwise.
     */
    isOpen() {
        return this._state === _states.ACTIVE;
    }
    /**
     * Closes the transaction
     *
     * This method will roll back the transaction if it is not already committed or rolled back.
     *
     * @returns {Promise<void>} An empty promise if closed successfully or error if any error happened during
     */
    close() {
        return __awaiter(this, void 0, void 0, function* () {
            if (this.isOpen()) {
                yield this.rollback();
            }
        });
    }
    // eslint-disable-next-line
    // @ts-ignore
    [Symbol.asyncDispose]() {
        return this.close();
    }
    _onErrorCallback(error) {
        // error will be "acknowledged" by sending a RESET message
        // database will then forget about this transaction and cleanup all corresponding resources
        // it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it
        if (this._state === _states.FAILED) {
            // already failed, nothing to do
            // if we call onError for each result again, we might run into an infinite loop, that causes an OOM eventually
            return Promise.resolve(null);
        }
        this._state = _states.FAILED;
        this._onClose();
        this._results.forEach(result => {
            if (result.isOpen()) {
                // @ts-expect-error
                result._streamObserverPromise
                    .then(resultStreamObserver => resultStreamObserver.onError(error))
                    // Nothing to do since we don't have a observer to notify the error
                    // the result will be already broke in other ways.
                    .catch((_) => { });
            }
        });
        // release connection back to the pool
        return this._connectionHolder.releaseConnection();
    }
    /**
     * @private
     * @param {object} meta The meta with bookmarks
     * @returns {void}
     */
    _onCompleteCallback(meta, previousBookmarks) {
        this._onBookmarks(new Bookmarks(meta === null || meta === void 0 ? void 0 : meta.bookmark), previousBookmarks !== null && previousBookmarks !== void 0 ? previousBookmarks : Bookmarks.empty(), meta === null || meta === void 0 ? void 0 : meta.db);
    }
}
const _states = {
    // The transaction is running with no explicit success or failure marked
    ACTIVE: {
        commit: ({ connectionHolder, onError, onComplete, onConnection, pendingResults, preparationJob }) => {
            return {
                result: finishTransaction(true, connectionHolder, onError, onComplete, onConnection, pendingResults, preparationJob),
                state: _states.SUCCEEDED
            };
        },
        rollback: ({ connectionHolder, onError, onComplete, onConnection, pendingResults, preparationJob }) => {
            return {
                result: finishTransaction(false, connectionHolder, onError, onComplete, onConnection, pendingResults, preparationJob),
                state: _states.ROLLED_BACK
            };
        },
        run: (query, parameters, { connectionHolder, onError, onComplete, onConnection, reactive, fetchSize, highRecordWatermark, lowRecordWatermark, preparationJob }) => {
            // RUN in explicit transaction can't contain bookmarks and transaction configuration
            // No need to include mode and database name as it shall be included in begin
            const requirements = preparationJob !== null && preparationJob !== void 0 ? preparationJob : Promise.resolve();
            const observerPromise = connectionHolder.getConnection()
                .then(conn => requirements.then(() => conn))
                .then(conn => {
                onConnection();
                if (conn != null) {
                    return conn.run(query, parameters, {
                        bookmarks: Bookmarks.empty(),
                        txConfig: TxConfig.empty(),
                        beforeError: onError,
                        afterComplete: onComplete,
                        reactive,
                        fetchSize,
                        highRecordWatermark,
                        lowRecordWatermark
                    });
                }
                else {
                    throw newError('No connection available');
                }
            })
                .catch(error => new FailedObserver({ error, onError }));
            return newCompletedResult(observerPromise, query, parameters, connectionHolder, highRecordWatermark, lowRecordWatermark);
        }
    },
    // An error has occurred, transaction can no longer be used and no more messages will
    // be sent for this transaction.
    FAILED: {
        commit: ({ connectionHolder, onError, onComplete }) => {
            return {
                result: newCompletedResult(new FailedObserver({
                    error: newError('Cannot commit this transaction, because it has been rolled back either because of an error or explicit termination.'),
                    onError
                }), 'COMMIT', {}, connectionHolder, 0, // high watermark
                0 // low watermark
                ),
                state: _states.FAILED
            };
        },
        rollback: ({ connectionHolder, onError, onComplete }) => {
            return {
                result: newCompletedResult(new CompletedObserver(), 'ROLLBACK', {}, connectionHolder, 0, // high watermark
                0 // low watermark
                ),
                state: _states.FAILED
            };
        },
        run: (query, parameters, { connectionHolder, onError, onComplete }) => {
            return newCompletedResult(new FailedObserver({
                error: newError('Cannot run query in this transaction, because it has been rolled back either because of an error or explicit termination.'),
                onError
            }), query, parameters, connectionHolder, 0, // high watermark
            0 // low watermark
            );
        }
    },
    // This transaction has successfully committed
    SUCCEEDED: {
        commit: ({ connectionHolder, onError, onComplete }) => {
            return {
                result: newCompletedResult(new FailedObserver({
                    error: newError('Cannot commit this transaction, because it has already been committed.'),
                    onError
                }), 'COMMIT', {}, EMPTY_CONNECTION_HOLDER, 0, // high watermark
                0 // low watermark
                ),
                state: _states.SUCCEEDED,
                connectionHolder
            };
        },
        rollback: ({ connectionHolder, onError, onComplete }) => {
            return {
                result: newCompletedResult(new FailedObserver({
                    error: newError('Cannot rollback this transaction, because it has already been committed.'),
                    onError
                }), 'ROLLBACK', {}, EMPTY_CONNECTION_HOLDER, 0, // high watermark
                0 // low watermark
                ),
                state: _states.SUCCEEDED,
                connectionHolder
            };
        },
        run: (query, parameters, { connectionHolder, onError, onComplete }) => {
            return newCompletedResult(new FailedObserver({
                error: newError('Cannot run query in this transaction, because it has already been committed.'),
                onError
            }), query, parameters, connectionHolder, 0, // high watermark
            0 // low watermark
            );
        }
    },
    // This transaction has been rolled back
    ROLLED_BACK: {
        commit: ({ connectionHolder, onError, onComplete }) => {
            return {
                result: newCompletedResult(new FailedObserver({
                    error: newError('Cannot commit this transaction, because it has already been rolled back.'),
                    onError
                }), 'COMMIT', {}, connectionHolder, 0, // high watermark
                0 // low watermark
                ),
                state: _states.ROLLED_BACK
            };
        },
        rollback: ({ connectionHolder, onError, onComplete }) => {
            return {
                result: newCompletedResult(new FailedObserver({
                    error: newError('Cannot rollback this transaction, because it has already been rolled back.')
                }), 'ROLLBACK', {}, connectionHolder, 0, // high watermark
                0 // low watermark
                ),
                state: _states.ROLLED_BACK
            };
        },
        run: (query, parameters, { connectionHolder, onError, onComplete }) => {
            return newCompletedResult(new FailedObserver({
                error: newError('Cannot run query in this transaction, because it has already been rolled back.'),
                onError
            }), query, parameters, connectionHolder, 0, // high watermark
            0 // low watermark
            );
        }
    }
};
/**
 *
 * @param {boolean} commit
 * @param {ConnectionHolder} connectionHolder
 * @param {function(err:Error): any} onError
 * @param {function(metadata:object): any} onComplete
 * @param {function() : any} onConnection
 * @param {list<Result>>}pendingResults all run results in this transaction
 */
function finishTransaction(commit, connectionHolder, onError, onComplete, onConnection, pendingResults, preparationJob) {
    const requirements = preparationJob !== null && preparationJob !== void 0 ? preparationJob : Promise.resolve();
    const observerPromise = connectionHolder.getConnection()
        .then(conn => requirements.then(() => conn))
        .then(connection => {
        onConnection();
        pendingResults.forEach(r => r._cancel());
        return Promise.all(pendingResults.map(result => result.summary())).then(results => {
            if (connection != null) {
                if (commit) {
                    return connection.commitTransaction({
                        beforeError: onError,
                        afterComplete: onComplete
                    });
                }
                else {
                    return connection.rollbackTransaction({
                        beforeError: onError,
                        afterComplete: onComplete
                    });
                }
            }
            else {
                throw newError('No connection available');
            }
        });
    })
        .catch(error => new FailedObserver({ error, onError }));
    // for commit & rollback we need result that uses real connection holder and notifies it when
    // connection is not needed and can be safely released to the pool
    return new Result(observerPromise, commit ? 'COMMIT' : 'ROLLBACK', {}, connectionHolder, {
        high: Number.MAX_VALUE,
        low: Number.MAX_VALUE
    });
}
/**
 * Creates a {@link Result} with empty connection holder.
 * For cases when result represents an intermediate or failed action, does not require any metadata and does not
 * need to influence real connection holder to release connections.
 * @param {ResultStreamObserver} observer - an observer for the created result.
 * @param {string} query - the cypher query that produced the result.
 * @param {Object} parameters - the parameters for cypher query that produced the result.
 * @param {ConnectionHolder} connectionHolder - the connection holder used to get the result
 * @return {Result} new result.
 * @private
 */
function newCompletedResult(observerPromise, query, parameters, connectionHolder = EMPTY_CONNECTION_HOLDER, highRecordWatermark, lowRecordWatermark) {
    return new Result(Promise.resolve(observerPromise), query, parameters, new ReadOnlyConnectionHolder(connectionHolder !== null && connectionHolder !== void 0 ? connectionHolder : EMPTY_CONNECTION_HOLDER), {
        low: lowRecordWatermark,
        high: highRecordWatermark
    });
}
export default Transaction;