Query strategy
Query strategy allows users to define their own Cypher query to extract changes. This requires proper schema modifications, such as tracking changes through a dedicated change tracking property such as timestamps on nodes or relationships or using soft-deletes to track deletion of entities.
Configuration
First, you need to select QUERY strategy for the connector instance;
"neo4j.source-strategy": "QUERY"
Second, you need to define your query to track changes and where to publish them.
"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 | Topic name which will receive the message. |
2 | A Cypher query that returns changed entities since the last iteration, sent in by $lastCheck parameter. |
3 | The property (field name) that we use as a cursor to track changes. This needs to be part of the returned results. |
Creating Source instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named source.query.neo4j.json
into a local directory.
We will now create the source instance by invoking the following REST call:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.query.neo4j.json
This will create a Kafka Connect source instance that will send change event messages derived by the provided query over to the my-topic
topic, using your preferred serialization format.
In Control Center, confirm that the Source connector has been created in the Connect tab, under connect-default.
Generated change event messages in this case will have the following structure, which wraps each field into a dedicated structure which is type safe:
{
"name": {
"type": "S",
"B": null,
"I64": null,
"F64": null,
"S": "<name>",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
},
"surname": {
"type": "S",
"B": null,
"I64": null,
"F64": null,
"S": "<surname>",
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
},
"timestamp": {
"type": "I64",
"B": null,
"I64": <timestamp>,
"F64": null,
"S": null,
"BA": null,
"TLD": null,
"TLDT": null,
"TLT": null,
"TZDT": null,
"TOT": null,
"TD": null,
"SP": null,
"LB": null,
"LI64": null,
"LF64": null,
"LS": null,
"LTLD": null,
"LTLDT": null,
"LTLT": null,
"LZDT": null,
"LTOT": null,
"LTD": null,
"LSP": null
}
}
While the above serialized form of the change message is forward compatible in terms of property type changes, it might make the consumer side deserialization logic quite complex and not desired.
In this case, you can configure neo4j.payload-mode
setting as COMPACT
so that change event messages will instead be structured as follows:
{
"name": "<name>",
"surname": "<surname>",
"timestamp": <timestamp>
}
Refer to the payload mode page for more information about the neo4j.payload-mode
setting, including its limitations.