Skip to content

Commit

Permalink
Merge pull request #94 from slipsoft/feat/multinode
Browse files Browse the repository at this point in the history
Feat/multinode
  • Loading branch information
n-peugnet authored Jun 5, 2019
2 parents 24e711b + 7e9804d commit e69739f
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 119 deletions.
77 changes: 67 additions & 10 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,54 @@
"name" : "table"
} ],
"paths" : {
"/db/addNode" : {
"put" : {
"tags" : [ "db" ],
"operationId" : "addNodes",
"consumes" : [ "application/json" ],
"produces" : [ "application/json" ],
"parameters" : [ {
"in" : "body",
"name" : "body",
"description" : "nodeData",
"required" : true,
"schema" : {
"type" : "array",
"items" : {
"$ref" : "#/definitions/Node"
}
}
} ],
"responses" : {
"default" : {
"description" : "successful operation"
}
}
}
},
"/db/check" : {
"get" : {
"tags" : [ "db" ],
"operationId" : "checkNode",
"consumes" : [ "application/json" ],
"produces" : [ "application/json" ],
"parameters" : [ {
"name" : "InternalToken",
"in" : "header",
"required" : false,
"type" : "string",
"default" : "null"
} ],
"responses" : {
"200" : {
"description" : "successful operation",
"schema" : {
"$ref" : "#/definitions/HttpResponse"
}
}
}
}
},
"/db/tables" : {
"get" : {
"tags" : [ "db" ],
Expand Down Expand Up @@ -50,11 +98,8 @@
"default" : "null"
} ],
"responses" : {
"200" : {
"description" : "successful operation",
"schema" : {
"$ref" : "#/definitions/HttpResponse"
}
"default" : {
"description" : "successful operation"
}
}
}
Expand Down Expand Up @@ -92,11 +137,8 @@
"type" : "string"
} ],
"responses" : {
"200" : {
"description" : "successful operation",
"schema" : {
"$ref" : "#/definitions/HttpResponse"
}
"default" : {
"description" : "successful operation"
}
}
}
Expand Down Expand Up @@ -282,6 +324,21 @@
}
}
},
"Node" : {
"type" : "object",
"properties" : {
"name" : {
"type" : "string"
},
"address" : {
"type" : "string"
},
"port" : {
"type" : "integer",
"format" : "int32"
}
}
},
"SortEntity" : {
"type" : "object",
"properties" : {
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/com/dant/app/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@

import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class Controller {
Expand Down Expand Up @@ -59,8 +57,13 @@ public static void addTables(ArrayList<TableEntity> allTableEntities, boolean ad
tablesToAdd.stream().forEach(t -> Database.getInstance().getAllTables().add(t));

SerialStructure.saveStructure();

if (!addImmediately) {

}
}


public static HttpResponse getTable(String tableName) {
Table table = Controller.getTableByName(tableName);
return new HttpResponse(table.convertToEntity());
Expand Down Expand Up @@ -90,7 +93,5 @@ public static HttpResponse doSearch(ViewEntity viewEntity) throws SearchExceptio
View viewToExecute = viewEntity.convertToView();
ResultSet resultSet = viewToExecute.execute();
return new HttpResponse(resultSet);

}

}
82 changes: 77 additions & 5 deletions src/main/java/com/dant/app/DBEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@

import com.dant.entity.HttpResponse;
import com.dant.entity.TableEntity;
import com.dant.utils.Log;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import db.structure.Database;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiParam;
import network.Network;
import network.Node;

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;


@Api("db")
Expand All @@ -17,15 +27,64 @@
@Consumes(MediaType.APPLICATION_JSON)
public class DBEndpoint {

@GET
@Path("/check")
public HttpResponse checkNode(@DefaultValue("null") @HeaderParam("InternalToken") String InternalToken) {
if (!Database.getInstance().config.SuperSecretPassphrase.equals(InternalToken)) {
throw new JsonSyntaxException("passPhraseDoesNotMatch");
}
return new HttpResponse("ok");
}
@PUT
@Path("/addNode")
public void addNodes(@ApiParam(value = "nodeData", required = true) List<Node> allNodes, final @Suspended AsyncResponse responseToClient) {
//TODO check for duplicates && remove the "http://" part
List<CompletableFuture<java.net.http.HttpResponse<String>>> completableFutures = allNodes.stream().map(Node::checkNode).collect(Collectors.toList());
// met toutes mes completableFutures dans une liste

CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
// appelle la fonction allOff pour retourner 1 seul objet Future

CompletableFuture<List<java.net.http.HttpResponse<String>>> allCompletableFutures = allFutures.thenApply(future ->
completableFutures.stream()
.map(completableFuture -> completableFuture.join())
.collect(Collectors.toList())
);
// on met tous les resultats dans une liste qu'on passe à un nouveau Future

allCompletableFutures.thenAccept(responses -> {
responses.stream().forEach(response -> {
if (response.statusCode() != 200) {
throw new JsonSyntaxException("one or more nodes could not be validated " + response.request().uri() + "error code " + response.statusCode() + " " + response.body());
}
responseToClient.resume(new HttpResponse("ok"));
Database.getInstance().allNodes.addAll(allNodes);
});
});
}

@PUT
@Path("/tables")
public HttpResponse createTables(
public void createTables(
@ApiParam(value = "content", required = true) ArrayList<TableEntity> allTables,
@DefaultValue("null") @HeaderParam("InternalToken") String InternalToken) {
@DefaultValue("null") @HeaderParam("InternalToken") String InternalToken, final @Suspended AsyncResponse responseToClient) {
// si la requète vient d'un endpoint, pas besoin de valider
boolean addImmediatly = InternalToken.equals(Database.getInstance().config.SuperSecretPassphrase);
Controller.addTables(allTables, addImmediatly);
return new HttpResponse("table successfully inserted");
if (!addImmediatly && Database.getInstance().allNodes.size() > 0) {
Gson gson = new Gson();
String body = gson.toJson(allTables);
Network.broadcast("/db/tables", "PUT", body).thenAccept(responses -> {
responses.stream().forEach(response -> {
if (response.statusCode() != 200) {
responseToClient.resume(new JsonSyntaxException("error when broadcasting to node " + response.statusCode() + " " +response.body() + response.uri()));
}
responseToClient.resume(new HttpResponse("ok"));
});
});
} else {
responseToClient.resume(new HttpResponse("ok"));
}
}

@GET
Expand All @@ -42,7 +101,20 @@ public HttpResponse getTable(@PathParam("tableName") String tableName) {

@DELETE
@Path("/tables/{tableName}")
public HttpResponse deleteTable(@PathParam("tableName") String tableName) {
return Controller.deleteTable(tableName);
public void deleteTable(@PathParam("tableName") String tableName, final @Suspended AsyncResponse responseToClient) {
Controller.deleteTable(tableName);
if (Database.getInstance().allNodes.size() > 0) {
Network.broadcast("/db/tables/" + tableName, "DELETE").thenAccept(responses ->
responses.stream().forEach(response -> {
if (response.statusCode() != 200) {
throw new JsonSyntaxException("the table on node" + response.request().uri() + "could not be deleted");
}
responseToClient.resume(new HttpResponse("ok"));
})
);
} else {
responseToClient.resume(new HttpResponse("ok"));
}

}
}
2 changes: 1 addition & 1 deletion src/main/java/com/dant/entity/ColumnEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Column convertToColumn() {
return new Column(this.name, dataType);
} catch (ReflectiveOperationException exp) {
Log.error(exp);
throw new RuntimeException("unable to create table");
throw new RuntimeException("unable to create column");
}
}
}
27 changes: 0 additions & 27 deletions src/main/java/db/network/Network.java

This file was deleted.

10 changes: 0 additions & 10 deletions src/main/java/db/network/Node.java

This file was deleted.

1 change: 1 addition & 0 deletions src/main/java/db/structure/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ public class Config {
public Integer loaderNbThread;
public String NTPHostName;
public String SuperSecretPassphrase;

}
4 changes: 2 additions & 2 deletions src/main/java/db/structure/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import db.network.Node;
import network.Node;

public final class Database {

private ArrayList<Table> allTables;
public Config config;
public ArrayList<Node> allNodes;
public ArrayList<Node> allNodes = new ArrayList<Node>();

private AtomicInteger nextTableID = new AtomicInteger(1);
private AtomicInteger nextIndexTreeDicUniqueId = new AtomicInteger(1);
Expand Down
74 changes: 74 additions & 0 deletions src/main/java/network/Network.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package network;

import db.structure.Database;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class Network {

public static CompletableFuture<List<java.net.http.HttpResponse<String>>> broadcast(String endpoint, String method) {
ArrayList<Node> allNodes = Database.getInstance().allNodes;
HttpClient client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.ALWAYS)
.build();

ArrayList<HttpRequest> allRequests = allNodes.stream().map(node -> HttpRequest.newBuilder()
.uri(URI.create(node.address + ":" + node.port + endpoint))
.headers("Content-Type", "application/json", "InternalToken", Database.getInstance().config.SuperSecretPassphrase)
.method(method, HttpRequest.BodyPublishers.noBody())
.build()
).collect(Collectors.toCollection(ArrayList::new));

List<CompletableFuture<HttpResponse<String>>> completableFutures = allRequests.stream().map(request ->
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
).collect(Collectors.toList());

CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));

return allFutures.thenApply(future ->
completableFutures.stream()
.map(completableFuture -> completableFuture.join())
.collect(Collectors.toList())
);
}

public static CompletableFuture<List<java.net.http.HttpResponse<String>>> broadcast(String endpoint, String method, String body) {
ArrayList<Node> allNodes = Database.getInstance().allNodes;
HttpClient client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.ALWAYS)
.build();

ArrayList<HttpRequest> allRequests = allNodes.stream().map(node -> HttpRequest.newBuilder()
.uri(URI.create(node.address + ":" + node.port + endpoint))
.headers("Content-Type", "application/json", "InternalToken", Database.getInstance().config.SuperSecretPassphrase)
.method(method, HttpRequest.BodyPublishers.ofString(body))
.build()
).collect(Collectors.toCollection(ArrayList::new));

List<CompletableFuture<HttpResponse<String>>> completableFutures = allRequests.stream().map(request ->
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
).collect(Collectors.toList());
// met toutes mes completableFutures dans une liste

CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]));
// appelle la fonction allOff pour retourner 1 seul objet Future

return allFutures.thenApply(future ->
completableFutures.stream()
.map(completableFuture -> completableFuture.join())
.collect(Collectors.toList())
);
// on met tous les resultats dans une liste qu'on passe à un nouveau Future
}


}
Loading

0 comments on commit e69739f

Please sign in to comment.