Kafka Source Connector: Payload Mode Configuration

The Kafka Source Connector for Neo4j supports two payload modes to control the format of data serialized and published to Kafka topics: EXTENDED and COMPACT. This feature is configurable through the neo4j.payload-mode property, allowing users to select the preferred serialization format based on data requirements.

Payload Modes

The neo4j.payload-mode configuration offers the following options:

  • EXTENDED (Default): Provides a detailed structure for each property, supporting schema compatibility and consistency. This format is especially useful in cases where schema changes (such as property type changes) or temporal types are present, ensuring data consistency across changes.

  • COMPACT: Produces a simpler format that only includes the essential fields. This format is lighter and may be preferable when schema compatibility or complex data types are not required.

Limitations of COMPACT Mode

  • Property Type Changes: COMPACT mode does not support changes in property types. If a property type changes in Neo4j (e.g., from integer to string), it can break the schema.

  • Protobuf Compatibility: COMPACT mode is not supported with Protobuf. It does not support serialization of temporal types (e.g., LocalDate, LocalDateTime).

Configuration

The payload mode can be configured in the source connector’s settings as follows:

"neo4j.payload-mode": "EXTENDED" // Or "COMPACT" based on requirements

Example Data Formats

The following examples show how data will be published in each payload mode.

COMPACT Mode Example

The COMPACT mode produces a minimalistic payload with only the essential fields:

{
  "name": "mary",
  "surname": "doe",
  "timestamp": 1729779296311
}

This mode is useful when performance and simplicity are priorities, and it is suitable for scenarios where schema evolution and temporal consistency are not a primary concern.

EXTENDED Mode Example

The EXTENDED mode includes additional structure and metadata to support complex types and schema consistency, preventing issues when property types change over time:

{
  "name": {
    "type": "S",
    "B": null,
    "I64": null,
    "F64": null,
    "S": "mary",
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  },
  "surname": {
    "type": "S",
    "B": null,
    "I64": null,
    "F64": null,
    "S": "doe",
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  },
  "timestamp": {
    "type": "I64",
    "B": null,
    "I64": 1729779365447,
    "F64": null,
    "S": null,
    "BA": null,
    "TLD": null,
    "TLDT": null,
    "TLT": null,
    "TZDT": null,
    "TOT": null,
    "TD": null,
    "SP": null,
    "LB": null,
    "LI64": null,
    "LF64": null,
    "LS": null,
    "LTLD": null,
    "LTLDT": null,
    "LTLT": null,
    "LZDT": null,
    "LTOT": null,
    "LTD": null,
    "LSP": null
  }
}

This mode is especially beneficial for data with complex schema requirements, as it ensures compatibility even if property types change on the Neo4j side.

Understanding the EXTENDED Payload Structure

In EXTENDED mode, each property includes fields for every supported Neo4j type. Only the field corresponding to the actual property type will contain a non-null value, while all others are set to null. This structure ensures that any change in the type of a property does not cause schema enforcement errors at either the source or sink connector.

Field Description

type

Indicates the type of the property. Possible values include: B, I64, F64, S, BA, TLD, TLDT, TLT, TZDT, TOT, TD, SP, or their list equivalents (e.g., LB, LI64, LF64, LS, LTLD, etc.).

B

Boolean type (true or false)

I64

64-bit integer

F64

64-bit floating point

S

String

BA

Byte array

TLD

Temporal Local Date

TLDT

Temporal Local DateTime

TLT

Temporal Local Time

TZDT

Temporal Zoned DateTime

TOT

Temporal Offset Time

TD

Temporal Duration

SP

Spatial Point

LB, LI64, LF64, LS, LTLD, etc.

Lists of each corresponding type

For example, a string field will be represented as:

{
  "type": "S",
  "B": null,
  "I64": null,
  "F64": null,
  "S": "actual_value",
  ...
}

Configuration Recommendations

COMPACT mode is useful and easier to work with when generated messages are consumed with other connectors or applications, and you can relax your schema compatibility mode on target topics. If your environment requires schema compatibility, temporal data types, or you have strong type safety requirements with different converters (AVRO, JSON Schema, PROTOBUF or JSON Embedded), EXTENDED mode should be preferred.

Compatibility with Sink Connectors

The EXTENDED format was introduced in connector version 5.1.0 to ensure that all data published to Kafka topics adheres to a consistent schema. This prevents issues when a property changes type on the Neo4j side (e.g., a name property changes from integer to string), enabling smooth data processing across connectors and Kafka consumers. When a Neo4j sink connector is fed by a Neo4j source connector, it’s recommended to use EXTENDED mode, as the Neo4j sink connector can seamlessly handle the EXTENDED data type.