Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK-928: Utility to generate events to existing table. #23

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<stringType>Utf8</stringType>
<createSetters>false</createSetters>
<fieldVisibility>private</fieldVisibility>
<imports>
<import>src/main/avro/standard_event.avsc</import>
</imports>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
39 changes: 39 additions & 0 deletions dataset/src/main/avro/standard_event.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"name": "StandardEvent",
"namespace": "org.kitesdk.data.event",
"type": "record",
"doc": "A standard event type for logging, based on the paper 'The Unified Logging Infrastructure for Data Analytics at Twitter' by Lee et al, http://vldb.org/pvldb/vol5/p1771_georgelee_vldb2012.pdf",
"fields": [
{
"name": "event_initiator",
"type": "string",
"doc": "Where the event was triggered from in the format {client,server}_{user,app}, e.g. 'client_user'. Required."
},
{
"name": "event_name",
"type": "string",
"doc": "A hierarchical name for the event, with parts separated by ':'. Required."
},
{
"name": "user_id",
"type": "long",
"doc": "A unique identifier for the user. Required."
},
{
"name": "session_id",
"type": "string",
"doc": "A unique identifier for the session. Required."
},
{
"name": "ip",
"type": "string",
"doc": "The IP address of the host where the event originated. Required."
},
{
"name": "timestamp",
"type": "long",
"doc": "The point in time when the event occurred, represented as the number of milliseconds since January 1, 1970, 00:00:00 GMT. Required."
}

]
}
115 changes: 115 additions & 0 deletions dataset/src/main/java/org/kitesdk/examples/data/GenerateEvents.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2015 Cloudera, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kitesdk.examples.data;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.View;
import org.kitesdk.data.event.StandardEvent;

public class GenerateEvents extends Configured implements Tool {
protected Random random;
protected long baseTimestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specific to a single run, so I think it makes sense to move it into run as a local variable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseTimestamp is also used in the randomTimestamp method. Are you suggesting that I pass it as a variable each time? Does it really make a difference?


public GenerateEvents() {
random = new Random();
}
@Override
public int run(String[] args) throws Exception {
long counter = 0l;
baseTimestamp = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a local variable because it affects how the loop runs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseTimeStamp is used in more than one method, and so it is set as a global variable. This is the way the code appeared before the prior tech review. This example is based on the published code sample CreateEvents.java, which also uses global variables.

https://github.com/kite-sdk/kite-examples/blob/master/spark/src/main/java/org/kitesdk/examples/spark/CreateEvents.java

For me, "it affects how the loop runs" does not explain why this change is essential before the code is suitable for publication. Does instantiating baseTimeStamp as a global variable somehow make it invalid? How does it affect how the loop runs?

If the goal is to teach me to write better code, I need more explanation. If the available examples are flawed, I need better examples to work from.


View<StandardEvent> events = Datasets.load(
(args.length==1 ? args[0] : "dataset:hive:events"), StandardEvent.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noted this elsewhere, but I think it would be better to use a variable rather than the inline test here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this wrong, or just different? Are you suggesting that the test should set the variable before the load method? If the argument is invalid, does it change the result by setting it outside the load method? If the code must change before publication, please provide the acceptable alternate code, rather than have me guess at what I should do.


DatasetWriter<StandardEvent> writer = events.newWriter();
try {
Utf8 sessionId = new Utf8("sessionId");
long userId = 0;
Utf8 ip = new Utf8("ip");
int randomEventCount = 0;

while (System.currentTimeMillis() - baseTimestamp < 36000) {
sessionId = randomSessionId();
userId = randomUserId();
ip = randomIp();
randomEventCount = random.nextInt(25);
for (int i=0; i < randomEventCount; i++) {
writer.write(generateRandomEvent(sessionId, userId, ip, counter++));
}
}
} finally {
writer.close();
}

System.out.println("Generated " + counter + " events");

return 0;
}

public StandardEvent generateRandomEvent(Utf8 sessionId, long userId, Utf8 ip, long counter) {
return StandardEvent.newBuilder()
.setEventInitiator(new Utf8("client_user"))
.setEventName(randomEventName(counter))
.setUserId(userId)
.setSessionId(sessionId)
.setIp(ip)
.setTimestamp(randomTimestamp())
.build();
}

public Utf8 randomEventName(long counter) {
return new Utf8("event"+counter);
}

public long randomUserId() {
return random.nextInt(10);
}

public Utf8 randomSessionId() {
return new Utf8(UUID.randomUUID().toString());
}

public Utf8 randomIp() {
return new Utf8("192.168." + (random.nextInt(254) + 1) + "."
+ (random.nextInt(254) + 1));
}

public long randomTimestamp() {
long delta = System.currentTimeMillis() - baseTimestamp;
delta = delta*1000l+random.nextInt(5000);
return baseTimestamp+delta;
}

public static void main(String... args) throws Exception {
int rc = ToolRunner.run(new Configuration(), new GenerateEvents(), args);

System.exit(rc);
}

}