diff --git a/pom.xml b/pom.xml index 84e6a2ed..23c0bdc9 100755 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,12 @@ + + com.amazonaws.services.dynamodbv2 + amazon-dynamodb-transactions + 1.1.2 + + org.springframework diff --git a/src/main/java/org/socialsignin/spring/data/dynamodb/tx/DynamoDBTransactionManager.java b/src/main/java/org/socialsignin/spring/data/dynamodb/tx/DynamoDBTransactionManager.java new file mode 100644 index 00000000..5fe1c207 --- /dev/null +++ b/src/main/java/org/socialsignin/spring/data/dynamodb/tx/DynamoDBTransactionManager.java @@ -0,0 +1,104 @@ +package org.socialsignin.spring.data.dynamodb.tx; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.transactions.Transaction; +import com.amazonaws.services.dynamodbv2.transactions.TransactionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.support.AbstractPlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; + +@Component +public class DynamoDBTransactionManager extends AbstractPlatformTransactionManager { + private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBTransactionManager.class); + + private final AmazonDynamoDB client; + private final TransactionManager txManager; + private final String transactionsTableName; + private final String transactionsImageTableName; + private final long readCapacityUnits; + private final long writeCapacityUnits; + private final long waitTimeSeconds; + + @Autowired + public DynamoDBTransactionManager(AmazonDynamoDB client, @Value("${Transactions}") String transactionsTableName, + @Value("${TransactionImages}") String transactionsImageTableName, + @Value("${TransactionImages}") long readCapacityUnits, + @Value("${TransactionImages}") long writeCapacityUnits, + @Value("${TransactionImages}") long waitTimeSeconds) { + this.client = client; + this.transactionsTableName = transactionsTableName; + this.transactionsImageTableName = transactionsImageTableName; + + this.readCapacityUnits = readCapacityUnits; + this.writeCapacityUnits = writeCapacityUnits; + this.waitTimeSeconds = waitTimeSeconds; + + try { + initialize(); + } catch (InterruptedException e) { + throw new BeanInitializationException("Initialization of " + this.transactionsTableName + + "/" + this.transactionsImageTableName+ " failed!", e); + } + + this.txManager = new TransactionManager (client, this.transactionsTableName, this.transactionsImageTableName) ; + } + + private void initialize() throws InterruptedException { + LOGGER.trace("Initialize tables"); + + TransactionManager.verifyOrCreateTransactionTable(client, this.transactionsTableName, + this.readCapacityUnits, this.writeCapacityUnits, this.waitTimeSeconds); + + TransactionManager.verifyOrCreateTransactionImagesTable(client, this.transactionsImageTableName, + this.readCapacityUnits, this.writeCapacityUnits, this.waitTimeSeconds); + + LOGGER.debug("Finished table initialization"); + } + + private Transaction getTransaction(DefaultTransactionStatus status) { + return (Transaction)status.getTransaction(); + } + + @Override + protected Object doGetTransaction() throws TransactionException { + // Create a new transaction from the transaction manager + Transaction tx = txManager.newTransaction(); + + return tx; + } + + @Override + protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { + Transaction tx = (Transaction)transaction; + + LOGGER.debug("Begin for <{}>", tx); + } + + @Override + protected void doCommit(DefaultTransactionStatus status) throws TransactionException { + Transaction tx = getTransaction(status); + LOGGER.debug("Commit for <{}>", tx); + + tx.commit(); + LOGGER.trace("Delete for <{}>", tx); + tx.delete(); + } + + @Override + protected void doRollback(DefaultTransactionStatus status) throws TransactionException { + Transaction tx = getTransaction(status); + LOGGER.debug("Rollback for <{}>", tx); + + tx.rollback(); + LOGGER.trace("Delete for <{}>", tx); + tx.delete(); + } + +} diff --git a/src/test/java/org/socialsignin/spring/data/dynamodb/tx/DynamoDBTransactionManagerTest.java b/src/test/java/org/socialsignin/spring/data/dynamodb/tx/DynamoDBTransactionManagerTest.java new file mode 100644 index 00000000..2e451817 --- /dev/null +++ b/src/test/java/org/socialsignin/spring/data/dynamodb/tx/DynamoDBTransactionManagerTest.java @@ -0,0 +1,93 @@ +package org.socialsignin.spring.data.dynamodb.tx; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; +import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.amazonaws.services.dynamodbv2.model.TableStatus; +import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; +import com.amazonaws.services.dynamodbv2.model.UpdateItemResult; +import com.amazonaws.services.dynamodbv2.model.UpdateTableRequest; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.transaction.support.DefaultTransactionStatus; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Random; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.intThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DynamoDBTransactionManagerTest { + private static final Random r = new Random(); + private final long readCapacityUnits = r.nextLong(); + private final long writeCapacityUnits = r.nextLong(); + private final long waitTimeSeconds = 1; + private final String transactionsTableName = "transactionsTableName"; + private final String transactionsImageTableName = "transactionsImageTableName"; + @Mock + private AmazonDynamoDB client; + @Mock + private TransactionDefinition txDefinition; + + private DynamoDBTransactionManager underTest; + + @Before + public void setUp() { + + when(client.describeTable(any(DescribeTableRequest.class))).thenReturn( + new DescribeTableResult() + .withTable(new TableDescription() + .withAttributeDefinitions(new AttributeDefinition() + .withAttributeName("_TxId") + .withAttributeType(ScalarAttributeType.S)) + .withKeySchema(new KeySchemaElement() + .withAttributeName("_TxId") + .withKeyType(KeyType.HASH)) + .withTableStatus(TableStatus.ACTIVE)), + new DescribeTableResult() + .withTable(new TableDescription() + .withAttributeDefinitions(new AttributeDefinition() + .withAttributeName("_TxI") + .withAttributeType(ScalarAttributeType.S)) + .withKeySchema(new KeySchemaElement() + .withAttributeName("_TxI") + .withKeyType(KeyType.HASH)) + .withTableStatus(TableStatus.ACTIVE)) + ); + + when(client.updateItem(any(UpdateItemRequest.class))).thenReturn( + new UpdateItemResult().withAttributes(Collections.emptyMap())); + + underTest = new DynamoDBTransactionManager(client, transactionsTableName, + transactionsImageTableName, readCapacityUnits, writeCapacityUnits, waitTimeSeconds); + + } + + @Test + public void simulateCommit() { + TransactionDefinition txDefinition = new DefaultTransactionDefinition(); + TransactionStatus tx = underTest.getTransaction(txDefinition); + + + underTest.commit(tx); + + } + +}