Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nazarov_Anton/part-3 & Node #62

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,59 @@
import java.util.function.Consumer;

public class ZipWithArraySpliterator<A, B> extends Spliterators.AbstractSpliterator<Pair<A, B>> {


private final Spliterator<A> inner;
private Spliterator<A> inner;
private final B[] array;
private int currentIndex;

public ZipWithArraySpliterator(Spliterator<A> inner, B[] array) {
super(Long.MAX_VALUE, 0); // FIXME:
// TODO
throw new UnsupportedOperationException();
public ZipWithArraySpliterator(final Spliterator<A> inner, final B[] array) {
this(0, inner, array);
}

private ZipWithArraySpliterator(final long firstIndex, final Spliterator<A> inner, final B[] array) {
super(Math.min(array.length - firstIndex, inner.estimateSize()), inner.characteristics());
this.inner = inner;
this.array = array;
this.currentIndex = (int) firstIndex;
}

@Override
public int characteristics() {
// TODO
throw new UnsupportedOperationException();
return inner.characteristics() & ~SORTED;
}

@Override
public boolean tryAdvance(Consumer<? super Pair<A, B>> action) {
// TODO
throw new UnsupportedOperationException();
public boolean tryAdvance(final Consumer<? super Pair<A, B>> action) {
return inner.tryAdvance(i -> {
action.accept(new Pair<>(i, array[currentIndex++]));
});
}


@Override
public void forEachRemaining(Consumer<? super Pair<A, B>> action) {
// TODO
throw new UnsupportedOperationException();
public void forEachRemaining(final Consumer<? super Pair<A, B>> action) {
for (long i = currentIndex; i < array.length; i++) {
inner.tryAdvance(t -> action.accept(new Pair<>(t, array[currentIndex++])));
}
}

@Override
public Spliterator<Pair<A, B>> trySplit() {
// TODO
throw new UnsupportedOperationException();
if (inner.hasCharacteristics(SUBSIZED)) {
final long size = estimateSize();
final Spliterator<A> aSpliterator = inner.trySplit();
if (aSpliterator == null) {
return null;
}
final ZipWithArraySpliterator<A, B> res = new ZipWithArraySpliterator<>(currentIndex, aSpliterator, array);
currentIndex += (int) (size / 2);
return res;
} else {
return super.trySplit();
}
}

@Override
public long estimateSize() {
// TODO
throw new UnsupportedOperationException();
return Math.min(array.length - currentIndex, inner.estimateSize());
}
}
18 changes: 12 additions & 6 deletions src/main/java/spliterators/part4/data/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

public class Node<T> {
private final T value;
private final Node<T> left;
private final Node<T> right;
private Node<T> left;
private Node<T> right;

public Node(T value, Node<T> left, Node<T> right) {
this.value = Objects.requireNonNull(value);
Expand All @@ -27,9 +27,15 @@ public Node<T> getRight() {
return right;
}

public Stream<T> stream() {
// TODO
throw new UnsupportedOperationException();
//return StreamSupport.stream(new NodeSplitertor(this));
public Stream<T> stream(boolean isParallel) {
return StreamSupport.stream(new NodeSpliterator<>(this), isParallel);
}

public void setNullChild(final Node<T> node) {
if (right == null) {
right = node;
} else {
left = node;
}
}
}
64 changes: 64 additions & 0 deletions src/main/java/spliterators/part4/data/NodeSpliterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package spliterators.part4.data;

import java.util.Spliterator;
import java.util.Spliterators;
import java.util.Stack;
import java.util.function.Consumer;

public class NodeSpliterator<T> extends Spliterators.AbstractSpliterator<T> {

private Node<T> node;
private final Stack<Node<T>> toDo;

public NodeSpliterator(final Node<T> node) {
super(Integer.MAX_VALUE, IMMUTABLE);
this.node = node;
toDo = new Stack<>();
}

@Override
public void forEachRemaining(final Consumer<? super T> action) {
if (node != null) {
forEach(node, toDo);
}
toDo.forEach(n -> {
action.accept(n.getValue());
});
}

private void forEach(final Node<T> node, final Stack<Node<T>> stack) {
stack.add(node);
if (node.getLeft() != null) {
forEach(node.getLeft(), stack);
}
if (node.getRight() != null) {
forEach(node.getRight(), stack);
}
}

@Override
public Spliterator<T> trySplit() {
if (node == null) {
return null;
}
final NodeSpliterator<T> left = new NodeSpliterator<>(node.getLeft());
toDo.add(node);
node = node.getRight();
return left;
}

@Override
public boolean tryAdvance(final Consumer<? super T> action) {
if (toDo.isEmpty()) {
return false;
}
final Node<T> pop = toDo.pop();
action.accept(pop.getValue());
return true;
}

@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package spliterators.part3.exercise;

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static junit.framework.TestCase.assertEquals;

public class ZipWithArraySpliteratorTest {
private Integer[] array, index;
private final static int MAX_SIZE = 10000;
private final static int INDEX_SIZE = 10000;

private List<Pair> res;


@Before
public void setup() {
array = new Integer[MAX_SIZE];
index = new Integer[INDEX_SIZE];
res = new ArrayList<>();
for (int i = 0; i < array.length; i++) {
array[i] = ThreadLocalRandom.current().nextInt();
index[i] = i;
res.add(new Pair<>(array[i], index[i]));
}
}

private Stream<Pair<Integer, Integer>> getMyStream(final boolean isParallel, final int rightIndex, final int rightSpliterator) {
final Spliterator<Integer> spliterator = Arrays
.spliterator(Arrays.copyOf(array, rightSpliterator));
return StreamSupport.stream(new ZipWithArraySpliterator<>(spliterator, Arrays.copyOf(index, rightIndex)), isParallel);
}

@Test
public void testSeq() {
final Object collect = getMyStream(false, INDEX_SIZE, MAX_SIZE)
.collect(Collectors.toList());
assertEquals(res, collect);
}

@Test
public void testPar() {
final Object collect = getMyStream(true, INDEX_SIZE, MAX_SIZE)
.collect(Collectors.toList());
assertEquals(res, collect);
}

@Test
public void testSeqSub() {
final int offset = 5000;
final Object collect = getMyStream(false, INDEX_SIZE - offset, MAX_SIZE)
.collect(Collectors.toList());
assertEquals(res.subList(0, INDEX_SIZE - offset) , collect);
}

@Test
public void testParSub() {
final int offset = 5000;
final Object collect = getMyStream(true, INDEX_SIZE, MAX_SIZE - offset)
.collect(Collectors.toList());
assertEquals(res.subList(0, MAX_SIZE - offset), collect);
}

}
88 changes: 88 additions & 0 deletions src/test/java/spliterators/part4/data/NodeSpliteratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package spliterators.part4.data;

import org.junit.Before;
import org.junit.Test;

import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.Assert.assertEquals;

public class NodeSpliteratorTest {
private Node<Integer> head;
private final static int LIMIT = 1000;
private Set<Integer> expected;

private Node<Integer> build(int left, int right) {
if (left > right) {
return null;
}
final int mid = (left + right) / 2;
final Node<Integer> l = build(left, mid - 1);
final Node<Integer> r = build(mid + 1, right);

return new Node<>(mid, l, r);
}

private int addRandom(Node<Integer> node) {
if ((node.getRight() == null) || (node.getLeft() == null)) {
final int add = ThreadLocalRandom.current().nextInt();
node.setNullChild(new Node<>(add, null, null));
return add;
}
final int res = LIMIT + ThreadLocalRandom.current().nextInt();
if (res > 0) {
return addRandom(node.getLeft());
} else {
return addRandom(node.getRight());
}
}

@Before
public void setUp() {
head = build(0, LIMIT);
expected = Stream.iterate(0, i -> i + 1).limit(LIMIT + 1).collect(Collectors.toSet());
}

@Test
public void testSeq() {
final Set<Integer> collect = head.stream(false).collect(Collectors.toSet());

assertEquals(collect, expected);

}

@Test
public void testSeqUnbalacnced() {
for (int i = 0; i < LIMIT; i++) {
expected.add(addRandom(head));
}

final Set<Integer> collect = head.stream(false).collect(Collectors.toSet());

assertEquals(collect, expected);

}

@Test
public void testPar() {
final Set<Integer> collect = head.stream(true).collect(Collectors.toSet());

assertEquals(collect, expected);

}

@Test
public void testParUnBalanced() {
for (int i = 0; i < LIMIT; i++) {
expected.add(addRandom(head));
}

final Set<Integer> collect = head.stream(true).collect(Collectors.toSet());

assertEquals(collect, expected);

}
}