- 17 Oct 2024
- 7 Minutes to read
- Print
- PDF
Apache Kafka
- Updated on 17 Oct 2024
- 7 Minutes to read
- Print
- PDF
Overview
The Apache Kafka connector publishes all the Sparkplug metrics to a Kafka broker using a JSON representation of the Sparkplug payload.
Features
Easy-to-Parse JSON Messages: Simplifies building data consumers, stream analytics pipelines and integrations by producing a straightforward JSON message for each metric.
Scalable: Efficiently distributes the load over multiple Kafka partitions to handle high-throughput workloads while maintaining the message order for each Sparkplug edge device.
Flexible Authentication: Compatible with SASL/PLAIN, SASL/SCRAM, SSL, and AWS IAM authentication mechanisms.
Supports all Sparkplug v3.0 Data Types: Compatible with all SparkplugB data types.
Compression: Reduces network bandwidth, Kafka disk usage and infrastructure costs through efficient message compression.
Message format
SparkPipe serializes each Sparkplug metric into a JSON payload, which can be in two formats, simple and original.
Simple
The simple format produces a simplified and easy-to-parse JSON message for each Sparkplug metric.
{
"name": "BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR",
"metadata": {
"groupId": "My MQTT Group",
"edgeNodeId": "Edge Node bfc0c4",
"deviceId": "SUNN3RGY"
},
"dataType": "Double",
"value": 0.9987980127334595,
"timestamp": 1718964747032,
"properties": {
"opcItemPath": "nsu=N3uron:OpcUaServer;s=SUNN3RGY.BLUELAKE.PVG001.PST004.INV001.POWER_FACTOR",
"Quality": 192,
"opcServer": "N3uron:OpcUaServer",
"valueSource": "opc",
"ConfiguredTagPath": "[default]SUNN3RGY/BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR"
},
"quality": "GOOD"
}
Original
This format produces the original JSON representation of the protobuf payload for each metric in the Sparkplug message.
{
"EdgeNodeDescriptor": {
"GroupID": "WIND3RNGY",
"EdgeNodeID": "El Andévalo"
},
"DeviceID": "Device01",
"Metric": {
"name": "SUNN3RGY/BLUELAKE/PVG001/PST_01/KPI_ST/POA_IRRADIANCE_5MIN_AVG",
"alias": 40,
"timestamp": 1729077600000,
"datatype": 10,
"properties": {
"keys": [
"engUnit",
"readOnly",
"tooltip",
"deadband",
"deadbandMode",
"Quality"
],
"values": [
{
"type": 12,
"Value": {
"StringValue": "W/m^2"
}
},
{
"type": 11,
"Value": {
"BooleanValue": true
}
},
{
"type": 12,
"Value": {
"StringValue": " Plane of Array irradiance 5min average "
}
},
{
"type": 10,
"Value": {
"DoubleValue": 0
}
},
{
"type": 12,
"Value": {
"StringValue": "Absolute"
}
},
{
"type": 3,
"Value": {
"IntValue": 192
}
}
]
},
"Value": {
"DoubleValue": 965.5749439333333
}
},
"Quality": "GOOD"
}
Configuration
The default configuration for the Kafka connector:
...
# Connectors configuration
[connectors]
name = "kafka"
# Kafka
[connectors.kafka]
# Bootstrap servers
# Example:
# servers = [
# "broker01.kafka.cloud:9092",
# "broker02.kafka.cloud:9092",
# "broker03.kafka.cloud:9092"
# ]
servers = []
# SSL/TLS configuration
tls_enabled = true
tls_client_certificate = "" # Path to the TLS client certificate
tls_client_certificate_key = "" # Path to the TLS certificate key
tls_certificate_authority = "" # Path to the TLS Certificate Authority
tls_insecure_skip_verify = false # Disable certificate validation
# Authentication mechanism (NO_AUTH, SASL_PLAIN, SASL_SCRAM, AWS_MSK_IAM)
authentication = "NO_AUTH"
# Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
username = ""
password = ""
# SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
algorithm = "SHA512"
# Topic to publish messages
topic = "uns-data"
# JSON message format (simple, original)
#
# - simple: A simplified and easy-to-parse JSON representation of each Sparkplug metric.
# - original: The original JSON representation of the Sparkplug message from protobuf.
format = "simple"
# Message compression (none, gzip, lz4, snappy, zstd)
compression = "snappy"
SASL/SCRAM
Example configuration using SASL/SCRAM authentication:
...
# Connectors configuration
[connectors]
name = "kafka"
# Kafka
[connectors.kafka]
# Bootstrap servers
servers = [
"broker01.kafka.cloud:9096",
"broker02.kafka.cloud:9096",
"broker03.kafka.cloud:9096"
]
# SSL/TLS configuration
tls_enabled = true
# SASL/SCRAM authentication
authentication = "SASL_SCRAM"
# Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
username = "<USERNAME>"
password = "<PASSWORD>"
# SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
algorithm = "SHA512"
# Topic to publish messages
topic = "uns-data"
# JSON message format (simple, original)
format = "simple"
# Message compression (none, gzip, lz4, snappy, zstd)
compression = "snappy"
mTLS
Example configuration using mutual TLS authentication (client certificates):
...
# Connectors configuration
[connectors]
name = "kafka"
# Kafka
[connectors.kafka]
# Bootstrap servers
servers = [
"broker01.kafka.cloud:9092",
"broker02.kafka.cloud:9092",
"broker03.kafka.cloud:9092"
]
# SSL/TLS configuration
tls_enabled = true
tls_client_certificate = "/etc/sparkpipe/certs/client.crt"
tls_client_certificate_key = "/etc/sparkpipe/certs/client.key"
tls_certificate_authority = "/etc/sparkpipe/certs/ca.crt"
# Topic to publish messages
topic = "uns-data"
# JSON message format (simple, original)
format = "simple"
# Message compression (none, gzip, lz4, snappy, zstd)
compression = "snappy"
Amazon MSK
Example configuration for connecting to an Amazon MSK cluster using IAM access control. See the official MSK documentation for more details on how to properly set up the IAM access control.
IAM policy used in this example:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:us-west-1:0123456789012:cluster/myTestCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:us-west-1:0123456789012:topic/myTestCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:us-west-1:0123456789012:group/myTestCluster/*"
]
}
]
}
Create an IAM role with the previously created policy and attach the policy to the EC2 instance of SparkPipe.
...
# Connectors configuration
[connectors]
name = "kafka"
# Kafka
[connectors.kafka]
servers = [
"b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098",
"b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098"
]
# SSL/TLS configuration
tls_enabled = true
# AWS_MSK_IAM authentication
authentication = "AWS_MSK_IAM"
# Topic to publish messages
topic = "uns-data"
# JSON message format (simple, original)
format = "simple"
# Message compression (none, gzip, lz4, snappy, zstd)
compression = "snappy"
Confluent Cloud
Example configuration for connecting to a Confluent Cloud cluster using TLS and SASL/PLAIN authentication.
...
# Connectors configuration
[connectors]
name = "kafka"
# Kafka
[connectors.kafka]
# Bootstrap servers
servers = [
"cluster-id.region.aws.confluent.cloud:9092"
]
# SSL/TLS configuration
tls_enabled = true
# SASL/PLAIN authentication
authentication = "SASL_PLAIN"
username = "<CONFLUENT_API_KEY>"
password = "<CONFLUENT_API_SECRET>"
# Topic to publish messages
topic = "uns-data"
# JSON message format (simple, original)
format = "simple"
# Message compression (none, gzip, snappy, lz4, zstd)
compression = "snappy"