Skip to content
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ FROM openjdk:17
EXPOSE 5000

COPY build/libs/*.jar .
CMD java -jar *.jar
CMD java -jar *.jar
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public void configure() throws Exception {
PaymentModeMapping mapping = paymentModeConfiguration.getByMode(paymentMode);

String tenantName = exchange.getProperty(TENANT_NAME, String.class);
tenantName = mapping.getDebulkingDfspid() == null ? tenantName : mapping.getDebulkingDfspid();
Map<String, Object> variables = exchange.getProperty(ZEEBE_VARIABLE, Map.class);
variables.put(PAYMENT_MODE, paymentMode);
variables.put(DEBULKINGDFSPID, mapping.getDebulkingDfspid() == null ? tenantName : mapping.getDebulkingDfspid());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.mifos.processor.bulk.camel.routes;

import static org.mifos.processor.bulk.camel.config.CamelProperties.LOCAL_FILE_PATH;
import static org.mifos.processor.bulk.camel.config.CamelProperties.OVERRIDE_HEADER;
import static org.mifos.processor.bulk.camel.config.CamelProperties.REGISTERING_INSTITUTE_ID;
import static org.mifos.processor.bulk.camel.config.CamelProperties.RESULT_TRANSACTION_LIST;
import static org.mifos.processor.bulk.camel.config.CamelProperties.SERVER_FILE_NAME;
import static org.mifos.processor.bulk.camel.config.CamelProperties.SERVER_SUB_BATCH_FILE_NAME_ARRAY;
import static org.mifos.processor.bulk.camel.config.CamelProperties.SUB_BATCH_COUNT;
Expand All @@ -22,7 +24,7 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -34,6 +36,7 @@
import org.apache.camel.LoggingLevel;
import org.mifos.processor.bulk.schema.SubBatchEntity;
import org.mifos.processor.bulk.schema.Transaction;
import org.mifos.processor.bulk.utility.TransactionParser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -69,18 +72,39 @@ public void configure() throws Exception {
List<String> subBatchFile = new ArrayList<>();
Set<String> distinctPayeeIds = transactionList.stream().map(Transaction::getPayeeDfspId).collect(Collectors.toSet());
logger.info("Payee id {}", distinctPayeeIds);
if (partyLookupEnabled && !distinctPayeeIds.isEmpty()) {
logger.info("Number of payeeId {}", distinctPayeeIds.size());
Boolean batchAccountLookup = (Boolean) exchange.getProperty("batchAccountLookup");
if (partyLookupEnabled && batchAccountLookup) {
// Create a map to store transactions for each payeeid
Map<String, List<Transaction>> transactionsByPayeeId = new HashMap<>();

// Split the list based on distinct payeeids
Map<String, String> subBatchIdPayeeMap = new HashMap<>();
Map<String, List<Transaction>> subBatchIdMap = new HashMap<>();
List<String> subBatchIdList = new ArrayList<>();
List<Transaction> updatedTransactionList = new ArrayList<Transaction>();
for (String payeeId : distinctPayeeIds) {
List<Transaction> transactionsForPayee = transactionList.stream()
.filter(transaction -> payeeId.equals(transaction.getPayeeDfspId())).collect(Collectors.toList());

transactionsByPayeeId.put(payeeId, transactionsForPayee);
String subBatchId = UUID.randomUUID().toString();
subBatchIdList.add(subBatchId);
subBatchIdPayeeMap.put(payeeId, subBatchId);
subBatchIdMap.put(subBatchId, transactionsForPayee);
}
logger.info("Number of SubBatch based on payeeId {}", subBatchIdList.size());
// mapping subBatchId in transactionList
for (String subBatchId : subBatchIdList) {
List<Transaction> transactions = subBatchIdMap.get(subBatchId);
for (Transaction transaction : transactions) {
for (Transaction originalTransaction : transactionList) {
if (originalTransaction.equals(transaction)) {
originalTransaction.setBatchId(subBatchId);
updatedTransactionList.add(originalTransaction);
}
}
}
}

for (String payeeId : distinctPayeeIds) {
List<Transaction> transactionsForSpecificPayee = transactionsByPayeeId.get(payeeId);
String filename = UUID.randomUUID() + "_" + "sub-batch-" + payeeId + ".csv";
Expand All @@ -90,9 +114,12 @@ public void configure() throws Exception {
File file = new File(filename);
SequenceWriter writer = csvMapper.writerWithSchemaFor(Transaction.class).with(csvSchema).writeValues(file);
for (Transaction transaction : transactionsForSpecificPayee) {
transaction.setBatchId(subBatchIdPayeeMap.get(payeeId));
writer.write(transaction);
}
exchange.setProperty(RESULT_TRANSACTION_LIST, updatedTransactionList);
subBatchFile.add(filename);
exchange.setProperty(TRANSACTION_LIST, updatedTransactionList);
}
} else {
List<String> lines = new ArrayList<>();
Expand All @@ -110,24 +137,40 @@ public void configure() throws Exception {
}

int subBatchCount = 1;
CsvSchema csvSchema = csvMapper.schemaFor(Transaction.class);
csvSchema = csvSchema.withHeader();
for (int i = 0; i < lines.size(); i += subBatchSize) {
String subBatchId = UUID.randomUUID().toString();
String filename = UUID.randomUUID() + "_" + "sub-batch-" + subBatchCount + ".csv";
FileWriter writer = new FileWriter(filename);
writer.write(header);
logger.info("SubBatch Id {}", subBatchId);

List<Transaction> subBatchTransactions = new ArrayList<>();
for (int j = i; j < Math.min(i + subBatchSize, lines.size()); j++) {
writer.write(lines.get(j) + System.lineSeparator());
Transaction transaction = TransactionParser.parseLineToTransaction(lines.get(j));
assert transaction != null;
transaction.setBatchId(subBatchId); // Set the subBatchId for the transaction
subBatchTransactions.add(transaction);
}

// Write the list of Transactions to the file
File file = new File(filename);
try (SequenceWriter writer = csvMapper.writer(csvSchema).writeValues(file)) {
writer.writeAll(subBatchTransactions);
} catch (IOException e) {
logger.error("Failed to write sub-batch file: " + filename, e);
}
writer.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the writer close is removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, I have used writer with try-with resource statement it automatically closes writer irrespectve of whether writing occurs or an exception is thrown.

logger.info("Created sub-batch with file name {}", filename);
subBatchFile.add(filename);
subBatchFile.add(filename); // Ensure this list is declared and accessible
subBatchCount++;
}
}
exchange.setProperty(SUB_BATCH_FILE_ARRAY, subBatchFile);
exchange.setProperty(SUB_BATCH_COUNT, subBatchFile.size());
exchange.setProperty(SUB_BATCH_CREATED, true);
exchange.setProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, new ArrayList<String>());
});
}).log("updating orignal").setProperty(LOCAL_FILE_PATH, exchangeProperty(SERVER_FILE_NAME))
.setProperty(OVERRIDE_HEADER, constant(true)) // default header in CSV file will be used
.to("direct:update-file-v2").to("direct:upload-file");

// Iterate through each CSVs of sub-batches and uploads in cloud
from("direct:upload-sub-batch-file").id("direct:upload-sub-batch-file").log("Starting upload of sub-batch file")
Expand Down Expand Up @@ -165,7 +208,7 @@ public void configure() throws Exception {

SubBatchEntity subBatchEntity = getDefaultSubBatchEntity();
subBatchEntity.setBatchId((String) zeebeVariables.get(BATCH_ID));
subBatchEntity.setSubBatchId(UUID.randomUUID().toString());
subBatchEntity.setSubBatchId(transactionList.get(0).getBatchId());
subBatchEntity.setRequestId((String) zeebeVariables.get(REQUEST_ID));
subBatchEntity.setCorrelationId((String) zeebeVariables.get(CLIENT_CORRELATION_ID));
subBatchEntity.setPayerFsp((String) zeebeVariables.get(PAYER_IDENTIFIER));
Expand Down
31 changes: 28 additions & 3 deletions src/main/java/org/mifos/processor/bulk/schema/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.Objects;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonPropertyOrder({ "id", "request_id", "payment_mode", "account_number", "payer_identifier_type", "payer_identifier",
"payee_identifier_type", "payee_identifier", "amount", "currency", "note", "program_shortcode", "cycle", "payee_dfsp_id" })
@JsonPropertyOrder({ "id", "request_id", "payment_mode", "payer_identifier_type", "payer_identifier",
"payee_identifier_type", "payee_identifier", "amount", "currency", "note", "program_shortcode", "cycle", "payee_dfsp_id",
"batch_id", "account_number" })
public class Transaction implements CsvSchema {

@JsonProperty("id")
Expand All @@ -32,6 +34,29 @@ public class Transaction implements CsvSchema {
@JsonProperty("currency")
private String currency;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Transaction that = (Transaction) o;
return id == that.id && Objects.equals(requestId, that.requestId) && Objects.equals(paymentMode, that.paymentMode)
&& Objects.equals(accountNumber, that.accountNumber) && Objects.equals(amount, that.amount)
&& Objects.equals(currency, that.currency) && Objects.equals(note, that.note)
&& Objects.equals(payerIdentifierType, that.payerIdentifierType) && Objects.equals(payerIdentifier, that.payerIdentifier)
&& Objects.equals(payeeIdentifierType, that.payeeIdentifierType) && Objects.equals(payeeIdentifier, that.payeeIdentifier)
&& Objects.equals(payeeDfspId, that.payeeDfspId);
}

@Override
public int hashCode() {
return Objects.hash(id, requestId, paymentMode, accountNumber, amount, currency, note, payerIdentifierType, payerIdentifier,
payeeIdentifierType, payeeIdentifier, payeeDfspId);
}

@JsonProperty("note")
private String note;

Expand All @@ -56,7 +81,7 @@ public class Transaction implements CsvSchema {
@JsonProperty("payee_dfsp_id")
private String payeeDfspId;

@JsonIgnore
@JsonProperty("batch_id")
private String batchId;

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.mifos.processor.bulk.utility;

import org.mifos.processor.bulk.schema.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TransactionParser {

private static final Logger logger = LoggerFactory.getLogger(TransactionParser.class);

private TransactionParser() {
throw new IllegalStateException("Utility class");
}

public static Transaction parseLineToTransaction(String line) {
try {
String[] parts = line.split(",", -1);
Transaction transaction = new Transaction();

if (parts.length > 0 && !parts[0].isEmpty()) {
transaction.setId(Integer.parseInt(parts[0]));
}
if (parts.length > 1) {
transaction.setRequestId(parts[1]);
}
if (parts.length > 2) {
transaction.setPaymentMode(parts[2]);
}
if (parts.length > 4) {
transaction.setPayerIdentifierType(parts[3]);
}
if (parts.length > 5) {
transaction.setPayerIdentifier(parts[4]);
}
if (parts.length > 6) {
transaction.setPayeeIdentifierType(parts[5]);
}
if (parts.length > 7) {
transaction.setPayeeIdentifier(parts[6]);
}
if (parts.length > 8) {
transaction.setAmount(parts[7]);
}
if (parts.length > 9) {
transaction.setCurrency(parts[8]);
}
if (parts.length > 10) {
transaction.setNote(parts[9]);
}
if (parts.length > 11) {
transaction.setProgramShortCode(parts[10]);
}
if (parts.length > 12) {
transaction.setCycle(parts[11]);
}
if (parts.length > 13) {
transaction.setPayeeDfspId(parts[12]);
}

return transaction;
} catch (Exception e) {
logger.error("Error parsing line to Transaction object: {}", line, e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public void setup() {
String filename = (String) variables.get(FILE_NAME);
String batchAccountLookupCallback = (String) variables.get("batchAccountLookupCallback");
variables.put(PARTY_LOOKUP_FAILED, false);
variables.put("batchAccountLookup", true);
exchange.setProperty(SERVER_FILE_NAME, filename);
exchange.setProperty("batchAccountLookupCallback", batchAccountLookupCallback);
exchange.setProperty("workflowInstanceKey", job.getProcessInstanceKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public void setup() {

String fileName = subBatches.remove(0);
SubBatchEntity subBatchEntity = null;

for (SubBatchEntity subBatch : subBatchEntityList) {
if (subBatch.getRequestFile().contains(fileName)) {
subBatchEntity = subBatch;
logger.info("SubBatchEntity found");
if (isSplittingEnabled) {
for (SubBatchEntity subBatch : subBatchEntityList) {
if (subBatch.getRequestFile().contains(fileName)) {
subBatchEntity = subBatch;
logger.info("SubBatchEntity found");
}
}
logger.debug("BatchEntity for this subbatch is {}", objectMapper.writeValueAsString(subBatchEntity));
}
logger.debug("BatchEntity for this subbatch is {}", objectMapper.writeValueAsString(subBatchEntity));

Exchange exchange = new DefaultExchange(camelContext);
exchange.setProperty(TENANT_NAME, variables.get(TENANT_ID));
exchange.setProperty(SERVER_FILE_NAME, fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public void setup() {
exchange.setProperty(SERVER_FILE_NAME, filename);
exchange.setProperty(ZEEBE_VARIABLE, variables);
exchange.setProperty("partyLookupFailed", partyLookupFailed);
exchange.setProperty("batchAccountLookup",
variables.get("batchAccountLookup") != null ? variables.get("batchAccountLookup") : false);

exchange.setProperty(SUB_BATCH_DETAILS, new ArrayList<SubBatchEntity>());

try {
Expand Down
6 changes: 3 additions & 3 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ config:
enable: true
field: "payerIdentifier"
splitting:
enable: false
enable: true
sub-batch-size: 5
formatting:
enable: false
Expand Down Expand Up @@ -159,7 +159,7 @@ bulk_processor:
hostname : "https://ph-ee-connector-bulk:8443"

csv:
columnNames: "id,request_id,payment_mode,payer_identifier_type,payer_identifier,payee_identifier_type,payee_identifier,amount,currency,note,account_number,program_shortcode,cycle,payee_dfsp_id"
columnNames: "id,request_id,payment_mode,payer_identifier_type,payer_identifier,payee_identifier_type,payee_identifier,amount,currency,note,account_number,program_shortcode,cycle,payee_dfsp_id,batch_id"
size : 100000 # in bytes

budget-account:
Expand Down Expand Up @@ -232,4 +232,4 @@ bpmns:
batch-transactions: "bulk_processor_account_lookup-{dfspid}"
- id: "lion"
flows:
batch-transactions: "bulk_processor-{dfspid}"
batch-transactions: "bulk_processor-{dfspid}"