Skip to content

Confluent kafka support#61

Merged
Mayuresh Pawar (Mayureshpawar29) merged 7 commits into
mainfrom
confluent-kafka-support
May 13, 2026
Merged

Confluent kafka support#61
Mayuresh Pawar (Mayureshpawar29) merged 7 commits into
mainfrom
confluent-kafka-support

Conversation

@Mayureshpawar29
Copy link
Copy Markdown
Contributor

Description

  • Migrated Kafka task from segmentio/kafka-go to confluent-kafka-go/v2 for better SCRAM-SHA-512, TLS, and
    Schema Registry support
  • Added Avro serialization/deserialization via Confluent Schema Registry (schema_registry_url,
    schema_registry_username, schema_registry_password)
  • Added isolation.level=read_committed to both group and standalone consumers as the consumer-side
    complement to idempotent: true on the producer
  • Cleaned up kafkaTask struct — removed duplicate unexported fields (timeout, batchFlushInterval), dead
    fields (BatchSize, UserCert, UserCertPath) and unused constant (defaultBatchSize)
  • Added unit tests covering serialization, deserialization, round-trip, config builders, security protocol,
    and retry logic
  • Added test pipeline YAMLs for schema registry write, standalone read, and group read

Types of changes

  • Docs change / refactoring / dependency upgrade
  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation and I have updated the documentation accordingly.
  • I have added tests to cover my changes.

- Replaced the existing Kafka implementation with Confluent Kafka Go client for improved functionality.
- Updated task configuration to include new fields for schema registry and idempotent producer support.
- Enhanced README documentation to reflect changes in task behavior and configuration options.
- Added example pipelines for reading and writing with schema support using Confluent Kafka.
@Mayureshpawar29 Mayuresh Pawar (Mayureshpawar29) requested a review from a team as a code owner May 8, 2026 16:53
Comment thread internal/pkg/pipeline/task/kafka/kafka.go
@prasadlohakpure
Copy link
Copy Markdown
Contributor

Since the library is CGO dependent, lets also test it with/wihtout CGO_ENABLED flag.
Conditions to test:
With flag enabled, we will get bigger build.
Without flag, we have chances of building corrupted builds while deploying cross OS

Comment thread internal/pkg/pipeline/task/kafka/kafka.go Outdated
Comment thread internal/pkg/pipeline/task/kafka/kafka.go Outdated
Comment thread internal/pkg/pipeline/task/kafka/kafka.go Outdated
… task configuration

- Updated the build process in release.yaml and Dockerfile to enable CGO for the caterpillar binary.
- Added a new serializer implementation for Avro format with Schema Registry support.
- Enhanced README documentation to clarify configuration options and provide examples for using Avro serialization.
Comment thread build/Dockerfile Outdated
@@ -1,13 +1,13 @@
FROM golang:1.24.7-alpine AS builder
FROM golang:1.24.7 AS builder
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this becuase
confluent-kafka-go vendors librdkafka_glibc_linux which cannot link against Alpine's musl libc.

Comment thread build/Dockerfile Outdated

# build executable
RUN go build -o caterpillar ./cmd/caterpillar/caterpillar.go
RUN CGO_ENABLED=1 go build -o caterpillar ./cmd/caterpillar/caterpillar.go
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confluent-kafka-go wraps native librdkafka via CGO

Comment thread internal/pkg/pipeline/task/kafka/serializer.go
Comment thread test/pipelines/kafka_schema_read.yaml Outdated
…rocess

- Changed base image from Debian to Alpine for a smaller footprint.
- Added necessary packages for building with CGO and librdkafka support.
- Updated build command to include dynamic tags for the caterpillar binary.
- Introduced a new codec format handler for Avro serialization in Kafka tasks, improving schema registry integration.
- Updated the `newCodecForFormat` function to utilize a map for codec format handling, enhancing maintainability.
- Added `schema_format` configuration option to Kafka pipeline YAML files for specifying the serialization format.
- Minor comment updates in the Dockerfile to clarify the build process.
Comment thread build/Dockerfile
WORKDIR /go/src/github.com/patterninc/caterpillar

# Alpine 3.20 ships librdkafka 2.4.0; confluent-kafka-go v2 requires 2.14.0+.
RUN apk add --no-cache gcc musl-dev pkgconf \
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build time dependency.

Comment thread build/Dockerfile
RUN chmod 755 caterpillar

FROM alpine:3.20
RUN apk add --no-cache \
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run time dependency.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the standalone reader, add an Unique suffix on every run, so that it always behaves as a new reader.
Otherwise completely drop the standalone reader functionality.

- Updated the Kafka consumer group ID to include a unique UUID, ensuring better identification and management of consumer instances.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@Mayureshpawar29 Mayuresh Pawar (Mayureshpawar29) merged commit e98f063 into main May 13, 2026
7 checks passed
@Mayureshpawar29 Mayuresh Pawar (Mayureshpawar29) deleted the confluent-kafka-support branch May 13, 2026 11:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants