Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1008][APPEND] create feature groups with struct feature #307

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions integrations/java/java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ HOPSWORKS_PROJECT_NAME=REPLACE_WITH_YOUR_HOPSWORKS_PROJECT_NAME
FEATURE_GROUP_NAME=java_data
FEATURE_GROUP_VERSION=1
FEATURE_VIEW_NAME=products_fv
FEATURE_VIEW_VERSION=1
```
FEATURE_VIEW_VERSION=1```

```bash
python3 ./setup_fv_fg.py --host $HOPSWORKS_HOST --api_key $HOPSWORKS_API_KEY --project $HOPSWORKS_PROJECT_NAME --feature_group_name $FEATURE_GROUP_NAME --feature_group_version $FEATURE_GROUP_VERSION --feature_view_name $FEATURE_VIEW_NAME --feature_view_version $FEATURE_VIEW_VERSION
java -jar ./target/hopsworks-java-tutorial-3.9.0-RC9-jar-with-dependencies.jar $HOPSWORKS_HOST $HOPSWORKS_API_KEY $HOPSWORKS_PROJECT_NAME $FEATURE_GROUP_NAME $FEATURE_GROUP_VERSION $FEATURE_VIEW_NAME $FEATURE_VIEW_VERSION
java -jar ./target/hopsworks-java-tutorial-4.3.0-SNAPSHOT-jar-with-dependencies.jar $HOPSWORKS_HOST $HOPSWORKS_API_KEY $HOPSWORKS_PROJECT_NAME $FEATURE_GROUP_NAME $FEATURE_GROUP_VERSION $FEATURE_VIEW_NAME $FEATURE_VIEW_VERSION
```
2 changes: 1 addition & 1 deletion integrations/java/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.hopsworks.tutorials</groupId>
<artifactId>hopsworks-java-tutorial</artifactId>
<version>3.9.0-RC9</version>
<version>4.3.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down
59 changes: 59 additions & 0 deletions integrations/java/java/src/main/avro/java_struct_avro.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"type": "record",
"name": "JavaStructAvro",
"namespace": "com.hopsworks.tutorials",
"fields": [
{
"name": "pk",
"type": [
"null",
"string"
]
},
{
"name": "event_time",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
]
},
{
"name": "feat",
"type": [
"null",
{
"type": "array",
"items": [
"null",
{
"type": "record",
"name": "S_feat",
"fields": [
{
"name": "sku",
"type": [
"null",
"string"
]
},
{
"name": "ts",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-micros"
}
]
}
]
}
]
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package com.hopsworks.tutorials;


import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;

public class JavaStructGenerator {

private static final Random random = new Random();

/**
* Generates a random microsecond timestamp.
* This value is computed between Jan 1, 2000 and the current time.
*/
private static Long getRandomMicroTimestamp() {
// Jan 1, 2000 in milliseconds (946684800000L)
long startMillis = 946684800000L;
long nowMillis = System.currentTimeMillis();
long randomMillis = startMillis + (long) (random.nextDouble() * (nowMillis - startMillis));
return randomMillis * 1000; // convert milliseconds to microseconds
}

/**
* Generates a random instance of JavaStructPojo.
*/
public static JavaStructPojo generateJavaStructPojo() {
// Generate a random primary key
String pk = UUID.randomUUID().toString();
// Generate a random event_time (timestamp in microseconds)
Long eventTime = getRandomMicroTimestamp();

// Create a random list of S_feat items (between 1 and 5 items)
int featSize = random.nextInt(5) + 1;
List<JavaStructPojo.S_feat> featList = new ArrayList<>();
for (int i = 0; i < featSize; i++) {
featList.add(generateRandomS_feat());
}

return new JavaStructPojo(pk, eventTime, featList);
}

public static JavaStructAvro generateJavaStructAvro(Integer id) {

JavaStructAvro javaStructAvro = new JavaStructAvro();

// Set primary key (union of null and string, so non-null value)
javaStructAvro.setPk(id.toString());

// Set event_time (as a Long for timestamp-micros)
javaStructAvro.setEventTime(getRandomMicroTimestamp());

// Create a random list of S_feat items (between 1 and 5 items)
// Generate a list of S_feat records.
List<S_feat> featList = new ArrayList<>();
int count = (int) (Math.random() * 5) + 1; // between 1 and 5 items
for (int i = 0; i < count; i++) {
S_feat feat = new S_feat();
String sku = "SKU-" + UUID.randomUUID().toString().substring(0, 8);
Long ts = getRandomMicroTimestamp();
feat.setSku(sku);
feat.setTs(ts);
featList.add(feat);
}
javaStructAvro.setFeat(featList);

return javaStructAvro;
}

/**
* Generates a random GenericRecord for the given schema.
*/
public static GenericRecord generateRandomRecord(Schema schema, Integer id) {
// Create the main record for the "java_struct_1" record.
GenericRecord record = new GenericData.Record(schema);

// "pk": union [null, string] -> choose a random UUID string.
record.put("pk", id.toString());

// "event_time": union [null, long] -> generate a random timestamp in microseconds.
record.put("event_time", getRandomMicroTimestamp());

// "feat": union [null, array<union[null, S_feat]>]
// Retrieve the union schema for the "feat" field.
Schema featFieldSchema = schema.getField("feat").schema();
// In the union, index 1 is the non-null array schema.
Schema arraySchema = featFieldSchema.getTypes().get(1); // non-null branch for the array
// The array’s element is itself a union: [null, S_feat]
Schema featElementUnionSchema = arraySchema.getElementType();
// In the union, index 1 is the S_feat record schema.
Schema sFeatSchema = featElementUnionSchema.getTypes().get(1); // non-null branch for S_feat

// Create a list of S_feat records (choosing a non-null value).
int featSize = random.nextInt(5) + 1; // between 1 and 5 items.
List<GenericRecord> featList = new ArrayList<>();
for (int i = 0; i < featSize; i++) {
GenericRecord featRecord = new GenericData.Record(sFeatSchema);
featRecord.put("sku", "SKU-" + UUID.randomUUID().toString().substring(0, 8));
featRecord.put("ts", getRandomMicroTimestamp());
featList.add(featRecord);
}
record.put("feat", featList);

return record;
}

/**
* Generates a random S_feat instance.
*/
public static JavaStructPojo.S_feat generateRandomS_feat() {
String sku = "SKU-" + UUID.randomUUID().toString().substring(0, 8);
Long ts = getRandomMicroTimestamp();
return new JavaStructPojo.S_feat(sku, ts);
}

public static List<JavaStructPojo> generateData(int size) {

List<JavaStructPojo> rows = new ArrayList<>(size);

for (int i = 0; i < size; i++) {
JavaStructPojo data = generateJavaStructPojo();
rows.add(data);
}
return rows;
}

public static List<JavaStructAvro> generateJavaStructAvroData(int size) {
List<JavaStructAvro> rows = new ArrayList<>(size);

for (int i = 0; i < size; i++) {
JavaStructAvro data = generateJavaStructAvro(i);
rows.add(data);
}
return rows;
}

/**
* Test method to generate and print a random Avro record.
*/
public static List<GenericRecord> generateGenericRecordData(int size) throws IOException {
String SCHEMA_JSON = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"JavaStructAvro\",\n" +
" \"namespace\": \"com.hopsworks.tutorials\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"pk\",\n" +
" \"type\": [\"null\", \"string\"]\n" +
" },\n" +
" {\n" +
" \"name\": \"event_time\",\n" +
" \"type\": [\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]\n" +
" },\n" +
" {\n" +
" \"name\": \"feat\",\n" +
" \"type\": [\"null\", {\"type\": \"array\", \"items\": [\"null\", {\"type\": \"record\", \"name\": \"S_feat\", \"fields\": [\n" +
" {\"name\": \"sku\", \"type\": [\"null\", \"string\"]},\n" +
" {\"name\": \"ts\", \"type\": [\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]}\n" +
" ]}]}]\n" +
" }\n" +
" ]\n" +
"}";

Schema.Parser parser = new Parser();
Schema schema = parser.parse(SCHEMA_JSON);

List<GenericRecord> rows = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
GenericRecord data = generateRandomRecord(schema, i);
rows.add(data);
}
return rows;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.hopsworks.tutorials;

import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.AvroSchema;
import java.util.List;

public class JavaStructPojo {

@Nullable
// This field is a union of null and string.
private String pk;

@Nullable
// Avro will treat this as a long with the logical type "timestamp-micros".
@AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
private Long event_time;

@Nullable
// This is a union of null and an array of unions (null or S_feat).
private List<S_feat> feat;

// Default constructor
public JavaStructPojo() {}

// Parameterized constructor
public JavaStructPojo(String pk, Long event_time, List<S_feat> feat) {
this.pk = pk;
this.event_time = event_time;
this.feat = feat;
}

public String getPk() {
return pk;
}

public void setPk(String pk) {
this.pk = pk;
}

public Long getEvent_time() {
return event_time;
}

public void setEvent_time(Long event_time) {
this.event_time = event_time;
}

public List<S_feat> getFeat() {
return feat;
}

public void setFeat(List<S_feat> feat) {
this.feat = feat;
}

@Override
public String toString() {
return "JavaStructPojo{" +
"pk='" + pk + '\'' +
", event_time=" + event_time +
", feat=" + feat +
'}';
}

// Nested static class corresponding to the S_feat record.
public static class S_feat {
@Nullable
// Union of null and string.
private String sku;

@Nullable
// Avro will treat this as a long with the logical type "timestamp-micros".
@AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
private Long ts;

// Default constructor
public S_feat() {}

// Parameterized constructor
public S_feat(String sku, Long ts) {
this.sku = sku;
this.ts = ts;
}

public String getSku() {
return sku;
}

public void setSku(String sku) {
this.sku = sku;
}

public Long getTs() {
return ts;
}

public void setTs(Long ts) {
this.ts = ts;
}

@Override
public String toString() {
return "S_feat{" +
"sku='" + sku + '\'' +
", ts=" + ts +
'}';
}
}
}
Loading