Skip to content

Commit cc5cfec

Browse files
committed
Fix es.output.json for Pig's EsStorage. (#897)
Adding new tests to verify reading raw JSON works. fixes #871
1 parent 6b2654e commit cc5cfec

File tree

3 files changed

+279
-8
lines changed

3 files changed

+279
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.hadoop.integration.pig;
21+
22+
import com.google.common.collect.Lists;
23+
import org.elasticsearch.hadoop.Provisioner;
24+
import org.elasticsearch.hadoop.QueryTestParams;
25+
import org.elasticsearch.hadoop.mr.RestUtils;
26+
import org.junit.Assert;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.Parameterized;
31+
import org.junit.runners.Parameterized.Parameters;
32+
33+
import java.util.Collection;
34+
import java.util.List;
35+
36+
import static org.hamcrest.Matchers.is;
37+
import static org.hamcrest.Matchers.stringContainsInOrder;
38+
import static org.junit.Assert.assertThat;
39+
40+
@RunWith(Parameterized.class)
41+
public class AbstractPigReadAsJsonTest extends AbstractPigTests {
42+
43+
private static int testInstance = 0;
44+
private static String previousQuery;
45+
private boolean readMetadata;
46+
47+
@Parameters
48+
public static Collection<Object[]> queries() {
49+
return QueryTestParams.params();
50+
}
51+
52+
private final String query;
53+
54+
public AbstractPigReadAsJsonTest(String query, boolean metadata) {
55+
this.query = query;
56+
this.readMetadata = metadata;
57+
58+
if (!query.equals(previousQuery)) {
59+
previousQuery = query;
60+
testInstance++;
61+
}
62+
}
63+
64+
private String scriptHead;
65+
66+
@Before
67+
public void before() throws Exception {
68+
RestUtils.touch("json-pig");
69+
RestUtils.refresh("json-pig");
70+
71+
this.scriptHead = "REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" +
72+
"DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.index.read.missing.as.empty=true','es.query=" + query + "','es.read.metadata=" + readMetadata +"','es.output.json=true');";
73+
}
74+
75+
@Test
76+
public void testTuple() throws Exception {
77+
String script = scriptHead +
78+
"A = LOAD 'json-pig/tupleartists' USING EsStorage();" +
79+
"X = LIMIT A 3;" +
80+
//"DESCRIBE A;";
81+
"STORE A INTO '" + tmpPig() + "/testtuple';";
82+
pig.executeScript(script);
83+
84+
String results = getResults("" + tmpPig() + "/testtuple");
85+
86+
List<String> doc1 = Lists.newArrayList(
87+
"{\"number\":\"12\",\"name\":\"Behemoth\",\"url\":\"http://www.last.fm/music/Behemoth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/54196161.jpg\",\"@timestamp\":\"2011-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
88+
);
89+
if (readMetadata) {
90+
doc1.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"tupleartists\",\"_id\":\"");
91+
doc1.add("\",\"_score\":null,\"sort\":[");
92+
}
93+
94+
List<String> doc2 = Lists.newArrayList(
95+
"{\"number\":\"918\",\"name\":\"Megadeth\",\"url\":\"http://www.last.fm/music/Megadeth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/8129787.jpg\",\"@timestamp\":\"2871-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
96+
);
97+
if (readMetadata) {
98+
doc2.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"tupleartists\",\"_id\":\"");
99+
doc2.add("\",\"_score\":null,\"sort\":[");
100+
}
101+
102+
List<String> doc3 = Lists.newArrayList(
103+
"{\"number\":\"982\",\"name\":\"Foo Fighters\",\"url\":\"http://www.last.fm/music/Foo+Fighters\",\"picture\":\"http://userserve-ak.last.fm/serve/252/59495563.jpg\",\"@timestamp\":\"2933-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
104+
);
105+
if (readMetadata) {
106+
doc3.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"tupleartists\",\"_id\":\"");
107+
doc3.add("\",\"_score\":null,\"sort\":[");
108+
}
109+
110+
assertThat(results, stringContainsInOrder(doc1));
111+
assertThat(results, stringContainsInOrder(doc2));
112+
assertThat(results, stringContainsInOrder(doc3));
113+
}
114+
115+
@Test
116+
public void testTupleWithSchema() throws Exception {
117+
String script = scriptHead +
118+
"A = LOAD 'json-pig/tupleartists' USING EsStorage() AS (name:chararray);" +
119+
"B = ORDER A BY name DESC;" +
120+
"X = LIMIT B 3;" +
121+
"STORE B INTO '" + tmpPig() + "/testtupleschema';";
122+
pig.executeScript(script);
123+
124+
String results = getResults("" + tmpPig() + "/testtupleschema");
125+
126+
List<String> doc1 = Lists.newArrayList(
127+
"{\"number\":\"999\",\"name\":\"Thompson Twins\",\"url\":\"http://www.last.fm/music/Thompson+Twins\",\"picture\":\"http://userserve-ak.last.fm/serve/252/6943589.jpg\",\"@timestamp\":\"2950-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
128+
);
129+
if (readMetadata) {
130+
doc1.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"tupleartists\",\"_id\":\"");
131+
doc1.add("\",\"_score\":null,\"sort\":[");
132+
}
133+
134+
List<String> doc2 = Lists.newArrayList(
135+
"{\"number\":\"12\",\"name\":\"Behemoth\",\"url\":\"http://www.last.fm/music/Behemoth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/54196161.jpg\",\"@timestamp\":\"2011-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
136+
);
137+
if (readMetadata) {
138+
doc2.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"tupleartists\",\"_id\":\"");
139+
doc2.add("\",\"_score\":null,\"sort\":[");
140+
}
141+
142+
List<String> doc3 = Lists.newArrayList(
143+
"{\"number\":\"230\",\"name\":\"Green Day\",\"url\":\"http://www.last.fm/music/Green+Day\",\"picture\":\"http://userserve-ak.last.fm/serve/252/15291249.jpg\",\"@timestamp\":\"2215-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
144+
);
145+
if (readMetadata) {
146+
doc3.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"tupleartists\",\"_id\":\"");
147+
doc3.add("\",\"_score\":null,\"sort\":[");
148+
}
149+
150+
assertThat(results, stringContainsInOrder(doc1));
151+
assertThat(results, stringContainsInOrder(doc2));
152+
assertThat(results, stringContainsInOrder(doc3));
153+
}
154+
155+
@Test
156+
public void testFieldAlias() throws Exception {
157+
String script = scriptHead
158+
+ "A = LOAD 'json-pig/fieldalias' USING EsStorage();"
159+
+ "X = LIMIT A 3;"
160+
+ "STORE A INTO '" + tmpPig() + "/testfieldalias';";
161+
pig.executeScript(script);
162+
163+
String results = getResults("" + tmpPig() + "/testfieldalias");
164+
165+
List<String> doc1 = Lists.newArrayList(
166+
"{\"number\":\"12\",\"name\":\"Behemoth\",\"url\":\"http://www.last.fm/music/Behemoth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/54196161.jpg\",\"@timestamp\":\"2011-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
167+
);
168+
if (readMetadata) {
169+
doc1.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"fieldalias\",\"_id\":\"");
170+
doc1.add("\",\"_score\":null,\"sort\":[");
171+
}
172+
173+
List<String> doc2 = Lists.newArrayList(
174+
"{\"number\":\"918\",\"name\":\"Megadeth\",\"url\":\"http://www.last.fm/music/Megadeth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/8129787.jpg\",\"@timestamp\":\"2871-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
175+
);
176+
if (readMetadata) {
177+
doc2.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"fieldalias\",\"_id\":\"");
178+
doc2.add("\",\"_score\":null,\"sort\":[");
179+
}
180+
181+
List<String> doc3 = Lists.newArrayList(
182+
"{\"number\":\"982\",\"name\":\"Foo Fighters\",\"url\":\"http://www.last.fm/music/Foo+Fighters\",\"picture\":\"http://userserve-ak.last.fm/serve/252/59495563.jpg\",\"@timestamp\":\"2933-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
183+
);
184+
if (readMetadata) {
185+
doc3.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"fieldalias\",\"_id\":\"");
186+
doc3.add("\",\"_score\":null,\"sort\":[");
187+
}
188+
189+
assertThat(results, stringContainsInOrder(doc1));
190+
assertThat(results, stringContainsInOrder(doc2));
191+
assertThat(results, stringContainsInOrder(doc3));
192+
}
193+
194+
@Test
195+
public void testMissingIndex() throws Exception {
196+
String script = scriptHead
197+
+ "A = LOAD 'foo/bar' USING EsStorage();"
198+
+ "X = LIMIT A 3;"
199+
+ "STORE A INTO '" + tmpPig() + "/testmissingindex';";
200+
pig.executeScript(script);
201+
202+
String results = getResults("" + tmpPig() + "/testmissingindex");
203+
assertThat(results.length(), is(0));
204+
}
205+
206+
@Test
207+
public void testParentChild() throws Exception {
208+
String script = scriptHead
209+
+ "A = LOAD 'json-pig/child' USING EsStorage();"
210+
+ "X = LIMIT A 3;"
211+
+ "STORE A INTO '" + tmpPig() + "/testparentchild';";
212+
pig.executeScript(script);
213+
214+
String results = getResults("" + tmpPig() + "/testparentchild");
215+
216+
List<String> doc1 = Lists.newArrayList(
217+
"{\"number\":\"12\",\"name\":\"Behemoth\",\"url\":\"http://www.last.fm/music/Behemoth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/54196161.jpg\",\"@timestamp\":\"2011-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
218+
);
219+
if (readMetadata) {
220+
doc1.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"child\",\"_id\":\"");
221+
doc1.add("\",\"_score\":null,\"_routing\":\"12\",\"_parent\":\"12\",\"sort\":[");
222+
}
223+
224+
List<String> doc2 = Lists.newArrayList(
225+
"{\"number\":\"918\",\"name\":\"Megadeth\",\"url\":\"http://www.last.fm/music/Megadeth\",\"picture\":\"http://userserve-ak.last.fm/serve/252/8129787.jpg\",\"@timestamp\":\"2871-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
226+
);
227+
if (readMetadata) {
228+
doc2.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"child\",\"_id\":\"");
229+
doc2.add("\",\"_score\":null,\"_routing\":\"918\",\"_parent\":\"918\",\"sort\":[");
230+
}
231+
232+
List<String> doc3 = Lists.newArrayList(
233+
"{\"number\":\"982\",\"name\":\"Foo Fighters\",\"url\":\"http://www.last.fm/music/Foo+Fighters\",\"picture\":\"http://userserve-ak.last.fm/serve/252/59495563.jpg\",\"@timestamp\":\"2933-10-06T19:20:25.000Z\",\"list\":[\"quick\", \"brown\", \"fox\"]"
234+
);
235+
if (readMetadata) {
236+
doc3.add(",\"_metadata\":{\"_index\":\"json-pig\",\"_type\":\"child\",\"_id\":\"");
237+
doc3.add("\",\"_score\":null,\"_routing\":\"982\",\"_parent\":\"982\",\"sort\":[");
238+
}
239+
240+
assertThat(results, stringContainsInOrder(doc1));
241+
assertThat(results, stringContainsInOrder(doc2));
242+
assertThat(results, stringContainsInOrder(doc3));
243+
}
244+
245+
@Test
246+
public void testDynamicPattern() throws Exception {
247+
Assert.assertTrue(RestUtils.exists("json-pig/pattern-1"));
248+
Assert.assertTrue(RestUtils.exists("json-pig/pattern-500"));
249+
Assert.assertTrue(RestUtils.exists("json-pig/pattern-990"));
250+
}
251+
252+
@Test
253+
public void testDynamicPatternFormat() throws Exception {
254+
Assert.assertTrue(RestUtils.exists("json-pig/pattern-format-2010-10-06"));
255+
Assert.assertTrue(RestUtils.exists("json-pig/pattern-format-2200-10-06"));
256+
Assert.assertTrue(RestUtils.exists("json-pig/pattern-format-2873-10-06"));
257+
}
258+
259+
private static String tmpPig() {
260+
return "tmp-pig/json-read-" + testInstance;
261+
}
262+
}

pig/src/itest/java/org/elasticsearch/hadoop/integration/pig/PigSuite.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.junit.runners.Suite;
3232

3333
@RunWith(Suite.class)
34-
@Suite.SuiteClasses({ AbstractPigSaveTest.class, AbstractPigSaveJsonTest.class, AbstractPigSearchTest.class, AbstractPigSearchJsonTest.class, AbstractPigExtraTests.class })
34+
@Suite.SuiteClasses({ AbstractPigSaveTest.class, AbstractPigSaveJsonTest.class, AbstractPigSearchTest.class, AbstractPigSearchJsonTest.class, AbstractPigReadAsJsonTest.class, AbstractPigExtraTests.class })
3535
//@Suite.SuiteClasses({ AbstractPigSaveTest.class, AbstractPigSearchTest.class })
3636
//@Suite.SuiteClasses({ AbstractPigExtraTests.class })
3737
public class PigSuite {

pig/src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.io.StringReader;
2323
import java.util.Arrays;
24+
import java.util.HashMap;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.Map.Entry;
@@ -58,7 +59,6 @@
5859
import org.elasticsearch.hadoop.rest.InitializationUtils;
5960
import org.elasticsearch.hadoop.util.IOUtils;
6061
import org.elasticsearch.hadoop.util.ObjectUtils;
61-
import org.elasticsearch.hadoop.util.SettingsUtils;
6262
import org.elasticsearch.hadoop.util.StringUtils;
6363

6464
/**
@@ -91,6 +91,8 @@ public class EsStorage extends LoadFunc implements LoadMetadata, LoadPushDown, S
9191
private RecordWriter<Object, Object> writer;
9292
private PigTuple pigTuple;
9393

94+
private boolean isJSON = false;
95+
9496
private List<String> aliasesTupleNames;
9597

9698
public EsStorage() {
@@ -103,6 +105,7 @@ public EsStorage(String... configuration) {
103105
for (String string : configuration) {
104106
// replace ; with line separators
105107
properties.load(new StringReader(string));
108+
log.trace(properties.toString());
106109
}
107110
} catch (IOException ex) {
108111
throw new EsHadoopIllegalArgumentException("Cannot parse options " + Arrays.toString(configuration), ex);
@@ -145,13 +148,13 @@ private void init(String location, Job job, boolean read) {
145148

146149
settings = (read ? settings.setResourceRead(location) : settings.setResourceWrite(location));
147150

148-
boolean changed = false;
149151
InitializationUtils.checkIdForOperation(settings);
152+
InitializationUtils.setValueWriterIfNotSet(settings, PigValueWriter.class, log);
153+
InitializationUtils.setValueReaderIfNotSet(settings, PigValueReader.class, log);
154+
InitializationUtils.setBytesConverterIfNeeded(settings, PigBytesConverter.class, log);
155+
InitializationUtils.setFieldExtractorIfNotSet(settings, PigFieldExtractor.class, log);
150156

151-
changed |= InitializationUtils.setValueWriterIfNotSet(settings, PigValueWriter.class, log);
152-
changed |= InitializationUtils.setValueReaderIfNotSet(settings, PigValueReader.class, log);
153-
changed |= InitializationUtils.setBytesConverterIfNeeded(settings, PigBytesConverter.class, log);
154-
changed |= InitializationUtils.setFieldExtractorIfNotSet(settings, PigFieldExtractor.class, log);
157+
isJSON = settings.getOutputAsJson();
155158
}
156159

157160
@SuppressWarnings("unchecked")
@@ -264,7 +267,13 @@ public Tuple getNext() throws IOException {
264267
return null;
265268
}
266269

267-
Map dataMap = reader.getCurrentValue();
270+
Map dataMap;
271+
if (isJSON) {
272+
dataMap = new HashMap(1);
273+
dataMap.put("data", reader.getCurrentValue());
274+
} else {
275+
dataMap = reader.getCurrentValue();
276+
}
268277
Tuple tuple = TupleFactory.getInstance().newTuple(dataMap.size());
269278

270279
if (dataMap.isEmpty()) {

0 commit comments

Comments
 (0)