diff --git a/examples/README.md b/examples/README.md index 0f496da1..2f6317aa 100644 --- a/examples/README.md +++ b/examples/README.md @@ -84,11 +84,11 @@ Amazon Piggybank jar: http://aws.amazon.com/code/Elastic-MapReduce/2730 by_year = GROUP parsed_year BY (chararray)year; year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg; - -- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'. + -- Arg to MongoInsertStorage: field to use as '_id'. STORE year_10yearavg INTO 'mongodb://localhost:27017/demo.asfkjabfa' USING - com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group'); + com.mongodb.hadoop.pig.MongoInsertStorage('group'); @@ -315,4 +315,4 @@ After phase two is finished, the result documents look like this (the `logs_coun ], "logs_count": 1050616 } -``` \ No newline at end of file +``` diff --git a/pig/README.md b/pig/README.md index 28ea0092..71815381 100644 --- a/pig/README.md +++ b/pig/README.md @@ -4,28 +4,28 @@ ##### From a MongoDB collection -To load records from MongoDB database to use in a Pig script, a class called `MongoLoader` is provided. To use it, first register the +To load records from MongoDB database to use in a Pig script, a class called `MongoLoader` is provided. To use it, first register the dependency jars in your script and then specify the Mongo URI to load with the `MongoLoader` class. -- First, register jar dependencies - REGISTER mongo-java-driver.jar + REGISTER mongo-java-driver.jar REGISTER mongo-hadoop-core.jar REGISTER mongo-hadoop-pig.jar raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' USING com.mongodb.hadoop.pig.MongoLoader; -`MongoLoader` can be used in two ways - `Dynamic Schema` and `Fixed Schema` mode. By creating an instance of the class without -specifying any field names in the constructor (as in the previous snippet) each record will appear to Pig as a tuple containing a -single `Map` that corresponds to the document from the collection, for example: - +`MongoLoader` can be used in two ways - `Dynamic Schema` and `Fixed Schema` mode. By creating an instance of the class without +specifying any field names in the constructor (as in the previous snippet) each record will appear to Pig as a tuple containing a +single `Map` that corresponds to the document from the collection, for example: + ([bc2Year#7.87,bc3Year#7.9,bc1Month#,bc5Year#7.87,_id#631238400000,bc10Year#7.94,bc20Year#,bc7Year#7.98,bc6Month#7.89,bc3Month#7.83,dayOfWeek#TUESDAY,bc30Year#8,bc1Year#7.81]) -However, by creating a MongoLoader instance with a specific list of field names, you can map fields in the document to fields in a Pig +However, by creating a MongoLoader instance with a specific list of field names, you can map fields in the document to fields in a Pig named tuple datatype. When used this way, `MongoLoader` takes two arguments: `schema` - a schema (list of fields/datatypes) that will map fields in the document to fields in the Pig records. See section below on Datatype Mapping for details. -`idAlias` - an alias to use for the `_id` field in documents retrieved from the collection. The string "\_id" is not a legal pig variable name, so the contents of the field in `_id` will be mapped to a value in Pig accordingly by providing a value here. +`idAlias` - an alias to use for the `_id` field in documents retrieved from the collection. The string "\_id" is not a legal pig variable name, so the contents of the field in `_id` will be mapped to a value in Pig accordingly by providing a value here. Example: @@ -33,12 +33,12 @@ Example: -- map the "_id" field in the documents to the "id" field in pig > raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader('id, bc10Year', 'id'); > raw_limited = LIMIT raw 3; - > dump raw_limited; + > dump raw_limited; (631238400000,7.94) (631324800000,7.99) (631411200000,7.98) -**Note**: Pig 0.9 and earlier have issues with non-named tuples. You may need to unpack and name the tuples explicitly, for example: +**Note**: Pig 0.9 and earlier have issues with non-named tuples. You may need to unpack and name the tuples explicitly, for example: The tuple `(1,2,3)` can not be transformed into a MongoDB document. But, `FLATTEN((1,2,3)) as v1, v2, v3` can successfully be stored as `{'v1': 1, 'v2': 2, 'v3': 3}` Pig 0.10 and later handles both cases correctly, so avoiding Pig 0.9 or earlier is recommended. @@ -51,60 +51,65 @@ You can load records directly into a Pig relation from a BSON file using the `BS raw = LOAD 'file:///tmp/dump/yield_historical.in.bson' USING com.mongodb.hadoop.pig.BSONLoader; -As with `MongoLoader` you can also supply an optional `idAlias` argument to map the `_id` field to a named Pig field, along with a +As with `MongoLoader` you can also supply an optional `idAlias` argument to map the `_id` field to a named Pig field, along with a `schema` to select fields/types to extract from the documents. ##### Datatype Mapping -In the second optional argument to the `BSONLoader` and `MongoLoader` class constructors, you can explicitly provide a datatype for -each element of the schema by using the Pig schema syntax, for example `name:chararray, age:int`. If the types aren't provided, the +In the second optional argument to the `BSONLoader` and `MongoLoader` class constructors, you can explicitly provide a datatype for +each element of the schema by using the Pig schema syntax, for example `name:chararray, age:int`. If the types aren't provided, the output type will be inferred based on the values in the documents. Data mappings used for these inferred types are as follows: * Embedded Document/Object -> `Map` * Array → Unnamed `Tuple` -* Date/ISODate → a 64 bit integer containing the UNIX time. This can be manipulated by Pig UDF functions to extract month, day, +* Date/ISODate → a 64 bit integer containing the UNIX time. This can be manipulated by Pig UDF functions to extract month, day, year, or other information - see http://aws.amazon.com/code/Elastic-MapReduce/2730 for some examples. -Note: older versions of Pig may not be able to generate mappings when tuples are unnamed, due to -[PIG-2509](https://issues.apache.org/jira/browse/PIG-2509). If you get errors, try making sure that all top-level fields in the relation being stored +Note: older versions of Pig may not be able to generate mappings when tuples are unnamed, due to +[PIG-2509](https://issues.apache.org/jira/browse/PIG-2509). If you get errors, try making sure that all top-level fields in the relation being stored have names assigned to them or try using a newer version of Pig. ### Writing output from Pig -If writing to a MongoDB instance, it's recommended to set `mapreduce.map.speculative=false` -and `mapreduce.reduce.speculative=false` to prevent the possibility of duplicate records being written. You can do this on +If writing to a MongoDB instance, it's recommended to set `mapreduce.map.speculative=false` +and `mapreduce.reduce.speculative=false` to prevent the possibility of duplicate records being written. You can do this on the command line with `-D` switches or directly in the Pig script using the `SET` command. ##### Static BSON file output - -To store output from Pig in a .BSON file (which can then be imported into a MongoDB instance using `mongorestore`) use the BSONStorage + +To store output from Pig in a .BSON file (which can then be imported into a MongoDB instance using `mongorestore`) use the BSONStorage class. Example: STORE raw_out INTO 'file:///tmp/whatever.bson' USING com.mongodb.hadoop.pig.BSONStorage; -If you want to supply a custom value for the `_id` field in the documents written out by `BSONStorage` you can give it an optional +If you want to supply a custom value for the `_id` field in the documents written out by `BSONStorage` you can give it an optional `idAlias` field which maps a value in the Pig record to the `_id` field in the output document, for example: STORE raw_out INTO 'file:///tmp/whatever.bson' USING com.mongodb.hadoop.pig.BSONStorage('id'); -The output URI for BSONStorage can be any accessible file system including `hdfs://` and `s3n://`. However, when using S3 for an output +The output URI for BSONStorage can be any accessible file system including `hdfs://` and `s3n://`. However, when using S3 for an output file, you will also need to set `fs.s3.awsAccessKeyId` and `fs.s3.awsSecretAccessKey` for your AWS account accordingly. ##### Inserting directly into a MongoDB collection -To make each output record be used as an insert into a MongoDB collection, use the `MongoInsertStorage` class supplying the output URI. +To make each output record be used as an insert into a MongoDB collection, use the `MongoInsertStorage` class supplying the output URI. For example: +``` +STORE INTO 'mongodb://localhost:27017/.' + USING com.mongodb.hadoop.pig.MongoInsertStorage(''); +``` + STORE dates_averages INTO 'mongodb://localhost:27017/demo.yield_aggregated' USING com.mongodb.hadoop.pig.MongoInsertStorage('', '' ); -The `MongoInsertStorage` class also takes one argument: an `idAlias` as described above. If `idAlias` is left blank, an `ObjectId` will be +The `MongoInsertStorage` class also takes one argument: an `idAlias` as described above. If `idAlias` is left blank, an `ObjectId` will be generated for the value of the `_id` field in each output document. ### Updating a MongoDB collection -Just like in the MongoDB javascript shell, you can now update documents in a MongoDB collection within a Pig script via +Just like in the MongoDB javascript shell, you can now update documents in a MongoDB collection within a Pig script via `com.mongodb.hadoop.pig.MongoUpdateStorage`. Use: ``` @@ -129,7 +134,7 @@ where Consider the following examples: -Assume we have an alias `data` that is a bag of tuples. +Assume we have an alias `data` that is a bag of tuples. ``` data = { @@ -141,7 +146,7 @@ data = with schema `f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}`. **Note**: Every pig data structure in a pig schema has to be named. -To insert the gender, first and last names of each person in `data` into a `test.persons_info` collection, +To insert the gender, first and last names of each person in `data` into a `test.persons_info` collection, making sure that we update any existing documents with the same `first` and `last` fields, use ``` STORE data INTO 'mongodb://localhost:27017/test.persons_info' diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java b/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java index 37cc36be..166ea1b3 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java @@ -37,6 +37,9 @@ import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; +import org.joda.time.DateTime; + +import java.util.Date; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -58,7 +61,7 @@ public class BSONStorage extends StoreFunc implements StoreMetadata { private String idField = null; private final BSONFileOutputFormat outputFormat = new BSONFileOutputFormat(); - + public BSONStorage() { } @@ -113,6 +116,8 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi return o.toString(); case DataType.CHARARRAY: return o; + case DataType.DATETIME: + return (o==null ? null : ((DateTime) o).toDate()); //Given a TUPLE, create a Map so BSONEncoder will eat it case DataType.TUPLE: @@ -123,7 +128,8 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi ResourceFieldSchema[] fs = s.getFields(); Map m = new LinkedHashMap(); for (int j = 0; j < fs.length; j++) { - m.put(fs[j].getName(), getTypeForBSON(((Tuple) o).get(j), fs[j], toIgnore)); + String fn = FieldUtils.getEscFieldName(fs[j].getName()); + m.put(fn, getTypeForBSON(((Tuple) o).get(j), fs[j], toIgnore)); } return m; @@ -159,7 +165,9 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi for (Tuple t : (DataBag) o) { Map ma = new LinkedHashMap(); for (int j = 0; j < fs.length; j++) { - ma.put(fs[j].getName(), t.get(j)); + String fn = FieldUtils.getEscFieldName(fs[j].getName()); + Object data = getTypeForBSON(t.get(j), fs[j], null); + ma.put(fn, data); } a.add(ma); } @@ -184,7 +192,7 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi @SuppressWarnings("unchecked") protected void writeField(final BasicDBObjectBuilder builder, final ResourceFieldSchema field, final Object d) throws IOException { Object convertedType = getTypeForBSON(d, field, null); - String fieldName = field != null ? field.getName() : "value"; + String fieldName = field != null ? FieldUtils.getEscFieldName(field.getName()) : "value"; if (convertedType instanceof Map) { for (Map.Entry mapentry : ((Map) convertedType).entrySet()) { diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/FieldUtils.java b/pig/src/main/java/com/mongodb/hadoop/pig/FieldUtils.java new file mode 100644 index 00000000..1994cdf6 --- /dev/null +++ b/pig/src/main/java/com/mongodb/hadoop/pig/FieldUtils.java @@ -0,0 +1,13 @@ +package com.mongodb.hadoop.pig; + +public class FieldUtils { + // Escape name that starts with esc_ + private static final String ESC_PREFIX = "esc_"; + + public static String getEscFieldName(String fieldName){ + if (fieldName.startsWith(ESC_PREFIX)) { + fieldName = fieldName.replace(ESC_PREFIX, ""); + } + return fieldName; + } +} diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java b/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java index 763a3b1b..13f97d31 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java @@ -20,6 +20,7 @@ import com.mongodb.BasicDBObjectBuilder; import com.mongodb.hadoop.MongoOutputFormat; import com.mongodb.hadoop.util.MongoConfigUtil; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -52,6 +53,8 @@ public class MongoInsertStorage extends StoreFunc implements StoreMetadata { private String udfcSignature = null; private String idField = null; + private String toIgnore = null; + private final MongoOutputFormat outputFormat = new MongoOutputFormat(); @@ -66,19 +69,25 @@ public MongoInsertStorage() { public MongoInsertStorage(final String idField, final String useUpsert) { this.idField = idField; } - + public MongoInsertStorage(final String idField) { this.idField = idField; } + public MongoInsertStorage(final String idField, final String useUpsert, final String toIgnore) { + this.idField = idField; + this.toIgnore = toIgnore; + } + protected void writeField(final BasicDBObjectBuilder builder, final ResourceFieldSchema field, final Object d) throws IOException { - Object convertedType = BSONStorage.getTypeForBSON(d, field, null); + Object convertedType = BSONStorage.getTypeForBSON(d, field, this.toIgnore); if (field.getName() != null && field.getName().equals(idField)) { builder.add("_id", convertedType); } else { - builder.add(field.getName(), convertedType); + String fieldName = FieldUtils.getEscFieldName(field.getName()); + builder.add(fieldName, convertedType); } } @@ -92,6 +101,10 @@ public void checkSchema(final ResourceSchema schema) throws IOException { p.setProperty(SCHEMA_SIGNATURE, schema.toString()); } + public void storeSchema(final ResourceSchema schema) { + this.schema = schema; + } + @Override public void storeSchema(final ResourceSchema schema, final String location, final Job job) { // not implemented @@ -122,7 +135,7 @@ public void putNext(final Tuple tuple) throws IOException { out.write(null, builder.get()); } catch (Exception e) { - throw new IOException("Couldn't convert tuple to bson: ", e); + throw new IOException("Couldn't convert tuple " + tuple + " to bson: ", e); } } @@ -172,7 +185,6 @@ public void setStoreLocation(final String location, final Job job) throws IOExce MongoConfigUtil.setOutputURI(config, location); } - @Override public void setStoreFuncUDFContextSignature(final String signature) { udfcSignature = signature; diff --git a/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java b/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java index 20a4e521..04942e97 100644 --- a/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java +++ b/pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java @@ -116,9 +116,11 @@ protected void writeField(final BasicDBObjectBuilder builder, final ResourceSchema.ResourceFieldSchema field, final Object d) throws IOException { + String fieldName = FieldUtils.getEscFieldName(field.getName()); + // If the field is missing or the value is null, write a null if (d == null) { - builder.add(field.getName(), null); + builder.add(fieldName, null); return; } @@ -127,35 +129,37 @@ protected void writeField(final BasicDBObjectBuilder builder, // Based on the field's type, write it out byte i = field.getType(); if (i == DataType.INTEGER) { - builder.add(field.getName(), d); + builder.add(fieldName, d); } else if (i == DataType.LONG) { - builder.add(field.getName(), d); + builder.add(fieldName, d); } else if (i == DataType.FLOAT) { - builder.add(field.getName(), d); + builder.add(fieldName, d); } else if (i == DataType.DOUBLE) { - builder.add(field.getName(), d); + builder.add(fieldName, d); + } else if (i == DataType.DATETIME) { + builder.add(fieldName, d); } else if (i == DataType.BYTEARRAY) { - builder.add(field.getName(), d.toString()); + builder.add(fieldName, d.toString()); } else if (i == DataType.CHARARRAY) { - builder.add(field.getName(), d); + builder.add(fieldName, d); } else if (i == DataType.TUPLE) { // Given a TUPLE, create a Map so BSONEncoder will eat it if (s == null) { throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field " - + field.getName()); + + fieldName); } ResourceFieldSchema[] fs = s.getFields(); Map m = new LinkedHashMap(); for (int j = 0; j < fs.length; j++) { m.put(fs[j].getName(), ((Tuple) d).get(j)); } - builder.add(field.getName(), (Map) m); + builder.add(fieldName, (Map) m); } else if (i == DataType.BAG) { // Given a BAG, create an Array so BSONEncoder will eat it. ResourceFieldSchema[] fs; if (s == null) { throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field " - + field.getName()); + + fieldName); } fs = s.getFields(); if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) { @@ -166,7 +170,7 @@ protected void writeField(final BasicDBObjectBuilder builder, s = fs[0].getSchema(); if (s == null) { throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field " - + field.getName()); + + fieldName); } fs = s.getFields(); @@ -179,7 +183,7 @@ protected void writeField(final BasicDBObjectBuilder builder, a.add(ma); } - builder.add(field.getName(), a); + builder.add(fieldName, a); } else if (i == DataType.MAP) { Map map = (Map) d; for (Object key : map.keySet()) {