Causal Clustering lifecycle
In this section we will develop some deeper knowledge of how the cluster operates. By developing our understanding of how the cluster works we will be better equipped to design, deploy, and troubleshoot our production systems.
Our in-depth tour will follow the lifecycle of a cluster. We will boot a Core cluster and pick up key architectural foundations as the cluster forms and transacts. We will then add in Read Replicas and show how they bootstrap join the cluster and then catchup and remain caught up with the Core Servers. We will then see how backup is used in live cluster environments before shutting down Read Replicas and Core Servers.
The discovery protocol is the first step in forming a Causal Cluster. It takes in some information about existing Core cluster servers, and uses this to initiate a network join protocol.
Using this information, the server will either join an existing cluster or form one of its own.
The discovery protocol targets Core Servers only regardless of whether it is a Core Server or Read Replica performing discovery. It is because we expect Read Replicas to be both numerous and, relatively speaking, transient whereas Core Servers will likely be fewer in number and relatively stable over time.
The discovery protocol takes information from
causal_clustering.initial_discovery_members in neo4j.conf, which lists which IP addresses and ports that form the cluster on startup.
Detailed information about discovery and discovery configuration options is given in the Initial discovery of cluster members section.
When consuming this information, the server will try to handshake with the other listed servers.
On successful handshake with another server (or servers), the current server will discover the whole current topology.
The discovery protocol continues to run throughout the lifetime of the Causal Cluster and is used to maintain the current state of available servers and to help clients route queries to an appropriate server via the client-side drivers.
If it is a Core Server that is performing discovery, once it has made a connection to the one of the existing Core Servers, it then joins the Raft protocol. Each database is replicated by a logically separate Raft group, so the process below is repeated for every one.
Raft is a distributed algorithm for maintaining a consistent log across multiple shared-nothing servers designed by Diego Ongaro for his 2014 Ph.D. thesis. See the Raft thesis for details.
Raft handles cluster membership by making it a normal part of keeping a distributed log in sync. Joining a cluster involves the insertion of a cluster membership entry into the Raft log which is then reliably replicated around the existing cluster. Once that entry is applied to enough members of the Raft consensus group (those machines running the specific instance of the algorithm), they update their view of the cluster to include the new server. Thus membership changes benefit from the same safety properties as other data transacted via Raft (see Transacting via the Raft protocol for more information).
The new Core Server must also catch up its own Raft logs with respect to the other Core Servers as it initializes its internal Raft instance. This is the normal case when a cluster is first booted and has performed few operations. There will be a delay before the new Core Server becomes available if it also needs to catch up (as per Catchup protocol) graph data from other servers. This is the normal case for a long lived cluster where the servers holds a great deal of graph data.
Where a joining Neo4j instance has databases whose names match databases which already exist in the cluster, the database stores on the joining instance must be the same as their counterparts on cluster members (although they are allowed to be in previous states). For example, if a cluster contains a database named products, a new instance may join with a backup of products, but not a database named products with different contents. A new instance may also join a cluster if it does not contain any matching databases.
The described catchup process is repeated for each database which exists in the cluster.
When a Read Replica performs discovery, once it has made a connection to any of the available Core clusters it proceeds to add itself into a shared whiteboard.
This whiteboard provides a view of all live Read Replicas and is used both for routing requests from database drivers that support end-user applications and for monitoring the state of the cluster.
The Read Replicas are not involved in the Raft protocol, nor are they able to influence cluster topology. Hence a shared whiteboard outside of Raft comfortably scales to very large numbers of Read Replicas.
The whiteboard is kept up to date as Read Replicas join and leave the cluster, even if they fail abruptly rather than leaving gracefully.
Once bootstrapped, each Core Server spends its time processing database transactions. Updates are reliably replicated around Core Servers via the Raft protocol. Updates appear in the form of a (committed) Raft log entry containing transaction commands which is subsequently applied to update the database.
One of Raft’s primary design goals is to be easily understandable so that there are fewer places for tricky bugs to hide in implementations. As a side-effect, it is also possible for database operators to reason about their Core Servers in their Causal Clusters.
The Raft Leader for the current term (a logical clock) appends the transaction (an 'entry' in Raft terminology) to the head of its local log and asks the other instances to do the same. When the Leader can see that a majority instances have appended the entry, it can be considered committed into the Raft log. The client application can now be informed that the transaction has safely committed since there is sufficient redundancy in the system to tolerate any (non-pathological) faults.
The Raft protocol describes three roles that an instance can be playing: Leader, Follower, and Candidate. These are transient roles and any Core Server can expect to play them throughout the lifetime of a cluster. While it is interesting from a computing science point of view to understand those states, operators should not be overly concerned: they are an implementation detail.
As each database operates within a logically separate Raft group, a core server can have multiple roles: one for each database.
For example, it may be the Leader for database
For safety, within any Raft protocol instance there is only one Leader able to make forward progress in any given term. The Leader bears the responsibility for imposing order on Raft log entries and driving the log forward with respect to the Followers.
Followers maintain their logs with respect to the current Leader’s log. Should any participant in the cluster suspect that the Leader has failed (not receiving new entries or heartbeats), then they can instigate a leadership election by entering the Candidate state. In Neo4j Core Servers this failure detection window is set by default above 20s to enable more stable leaders.
Whichever instance is in the best state (including the existing Leader, if it remains available) can emerge from the election as Leader. The "best state" for a Leader is decided by highest term, then by longest log, then by highest committed entry.
The ability to fail over roles without losing data allows forward progress even in the event of faults. Even where Raft instances fail, the protocol can rapidly piece together which of the remaining instances is best placed to take over from the failed instance (or instances) without data loss. This is the essence of a non-blocking consensus protocol which allows Neo4j Causal Clustering to provide continuous availability to applications.
Read Replicas spend their time concurrently processing graph queries and applying a stream of transactions from the Core Servers to update their local graph store.
Updates from Core Servers to Read Replicas are propagated by transaction shipping. Transaction shipping is instigated by Read Replicas frequently polling any of the Core Servers specifying the ID of the last transaction they received and processed. The frequency of polling is an operational choice.
Neo4j transaction IDs are strictly monotonic integer values (they always increase). This makes it possible to determine whether or not a transaction has been applied to a Read Replica by comparing its last processed transaction ID with that of a Core Server.
If there is a large difference between an Read Replica’s transaction history and that of a Core Server, polling may not result in any transactions being shipped. This is quite expected, for example when a new Read Replica is introduced to a long-running cluster or where a Read Replica has been down for some significant period of time. In such cases the catchup protocol will realize the gap between the Core Servers and Read Replica is too large to fill via transaction shipping and will fall back to copying the database store directly from Core Server to Read Replica. Since we are working with a live system, at the end of the database store copy the Core Server’s database is likely to have changed. The Read Replica completes the catchup by asking for any transactions missed during the copy operation before becoming available.
A very slow database store copy could conceivably leave the Read Replica too far behind to catch up via transaction log shipping as the Core Server has substantially moved on. In such cases the Read Replica server repeats the catchup protocol. In pathological cases the operator can intervene to snapshot, restore, or file copy recent store files from a fast backup.
On clean shutdown, a Read Replica will invoke the discovery protocol to remove itself from the shared whiteboard overview of the cluster. It will also ensure that the database is cleanly shutdown and consistent, immediately ready for future use.
On an unclean shutdown such as a power outage, the Core Servers maintaining the overview of the cluster will notice that the Read Replica’s connection has been abruptly been cut. The discovery machinery will initially hide the Read Replica’s whiteboard entry, and if the Read Replica does not reappear quickly its modest memory use in the shared whiteboard will be reclaimed.
On unclean shutdown it is possible the Read Replica will not have entirely consistent store files or transaction logs. On subsequent reboot the Read Replica will rollback any partially applied transactions such that the database is in a consistent state.
A shutdown of a Core Server, like Core Server booting, is handled via the Raft protocol. When a member is shutdown, either cleanly or by force, it will eventually be voted out from the Raft group. All remaining instances accept that the cluster has grown smaller, and is therefore less fault tolerant. For any databases where the leaver was playing the Leader role, each of those leaderships will be transferred to other Core Servers. Once the new Leader is established, the Core cluster continues albeit with less redundancy.
If more members than the current fault tolerance leaves the cluster within a very short time period, the cluster cannot proceed and will lose quorum. However, if members are gradually lost, the cluster may have time to reduce the size of the cluster. A Core cluster of 5 members reduced to 3 can still continue operate normally with a fault tolerance reduced from 2 to 0. After the Raft protocol votes out the lost members which reduces the cluster size to 3, our fault tolerance has been increased from 0 to 1, and can lose yet another member and keep operating. This is because the Raft protocol has had time to vote out the lost members, and changed the cluster size of 5 (fault tolerance of 2) to 3 (fault tolerance of 1).
Raft may only reduce a cluster size to the configured causal_clustering.minimum_core_cluster_size_at_runtime. Once the cluster has reached this size, it will stop voting out members.
Was this page helpful?