Apache Kafka
  • 17 Oct 2024
  • 7 Minutes to read
  • PDF

Apache Kafka

  • PDF

Article summary

Overview

The Apache Kafka connector publishes all the Sparkplug metrics to a Kafka broker using a JSON representation of the Sparkplug payload.

AWS-N3-SparkPipe-Architecture-02

Features

  • Easy-to-Parse JSON Messages: Simplifies building data consumers, stream analytics pipelines and integrations by producing a straightforward JSON message for each metric.

  • Scalable: Efficiently distributes the load over multiple Kafka partitions to handle high-throughput workloads while maintaining the message order for each Sparkplug edge device.

  • Flexible Authentication: Compatible with SASL/PLAIN, SASL/SCRAM, SSL, and AWS IAM authentication mechanisms.

  • Supports all Sparkplug v3.0 Data Types: Compatible with all SparkplugB data types.

  • Compression: Reduces network bandwidth, Kafka disk usage and infrastructure costs through efficient message compression.

Message format

SparkPipe serializes each Sparkplug metric into a JSON payload, which can be in two formats, simple and original.

Simple

The simple format produces a simplified and easy-to-parse JSON message for each Sparkplug metric.

{
  "name": "BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR",
  "metadata": {
    "groupId": "My MQTT Group",
    "edgeNodeId": "Edge Node bfc0c4",
    "deviceId": "SUNN3RGY"
  },
  "dataType": "Double",
  "value": 0.9987980127334595,
  "timestamp": 1718964747032,
  "properties": {
    "opcItemPath": "nsu=N3uron:OpcUaServer;s=SUNN3RGY.BLUELAKE.PVG001.PST004.INV001.POWER_FACTOR",
    "Quality": 192,
    "opcServer": "N3uron:OpcUaServer",
    "valueSource": "opc",
    "ConfiguredTagPath": "[default]SUNN3RGY/BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR"
  },
  "quality": "GOOD"
}

Original

This format produces the original JSON representation of the protobuf payload for each metric in the Sparkplug message.

{
    "EdgeNodeDescriptor": {
        "GroupID": "WIND3RNGY",
        "EdgeNodeID": "El Andévalo"
    },
    "DeviceID": "Device01",
    "Metric": {
        "name": "SUNN3RGY/BLUELAKE/PVG001/PST_01/KPI_ST/POA_IRRADIANCE_5MIN_AVG",
        "alias": 40,
        "timestamp": 1729077600000,
        "datatype": 10,
        "properties": {
            "keys": [
                "engUnit",
                "readOnly",
                "tooltip",
                "deadband",
                "deadbandMode",
                "Quality"
            ],
            "values": [
                {
                    "type": 12,
                    "Value": {
                        "StringValue": "W/m^2"
                    }
                },
                {
                    "type": 11,
                    "Value": {
                        "BooleanValue": true
                    }
                },
                {
                    "type": 12,
                    "Value": {
                        "StringValue": " Plane of Array irradiance 5min average "
                    }
                },
                {
                    "type": 10,
                    "Value": {
                        "DoubleValue": 0
                    }
                },
                {
                    "type": 12,
                    "Value": {
                        "StringValue": "Absolute"
                    }
                },
                {
                    "type": 3,
                    "Value": {
                        "IntValue": 192
                    }
                }
            ]
        },
        "Value": {
            "DoubleValue": 965.5749439333333
        }
    },
    "Quality": "GOOD"
}

Configuration

The default configuration for the Kafka connector:

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
  # Example:
  #   servers = [ 
  #     "broker01.kafka.cloud:9092",
  #     "broker02.kafka.cloud:9092",
  #     "broker03.kafka.cloud:9092"
  #   ]
  servers = []

  # SSL/TLS configuration
  tls_enabled                = true
  tls_client_certificate     = ""    # Path to the TLS client certificate
  tls_client_certificate_key = ""    # Path to the TLS certificate key
  tls_certificate_authority  = ""    # Path to the TLS Certificate Authority
  tls_insecure_skip_verify   = false # Disable certificate validation

  # Authentication mechanism (NO_AUTH, SASL_PLAIN, SASL_SCRAM, AWS_MSK_IAM)
  authentication = "NO_AUTH"

  # Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
  username = ""
  password = ""

  # SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
  algorithm = "SHA512"

  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  #
  # - simple: A simplified and easy-to-parse JSON representation of each Sparkplug metric.
  # - original: The original JSON representation of the Sparkplug message from protobuf.
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

SASL/SCRAM

Example configuration using SASL/SCRAM authentication:

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
     servers = [ 
       "broker01.kafka.cloud:9096",
       "broker02.kafka.cloud:9096",
       "broker03.kafka.cloud:9096"
     ]

  # SSL/TLS configuration
  tls_enabled = true

  # SASL/SCRAM authentication
  authentication = "SASL_SCRAM"

  # Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
  username = "<USERNAME>"
  password = "<PASSWORD>"

  # SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
  algorithm = "SHA512"

  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

mTLS

Example configuration using mutual TLS authentication (client certificates):

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
     servers = [ 
       "broker01.kafka.cloud:9092",
       "broker02.kafka.cloud:9092",
       "broker03.kafka.cloud:9092"
     ]
  
  # SSL/TLS configuration
  tls_enabled                = true
  tls_client_certificate     = "/etc/sparkpipe/certs/client.crt"
  tls_client_certificate_key = "/etc/sparkpipe/certs/client.key"
  tls_certificate_authority  = "/etc/sparkpipe/certs/ca.crt"

  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

Amazon MSK

Example configuration for connecting to an Amazon MSK cluster using IAM access control. See the official MSK documentation for more details on how to properly set up the IAM access control.

IAM policy used in this example:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:cluster/myTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:topic/myTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:group/myTestCluster/*"
            ]
        }
    ]
}

Create an IAM role with the previously created policy and attach the policy to the EC2 instance of SparkPipe.

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  servers = [
      "b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098",
      "b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098"
  ]

  # SSL/TLS configuration
  tls_enabled = true

  # AWS_MSK_IAM authentication
  authentication = "AWS_MSK_IAM"
  
  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

Confluent Cloud

Example configuration for connecting to a Confluent Cloud cluster using TLS and SASL/PLAIN authentication.

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
  servers = [
      "cluster-id.region.aws.confluent.cloud:9092"
   ]
   
   # SSL/TLS configuration
  tls_enabled = true

  # SASL/PLAIN authentication
  authentication = "SASL_PLAIN"
  username       = "<CONFLUENT_API_KEY>"
  password       = "<CONFLUENT_API_SECRET>"

  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, snappy, lz4, zstd)
  compression = "snappy"


Was this article helpful?

What's Next