Skip to content

Commit

Permalink
more readable parsing canonical format processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Karrick S. McDermott committed Jan 7, 2019
1 parent 4a9979e commit 1455199
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 55 deletions.
108 changes: 60 additions & 48 deletions canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,57 +8,60 @@ import (
)

// pcfProcessor is a function type that given a parsed JSON object, returns its
// Parsing Canonical Form accroding to Avro specification.
type pcfProcessor func(s interface{}) string
// Parsing Canonical Form according to the Avro specification.
type pcfProcessor func(s interface{}) (string, error)

// parsingCanonialForm returns the "Parsing Canonical Form" (pcf) for a parsed
// JSON structure of a *valid* Avro schema.
func parsingCanonicalForm(schema interface{}) string {
var proc pcfProcessor

proc = func(s interface{}) string {
switch val := s.(type) {
case map[string]interface{}: // JSON object
return pcfMap(val, proc)
case []interface{}: // JSON array
return pcfArray(val, proc)
case string: // Standalone string
return pcfString(val, proc)
case float64:
return pcfFloat64(val, proc)
default: // Invalid JSON element within the schema; ignore
return ""
}
// JSON structure of a valid Avro schema, or an error describing the schema
// error.
func parsingCanonicalForm(schema interface{}) (string, error) {
switch val := schema.(type) {
case map[string]interface{}:
// JSON objects are decoded as a map of strings to empty interfaces
return pcfObject(val)
case []interface{}:
// JSON arrays are decoded as a slice of empty interfaces
return pcfArray(val)
case string:
// JSON string values are decoded as a Go string
return pcfString(val)
case float64:
// JSON numerical values are decoded as Go float64
return pcfNumber(val)
default:
return "", fmt.Errorf("cannot parse schema with invalid schema type; ought to be map[string]interface{}, []interface{}, string, or float64; received: %T: %v", schema, schema)
}

return proc(schema)
}

// pcfFloat64 returns the parsing canonical form for a float64 value
func pcfFloat64(val float64, proc pcfProcessor) string {
return strconv.FormatFloat(val, 'g', -1, 64)
// pcfNumber returns the parsing canonical form for a numerical value.
func pcfNumber(val float64) (string, error) {
return strconv.FormatFloat(val, 'g', -1, 64), nil
}

// pcfString returns the parsing canonical form for a string value
func pcfString(val string, proc pcfProcessor) string {
return `"` + val + `"`
// pcfString returns the parsing canonical form for a string value.
func pcfString(val string) (string, error) {
return `"` + val + `"`, nil
}

// pcfArray returns the parsing canonical form for a JSON array
func pcfArray(val []interface{}, proc pcfProcessor) string {
var elements = make([]string, len(val))
// pcfArray returns the parsing canonical form for a JSON array.
func pcfArray(val []interface{}) (string, error) {
items := make([]string, len(val))
for i, el := range val {
elements[i] = proc(el)
p, err := parsingCanonicalForm(el)
if err != nil {
return "", err
}
items[i] = p
}
return "[" + strings.Join(elements, ",") + "]"
return "[" + strings.Join(items, ",") + "]", nil
}

// pcfMap returns the parsing canonical form for a JSON map
func pcfMap(jsonMap map[string]interface{}, proc pcfProcessor) string {
var els = make(stringPairs, 0, len(jsonMap))
// pcfObject returns the parsing canonical form for a JSON object.
func pcfObject(jsonMap map[string]interface{}) (string, error) {
pairs := make(stringPairs, 0, len(jsonMap))

namespace := ""
//Remember the namespace to fully qualify names later
// Remember the namespace to fully qualify names later
var namespace string
if namespaceJSON, ok := jsonMap["namespace"]; ok {
if namespaceStr, ok := namespaceJSON.(string); ok {
// and it's value is string (otherwise invalid schema)
Expand All @@ -71,19 +74,19 @@ func pcfMap(jsonMap map[string]interface{}, proc pcfProcessor) string {
// Reduce primitive schemas to their simple form.
if len(jsonMap) == 1 && k == "type" {
if t, ok := v.(string); ok {
return "\"" + t + "\""
return "\"" + t + "\"", nil
}
}

// Only keep relevant attributes (strip 'doc', 'alias' or 'namespace')
// Only keep relevant attributes (strip 'doc', 'alias', 'namespace')
if _, ok := fieldOrder[k]; !ok {
continue
}

// Add namespace to a non-qualified name.
if k == "name" && namespace != "" {
// Check if the name isn't already qualified.
if t, ok := v.(string); ok && !strings.Contains(t, ".") {
if t, ok := v.(string); ok && !strings.ContainsRune(t, '.') {
v = namespace + "." + t
}
}
Expand All @@ -95,18 +98,27 @@ func pcfMap(jsonMap map[string]interface{}, proc pcfProcessor) string {
s, err := strconv.ParseUint(s, 10, 0)
if err != nil {
// should never get here because already validated schema
panic(fmt.Errorf("Fixed size ought to be number greater than zero: %v", s))
return "", fmt.Errorf("Fixed size ought to be number greater than zero: %v", s)
}
v = float64(s)
}
}

els = append(els, stringPair{k, proc(k) + ":" + proc(v)})
pk, err := parsingCanonicalForm(k)
if err != nil {
return "", err
}
pv, err := parsingCanonicalForm(v)
if err != nil {
return "", err
}

pairs = append(pairs, stringPair{k, pk + ":" + pv})
}

// Sort keys by their order in specification.
sort.Sort(byAvroFieldOrder(els))
return "{" + strings.Join(els.Bs(), ",") + "}"
sort.Sort(byAvroFieldOrder(pairs))
return "{" + strings.Join(pairs.Bs(), ",") + "}", nil
}

// stringPair represents a pair of string values.
Expand All @@ -115,16 +127,16 @@ type stringPair struct {
B string
}

// stringPairs is a sortable array of pair of strings.
// stringPairs is a sortable slice of pairs of strings.
type stringPairs []stringPair

// Bs returns an array of second values of an array of pairs.
func (sp *stringPairs) Bs() []string {
out := make([]string, len(*sp))
items := make([]string, len(*sp))
for i, el := range *sp {
out[i] = el.B
items[i] = el.B
}
return out
return items
}

// fieldOrder defines fields that show up in canonical schema and specifies
Expand Down
12 changes: 5 additions & 7 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,12 @@ func NewCodec(schemaSpecification string) (*Codec, error) {

c, err := buildCodec(st, nullNamespace, schema)
if err == nil {
// // compact schema and save it
// compact, err := json.Marshal(schema)
// if err != nil {
// return nil, fmt.Errorf("cannot remarshal schema: %s", err)
// }
// c.schemaOriginal = string(compact)
c.schemaOriginal = schemaSpecification
c.schemaCanonical = parsingCanonicalForm(schema)
c.schemaCanonical, err = parsingCanonicalForm(schema)
if err != nil {
// Should not get here because schema is already validated above.
return nil, err
}
}

return c, err
Expand Down

0 comments on commit 1455199

Please sign in to comment.