Subscription engines
This page describes different ways to set up a GraphQL subscription along with a @neo4j/graphql
server.
Default
The default behavior is automatically set if the subscriptions
feature is set to true
, as described in Getting Started:
new Neo4jGraphQL({
typeDefs,
driver,
features: {
subscriptions: true
},
});
This behavior enables a simple subscription system that works on a single instance. It is ideal for development, testing, and servers that do not require horizontal scaling.
Change Data Capture Beta
If your database supports Change Data Capture (CDC), you can use it as your mechanism for subscriptions using Neo4jGraphQLSubscriptionsCDCEngine
.
Make sure to follow the steps described on the CDC Documentation to enable it for your Neo4j instance.
Note that CDC-based subscriptions behave differently from other subscription mechanisms. In this case, it uses the native CDC events from Neo4j database. This has the following implications:
-
Any database change, including those changes done outside of GraphQL, will be reported.
-
Relationship events are not supported at the moment.
-
No additional broker mechanism is required. All the events are received by all the instances of
@neo4j/graphql
. -
Events are not triggered immediately but are polled to the database.
Usage
Neo4jGraphQLSubscriptionsCDCEngine
can be imported directly from the library.
The Neo4j driver is the only required argument:
import { Neo4jGraphQL, Neo4jGraphQLSubscriptionsCDCEngine } from '@neo4j/graphql'; const engine = new Neo4jGraphQLSubscriptionsCDCEngine({ driver, }) const neoSchema = new Neo4jGraphQL({ typeDefs, driver, features: { subscriptions: engine, }, });
API
The following options can be passed to the constructor:
-
driver
: The driver to be used for CDC queries. -
pollTime
: The interval, in milliseconds, between queries to CDC. Defaults to 100ms. Note that poll time is the period between one request finishing and the next one starting. The actual time it takes for CDC events to trigger the subscription also depend on your network. -
queryConfig
: An object with the driver query options to be passed to CDC requests. Use thedb
field to define the target database for CDC.
AMQP
Using subscriptions on a server with multiple instances can be complex, as described in Horizontal scaling. Therefore, the recommended approach is to use a PubSub system, which can be achieved with an AMQP broker such as RabbitMQ. This is supported by the @neo4j/graphql-amqp-subscriptions-engine package.
The @neo4j/graphql-amqp-subscriptions-engine
plugin connects to message brokers through the AMQP 0-9-1
protocol to distribute subscription events across all server instances.
Some brokers supporting this protocol are:
The plugin can be installed with npm
:
npm install @neo4j/graphql-amqp-subscriptions-engine
AMQP 1.0 is not supported by this plugin. |
Usage
The AMQP plugin should be instanced and passed to the subscription
field in features.
This automatically enables the subscriptions with the AMQP broker as a message queue:
const { Neo4jGraphQLAMQPSubscriptionsEngine } = require("@neo4j/graphql-amqp-subscriptions-engine"); const amqpSubscription = new Neo4jGraphQLAMQPSubscriptionsEngine({ connection: { hostname: "localhost", username: "guest", password: "guest", } }); const neoSchema = new Neo4jGraphQL({ typeDefs, driver, features: { subscriptions: amqpSubscription, }, });
API
The following options can be passed to the constructor:
-
connection: an AMQP uri as a string or a configuration object.
-
hostname: hostname to be used. Defaults to
localhost
. -
username: defaults to
guest
. -
password: defaults to
guest
. -
port: port of the AMQP broker. Defaults to
5672
.
-
-
exchange: the exchange to be used in the broker. Defaults to
neo4j.graphql.subscriptions.fx
. -
version: the AMQP version to be used. Currently only
0-9-1
is supported.
Additionally, any option supported by amqplib can be passed to connection
.
To set these configurations up, use the following method:
-
close(): Promise<void>: Closes the connection and channel created, and unbinds the event emitter.
Custom subscription engine
If none of the existing engines is valid for your use case, you can create a new engine to connect to any broker you may need. For that, you need to create a new class defining your messaging behavior and it must contain:
-
An
EventEmitter
property calledevents
that should emit an event every time the broker sends a message. -
A
publish
method that should publish a new event to the broker. -
Optionally, an
init
method returning a promise that should be called ongetSchema
. This is useful for setting up the connection to a broker.
In case you want to handle subscriptions using redis:
// Note: This is an example of a custom subscription behavior and not a production ready redis implementation. class CustomRedisSubscriptionEngine { constructor(redisClient) { this.client = redisClient; this.events = new EventEmitter(); } // This method connects to Redis and sends messages to the eventEmitter when receiving events. async init(){ await this.client.connect(); this.subscriber = this.client.duplicate() this.publisher = this.client.duplicate(); await this.subscriber.connect(); await this.publisher.connect(); await this.subscriber.subscribe("graphql-subscriptions", (message) => { const eventMeta = JSON.parse(message); this.events.emit(eventMeta.event, eventMeta); // Emits a new event when receiving a new message from redis }); } async publish(eventMeta) { await this.publisher.publish("graphql-subscriptions", JSON.stringify(eventMeta)); // Sends a message to redis } } const client = createClient(); // From https://www.npmjs.com/package/redis const redisSubscriptions = new CustomRedisSubscriptionEngine(client) const neoSchema = new Neo4jGraphQL({ typeDefs, driver, features: { subscriptions: redisSubscriptions, }, });
Note that extra properties and methods are often needed to handle the connection to the broker.
However, as long as the messages are sent to the broker in the publish
method and that these messages are received and then emitted through the events
property, the subscriptions are properly handled.
Using Typescript
If using Typescript, you may import the interface Neo4jGraphQLSubscriptionsEngine
to implement your own class.
Ensure the API is correctly defined:
class CustomRedisEngine implements Neo4jGraphQLSubscriptionsEngine {}
Events are sent in order to the class.
However, order is not guaranteed once these events have been broadcasted through a broker.
For cases when ordering is important, you must set up the field |