Skip to content

Commit

Permalink
renamed
Browse files Browse the repository at this point in the history
  • Loading branch information
lindt committed Jun 20, 2021
1 parent c74df28 commit 45bb08c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.github.mobility-university</groupId>
<artifactId>partitioned-blocking-queue</artifactId>
<version>0.1</version>
<version>0.1.1</version>
<name>Partitionied Blocking Queue</name>
<description>BlockingQueue but for partitionied data to allow multi threaded streaming and keep order within partition</description>
<url>https://github.com/mobility-university/partitioned-blocking-queue</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* @param <Value> the value which should be queued
*/
@ThreadSafe
public class PartitionedBlockedQueue<Key, Value> {
public class PartitionedBlockingQueue<Key, Value> {
private final int numberOfPartitions;
private final List<Semaphore> semaphores;
private final List<Deque<Value>> queues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ public class PartitioniedBlockingQueueTest {
@Test()
public void constructs() {
// execute & verify
assertNotNull(new PartitionedBlockedQueue(1));
assertNotNull(new PartitionedBlockingQueue(1));
}

@Test()
public void acquiresValue() throws InterruptedException {
// setup
var queue = new PartitionedBlockedQueue<String,String>(1);
var queue = new PartitionedBlockingQueue<String,String>(1);
// execute
var actual = queue.acquire("key", "value");
// verify
Expand All @@ -35,7 +35,7 @@ public void acquiresValue() throws InterruptedException {
@Test()
public void acquiresParallelForDifferentPartitions() throws InterruptedException {
// setup
var queue = new PartitionedBlockedQueue<SpecificHash,String>(2);
var queue = new PartitionedBlockingQueue<SpecificHash,String>(2);

assertEquals(queue.acquire(new SpecificHash(0), "first value"), "first value");
// execute
Expand All @@ -47,7 +47,7 @@ public void acquiresParallelForDifferentPartitions() throws InterruptedException
@Test()
public void releasesAcquiredPartition() throws InterruptedException {
// setup
var queue = new PartitionedBlockedQueue<String,String>(1);
var queue = new PartitionedBlockingQueue<String,String>(1);
queue.acquire("key", "value");
// execute & verify
queue.release("key");
Expand All @@ -56,7 +56,7 @@ public void releasesAcquiredPartition() throws InterruptedException {
@Test()
public void acquiresAgainAfterRelease() throws InterruptedException {
// setup
var queue = new PartitionedBlockedQueue<String,String>(1);
var queue = new PartitionedBlockingQueue<String,String>(1);

assertEquals(queue.acquire("key", "value"), "value");
queue.release("key");
Expand All @@ -70,7 +70,7 @@ public void acquiresAgainAfterRelease() throws InterruptedException {
@Test()
public void blocksParallelAcquireToSamePartition() throws InterruptedException{
// setup
var queue = new PartitionedBlockedQueue<String,String>(1);
var queue = new PartitionedBlockingQueue<String,String>(1);
var eaters = Executors.newFixedThreadPool(1);
assertEquals(queue.acquire("key", "value"), "value");
var counter = new CountDownLatch(1);
Expand All @@ -92,7 +92,7 @@ public void blocksParallelAcquireToSamePartition() throws InterruptedException{
@Test()
public void providesAllValues() throws InterruptedException{
// setup
var queue = new PartitionedBlockedQueue<String,String>(10);
var queue = new PartitionedBlockingQueue<String,String>(10);
var workers = Executors.newFixedThreadPool(30);
var numberOfTasks = 1_000;
var expectedValues = IntStream.range(0, numberOfTasks).boxed().map(task -> String.format("value %s", task)).collect(Collectors.toList());
Expand Down Expand Up @@ -122,7 +122,7 @@ public void providesAllValues() throws InterruptedException{
@Test()
public void keepsOrder() throws InterruptedException{
// setup
var queue = new PartitionedBlockedQueue<String,String>(1);
var queue = new PartitionedBlockingQueue<String,String>(1);
var workers = Executors.newFixedThreadPool(30);
var numberOfTasks = 100;

Expand Down

0 comments on commit 45bb08c

Please sign in to comment.