Skip to content

Conversation

parisni
Copy link

@parisni parisni commented Sep 18, 2025

What is the purpose of the change

This pull request adds support for primitive types in ConfluentRegistryAvroSerializationSchema by introducing a new forPrimitiveType factory method. This enables serialization of primitive Avro types (string, int, long, boolean, float, double) with Confluent Schema Registry, which was previously only possible for SpecificRecord and GenericRecord types.

Brief change log

  • Added forPrimitiveType static factory method to ConfluentRegistryAvroSerializationSchema
  • Method accepts primitive Avro schemas and uses Object.class as the record class
  • Follows same pattern as existing forSpecific and forGeneric methods
  • Added comprehensive test coverage for all primitive types

Verifying this change

This change added tests and can be verified as follows:

  • Added ConfluentRegistryAvroSerializationSchemaTest with 9 comprehensive unit tests
  • Tests validate factory method creation for all Avro primitive types (string, int, long, boolean, float, double)
  • Tests verify configuration handling with both null and custom registry configs
  • Tests ensure API consistency with existing forSpecific and forGeneric methods
  • All tests follow existing Flink test patterns and conventions
  • Tests use mock schema registry approach to avoid network dependencies

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 18, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -140,4 +140,19 @@ public static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric
DEFAULT_IDENTITY_MAP_CAPACITY,
registryConfigs));
}

public static ConfluentRegistryAvroSerializationSchema<Object> forPrimitiveType(
Copy link
Contributor

@davidradl davidradl Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please could you raise a Flip if you think it is a feature; you should have a Jira anyway to track this and details the motivation for the change. Then the PR title should be changed to reference the Jira as per the process.

I assume this effects the externals in someway to enable primitive processing - I suggest including this in the docs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Done.

@parisni parisni changed the title feat: support avro primitive keys [FLINK-38382]feat: support avro primitive keys Sep 18, 2025
@parisni parisni changed the title [FLINK-38382]feat: support avro primitive keys [FLINK-38382][kafka] Support avro primitive keys in Confluent Schema Registry binding Sep 18, 2025
@parisni parisni changed the title [FLINK-38382][kafka] Support avro primitive keys in Confluent Schema Registry binding [FLINK-38382][kafka] Support avro primitive types in Confluent Schema Registry binding Sep 18, 2025
@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Sep 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants