Skip to content
This repository was archived by the owner on Jan 29, 2022. It is now read-only.

Fix MongoInsertStorage documentation #105

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
6 changes: 3 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ Amazon Piggybank jar: http://aws.amazon.com/code/Elastic-MapReduce/2730
by_year = GROUP parsed_year BY (chararray)year;
year_10yearavg = FOREACH by_year GENERATE group, AVG(parsed_year.bc) as tenyear_avg;

-- Args to MongoInsertStorage are: schema for output doc, field to use as '_id'.
-- Arg to MongoInsertStorage: field to use as '_id'.
STORE year_10yearavg
INTO 'mongodb://localhost:27017/demo.asfkjabfa'
USING
com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,tenyear_avg:float', 'group');
com.mongodb.hadoop.pig.MongoInsertStorage('group');



Expand Down Expand Up @@ -315,4 +315,4 @@ After phase two is finished, the result documents look like this (the `logs_coun
],
"logs_count": 1050616
}
```
```
59 changes: 32 additions & 27 deletions pig/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,41 @@

##### From a MongoDB collection

To load records from MongoDB database to use in a Pig script, a class called `MongoLoader` is provided. To use it, first register the
To load records from MongoDB database to use in a Pig script, a class called `MongoLoader` is provided. To use it, first register the
dependency jars in your script and then specify the Mongo URI to load with the `MongoLoader` class.

-- First, register jar dependencies
REGISTER mongo-java-driver.jar
REGISTER mongo-java-driver.jar
REGISTER mongo-hadoop-core.jar
REGISTER mongo-hadoop-pig.jar

raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' USING com.mongodb.hadoop.pig.MongoLoader;

`MongoLoader` can be used in two ways - `Dynamic Schema` and `Fixed Schema` mode. By creating an instance of the class without
specifying any field names in the constructor (as in the previous snippet) each record will appear to Pig as a tuple containing a
single `Map` that corresponds to the document from the collection, for example:
`MongoLoader` can be used in two ways - `Dynamic Schema` and `Fixed Schema` mode. By creating an instance of the class without
specifying any field names in the constructor (as in the previous snippet) each record will appear to Pig as a tuple containing a
single `Map` that corresponds to the document from the collection, for example:

([bc2Year#7.87,bc3Year#7.9,bc1Month#,bc5Year#7.87,_id#631238400000,bc10Year#7.94,bc20Year#,bc7Year#7.98,bc6Month#7.89,bc3Month#7.83,dayOfWeek#TUESDAY,bc30Year#8,bc1Year#7.81])

However, by creating a MongoLoader instance with a specific list of field names, you can map fields in the document to fields in a Pig
However, by creating a MongoLoader instance with a specific list of field names, you can map fields in the document to fields in a Pig
named tuple datatype. When used this way, `MongoLoader` takes two arguments:

`schema` - a schema (list of fields/datatypes) that will map fields in the document to fields in the Pig records. See section below on Datatype Mapping for details.

`idAlias` - an alias to use for the `_id` field in documents retrieved from the collection. The string "\_id" is not a legal pig variable name, so the contents of the field in `_id` will be mapped to a value in Pig accordingly by providing a value here.
`idAlias` - an alias to use for the `_id` field in documents retrieved from the collection. The string "\_id" is not a legal pig variable name, so the contents of the field in `_id` will be mapped to a value in Pig accordingly by providing a value here.

Example:

-- Load two fields from the documents in the collection specified by this URI
-- map the "_id" field in the documents to the "id" field in pig
> raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader('id, bc10Year', 'id');
> raw_limited = LIMIT raw 3;
> dump raw_limited;
> dump raw_limited;
(631238400000,7.94)
(631324800000,7.99)
(631411200000,7.98)

**Note**: Pig 0.9 and earlier have issues with non-named tuples. You may need to unpack and name the tuples explicitly, for example:
**Note**: Pig 0.9 and earlier have issues with non-named tuples. You may need to unpack and name the tuples explicitly, for example:
The tuple `(1,2,3)` can not be transformed into a MongoDB document. But,
`FLATTEN((1,2,3)) as v1, v2, v3` can successfully be stored as `{'v1': 1, 'v2': 2, 'v3': 3}`
Pig 0.10 and later handles both cases correctly, so avoiding Pig 0.9 or earlier is recommended.
Expand All @@ -51,60 +51,65 @@ You can load records directly into a Pig relation from a BSON file using the `BS

raw = LOAD 'file:///tmp/dump/yield_historical.in.bson' USING com.mongodb.hadoop.pig.BSONLoader;

As with `MongoLoader` you can also supply an optional `idAlias` argument to map the `_id` field to a named Pig field, along with a
As with `MongoLoader` you can also supply an optional `idAlias` argument to map the `_id` field to a named Pig field, along with a
`schema` to select fields/types to extract from the documents.

##### Datatype Mapping

In the second optional argument to the `BSONLoader` and `MongoLoader` class constructors, you can explicitly provide a datatype for
each element of the schema by using the Pig schema syntax, for example `name:chararray, age:int`. If the types aren't provided, the
In the second optional argument to the `BSONLoader` and `MongoLoader` class constructors, you can explicitly provide a datatype for
each element of the schema by using the Pig schema syntax, for example `name:chararray, age:int`. If the types aren't provided, the
output type will be inferred based on the values in the documents. Data mappings used for these inferred types are as follows:

* Embedded Document/Object -> `Map`

* Array → Unnamed `Tuple`

* Date/ISODate → a 64 bit integer containing the UNIX time. This can be manipulated by Pig UDF functions to extract month, day,
* Date/ISODate → a 64 bit integer containing the UNIX time. This can be manipulated by Pig UDF functions to extract month, day,
year, or other information - see http://aws.amazon.com/code/Elastic-MapReduce/2730 for some examples.

Note: older versions of Pig may not be able to generate mappings when tuples are unnamed, due to
[PIG-2509](https://issues.apache.org/jira/browse/PIG-2509). If you get errors, try making sure that all top-level fields in the relation being stored
Note: older versions of Pig may not be able to generate mappings when tuples are unnamed, due to
[PIG-2509](https://issues.apache.org/jira/browse/PIG-2509). If you get errors, try making sure that all top-level fields in the relation being stored
have names assigned to them or try using a newer version of Pig.

### Writing output from Pig

If writing to a MongoDB instance, it's recommended to set `mapreduce.map.speculative=false`
and `mapreduce.reduce.speculative=false` to prevent the possibility of duplicate records being written. You can do this on
If writing to a MongoDB instance, it's recommended to set `mapreduce.map.speculative=false`
and `mapreduce.reduce.speculative=false` to prevent the possibility of duplicate records being written. You can do this on
the command line with `-D` switches or directly in the Pig script using the `SET` command.

##### Static BSON file output
To store output from Pig in a .BSON file (which can then be imported into a MongoDB instance using `mongorestore`) use the BSONStorage

To store output from Pig in a .BSON file (which can then be imported into a MongoDB instance using `mongorestore`) use the BSONStorage
class. Example:

STORE raw_out INTO 'file:///tmp/whatever.bson' USING com.mongodb.hadoop.pig.BSONStorage;

If you want to supply a custom value for the `_id` field in the documents written out by `BSONStorage` you can give it an optional
If you want to supply a custom value for the `_id` field in the documents written out by `BSONStorage` you can give it an optional
`idAlias` field which maps a value in the Pig record to the `_id` field in the output document, for example:

STORE raw_out INTO 'file:///tmp/whatever.bson' USING com.mongodb.hadoop.pig.BSONStorage('id');

The output URI for BSONStorage can be any accessible file system including `hdfs://` and `s3n://`. However, when using S3 for an output
The output URI for BSONStorage can be any accessible file system including `hdfs://` and `s3n://`. However, when using S3 for an output
file, you will also need to set `fs.s3.awsAccessKeyId` and `fs.s3.awsSecretAccessKey` for your AWS account accordingly.

##### Inserting directly into a MongoDB collection

To make each output record be used as an insert into a MongoDB collection, use the `MongoInsertStorage` class supplying the output URI.
To make each output record be used as an insert into a MongoDB collection, use the `MongoInsertStorage` class supplying the output URI.
For example:
```
STORE <aliasname> INTO 'mongodb://localhost:27017/<db>.<collection>'
USING com.mongodb.hadoop.pig.MongoInsertStorage('<idAlias>');
```


STORE dates_averages INTO 'mongodb://localhost:27017/demo.yield_aggregated' USING com.mongodb.hadoop.pig.MongoInsertStorage('', '' );

The `MongoInsertStorage` class also takes one argument: an `idAlias` as described above. If `idAlias` is left blank, an `ObjectId` will be
The `MongoInsertStorage` class also takes one argument: an `idAlias` as described above. If `idAlias` is left blank, an `ObjectId` will be
generated for the value of the `_id` field in each output document.

### Updating a MongoDB collection

Just like in the MongoDB javascript shell, you can now update documents in a MongoDB collection within a Pig script via
Just like in the MongoDB javascript shell, you can now update documents in a MongoDB collection within a Pig script via
`com.mongodb.hadoop.pig.MongoUpdateStorage`. Use:

```
Expand All @@ -129,7 +134,7 @@ where

Consider the following examples:

Assume we have an alias `data` that is a bag of tuples.
Assume we have an alias `data` that is a bag of tuples.
```
data =
{
Expand All @@ -141,7 +146,7 @@ data =
with schema `f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}`.
**Note**: Every pig data structure in a pig schema has to be named.

To insert the gender, first and last names of each person in `data` into a `test.persons_info` collection,
To insert the gender, first and last names of each person in `data` into a `test.persons_info` collection,
making sure that we update any existing documents with the same `first` and `last` fields, use
```
STORE data INTO 'mongodb://localhost:27017/test.persons_info'
Expand Down
16 changes: 12 additions & 4 deletions pig/src/main/java/com/mongodb/hadoop/pig/BSONStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;

import org.joda.time.DateTime;

import java.util.Date;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -58,7 +61,7 @@ public class BSONStorage extends StoreFunc implements StoreMetadata {
private String idField = null;

private final BSONFileOutputFormat outputFormat = new BSONFileOutputFormat();

public BSONStorage() {
}

Expand Down Expand Up @@ -113,6 +116,8 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
return o.toString();
case DataType.CHARARRAY:
return o;
case DataType.DATETIME:
return (o==null ? null : ((DateTime) o).toDate());

//Given a TUPLE, create a Map so BSONEncoder will eat it
case DataType.TUPLE:
Expand All @@ -123,7 +128,8 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
ResourceFieldSchema[] fs = s.getFields();
Map<String, Object> m = new LinkedHashMap<String, Object>();
for (int j = 0; j < fs.length; j++) {
m.put(fs[j].getName(), getTypeForBSON(((Tuple) o).get(j), fs[j], toIgnore));
String fn = FieldUtils.getEscFieldName(fs[j].getName());
m.put(fn, getTypeForBSON(((Tuple) o).get(j), fs[j], toIgnore));
}
return m;

Expand Down Expand Up @@ -159,7 +165,9 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
for (Tuple t : (DataBag) o) {
Map<String, Object> ma = new LinkedHashMap<String, Object>();
for (int j = 0; j < fs.length; j++) {
ma.put(fs[j].getName(), t.get(j));
String fn = FieldUtils.getEscFieldName(fs[j].getName());
Object data = getTypeForBSON(t.get(j), fs[j], null);
ma.put(fn, data);
}
a.add(ma);
}
Expand All @@ -184,7 +192,7 @@ public static Object getTypeForBSON(final Object o, final ResourceFieldSchema fi
@SuppressWarnings("unchecked")
protected void writeField(final BasicDBObjectBuilder builder, final ResourceFieldSchema field, final Object d) throws IOException {
Object convertedType = getTypeForBSON(d, field, null);
String fieldName = field != null ? field.getName() : "value";
String fieldName = field != null ? FieldUtils.getEscFieldName(field.getName()) : "value";

if (convertedType instanceof Map) {
for (Map.Entry<String, Object> mapentry : ((Map<String, Object>) convertedType).entrySet()) {
Expand Down
13 changes: 13 additions & 0 deletions pig/src/main/java/com/mongodb/hadoop/pig/FieldUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.mongodb.hadoop.pig;

public class FieldUtils {
// Escape name that starts with esc_
private static final String ESC_PREFIX = "esc_";

public static String getEscFieldName(String fieldName){
if (fieldName.startsWith(ESC_PREFIX)) {
fieldName = fieldName.replace(ESC_PREFIX, "");
}
return fieldName;
}
}
22 changes: 17 additions & 5 deletions pig/src/main/java/com/mongodb/hadoop/pig/MongoInsertStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -52,6 +53,8 @@ public class MongoInsertStorage extends StoreFunc implements StoreMetadata {

private String udfcSignature = null;
private String idField = null;
private String toIgnore = null;


private final MongoOutputFormat outputFormat = new MongoOutputFormat();

Expand All @@ -66,19 +69,25 @@ public MongoInsertStorage() {
public MongoInsertStorage(final String idField, final String useUpsert) {
this.idField = idField;
}

public MongoInsertStorage(final String idField) {
this.idField = idField;
}

public MongoInsertStorage(final String idField, final String useUpsert, final String toIgnore) {
this.idField = idField;
this.toIgnore = toIgnore;
}

protected void writeField(final BasicDBObjectBuilder builder,
final ResourceFieldSchema field,
final Object d) throws IOException {
Object convertedType = BSONStorage.getTypeForBSON(d, field, null);
Object convertedType = BSONStorage.getTypeForBSON(d, field, this.toIgnore);
if (field.getName() != null && field.getName().equals(idField)) {
builder.add("_id", convertedType);
} else {
builder.add(field.getName(), convertedType);
String fieldName = FieldUtils.getEscFieldName(field.getName());
builder.add(fieldName, convertedType);
}

}
Expand All @@ -92,6 +101,10 @@ public void checkSchema(final ResourceSchema schema) throws IOException {
p.setProperty(SCHEMA_SIGNATURE, schema.toString());
}

public void storeSchema(final ResourceSchema schema) {
this.schema = schema;
}

@Override
public void storeSchema(final ResourceSchema schema, final String location, final Job job) {
// not implemented
Expand Down Expand Up @@ -122,7 +135,7 @@ public void putNext(final Tuple tuple) throws IOException {

out.write(null, builder.get());
} catch (Exception e) {
throw new IOException("Couldn't convert tuple to bson: ", e);
throw new IOException("Couldn't convert tuple " + tuple + " to bson: ", e);
}
}

Expand Down Expand Up @@ -172,7 +185,6 @@ public void setStoreLocation(final String location, final Job job) throws IOExce
MongoConfigUtil.setOutputURI(config, location);
}


@Override
public void setStoreFuncUDFContextSignature(final String signature) {
udfcSignature = signature;
Expand Down
28 changes: 16 additions & 12 deletions pig/src/main/java/com/mongodb/hadoop/pig/MongoStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ protected void writeField(final BasicDBObjectBuilder builder,
final ResourceSchema.ResourceFieldSchema field,
final Object d) throws IOException {

String fieldName = FieldUtils.getEscFieldName(field.getName());

// If the field is missing or the value is null, write a null
if (d == null) {
builder.add(field.getName(), null);
builder.add(fieldName, null);
return;
}

Expand All @@ -127,35 +129,37 @@ protected void writeField(final BasicDBObjectBuilder builder,
// Based on the field's type, write it out
byte i = field.getType();
if (i == DataType.INTEGER) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.LONG) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.FLOAT) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.DOUBLE) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.DATETIME) {
builder.add(fieldName, d);
} else if (i == DataType.BYTEARRAY) {
builder.add(field.getName(), d.toString());
builder.add(fieldName, d.toString());
} else if (i == DataType.CHARARRAY) {
builder.add(field.getName(), d);
builder.add(fieldName, d);
} else if (i == DataType.TUPLE) {
// Given a TUPLE, create a Map so BSONEncoder will eat it
if (s == null) {
throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field "
+ field.getName());
+ fieldName);
}
ResourceFieldSchema[] fs = s.getFields();
Map<String, Object> m = new LinkedHashMap<String, Object>();
for (int j = 0; j < fs.length; j++) {
m.put(fs[j].getName(), ((Tuple) d).get(j));
}
builder.add(field.getName(), (Map) m);
builder.add(fieldName, (Map) m);
} else if (i == DataType.BAG) {
// Given a BAG, create an Array so BSONEncoder will eat it.
ResourceFieldSchema[] fs;
if (s == null) {
throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field "
+ field.getName());
+ fieldName);
}
fs = s.getFields();
if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
Expand All @@ -166,7 +170,7 @@ protected void writeField(final BasicDBObjectBuilder builder,
s = fs[0].getSchema();
if (s == null) {
throw new IOException("Schemas must be fully specified to use this storage function. No schema found for field "
+ field.getName());
+ fieldName);
}
fs = s.getFields();

Expand All @@ -179,7 +183,7 @@ protected void writeField(final BasicDBObjectBuilder builder,
a.add(ma);
}

builder.add(field.getName(), a);
builder.add(fieldName, a);
} else if (i == DataType.MAP) {
Map map = (Map) d;
for (Object key : map.keySet()) {
Expand Down