Skip to content

Commit b69025c

Browse files
committed
Fix es.output.json for Hive (#899)
Hive will select the first compatible field in the table's schema to set the json value to. This field must be a String value. fixes #896
1 parent cc5cfec commit b69025c

File tree

5 files changed

+267
-10
lines changed

5 files changed

+267
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.hadoop.integration.hive;
21+
22+
import org.apache.hive.service.cli.HiveSQLException;
23+
import org.elasticsearch.hadoop.QueryTestParams;
24+
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
25+
import org.elasticsearch.hadoop.mr.RestUtils;
26+
import org.junit.After;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.Parameterized;
31+
import org.junit.runners.Parameterized.Parameters;
32+
33+
import java.util.ArrayList;
34+
import java.util.Arrays;
35+
import java.util.Collection;
36+
import java.util.List;
37+
38+
import static org.elasticsearch.hadoop.integration.hive.HiveSuite.provisionEsLib;
39+
import static org.elasticsearch.hadoop.integration.hive.HiveSuite.server;
40+
import static org.junit.Assert.assertEquals;
41+
import static org.junit.Assert.assertTrue;
42+
import static org.junit.Assert.fail;
43+
44+
@SuppressWarnings("Duplicates")
45+
@RunWith(Parameterized.class)
46+
public class AbstractHiveReadJsonTest {
47+
48+
private static int testInstance = 0;
49+
private final boolean readMetadata;
50+
51+
@Parameters
52+
public static Collection<Object[]> queries() {
53+
return QueryTestParams.params();
54+
}
55+
56+
private final String query;
57+
58+
public AbstractHiveReadJsonTest(String query, boolean readMetadata) {
59+
this.query = query;
60+
this.readMetadata = readMetadata;
61+
}
62+
63+
@Before
64+
public void before() throws Exception {
65+
provisionEsLib();
66+
RestUtils.refresh("json-hive");
67+
}
68+
69+
@After
70+
public void after() throws Exception {
71+
testInstance++;
72+
HiveSuite.after();
73+
}
74+
75+
@Test
76+
public void basicLoad() throws Exception {
77+
78+
String create = "CREATE EXTERNAL TABLE jsonartistsread" + testInstance + " (data INT, garbage INT, garbage2 STRING) "
79+
+ tableProps("json-hive/artists", "'es.output.json' = 'true'", "'es.mapping.names'='garbage2:refuse'");
80+
81+
String select = "SELECT * FROM jsonartistsread" + testInstance;
82+
83+
server.execute(create);
84+
List<String> result = server.execute(select);
85+
assertTrue("Hive returned null", containsNoNull(result));
86+
assertContains(result, "Marilyn");
87+
assertContains(result, "last.fm/music/MALICE");
88+
assertContains(result, "last.fm/serve/252/5872875.jpg");
89+
}
90+
91+
@Test
92+
public void basicLoadWithNameMappings() throws Exception {
93+
94+
String create = "CREATE EXTERNAL TABLE jsonartistsread" + testInstance + " (refuse INT, garbage INT, data STRING) "
95+
+ tableProps("json-hive/artists", "'es.output.json' = 'true'", "'es.mapping.names'='data:boomSomethingYouWerentExpecting'");
96+
97+
String select = "SELECT * FROM jsonartistsread" + testInstance;
98+
99+
server.execute(create);
100+
List<String> result = server.execute(select);
101+
assertTrue("Hive returned null", containsNoNull(result));
102+
assertContains(result, "Marilyn");
103+
assertContains(result, "last.fm/music/MALICE");
104+
assertContains(result, "last.fm/serve/252/5872875.jpg");
105+
}
106+
107+
@Test(expected = HiveSQLException.class)
108+
public void basicLoadWithNoGoodCandidateField() throws Exception {
109+
110+
String create = "CREATE EXTERNAL TABLE jsonartistsread" + testInstance + " (refuse INT, garbage INT) "
111+
+ tableProps("json-hive/artists", "'es.output.json' = 'true'");
112+
113+
String select = "SELECT * FROM jsonartistsread" + testInstance;
114+
115+
server.execute(create);
116+
server.execute(select);
117+
118+
fail("Should have broken because there are no String fields in the table schema to place the JSON data.");
119+
}
120+
121+
@Test
122+
public void testMissingIndex() throws Exception {
123+
String create = "CREATE EXTERNAL TABLE jsonmissingread" + testInstance + " (data STRING) "
124+
+ tableProps("foobar/missing", "'es.index.read.missing.as.empty' = 'true'", "'es.output.json' = 'true'");
125+
126+
String select = "SELECT * FROM jsonmissingread" + testInstance;
127+
128+
server.execute(create);
129+
List<String> result = server.execute(select);
130+
assertEquals(0, result.size());
131+
}
132+
133+
@Test
134+
public void testParentChild() throws Exception {
135+
String create = "CREATE EXTERNAL TABLE jsonchildread" + testInstance + " (data STRING) "
136+
+ tableProps("json-hive/child", "'es.index.read.missing.as.empty' = 'true'", "'es.output.json' = 'true'");
137+
138+
String select = "SELECT * FROM jsonchildread" + testInstance;
139+
140+
System.out.println(server.execute(create));
141+
List<String> result = server.execute(select);
142+
assertTrue("Hive returned null", containsNoNull(result));
143+
assertTrue(result.size() > 1);
144+
assertContains(result, "Marilyn");
145+
assertContains(result, "last.fm/music/MALICE");
146+
assertContains(result, "last.fm/serve/252/2181591.jpg");
147+
}
148+
149+
private static boolean containsNoNull(List<String> str) {
150+
for (String string : str) {
151+
if (string.contains("NULL")) {
152+
return false;
153+
}
154+
}
155+
156+
return true;
157+
}
158+
159+
private static void assertContains(List<String> str, String content) {
160+
for (String string : str) {
161+
if (string.contains(content)) {
162+
return;
163+
}
164+
}
165+
fail(String.format("'%s' not found in %s", content, str));
166+
}
167+
168+
169+
private String tableProps(String resource, String... params) {
170+
List<String> copy = new ArrayList(Arrays.asList(params));
171+
copy.add("'" + ConfigurationOptions.ES_READ_METADATA + "'='" + readMetadata + "'");
172+
return HiveSuite.tableProps(resource, query, copy.toArray(new String[copy.size()]));
173+
}
174+
}

hive/src/itest/java/org/elasticsearch/hadoop/integration/hive/HiveSuite.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737
import org.junit.runners.Suite;
3838

3939
@RunWith(Suite.class)
40-
@Suite.SuiteClasses({ AbstractHiveSaveTest.class, AbstractHiveSaveJsonTest.class, AbstractHiveSearchTest.class, AbstractHiveSearchJsonTest.class, AbstractHiveExtraTests.class})
40+
@Suite.SuiteClasses({ AbstractHiveSaveTest.class, AbstractHiveSaveJsonTest.class, AbstractHiveSearchTest.class, AbstractHiveSearchJsonTest.class, AbstractHiveReadJsonTest.class, AbstractHiveExtraTests.class})
4141
//@Suite.SuiteClasses({ AbstractHiveSaveJsonTest.class, AbstractHiveSearchJsonTest.class })
42+
//@Suite.SuiteClasses({ AbstractHiveSaveJsonTest.class, AbstractHiveReadJsonTest.class })
4243
//@Suite.SuiteClasses({ AbstractHiveSaveTest.class })
4344
public class HiveSuite {
4445

hive/src/main/java/org/elasticsearch/hadoop/hive/EsHiveInputFormat.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ public FileSplit[] getSplits(JobConf job, int numSplits) throws IOException {
102102
Log log = LogFactory.getLog(getClass());
103103
// move on to initialization
104104
InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
105-
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));
105+
if (settings.getOutputAsJson() == false) {
106+
// Only set the fields if we aren't asking for raw JSON
107+
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));
108+
}
106109
// set read resource
107110
settings.setResourceRead(settings.getResourceRead());
108111
HiveUtils.init(settings, log);

hive/src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java

+27-3
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,8 @@
4242
import org.apache.hadoop.io.NullWritable;
4343
import org.apache.hadoop.io.Text;
4444
import org.apache.hadoop.io.Writable;
45-
import org.apache.hadoop.mapred.JobTracker;
45+
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
4646
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
47-
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
4847
import org.elasticsearch.hadoop.cfg.Settings;
4948
import org.elasticsearch.hadoop.rest.InitializationUtils;
5049
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
@@ -71,6 +70,8 @@ public class EsSerDe extends AbstractSerDe {
7170

7271
private boolean writeInitialized = false;
7372
private boolean trace = false;
73+
private boolean outputJSON = false;
74+
private Text jsonFieldName = null;
7475

7576

7677
// introduced in Hive 0.14
@@ -89,6 +90,10 @@ public void initialize(Configuration conf, Properties tbl, Properties partitionP
8990
this.tableProperties = tbl;
9091

9192
trace = log.isTraceEnabled();
93+
outputJSON = settings.getOutputAsJson();
94+
if (outputJSON) {
95+
jsonFieldName = new Text(HiveUtils.discoverJsonFieldName(settings, alias));
96+
}
9297
}
9398

9499

@@ -102,7 +107,13 @@ public Object deserialize(Writable blob) throws SerDeException {
102107
if (blob == null || blob instanceof NullWritable) {
103108
return null;
104109
}
105-
Object des = hiveFromWritable(structTypeInfo, blob, alias);
110+
111+
Writable deserialize = blob;
112+
if (outputJSON) {
113+
deserialize = wrapJsonData(blob);
114+
}
115+
116+
Object des = hiveFromWritable(structTypeInfo, deserialize, alias);
106117

107118
if (trace) {
108119
log.trace(String.format("Deserialized [%s] to [%s]", blob, des));
@@ -111,6 +122,19 @@ public Object deserialize(Writable blob) throws SerDeException {
111122
return des;
112123
}
113124

125+
private Writable wrapJsonData(Writable blob) {
126+
Assert.isTrue(blob instanceof Text, "Property `es.output.json` is enabled, but returned data was not of type Text...");
127+
128+
switch (structTypeInfo.getCategory()) {
129+
case STRUCT:
130+
Map<Writable, Writable> mapContainer = new MapWritable();
131+
mapContainer.put(jsonFieldName, blob);
132+
return (Writable) mapContainer;
133+
default:
134+
throw new EsHadoopIllegalStateException("Could not correctly wrap JSON data for structural type " + structTypeInfo.getCategory());
135+
}
136+
}
137+
114138
@Override
115139
public ObjectInspector getObjectInspector() throws SerDeException {
116140
return inspector;

hive/src/main/java/org/elasticsearch/hadoop/hive/HiveUtils.java

+60-5
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import java.util.ArrayList;
2222
import java.util.Collection;
2323
import java.util.Collections;
24+
import java.util.HashSet;
25+
import java.util.Iterator;
2426
import java.util.LinkedHashMap;
2527
import java.util.List;
2628
import java.util.Map;
2729
import java.util.Map.Entry;
2830
import java.util.Properties;
31+
import java.util.Set;
2932

3033
import org.apache.commons.logging.Log;
3134
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
@@ -38,6 +41,7 @@
3841
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
3942
import org.elasticsearch.hadoop.cfg.Settings;
4043
import org.elasticsearch.hadoop.rest.InitializationUtils;
44+
import org.elasticsearch.hadoop.util.Assert;
4145
import org.elasticsearch.hadoop.util.FieldAlias;
4246
import org.elasticsearch.hadoop.util.ObjectUtils;
4347
import org.elasticsearch.hadoop.util.SettingsUtils;
@@ -71,15 +75,18 @@ static StructTypeInfo typeInfo(StructObjectInspector inspector) {
7175
return (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(inspector);
7276
}
7377

78+
/**
79+
* Renders the full collection of field names needed from ES by combining the names of
80+
* the hive table fields with the user provided name mappings.
81+
* @param settings Settings to pull hive column names and user name mappings from.
82+
* @return A collection of ES field names
83+
*/
7484
static Collection<String> columnToAlias(Settings settings) {
7585
FieldAlias fa = alias(settings);
7686
List<String> columnNames = StringUtils.tokenize(settings.getProperty(HiveConstants.COLUMNS), ",");
87+
7788
// eliminate virtual columns
7889
// we can't use virtual columns since some distro don't have this field...
79-
// for (VirtualColumn vc : VirtualColumn.VIRTUAL_COLUMNS) {
80-
// columnNames.remove(vc.getName());
81-
// }
82-
8390
for (String vc : HiveConstants.VIRTUAL_COLUMNS) {
8491
columnNames.remove(vc);
8592
}
@@ -94,6 +101,12 @@ static Collection<String> columnToAlias(Settings settings) {
94101
return columnNames;
95102
}
96103

104+
/**
105+
* Reads the current aliases, and then the set of hive column names. Remaps the raw hive column names (_col1, _col2)
106+
* to the names used in the hive table, or, if the mappings exist, the names in the mappings instead.
107+
* @param settings Settings to pull user name mappings and hive column names from
108+
* @return FieldAlias mapping object to go from hive column name to ES field name
109+
*/
97110
static FieldAlias alias(Settings settings) {
98111
Map<String, String> aliasMap = SettingsUtils.aliases(settings.getProperty(HiveConstants.MAPPING_NAMES), true);
99112

@@ -117,11 +130,53 @@ static FieldAlias alias(Settings settings) {
117130
return new FieldAlias(aliasMap, true);
118131
}
119132

133+
/**
134+
* Selects an appropriate field from the given Hive table schema to insert JSON data into if the feature is enabled
135+
* @param settings Settings to read schema information from
136+
* @return A FieldAlias object that projects the json source field into the select destination field
137+
*/
138+
static String discoverJsonFieldName(Settings settings, FieldAlias alias) {
139+
Set<String> virtualColumnsToBeRemoved = new HashSet<String>(HiveConstants.VIRTUAL_COLUMNS.length);
140+
Collections.addAll(virtualColumnsToBeRemoved, HiveConstants.VIRTUAL_COLUMNS);
141+
142+
List<String> columnNames = StringUtils.tokenize(settings.getProperty(HiveConstants.COLUMNS), ",");
143+
Iterator<String> nameIter = columnNames.iterator();
144+
145+
List<String> columnTypes = StringUtils.tokenize(settings.getProperty(HiveConstants.COLUMNS_TYPES), ":");
146+
Iterator<String> typeIter = columnTypes.iterator();
147+
148+
String candidateField = null;
149+
150+
while(nameIter.hasNext() && candidateField == null) {
151+
String columnName = nameIter.next();
152+
String type = typeIter.next();
153+
154+
if ("string".equalsIgnoreCase(type) && !virtualColumnsToBeRemoved.contains(columnName)) {
155+
candidateField = columnName;
156+
}
157+
}
158+
159+
Assert.hasText(candidateField, "Could not identify a field to insert JSON data into " +
160+
"from the given fields : {" + columnNames + "} of types {" + columnTypes + "}");
161+
162+
// If the candidate field is aliased to something else, find the alias name and use that for the field name:
163+
candidateField = alias.toES(candidateField);
164+
165+
return candidateField;
166+
}
167+
168+
/**
169+
* @param settings The settings to extract the list of hive columns from
170+
* @return A map of column names to "_colX" where "X" is the place in the struct
171+
*/
120172
static Map<String, String> columnMap(Settings settings) {
121173
return columnMap(settings.getProperty(HiveConstants.COLUMNS));
122174
}
123175

124-
// returns a map of {<column-name>:_colX}
176+
/**
177+
* @param columnString The comma separated list of hive column names
178+
* @return A map of column names to "_colX" where "X" is the place in the struct
179+
*/
125180
private static Map<String, String> columnMap(String columnString) {
126181
// add default aliases for serialization (mapping name -> _colX)
127182
List<String> columnNames = StringUtils.tokenize(columnString, ",");

0 commit comments

Comments
 (0)