Skip to content
This repository was archived by the owner on Feb 21, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package com.mozilla.pig.eval.date;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

Expand Down Expand Up @@ -50,8 +49,8 @@ public String exec(Tuple input) throws IOException {
try {
Date d = inputSdf.parse((String)input.get(0));
s = outputSdf.format(d);
} catch (ParseException e) {
pigLogger.warn(this, "Date parsing error", ERRORS.DateParseError);
} catch (Exception e) {
warn("Date parse error: " + e, ERRORS.DateParseError);
}

return s;
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/com/mozilla/pig/eval/date/FormatDate.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

import com.mozilla.pig.eval.date.ConvertDateFormat.ERRORS;

public class FormatDate extends EvalFunc<String> {

private Calendar cal = Calendar.getInstance();
Expand All @@ -41,10 +44,16 @@ public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0 || input.get(0) == null) {
return null;
}

cal.setTimeInMillis(((Number)input.get(0)).longValue());

String s = null;
try {
cal.setTimeInMillis(((Number)input.get(0)).longValue());
s = sdf.format(cal.getTime());
} catch (Exception e) {
warn("Date parse error: " + e, ERRORS.DateParseError);
}

return sdf.format(cal.getTime());
return s;
}

}
2 changes: 1 addition & 1 deletion src/main/java/com/mozilla/pig/eval/date/ParseDate.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Long exec(Tuple input) throws IOException {
Date d = sdf.parse((String)input.get(0));
t = d.getTime();
} catch (ParseException e) {
pigLogger.warn(this, "Date parsing error", ERRORS.DateParseError);
warn("Date parse error: " + e, ERRORS.DateParseError);
}

return t;
Expand Down
31 changes: 15 additions & 16 deletions src/main/java/com/mozilla/pig/eval/date/TimeDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

Expand All @@ -36,7 +35,7 @@ public static enum ERRORS { DateParseError };

private int deltaUnit;
private boolean parseDate = false;
private SimpleDateFormat sdf;
private String dateFormat;

public TimeDelta() {
deltaUnit = Calendar.MILLISECOND;
Expand All @@ -52,7 +51,7 @@ public TimeDelta(String deltaUnitStr, String dateFormat) throws ParseException {
deltaUnit = Integer.parseInt(deltaUnitStr);
if (dateFormat != null) {
parseDate = true;
sdf = new SimpleDateFormat(dateFormat);
this.dateFormat = dateFormat;
}
}

Expand All @@ -63,19 +62,19 @@ public Long exec(Tuple input) throws IOException {
return null;
}

long delta = 0;
if (parseDate) {
try {
Date d1 = sdf.parse((String)input.get(0));
Date d2 = sdf.parse((String)input.get(1));
delta = DateUtil.getTimeDelta(d1.getTime(), d2.getTime(), deltaUnit);
} catch (ParseException e) {
pigLogger.warn(this, "Date parse error", ERRORS.DateParseError);
}
} else {
long t1 = ((Number)input.get(0)).longValue();
long t2 = ((Number)input.get(1)).longValue();
delta = DateUtil.getTimeDelta(t1, t2, deltaUnit);
Long delta = null;
try {
if (parseDate) {
Date d1 = DateUtil.parseAndCacheDate(dateFormat, (String)input.get(0));
Date d2 = DateUtil.parseAndCacheDate(dateFormat, (String)input.get(1));
delta = DateUtil.getTimeDelta(d1.getTime(), d2.getTime(), deltaUnit);
} else {
long t1 = ((Number)input.get(0)).longValue();
long t2 = ((Number)input.get(1)).longValue();
delta = DateUtil.getTimeDelta(t1, t2, deltaUnit);
}
} catch (Exception e) {
warn("Date parse error: " + e, ERRORS.DateParseError);
}

return delta;
Expand Down
71 changes: 11 additions & 60 deletions src/main/java/com/mozilla/pig/eval/json/JsonMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,90 +21,41 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.mozilla.pig.eval.json.deserializers.JsonMapDeserializer;

public class JsonMap extends EvalFunc<Map<String, Object>> {

public static enum ERRORS {
JSONParseError, JSONMappingError, EOFError, GenericError
};

private static final BagFactory bagFactory = BagFactory.getInstance();
protected static final TupleFactory tupleFactory = TupleFactory.getInstance();
protected final ObjectMapper jsonMapper = new ObjectMapper();

/**
* Converts List objects to DataBag to keep Pig happy
*
* @param l
* @return
*/
@SuppressWarnings("unchecked")
private DataBag convertListToBag(List<Object> l) {
DataBag dbag = bagFactory.newDefaultBag();
Tuple t = tupleFactory.newTuple();
for (Object o : l) {
if (o instanceof List) {
dbag.addAll(convertListToBag((List<Object>) o));
} else {
t.append(o);
}
}

if (t.size() > 0) {
dbag.add(t);
}

return dbag;
private static ObjectMapper jsonMapper = null;

static {
jsonMapper = new ObjectMapper();
SimpleModule module = new SimpleModule("JsonMap");
module.addDeserializer(Object.class, new JsonMapDeserializer());
jsonMapper.registerModule(module);
}

/**
* Convert map and its values to types that Pig can handle
*
* @param m
* @return
*/
@SuppressWarnings("unchecked")
protected Map<String, Object> makeSafe(Map<String, Object> m) {
Map<String, Object> safeValues = new HashMap<String, Object>();
for (Map.Entry<String, Object> entry : m.entrySet()) {
Object v = entry.getValue();
if (v != null && v instanceof List) {
DataBag db = convertListToBag((List<Object>) v);
safeValues.put(entry.getKey(), db);
} else if (v != null && v instanceof Map) {
safeValues.put(entry.getKey(), makeSafe((Map<String, Object>) v));
} else {
safeValues.put(entry.getKey(), entry.getValue());
}
}

return safeValues;
}


@Override
public Map<String, Object> exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}

try {
Map<String, Object> values = jsonMapper.readValue((String) input.get(0),
new TypeReference<Map<String, Object>>() {});
return makeSafe(values);
return jsonMapper.readValue((String) input.get(0), new TypeReference<Map<String, Object>>() {});
} catch (JsonParseException e) {
warn("JSON Parse Error: " + e.getMessage(), ERRORS.JSONParseError);
} catch (JsonMappingException e) {
Expand Down
82 changes: 39 additions & 43 deletions src/main/java/com/mozilla/pig/eval/json/JsonTupleMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,54 @@
*/
package com.mozilla.pig.eval.json;

import java.util.HashMap;
import java.util.List;
import java.io.EOFException;
import java.io.IOException;
import java.util.Map;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class JsonTupleMap extends JsonMap {

/**
* Converts List objects to Tuple to keep Pig happy
*
* @param l
* @return
*/
@SuppressWarnings("unchecked")
private Tuple convertListToTuple(List<Object> l) {
Tuple t = tupleFactory.newTuple();
for (Object o : l) {
if (o instanceof List) {
t.append(convertListToTuple((List<Object>) o));
} else {
t.append(o);
}
}
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.mozilla.pig.eval.json.deserializers.JsonTuppleDeserializer;

public class JsonTupleMap extends EvalFunc<Map<String, Object>> {

public static enum ERRORS {
JSONParseError, JSONMappingError, EOFError, GenericError
};

return t;
private static ObjectMapper jsonMapper = new ObjectMapper();

static {
jsonMapper = new ObjectMapper();
SimpleModule module = new SimpleModule("JsonTupleMap");
module.addDeserializer(Object.class, new JsonTuppleDeserializer());
jsonMapper.registerModule(module);
}

/**
* Convert map and its values to types that Pig can handle
*
* @param m
* @return
*/

@Override
@SuppressWarnings("unchecked")
protected Map<String, Object> makeSafe(Map<String, Object> m) {
Map<String, Object> safeValues = new HashMap<String, Object>();
for (Map.Entry<String, Object> entry : m.entrySet()) {
Object v = entry.getValue();
if (v != null && v instanceof List) {
// This is the main diff from the parent class
// in that it converts lists to
Tuple t = convertListToTuple((List<Object>) v);
safeValues.put(entry.getKey(), t);
} else if (v != null && v instanceof Map) {
safeValues.put(entry.getKey(), makeSafe((Map<String, Object>) v));
} else {
safeValues.put(entry.getKey(), v);
}
public Map<String, Object> exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}

try {
return jsonMapper.readValue((String) input.get(0), new TypeReference<Map<String, Object>>() {});
} catch (JsonParseException e) {
warn("JSON Parse Error: " + e.getMessage(), ERRORS.JSONParseError);
} catch (JsonMappingException e) {
warn("JSON Mapping Error: " + e.getMessage(), ERRORS.JSONMappingError);
} catch (EOFException e) {
warn("Hit EOF unexpectedly", ERRORS.EOFError);
} catch (Exception e) {
warn("Generic error during JSON mapping", ERRORS.GenericError);
}

return safeValues;
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2013 Mozilla Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mozilla.pig.eval.json.deserializers;

import java.io.IOException;

import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;

final public class JsonMapDeserializer extends RegularJsonDeserializer {

private static final BagFactory bagFactory = BagFactory.getInstance();
private static final TupleFactory tupleFactory = TupleFactory.getInstance();

protected Object deserializeArray(JsonParser jp, DeserializationContext ctx) throws IOException, JsonParseException, JsonProcessingException {
Tuple tupple = tupleFactory.newTuple();
DataBag dbag = bagFactory.newDefaultBag();

while (jp.nextToken() != JsonToken.END_ARRAY) {
Object value = deserializeJson(jp, ctx);
if (value instanceof DataBag) {
dbag.addAll((DataBag) value);
} else {
tupple.append(deserializeJson(jp, ctx));
}
}

if (tupple.size() > 0) {
dbag.add(tupple);
}

return dbag;
}
}
Loading