Skip to content

Commit b03cf68

Browse files
committed
Add basic Solr backbone component for reference use
1 parent 371ed3b commit b03cf68

1 file changed

Lines changed: 73 additions & 0 deletions

File tree

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.ohnlp.backbone.io.solr;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import org.apache.beam.sdk.io.solr.SolrIO;
5+
import org.apache.beam.sdk.schemas.Schema;
6+
import org.apache.beam.sdk.transforms.DoFn;
7+
import org.apache.beam.sdk.transforms.ParDo;
8+
import org.apache.beam.sdk.values.PBegin;
9+
import org.apache.beam.sdk.values.PCollection;
10+
import org.apache.beam.sdk.values.Row;
11+
import org.apache.solr.common.SolrDocument;
12+
import org.ohnlp.backbone.api.Extract;
13+
import org.ohnlp.backbone.api.exceptions.ComponentInitializationException;
14+
15+
/**
16+
* Reads in documents from Solr
17+
*
18+
* configuration format:
19+
* <code>
20+
* {
21+
* "host": "solrHost",
22+
* "user": "solrUser or NONE if no credentials needed",
23+
* "password": "solrPass or NONE if no credentials needed",
24+
* "query": "solr query to run",
25+
* "doc_id_field": "field_name_for_document_id",
26+
* "doc_text_field": "field_name_for_document_text"
27+
* }
28+
* </code>
29+
*/
30+
// TODO currently input is limited strictly to three fields, make this more flexible
31+
// Most likely possibility is to see a schema.xml as input
32+
public class SolrExtract extends Extract {
33+
private String solrHost;
34+
private String user;
35+
private String pass;
36+
private String query;
37+
private String docIdField;
38+
private String docTextField;
39+
40+
@Override
41+
public void initFromConfig(JsonNode config) throws ComponentInitializationException {
42+
this.solrHost = config.get("host").asText();
43+
this.user = config.get("user").asText();
44+
this.pass = config.get("password").asText();
45+
this.query = config.get("query").asText();
46+
this.docIdField = config.get("doc_id_field").asText();
47+
this.docTextField = config.get("doc_text_field").asText();
48+
}
49+
50+
@Override
51+
public PCollection<Row> expand(PBegin input) {
52+
Schema rowSchema = Schema.builder()
53+
.addStringField("note_id")
54+
.addStringField("note_text").build();
55+
SolrIO.ConnectionConfiguration config = SolrIO.ConnectionConfiguration.create(solrHost);
56+
if (!user.equals("NONE")) {
57+
config = config.withBasicCredentials(user, pass);
58+
}
59+
return SolrIO
60+
.read()
61+
.withConnectionConfiguration(config)
62+
.withQuery(query)
63+
.expand(input)
64+
.apply("Convert Solr Documents to Rows", ParDo.of(new DoFn<SolrDocument, Row>() {
65+
@ProcessElement
66+
public void processElement(@Element SolrDocument input, OutputReceiver<Row> output) {
67+
output.output(Row.withSchema(rowSchema).addValues(
68+
input.getFieldValue(docIdField),
69+
input.getFieldValue(docTextField)).build());
70+
}
71+
}));
72+
}
73+
}

0 commit comments

Comments
 (0)