Source Configuration
The documentation on the deprecated Neo4j Streams plugin can be found here. |
In this chapter we’ll discuss how the Source instance is configured.
Create the Source Instance
You can create a new Source instance with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @source.avro.neo4j.json
Let’s look at the source.avro.neo4j.json
file:
{
"name": "Neo4jSourceConnectorAVRO",
"config": {
"topic": "my-topic",
"connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.streaming.poll.interval.msecs": 5000,
"neo4j.streaming.property": "timestamp",
"neo4j.streaming.from": "LAST_COMMITTED",
"neo4j.enforce.schema": true,
"neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp"
}
}
This will create a Kafka Connect Source instance that will send AVRO
message over the topic named my-topic
.
Every message in the topic will have the following structure:
{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}
Please check the Configuration Summary for a detailed guide about the supported configuration parameters. |
How the Source module pushes the data to the defined Kafka topic
Timestamps aren’t necessarily unique. As a result:
|
We use the query provided in the neo4j.source.query
field by polling the database every value is into the
neo4j.streaming.poll.interval.msecs
field.
So given the JSON configuration we have that we’ll perform:
MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp
every 5000 milliseconds by publishing events like:
{"name":{"string":"John Doe"},"timestamp":{"long":1624551349362}}
In this case we use neo4j.enforce.schema=true
and this means that we will attach a schema for each record, in case
you want to stream pure simple JSON strings just use the relative serializer with neo4j.enforce.schema=false
with the
following output:
{"name": "John Doe", "timestamp": 1624549598834}
Was this page helpful?