---
title: "SparkPipe | Connectors | Apache Kafka"
slug: "sparkpipe-connectors-apache-kafka"
description: "SparkPipe is a cloud-native service that utilizes the MQTT Sparkplug specification to securely connect to any MQTT 3.1 compatible broker, capturing all the events published by any Sparkplug-enabled edge node, and routing this data to cloud-based services and applications such as Apache Kafka, etc."
updated: 2025-12-19T15:11:36Z
published: 2025-12-19T15:11:36Z
---

> ## Documentation Index
> Fetch the complete documentation index at: https://docs.n3uron.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Apache Kafka

## Overview

The [Apache Kafka](https://kafka.apache.org/) connector publishes all the Sparkplug metrics to a Kafka broker using a JSON representation of the Sparkplug payload.

![AWS-N3-SparkPipe-Architecture-02](https://cdn.document360.io/54093ab5-6b22-4542-a265-04377931f11a/Images/Documentation/AWS-N3-SparkPipe-Architecture-02.png)

### **Features**

- **Easy-to-Parse JSON Messages:** Simplifies building data consumers, stream analytics pipelines and integrations by producing a straightforward JSON message for each metric.
- **Scalable:** Efficiently distributes the load over multiple Kafka partitions to handle high-throughput workloads while maintaining the message order for each Sparkplug edge device.
- **Flexible Authentication:** Compatible with SASL/PLAIN, SASL/SCRAM, SSL, and AWS IAM authentication mechanisms.
- **Supports all Sparkplug v3.0 Data Types:** Compatible with all [SparkplugB](https://sparkplug.eclipse.org/)data types.
- **Compression:** Reduces network bandwidth, Kafka disk usage and infrastructure costs through efficient message compression.

### Message format

SparkPipe serializes each Sparkplug metric into a JSON payload, which can be in two formats, simple and original.

#### **Simple**

The simple format produces a simplified and easy-to-parse JSON message for each Sparkplug metric.

```json
{
  "name": "BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR",
  "metadata": {
    "groupId": "My MQTT Group",
    "edgeNodeId": "Edge Node bfc0c4",
    "deviceId": "SUNN3RGY"
  },
  "dataType": "Double",
  "value": 0.9987980127334595,
  "timestamp": 1718964747032,
  "properties": {
    "opcItemPath": "nsu=N3uron:OpcUaServer;s=SUNN3RGY.BLUELAKE.PVG001.PST004.INV001.POWER_FACTOR",
    "Quality": 192,
    "opcServer": "N3uron:OpcUaServer",
    "valueSource": "opc",
    "ConfiguredTagPath": "[default]SUNN3RGY/BLUELAKE/PVG001/PST004/INV001/POWER_FACTOR"
  },
}
```

#### **Original**

This format produces the original JSON representation of the [protobuf](https://protobuf.dev/) payload for each metric in the Sparkplug message.

```json
{
    "EdgeNodeDescriptor": {
        "GroupID": "WIND3RNGY",
        "EdgeNodeID": "El Andévalo"
    },
    "DeviceID": "Device01",
    "Metric": {
        "name": "SUNN3RGY/BLUELAKE/PVG001/PST_01/KPI_ST/POA_IRRADIANCE_5MIN_AVG",
        "alias": 40,
        "timestamp": 1729077600000,
        "datatype": 10,
        "properties": {
            "keys": [
                "engUnit",
                "readOnly",
                "tooltip",
                "deadband",
                "deadbandMode",
                "Quality"
            ],
            "values": [
                {
                    "type": 12,
                    "Value": {
                        "StringValue": "W/m^2"
                    }
                },
                {
                    "type": 11,
                    "Value": {
                        "BooleanValue": true
                    }
                },
                {
                    "type": 12,
                    "Value": {
                        "StringValue": " Plane of Array irradiance 5min average "
                    }
                },
                {
                    "type": 10,
                    "Value": {
                        "DoubleValue": 0
                    }
                },
                {
                    "type": 12,
                    "Value": {
                        "StringValue": "Absolute"
                    }
                },
                {
                    "type": 3,
                    "Value": {
                        "IntValue": 192
                    }
                }
            ]
        },
        "Value": {
            "DoubleValue": 965.5749439333333
        }
    },
    "Quality": "GOOD"
}
```

## Configuration

This is the default configuration for the Kafka connector:

```ini
...
# Connectors configuration
[connectors]
  name = "kafka"

...
# Kafka
[connectors.kafka]
  # Bootstrap servers
  # Example:
  #   servers = [ 
  #     "broker01.kafka.cloud:9092",
  #     "broker02.kafka.cloud:9092",
  #     "broker03.kafka.cloud:9092"
  #   ]
  servers = []

  # SSL/TLS configuration
  tls_enabled                = true
  tls_client_certificate     = ""    # Path to the TLS client certificate
  tls_client_certificate_key = ""    # Path to the TLS certificate key
  tls_certificate_authority  = ""    # Path to the TLS Certificate Authority
  tls_insecure_skip_verify   = false # Disable certificate validation

  # Authentication mechanism (NO_AUTH, SASL_PLAIN, SASL_SCRAM, AWS_MSK_IAM)
  authentication = "NO_AUTH"

  # Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
  username = ""
  password = ""

  # SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
  algorithm = "SHA512"

  # Topic to publish messages, the topic will be created if it does no exists
  topic = "uns-data"

  # Number of partitions to create in the topic
  num_partitions = 12

  # Number of copies of each partition in the cluster
  #  1 -> No data replication, avoid in production environments
  #  2 -> Data is replicated to two brokers, can withstand the loss of 1 broker
  #  3 -> Data is replicated to three brokers, can withstand the loss of 2 brokers
  replication_factor = 3

  # JSON message format (simple, original)
  #
  # - simple: A simplified and easy-to-parse JSON representation of each Sparkplug metric.
  # - original: The original JSON representation of the Sparkplug message from protobuf.
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"
```

### SASL/SCRAM

Example configuration using SASL/SCRAM authentication:

```ini
...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
     servers = [ 
       "broker01.kafka.cloud:9096",
       "broker02.kafka.cloud:9096",
       "broker03.kafka.cloud:9096"
     ]

  # SSL/TLS configuration
  tls_enabled = true

  # SASL/SCRAM authentication
  authentication = "SASL_SCRAM"

  # Username/password for SASL_PLAIN and SASL_SCRAM (leave empty if unused)
  username = "<USERNAME>"
  password = "<PASSWORD>"

  # SHA algorithm used in SASL_SCRAM authentication (SHA256 or SHA512)
  algorithm = "SHA512"

  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"
```

### mTLS

Example configuration using mutual TLS authentication (client certificates):

```ini
...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
     servers = [ 
       "broker01.kafka.cloud:9092",
       "broker02.kafka.cloud:9092",
       "broker03.kafka.cloud:9092"
     ]
  
  # SSL/TLS configuration
  tls_enabled                = true
  tls_client_certificate     = "/etc/sparkpipe/certs/client.crt"
  tls_client_certificate_key = "/etc/sparkpipe/certs/client.key"
  tls_certificate_authority  = "/etc/sparkpipe/certs/ca.crt"

  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"
```

### Amazon MSK

Example configuration for connecting to an **Amazon MSK** cluster using IAM access control. See the [official MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html) for more details on how to properly set up the IAM access control.

IAM policy used in this example:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:cluster/myTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:topic/myTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-1:0123456789012:group/myTestCluster/*"
            ]
        }
    ]
}
```

Create an IAM role with the previously created policy and attach the policy to the EC2 instance of SparkPipe.

```ini
...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  servers = [
      "b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098",
      "b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098"
  ]

  # SSL/TLS configuration
  tls_enabled = true

  # AWS_MSK_IAM authentication
  authentication = "AWS_MSK_IAM"
  
  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, lz4, snappy, zstd)
  compression = "snappy"
```

### Confluent Cloud

Example configuration for connecting to a **Confluent Cloud** cluster using TLS and SASL/PLAIN authentication.

```ini
...
# Connectors configuration
[connectors]
  name = "kafka"

# Kafka
[connectors.kafka]
  # Bootstrap servers
  servers = [
      "cluster-id.region.aws.confluent.cloud:9092"
   ]
   
   # SSL/TLS configuration
  tls_enabled = true

  # SASL/PLAIN authentication
  authentication = "SASL_PLAIN"
  username       = "<CONFLUENT_API_KEY>"
  password       = "<CONFLUENT_API_SECRET>"

  # Topic to publish messages
  topic = "uns-data"

  # JSON message format (simple, original)
  format = "simple"

  # Message compression (none, gzip, snappy, lz4, zstd)
  compression = "snappy"
```
