Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions fdb-extensions/fdb-extensions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
}
api(libs.fdbJava)
implementation(libs.guava)
implementation(libs.half4j)
implementation(libs.slf4j.api)
compileOnly(libs.jsr305)

Expand All @@ -41,6 +42,38 @@ dependencies {
testFixturesAnnotationProcessor(libs.autoService)
}

def siftSmallFile = layout.buildDirectory.file('downloads/siftsmall.tar.gz')
def extractDir = layout.buildDirectory.dir("extracted")

// Task that downloads the CSV exactly once unless it changed
tasks.register('downloadSiftSmall', de.undercouch.gradle.tasks.download.Download) {
src 'https://huggingface.co/datasets/vecdata/siftsmall/resolve/3106e1b83049c44713b1ce06942d0ab474bbdfb6/siftsmall.tar.gz'
dest siftSmallFile.get().asFile
onlyIfModified true
tempAndMove true
retries 3
}

tasks.register('extractSiftSmall', Copy) {
dependsOn 'downloadSiftSmall'
from(tarTree(resources.gzip(siftSmallFile)))
into extractDir

doLast {
println "Extracted files into: ${extractDir.get().asFile}"
fileTree(extractDir).visit { details ->
if (!details.isDirectory()) {
println " - ${details.file}"
}
}
}
}

test {
dependsOn tasks.named('extractSiftSmall')
inputs.dir extractDir
}

publishing {
publications {
library(MavenPublication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.util.LoggableException;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -42,9 +44,13 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.IntUnaryOperator;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand Down Expand Up @@ -1051,6 +1057,93 @@
return result;
}

/**
* Method that provides the functionality of a for loop, however, in an asynchronous way. The result of this method
* is a {@link CompletableFuture} that represents the result of the last iteration of the loop body.
* @param startI an integer analogous to the starting value of a loop variable in a for loop
* @param startU an object of some type {@code U} that represents some initial state that is passed to the loop's
* initial state
* @param conditionPredicate a predicate on the loop variable that must be true before the next iteration is
* entered; analogous to the condition in a for loop
* @param stepFunction a unary operator used for modifying the loop variable after each iteration
* @param body a bi-function to be called for each iteration; this function is initially invoked using
* {@code startI} and {@code startU}; the result of the body is then passed into the next iterator's body
* together with a new value for the loop variable. In this way callers can access state inside an iteration
* that was computed in a previous iteration.
* @param executor the executor
* @param <U> the type of the result of the body {@link BiFunction}
* @return a {@link CompletableFuture} containing the result of the last iteration's body invocation.
*/
@Nonnull
public static <U> CompletableFuture<U> forLoop(final int startI, @Nullable final U startU,
@Nonnull final IntPredicate conditionPredicate,
@Nonnull final IntUnaryOperator stepFunction,
@Nonnull final BiFunction<Integer, U, CompletableFuture<U>> body,
@Nonnull final Executor executor) {
final AtomicInteger loopVariableAtomic = new AtomicInteger(startI);
final AtomicReference<U> lastResultAtomic = new AtomicReference<>(startU);
return whileTrue(() -> {
final int loopVariable = loopVariableAtomic.get();
if (!conditionPredicate.test(loopVariable)) {
return AsyncUtil.READY_FALSE;
}
return body.apply(loopVariable, lastResultAtomic.get())
.thenApply(result -> {
loopVariableAtomic.set(stepFunction.applyAsInt(loopVariable));
lastResultAtomic.set(result);
return true;
});

Check warning on line 1095 in fdb-extensions/src/main/java/com/apple/foundationdb/async/MoreAsyncUtil.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-extensions/src/main/java/com/apple/foundationdb/async/MoreAsyncUtil.java#L1091-L1095

Method always returns the same value (true) https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3598%2Fnormen662%2Fhnsw%3AHEAD&id=8070E99D8E3F8370FE548A32704A6A30
}, executor).thenApply(ignored -> lastResultAtomic.get());
}

/**
* Method to iterate over some items, for each of which a body is executed asynchronously. The result of each such
* executed is then collected in a list and returned as a {@link CompletableFuture} over that list.
* @param items the items to iterate over
* @param body a function to be called for each item
* @param parallelism the maximum degree of parallelism this method should use
* @param executor the executor
* @param <T> the type of item
* @param <U> the type of the result
* @return a {@link CompletableFuture} containing a list of results collected from the individual body invocations
*/
@Nonnull
@SuppressWarnings("unchecked")
public static <T, U> CompletableFuture<List<U>> forEach(@Nonnull final Iterable<T> items,
@Nonnull final Function<T, CompletableFuture<U>> body,
final int parallelism,
@Nonnull final Executor executor) {
// this deque is only modified by once upon creation
final ArrayDeque<T> toBeProcessed = new ArrayDeque<>();
for (final T item : items) {
toBeProcessed.addLast(item);
}

final List<CompletableFuture<Void>> working = Lists.newArrayList();
final AtomicInteger indexAtomic = new AtomicInteger(0);
final Object[] resultArray = new Object[toBeProcessed.size()];

return whileTrue(() -> {
working.removeIf(CompletableFuture::isDone);

while (working.size() <= parallelism) {
final T currentItem = toBeProcessed.pollFirst();
if (currentItem == null) {
break;
}

final int index = indexAtomic.getAndIncrement();
working.add(body.apply(currentItem)
.thenAccept(result -> resultArray[index] = result));
}

if (working.isEmpty()) {
return AsyncUtil.READY_FALSE;
}
return whenAny(working).thenApply(ignored -> true);
}, executor).thenApply(ignored -> Arrays.asList((U[])resultArray));
}

/**
* A {@code Boolean} function that is always true.
* @param <T> the type of the (ignored) argument to the function
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* AbstractNode.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.async.hnsw;

import com.apple.foundationdb.tuple.Tuple;
import com.google.common.collect.ImmutableList;

import javax.annotation.Nonnull;
import java.util.List;

/**
* An abstract base class implementing the {@link Node} interface.
* <p>
* This class provides the fundamental structure for a node within the HNSW graph,
* managing a unique {@link Tuple} primary key and an immutable list of its neighbors.
* Subclasses are expected to provide concrete implementations, potentially adding
* more state or behavior.
*
* @param <N> the type of the node reference used for neighbors, which must extend {@link NodeReference}
*/
abstract class AbstractNode<N extends NodeReference> implements Node<N> {
@Nonnull
private final Tuple primaryKey;

@Nonnull
private final List<N> neighbors;

/**
* Constructs a new {@code AbstractNode} with a specified primary key and a list of neighbors.
* <p>
* This constructor creates a defensive, immutable copy of the provided {@code neighbors} list.
* This ensures that the internal state of the node cannot be modified by external
* changes to the original list after construction.
*
* @param primaryKey the unique identifier for this node; must not be {@code null}
* @param neighbors the list of nodes connected to this node; must not be {@code null}
*/
protected AbstractNode(@Nonnull final Tuple primaryKey,
@Nonnull final List<N> neighbors) {
this.primaryKey = primaryKey;
this.neighbors = ImmutableList.copyOf(neighbors);
}

/**
* Gets the primary key that uniquely identifies this object.
* @return the primary key {@link Tuple}, which will never be {@code null}.
*/
@Nonnull
@Override
public Tuple getPrimaryKey() {
return primaryKey;
}

/**
* Gets the list of neighbors connected to this node.
* <p>
* This method returns a direct reference to the internal list which is
* immutable.
* @return a non-null, possibly empty, list of neighbors.
*/
@Nonnull
@Override
public List<N> getNeighbors() {
return neighbors;
}

/**
* Gets the neighbor at the specified index.
* <p>
* This method provides access to a specific neighbor by its zero-based position
* in the internal list of neighbors.
* @param index the zero-based index of the neighbor to retrieve.
* @return the neighbor at the specified index, guaranteed to be non-null.
*/
@Nonnull
@Override
public N getNeighbor(final int index) {
return neighbors.get(index);
}

Check warning on line 97 in fdb-extensions/src/main/java/com/apple/foundationdb/async/hnsw/AbstractNode.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Test Gaps

fdb-extensions/src/main/java/com/apple/foundationdb/async/hnsw/AbstractNode.java#L95-L97

[Test Gap] Added method `getNeighbor` has not been tested. https://fdb.teamscale.io/metrics/code/foundationdb-fdb-record-layer/fdb-extensions%2Fsrc%2Fmain%2Fjava%2Fcom%2Fapple%2Ffoundationdb%2Fasync%2Fhnsw%2FAbstractNode.java?coverage-mode=test-gap&t=FORK_MR%2F3598%2Fnormen662%2Fhnsw%3AHEAD&selection=char-3329-3410&merge-request=FoundationDB%2Ffdb-record-layer%2F3598
}
Loading