Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dependency verification #23

Open
wants to merge 55 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
efb456d
add basic support of references to POST/GET subjects
libretto Apr 19, 2022
843a1da
fixups
libretto Apr 19, 2022
44f853a
fixup
libretto Apr 19, 2022
3498962
fixup
libretto Apr 19, 2022
9e9b16d
fixup
libretto Apr 20, 2022
944cd39
fixup
libretto Apr 20, 2022
956b68e
fixup
libretto Apr 26, 2022
d677bd5
merge with master and fixup conflicts
libretto Apr 26, 2022
764facf
debugging
libretto May 1, 2022
c1c4c50
referencedby/delete workarounds
libretto May 11, 2022
732a9ed
referencedby/delete workarounds
libretto May 11, 2022
1f33b7d
Add basic support of references with basic tests
libretto May 16, 2022
ebe889f
Merge branch 'master' into deps
sujayinstaclustr May 23, 2022
93cd241
removed reference for ujson
sujayinstaclustr May 23, 2022
2e9a8c6
added comma at the end
sujayinstaclustr May 23, 2022
516d280
dependencies verification without known deps
libretto Jun 14, 2022
cc1a691
fixup cyclic dependencies
libretto Jun 17, 2022
7f2cf01
Update schema_models.py
libretto Jun 19, 2022
0f2fab6
Update schema_models.py
libretto Jun 19, 2022
bbbb5a3
Update schema_models.py
libretto Jun 19, 2022
1d2b7c4
Update schema_reader.py
libretto Jun 19, 2022
4634373
Update karapace/schema_registry_apis.py
libretto Jun 19, 2022
4b99fc8
fixup style
libretto Jun 19, 2022
efe29f4
update code by PR review
libretto Jun 19, 2022
0854296
add reference_key()
libretto Jun 21, 2022
8a1b129
Merge branch 'master' into deps
sujayinstaclustr Jun 23, 2022
46146dd
fixed undefined variable missing due to merge
sujayinstaclustr Jun 23, 2022
967c822
removed extra line feeds
sujayinstaclustr Jun 29, 2022
73ff0f3
implementation of protobuf dependency verifications beta
libretto Jul 7, 2022
c38c952
fixup
libretto Jul 12, 2022
1f198af
delete workaround file
libretto Jul 13, 2022
da4171a
merge with main branch
libretto Jul 18, 2022
98d15e9
merge with main branch
libretto Jul 18, 2022
b0d2532
fixup issue
libretto Jul 18, 2022
69c5293
add info about confluent known dependencies
libretto Jul 18, 2022
70e8077
Update karapace/schema_registry_apis.py
libretto Jul 26, 2022
ef05bd4
improve PR1 code
libretto Jul 29, 2022
144c5ec
merge with deps
libretto Aug 1, 2022
8e57eeb
resolve conflicts with aiven/main
libretto Aug 4, 2022
1d4be0b
fixup merge issues
libretto Aug 8, 2022
fad2c0e
merge with aiven/master
libretto Aug 8, 2022
45c6c54
lint
libretto Aug 8, 2022
b454e1a
Merge branch 'master' into branch pr2
libretto Aug 18, 2022
e3ace7d
Merge remote-tracking branch 'origin' into pr2
libretto Aug 24, 2022
4bd7ff7
move dependency classes into separate files
libretto Sep 6, 2022
d8c54e1
change dependencies class
libretto Sep 7, 2022
d1fa06e
fixup references issues
libretto Sep 20, 2022
f72f855
fixup issues in code
libretto Sep 26, 2022
4d14ff0
fixup lint
libretto Sep 26, 2022
e1f39b9
add dependency verifier unit test
libretto Sep 28, 2022
90ef33c
add dependency verifier unit test
libretto Sep 28, 2022
341d355
merge with master
libretto Oct 4, 2022
7bd1ab6
fixup test bug with protobuf
libretto Oct 4, 2022
d5fdf59
fixup bug2
libretto Oct 5, 2022
37e8adb
fixup bug2
libretto Oct 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixups
libretto committed Apr 19, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 843a1da4a5c9538ad13060d392f85a67ce0053ad
68 changes: 26 additions & 42 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
from karapace.karapace import KarapaceBase
from karapace.master_coordinator import MasterCoordinator
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME
from karapace.schema_models import InvalidSchema, InvalidSchemaType, ValidatedTypedSchema, References, InvalidReferences
from karapace.schema_models import InvalidReferences, InvalidSchema, InvalidSchemaType, References, ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader, SchemaType, TypedSchema
from karapace.utils import json_encode, KarapaceKafkaClient
from typing import Any, Dict, Optional, Tuple
@@ -115,8 +115,7 @@ def _add_schema_registry_routes(self) -> None:
self.route("/config", callback=self.config_get, method="GET", schema_request=True)
self.route("/config", callback=self.config_set, method="PUT", schema_request=True)
self.route(
"/schemas/ids/<schema_id:path>/versions", callback=self.schemas_get_versions, method="GET",
schema_request=True
"/schemas/ids/<schema_id:path>/versions", callback=self.schemas_get_versions, method="GET", schema_request=True
)
self.route("/schemas/ids/<schema_id:path>", callback=self.schemas_get, method="GET", schema_request=True)
self.route("/schemas/types", callback=self.schemas_types, method="GET", schema_request=True)
@@ -271,14 +270,14 @@ def send_kafka_message(self, key, value):
return future

def send_schema_message(
self,
*,
subject: str,
schema: Optional[TypedSchema],
schema_id: int,
version: int,
deleted: bool,
references: Optional[References],
self,
*,
subject: str,
schema: Optional[TypedSchema],
schema_id: int,
version: int,
deleted: bool,
references: Optional[References],
):
key = '{{"subject":"{}","version":{},"magic":1,"keytype":"SCHEMA"}}'.format(subject, version)
if schema:
@@ -315,8 +314,7 @@ async def compatibility_check(self, content_type, *, subject, version, request):
"""Check for schema compatibility"""
body = request.json
self.log.info("Got request to check subject: %r, version_id: %r compatibility", subject, version)
old = await self.subject_version_get(content_type=content_type, subject=subject, version=version,
return_dict=True)
old = await self.subject_version_get(content_type=content_type, subject=subject, version=version, return_dict=True)
self.log.info("Existing schema: %r, new_schema: %r", old["schema"], body["schema"])
try:
schema_type = SchemaType(body.get("schemaType", "AVRO"))
@@ -504,8 +502,7 @@ async def subjects_list(self, content_type):
async def _subject_delete_local(self, content_type: str, subject: str, permanent: bool):
subject_data = self._subject_get(subject, content_type, include_deleted=permanent)

if permanent and [version for version, value in subject_data["schemas"].items() if
not value.get("deleted", False)]:
if permanent and [version for version, value in subject_data["schemas"].items() if not value.get("deleted", False)]:
self.r(
body={
"error_code": SchemaErrorCodes.SUBJECT_NOT_SOFT_DELETED.value,
@@ -524,10 +521,10 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent
if permanent:
for version, value in list(subject_data["schemas"].items()):
schema_id = value.get("id")
self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version,
schema_id)
self.send_schema_message(subject=subject, schema=None, schema_id=schema_id, version=version,
deleted=True)
self.log.info("Permanently deleting subject '%s' version %s (schema id=%s)", subject, version, schema_id)
self.send_schema_message(
subject=subject, schema=None, schema_id=schema_id, version=version, deleted=True, references=None
)
else:
self.send_delete_subject_message(subject, latest_schema_id)
self.r(version_list, content_type, status=HTTPStatus.OK)
@@ -583,7 +580,7 @@ async def subject_version_get(self, content_type, *, subject, version, return_di
"schema": schema.schema_str,
}
references = schema_data.get("references")
if references :
if references:
ret["references"] = references
if schema.schema_type is not SchemaType.AVRO:
ret["schemaType"] = schema.schema_type
@@ -633,7 +630,12 @@ async def _subject_version_delete_local(self, content_type: str, subject: str, v
schema_id = subject_schema_data["id"]
schema = subject_schema_data["schema"]
self.send_schema_message(
subject=subject, schema=None if permanent else schema, schema_id=schema_id, version=version, deleted=True
subject=subject,
schema=None if permanent else schema,
schema_id=schema_id,
version=version,
deleted=True,
references=None,
)
self.r(str(version), content_type, status=HTTPStatus.OK)

@@ -669,27 +671,10 @@ async def subject_version_schema_get(self, content_type, *, subject, version):
content_type=content_type,
status=HTTPStatus.NOT_FOUND,
)
self.r(self.get_referencedby(subject, version), content_type)
self.r(schema_data["schema"].schema_str, content_type)

async def subject_version_referencedby_get(self, content_type, *, subject, version):
self._validate_version(content_type, version)
subject_data = self._subject_get(subject, content_type)
max_version = max(subject_data["schemas"])

if version == "latest":
schema_data = subject_data["schemas"][max_version]
elif int(version) <= max_version:
schema_data = subject_data["schemas"].get(int(version))
else:
self.r(
body={
"error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value,
"message": f"Version {version} not found.",
},
content_type=content_type,
status=HTTPStatus.NOT_FOUND,
)
self.r(schema_data["schema"].schema_str, content_type)
pass

async def subject_versions_list(self, content_type, *, subject):
subject_data = self._subject_get(subject, content_type)
@@ -840,7 +825,6 @@ def write_new_schema_local(self, subject, body, content_type):
)
new_schema_references = None
if body.get("references"):
b = body.get("references")
try:
new_schema_references = References(schema_type, body["references"])
except InvalidReferences:
@@ -885,7 +869,7 @@ def write_new_schema_local(self, subject, body, content_type):
schema_id=schema_id,
version=version,
deleted=False,
references=new_schema_references
references=new_schema_references,
)
self.r({"id": schema_id}, content_type)

28 changes: 14 additions & 14 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from karapace.client import Client
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.schema_models import InvalidSchema, SchemaType, TypedSchema, ValidatedTypedSchema, References
from karapace.schema_models import InvalidSchema, References, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.utils import json_encode
from typing import Any, Dict, Optional, Tuple
from urllib.parse import quote
@@ -74,9 +74,8 @@ def __init__(self, schema_registry_url: str = "http://localhost:8081", server_ca

async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema, references: Optional[References]) -> int:
if schema.schema_type is SchemaType.PROTOBUF:
if references :
payload = {"schema": str(schema), "schemaType": schema.schema_type.value,
"references": references.json()}
if references:
payload = {"schema": str(schema), "schemaType": schema.schema_type.value, "references": references.json()}
else:
payload = {"schema": str(schema), "schemaType": schema.schema_type.value}
else:
@@ -86,7 +85,7 @@ async def post_new_schema(self, subject: str, schema: ValidatedTypedSchema, refe
raise SchemaRetrievalError(result.json())
return result.json()["id"]

async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema, Optional[References]]:
async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSchema, References]:
result = await self.client.get(f"subjects/{quote(subject)}/versions/latest")
if not result.ok:
raise SchemaRetrievalError(result.json())
@@ -95,9 +94,8 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche
raise SchemaRetrievalError(f"Invalid result format: {json_result}")
try:
schema_type = SchemaType(json_result.get("schemaType", "AVRO"))

if json_result["references"]:
references = References(json_result["references"])
references = References(schema_type, json_result["references"])
else:
references = None

@@ -106,7 +104,7 @@ async def get_latest_schema(self, subject: str) -> Tuple[int, ValidatedTypedSche
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e

async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, Optional[References]] :
async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema, Optional[References]]:
result = await self.client.get(f"schemas/ids/{schema_id}")
if not result.ok:
raise SchemaRetrievalError(result.json()["message"])
@@ -116,7 +114,7 @@ async def get_schema_for_id(self, schema_id: int) -> Tuple[ValidatedTypedSchema,
try:
schema_type = SchemaType(json_result.get("schemaType", "AVRO"))
if json_result["references"]:
references = References(json_result["references"])
references = References(schema_type, json_result["references"])
else:
references = None

@@ -131,10 +129,10 @@ async def close(self):

class SchemaRegistrySerializerDeserializer:
def __init__(
self,
config: dict,
name_strategy: str = "topic_name",
**cfg, # pylint: disable=unused-argument
self,
config: dict,
name_strategy: str = "topic_name",
**cfg, # pylint: disable=unused-argument
) -> None:
self.config = config
self.state_lock = asyncio.Lock()
@@ -168,6 +166,7 @@ def get_subject_name(self, topic_name: str, schema: str, subject_type: str, sche

async def get_schema_for_subject(self, subject: str) -> TypedSchema:
assert self.registry_client, "must not call this method after the object is closed."
# pylint: disable=unused-variable
schema_id, schema, references = await self.registry_client.get_latest_schema(subject)
async with self.state_lock:
schema_ser = schema.__str__()
@@ -184,7 +183,8 @@ async def get_id_for_schema(self, schema: str, subject: str, schema_type: Schema
schema_ser = schema_typed.__str__()
if schema_ser in self.schemas_to_ids:
return self.schemas_to_ids[schema_ser]
schema_id = await self.registry_client.post_new_schema(subject, schema_typed)
schema_id = await self.registry_client.post_new_schema(subject, schema_typed) # pylint: disable=E1120

async with self.state_lock:
self.schemas_to_ids[schema_ser] = schema_id
self.ids_to_schemas[schema_id] = schema_typed
4 changes: 2 additions & 2 deletions tests/integration/test_client.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ async def test_remote_client(registry_async_client: Client) -> None:
reg_cli = SchemaRegistryClient()
reg_cli.client = registry_async_client
subject = new_random_name("subject")
sc_id = await reg_cli.post_new_schema(subject, schema_avro)
sc_id = await reg_cli.post_new_schema(subject, schema_avro) # pylint: disable=E1120
assert sc_id >= 0
stored_schema = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}"
@@ -23,7 +23,7 @@ async def test_remote_client_tls(registry_async_client_tls: Client) -> None:
reg_cli = SchemaRegistryClient()
reg_cli.client = registry_async_client_tls
subject = new_random_name("subject")
sc_id = await reg_cli.post_new_schema(subject, schema_avro)
sc_id = await reg_cli.post_new_schema(subject, schema_avro) # pylint: disable=E1120
assert sc_id >= 0
stored_schema = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}"
25 changes: 9 additions & 16 deletions tests/integration/test_schema_protobuf.py
Original file line number Diff line number Diff line change
@@ -8,9 +8,9 @@
from karapace.protobuf.kotlin_wrapper import trim_margin
from tests.utils import create_subject_name_factory

import json
import logging
import pytest
import json

baseurl = "http://localhost:8081"

@@ -109,21 +109,21 @@ async def test_protobuf_schema_compatibility(registry_async_client: Client, trai

async def test_protobuf_schema_references(registry_async_client: Client) -> None:

customer_schema = """
customer_schema = """
|syntax = "proto3";
|package a1;
|message Customer {
| string name = 1;
| int32 code = 2;
|}
|"""

customer_schema = trim_margin(customer_schema)
res = await registry_async_client.post(
f"subjects/customer/versions", json={"schemaType": "PROTOBUF", "schema": customer_schema}
"subjects/customer/versions", json={"schemaType": "PROTOBUF", "schema": customer_schema}
)
assert res.status_code == 200
assert "id" in res.json()

original_schema = """
|syntax = "proto3";
|package a1;
@@ -139,24 +139,17 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None
|"""

original_schema = trim_margin(original_schema)
references = [{"name": "Customer.proto",
"subject": "customer",
"version": 1}]
references = [{"name": "Customer.proto", "subject": "customer", "version": 1}]
res = await registry_async_client.post(
f"subjects/test_schema/versions",
json={"schemaType": "PROTOBUF",
"schema": original_schema,
"references": references}
"subjects/test_schema/versions",
json={"schemaType": "PROTOBUF", "schema": original_schema, "references": references},
)
assert res.status_code == 200
assert "id" in res.json()
res = await registry_async_client.get(
f"subjects/test_schema/versions/latest", json={} )
res = await registry_async_client.get("subjects/test_schema/versions/latest", json={})
assert res.status_code == 200
myjson = res.json()
assert "id" in myjson
references = [{"name": "Customer.proto",
"subject": "customer",
"version": 1}]
references = [{"name": "Customer.proto", "subject": "customer", "version": 1}]
refs2 = json.loads(myjson["references"])
assert not any(x != y for x, y in zip(refs2, references))