Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 81 additions & 25 deletions src/main/java/org/si4t/solr/SolrIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,15 @@ public class SolrIndexer implements SearchIndex
private List<Configuration> solrCores = null;
private List<Configuration> solrUrls = null;
private String defaultCoreUrl = null;
private ConcurrentHashMap<String, BaseIndexData> itemRemovals = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, SearchIndexData> itemAdds = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, BinaryIndexData> binaryAdds = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, SearchIndexData> itemUpdates = new ConcurrentHashMap<>();

private ConcurrentHashMap<String, ConcurrentHashMap<String, BaseIndexData>> publicationItemRemovals = new ConcurrentHashMap<String, ConcurrentHashMap<String, BaseIndexData>>();
private ConcurrentHashMap<String, ConcurrentHashMap<String, SearchIndexData>> publicationItemAdds = new ConcurrentHashMap<String, ConcurrentHashMap<String, SearchIndexData>>();
private ConcurrentHashMap<String, ConcurrentHashMap<String, BinaryIndexData>> publicationBinaryAdds = new ConcurrentHashMap<String, ConcurrentHashMap<String, BinaryIndexData>>();
private ConcurrentHashMap<String, ConcurrentHashMap<String, SearchIndexData>> publicationItemUpdates = new ConcurrentHashMap<String, ConcurrentHashMap<String, SearchIndexData>>();

public SolrIndexer(){

}

private void setSolrUrl(String pubId) throws ConfigurationException
{
Expand Down Expand Up @@ -189,7 +194,18 @@ public void addBinaryToIndex(BinaryIndexData data) throws IndexingException
LOG.error("Addition failed. Unique ID is empty");
return;
}
this.binaryAdds.put(data.getUniqueIndexId(), data);

ConcurrentHashMap<String, BinaryIndexData> binaryAdds = this.publicationBinaryAdds.get(data.getPublicationItemId());

//If no Concurrent Hash map for the publication yet make one
if(binaryAdds == null){
binaryAdds = new ConcurrentHashMap<String, BinaryIndexData>();
}

binaryAdds.put(data.getUniqueIndexId(), data);

//Update the cached items for the publication
this.publicationBinaryAdds.put(data.getPublicationItemId(), binaryAdds);
}

/*
Expand All @@ -213,10 +229,19 @@ public void addItemToIndex(SearchIndexData data) throws IndexingException
LOG.warn("Item is: " + data.toString());
}

if (!this.itemAdds.containsKey(data.getUniqueIndexId()))
ConcurrentHashMap<String, SearchIndexData> itemAdds = this.publicationItemAdds.get(data.getPublicationItemId());

//If no Concurrent Hash map for the publication yet make one
if(itemAdds == null){
itemAdds = new ConcurrentHashMap<String, SearchIndexData>();
}

if (!itemAdds.containsKey(data.getUniqueIndexId()))
{
this.itemAdds.put(data.getUniqueIndexId(), data);
itemAdds.put(data.getUniqueIndexId(), data);
}

publicationItemAdds.put(data.getPublicationItemId(), itemAdds);
}

/*
Expand All @@ -233,7 +258,17 @@ public void removeBinaryFromIndex(BaseIndexData data) throws IndexingException
LOG.error("Removal addition failed. Unique ID empty");
return;
}
this.itemRemovals.put(data.getUniqueIndexId(), data);

ConcurrentHashMap<String, BaseIndexData> itemRemovals = this.publicationItemRemovals.get(data.getPublicationItemId());

//If no Concurrent Hash map for the publication yet make one
if(itemRemovals == null){
itemRemovals = new ConcurrentHashMap<String, BaseIndexData>();
}

itemRemovals.put(data.getUniqueIndexId(), data);

this.publicationItemRemovals.put(data.getPublicationItemId(), itemRemovals);
}

/*
Expand All @@ -252,7 +287,16 @@ public void removeItemFromIndex(BaseIndexData data) throws IndexingException
return;
}

this.itemRemovals.put(data.getUniqueIndexId(), data);
ConcurrentHashMap<String, BaseIndexData> itemRemovals = this.publicationItemRemovals.get(data.getPublicationItemId());

//If no Concurrent Hash map for the publication yet make one
if(itemRemovals == null){
itemRemovals = new ConcurrentHashMap<String, BaseIndexData>();
}

itemRemovals.put(data.getUniqueIndexId(), data);

this.publicationItemRemovals.put(data.getPublicationItemId(), itemRemovals);
}

/*
Expand All @@ -269,8 +313,17 @@ public void updateItemInIndex(SearchIndexData data) throws IndexingException
LOG.error("Adding update item failed. Unique ID empty");
return;
}
this.itemUpdates.put(data.getUniqueIndexId(), data);

ConcurrentHashMap<String, SearchIndexData> itemUpdates = this.publicationItemUpdates.get(data.getPublicationItemId());

//If no Concurrent Hash map for the publication yet make one
if(itemUpdates == null){
itemUpdates = new ConcurrentHashMap<String, SearchIndexData>();
}

itemUpdates.put(data.getUniqueIndexId(), data);

this.publicationItemUpdates.put(data.getPublicationItemId(), itemUpdates);
}

/*
Expand All @@ -285,10 +338,10 @@ public void commit(String publicationId) throws IndexingException

this.setSolrUrl(publicationId);

this.commitAddContentToSolr(this.itemAdds);
this.commitAddBinariesToSolr();
this.removeItemsFromSolr();
this.processItemUpdates();
this.commitAddContentToSolr(this.publicationItemAdds.get(publicationId));
this.commitAddBinariesToSolr(this.publicationBinaryAdds.get(publicationId));
this.removeItemsFromSolr(this.publicationItemRemovals.get(publicationId));
this.processItemUpdates(this.publicationItemUpdates.get(publicationId));
}

catch (SolrServerException e)
Expand Down Expand Up @@ -325,10 +378,11 @@ public void commit(String publicationId) throws IndexingException

private void clearRegisters()
{
itemAdds.clear();
binaryAdds.clear();
itemRemovals.clear();
itemUpdates.clear();
//Need to loop through each publication ones
this.publicationItemAdds.clear();
this.publicationBinaryAdds.clear();
this.publicationItemRemovals.clear();
this.publicationItemUpdates.clear();
}

/**
Expand All @@ -343,14 +397,16 @@ private void clearRegisters()
* @throws IOException
* @throws SAXException
* @throws SolrServerException
* @param itemUpdates
*/
private void processItemUpdates() throws ParserConfigurationException, IOException, SAXException, SolrServerException
private void processItemUpdates(ConcurrentHashMap<String, SearchIndexData> itemUpdates) throws ParserConfigurationException, IOException, SAXException, SolrServerException
{
this.commitAddContentToSolr(this.itemUpdates);
this.commitAddContentToSolr(itemUpdates);
}

private void commitAddBinariesToSolr() throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException {
if (this.binaryAdds.size() > 0)
private void commitAddBinariesToSolr(ConcurrentHashMap<String, BinaryIndexData> binaryAdds) throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException
{
if (binaryAdds.size() > 0)
{
LOG.info("Adding binaries to Solr.");

Expand Down Expand Up @@ -443,13 +499,13 @@ private static SolrInputDocument constructInputDocument(SearchIndexData data, Lo
return doc;
}

private void removeItemsFromSolr() throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException {
if (this.itemRemovals.size() > 0)
private void removeItemsFromSolr(ConcurrentHashMap<String, BaseIndexData> itemRemovals) throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException {
if (itemRemovals.size() > 0)
{
LOG.info
(
SolrIndexDispatcher.INSTANCE.removeFromSolr(
this.itemRemovals.keySet(),
itemRemovals.keySet(),
new SolrClientRequest
(
this.solrHome + "-" + this.coreName,
Expand Down