Skip to content

Commit

Permalink
AVRO-530: Allow recursive types in protocol (#1768)
Browse files Browse the repository at this point in the history
* AVRO-530 recursives types in protocol
  • Loading branch information
clesaec authored Sep 12, 2023
1 parent 7f532ff commit c3b31f6
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 6 deletions.
12 changes: 9 additions & 3 deletions lang/java/avro/src/main/java/org/apache/avro/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public int hashCode() {
public String getDoc() {
return doc;
}

}

private class TwoWayMessage extends Message {
Expand Down Expand Up @@ -466,7 +465,9 @@ public byte[] getMD5() {

/** Read a protocol from a Json file. */
public static Protocol parse(File file) throws IOException {
return parse(Schema.FACTORY.createParser(file));
try (JsonParser jsonParser = Schema.FACTORY.createParser(file)) {
return parse(jsonParser);
}
}

/** Read a protocol from a Json stream. */
Expand Down Expand Up @@ -537,10 +538,15 @@ private void parseTypes(JsonNode json) {
return; // no types defined
if (!defs.isArray())
throw new SchemaParseException("Types not an array: " + defs);

for (JsonNode type : defs) {
if (!type.isObject())
throw new SchemaParseException("Type not an object: " + type);
Schema.parse(type, types);
Schema.parseNamesDeclared(type, types, types.space());

}
for (JsonNode type : defs) {
Schema.parseCompleteSchema(type, types, types.space());
}
}

Expand Down
2 changes: 1 addition & 1 deletion lang/java/avro/src/main/java/org/apache/avro/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -1865,7 +1865,7 @@ private static boolean isValidValue(Schema schema, JsonNode value) {
* @param currentNameSpace : current working name space.
* @return schema.
*/
private static Schema parseNamesDeclared(JsonNode schema, Names names, String currentNameSpace) {
static Schema parseNamesDeclared(JsonNode schema, Names names, String currentNameSpace) {
if (schema == null) {
return null;
}
Expand Down
75 changes: 74 additions & 1 deletion lang/java/avro/src/test/java/org/apache/avro/TestProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,86 @@
*/
package org.apache.avro;

import org.junit.jupiter.api.Test;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;

import com.fasterxml.jackson.databind.JsonNode;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.*;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Collections;

import org.junit.jupiter.api.Test;

public class TestProtocol {

@Test
public void parse() throws IOException {
File fic = new File("../../../share/test/schemas/namespace.avpr");
Protocol protocol = Protocol.parse(fic);
assertNotNull(protocol);
assertEquals("TestNamespace", protocol.getName());
}

/**
* record type 'User' contains a field of type 'Status', which contains a field
* of type 'User'.
*/
@Test
public void crossProtocol() {
String userStatus = "{ \"protocol\" : \"p1\", " + "\"types\": ["
+ "{\"name\": \"User\", \"type\": \"record\", \"fields\": [{\"name\": \"current_status\", \"type\": \"Status\"}]},\n"
+ "\n"
+ "{\"name\": \"Status\", \"type\": \"record\", \"fields\": [{\"name\": \"author\", \"type\": \"User\"}]}"
+ "]}";

Protocol protocol = Protocol.parse(userStatus);
Schema userSchema = protocol.getType("User");
Schema statusSchema = protocol.getType("Status");
assertSame(statusSchema, userSchema.getField("current_status").schema());
assertSame(userSchema, statusSchema.getField("author").schema());

String parsingFormUser = SchemaNormalization.toParsingForm(userSchema);
assertEquals(
"{\"name\":\"User\",\"type\":\"record\",\"fields\":[{\"name\":\"current_status\",\"type\":{\"name\":\"Status\",\"type\":\"record\",\"fields\":[{\"name\":\"author\",\"type\":\"User\"}]}}]}",
parsingFormUser);

String parsingFormStatus = SchemaNormalization.toParsingForm(statusSchema);
assertEquals(
"{\"name\":\"Status\",\"type\":\"record\",\"fields\":[{\"name\":\"author\",\"type\":{\"name\":\"User\",\"type\":\"record\",\"fields\":[{\"name\":\"current_status\",\"type\":\"Status\"}]}}]}",
parsingFormStatus);
}

/**
* When one schema with a type used before it is defined, test normalization
* defined schema before it is used.
*/
@Test
void normalization() {
final String schema = "{\n" + " \"type\":\"record\", \"name\": \"Main\", " + " \"fields\":[\n"
+ " { \"name\":\"f1\", \"type\":\"Sub\" },\n" // use Sub
+ " { \"name\":\"f2\", " + " \"type\":{\n" + " \"type\":\"enum\", \"name\":\"Sub\",\n" // define
// Sub
+ " \"symbols\":[\"OPEN\",\"CLOSE\"]\n" + " }\n" + " }\n" + " ]\n" + "}";
Schema s = new Schema.Parser().parse(schema);
assertNotNull(s);

String parsingForm = SchemaNormalization.toParsingForm(s);
assertEquals(
"{\"name\":\"Main\",\"type\":\"record\",\"fields\":[{\"name\":\"f1\",\"type\":{\"name\":\"Sub\",\"type\":\"enum\",\"symbols\":[\"OPEN\",\"CLOSE\"]}},{\"name\":\"f2\",\"type\":\"Sub\"}]}",
parsingForm);
}

@Test
void namespaceAndNameRules() {
Protocol p1 = new Protocol("P", null, "foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,8 @@ public static interface C {
void forwardReference() {
ReflectData data = ReflectData.get();
Protocol reflected = data.getProtocol(C.class);
Protocol reparsed = Protocol.parse(reflected.toString());
String ref = reflected.toString();
Protocol reparsed = Protocol.parse(ref);
assertEquals(reflected, reparsed);
assert (reparsed.getTypes().contains(data.getSchema(A.class)));
assert (reparsed.getTypes().contains(data.getSchema(B1.class)));
Expand Down

0 comments on commit c3b31f6

Please sign in to comment.