Apache Kafka
  • 12 Jul 2024
  • 4 Minutes to read
  • PDF

Apache Kafka

  • PDF

Article summary

Overview

The Apache Kafka connector publishes all the Sparkplug metrics to a Kafka topic using an easy-to-parse JSON representation of the Sparkplug payload.

AWS-N3-SparkPipe-Architecture-02

Features

  • Easy-to-Parse JSON Messages: Simplifies building data consumers, stream analytics pipelines and integrations by producing a straightforward JSON message for each metric.

  • Scalable: Efficiently distributes the load over multiple Kafka partitions to handle high-throughput workloads while maintaining the message order for each Sparkplug edge device.

  • Flexible Authentication: Compatible with SASL/PLAIN, SASL/SCRAM, mTLS, and AWS IAM authentication mechanisms.

  • Supports all Sparkplug v3.0 Data Types: Compatible with all SparkplugB data types.

  • Compression: Reduces network bandwidth, Kafka disk usage and infrastructure costs through efficient message compression.

Message format

SparkPipe serializes each Sparkplug metric into a simple and easy-to-parse JSON payload.

Example JSON:

{
  "name": "BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR",
  "metadata": {
    "groupId": "My MQTT Group",
    "edgeNodeId": "Edge Node bfc0c4",
    "deviceId": "SUNN3RGY"
  },
  "dataType": "Double",
  "value": 0.9987980127334595,
  "timestamp": 1718964747032,
  "properties": {
    "opcItemPath": "nsu=N3uron:OpcUaServer;s=SUNN3RGY.BLUELAKE.PVG001.PST004.INV001.POWER_FACTOR",
    "Quality": 192,
    "opcServer": "N3uron:OpcUaServer",
    "valueSource": "opc",
    "ConfiguredTagPath": "[default]SUNN3RGY/BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR"
  },
  "quality": "GOOD"
}

Configuration

The default configuration for the Kafka connector:

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
  # Example:
  #   servers = [ 
  #     "broker01.kafka.cloud:9092",
  #     "broker02.kafka.cloud:9092",
  #     "broker03.kafka.cloud:9092"
  #   ]
  servers = []

  # SSL/TLS configuration
  tls_enabled                = true
  tls_client_certificate     = ""    # Path to the TLS client certificate
  tls_client_certificate_key = ""    # Path to the TLS certificate key
  tls_certificate_authority  = ""    # Path to the TLS Certificate Authority
  tls_insecure_skip_verify   = false # Disable certificate validation

  # Authentication mechanism (NO_AUTH, SASL_PLAIN, SASL_SCRAM, AWS_MSK_IAM)
  authentication = "NO_AUTH"

  # Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
  username = ""
  password = ""

  # SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
  algorithm = "SHA512"

  # Topic to publish messages
  topic = "uns-data"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

SASL/SCRAM

Example configuration using SASL/SCRAM authentication:

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
     servers = [ 
       "broker01.kafka.cloud:9096",
       "broker02.kafka.cloud:9096",
       "broker03.kafka.cloud:9096"
     ]

  # SSL/TLS configuration
  tls_enabled                = true

  # SASL/SCRAM authentication
  authentication = "SASL_SCRAM"

  # Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
  username = "<USERNAME>"
  password = "<PASSWORD>"

  # SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
  algorithm = "SHA512"

  # Topic to publish messages
  topic = "uns-data"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

mTLS

Example configuration using mutual TLS authentication:

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
     servers = [ 
       "broker01.kafka.cloud:9092",
       "broker02.kafka.cloud:9092",
       "broker03.kafka.cloud:9092"
     ]
  
  # SSL/TLS configuration
  tls_enabled                = true
  tls_client_certificate     = "/etc/sparkpipe/certs/client.crt"
  tls_client_certificate_key = "/etc/sparkpipe/certs/client.key"
  tls_certificate_authority  = "/etc/sparkpipe/certs/ca.crt"

  # Topic to publish messages
  topic = "uns-data"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

Amazon MSK

Example configuration for connecting to an Amazon MSK cluster using IAM access control. See the official MSK documentation for more details on how to properly set up the IAM access control.

IAM policy used in this example:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:cluster/myTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:topic/myTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:group/myTestCluster/*"
            ]
        }
    ]
}

Create an IAM role with the previously created policy and attach the policy to the EC2 instance of SparkPipe.

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  servers = [
      "b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098",
      "b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098"
  ]

  # SSL/TLS configuration
  tls_enabled = true

  # AWS_MSK_IAM authentication
  authentication = "AWS_MSK_IAM"
  
  # Topic to publish messages
  topic = "uns-data"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"

Confluent Cloud

Example configuration for connecting to a Confluent Cloud cluster using TLS and SASL/PLAIN authentication.

...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
  servers                = [
      "cluster-id.region.aws.confluent.cloud:9092"
   ]
   
   # SSL/TLS configuration
  tls_enabled  = true

  # SASL/PLAIN authentication
  authentication = "SASL_PLAIN"
  username       = "<CONFLUENT_API_KEY>"
  password       = "<CONFLUENT_API_SECRET>"

  # Topic to publish messages
  topic = "uns-data"

  # Message compression (none, gzip, snappy, lz4, zstd)
  compression = "snappy"


Was this article helpful?

What's Next