-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Supporting class for the kite-crunch example CDK-754. #25
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
|
||
<!-- | ||
~ Copyright 2013 Cloudera Inc. | ||
~ | ||
~ Licensed under the Apache License, Version 2.0 (the "License"); | ||
~ you may not use this file except in compliance with the License. | ||
~ You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
|
||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>org.kitesdk.examples</groupId> | ||
<artifactId>demo-crunch</artifactId> | ||
<version>1.0.0</version> | ||
|
||
<parent> | ||
<groupId>org.kitesdk.examples</groupId> | ||
<artifactId>demo</artifactId> | ||
<version>1.0.0</version> | ||
</parent> | ||
|
||
<packaging>jar</packaging> | ||
|
||
<name>demo-crunch</name> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.kitesdk</groupId> | ||
<artifactId>kite-maven-plugin</artifactId> | ||
<configuration> | ||
<toolClass>org.kitesdk.examples.tutorials.crunch.AggregateEvents</toolClass> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>org.kitesdk.examples</groupId> | ||
<artifactId>demo-core</artifactId> | ||
<version>${project.parent.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.google.guava</groupId> | ||
<artifactId>guava</artifactId> | ||
<version>${hadoop.guava.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>log4j</groupId> | ||
<artifactId>log4j</artifactId> | ||
<version>${hadoop.log4j.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-log4j12</artifactId> | ||
<version>${hadoop.slf4j.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.hive</groupId> | ||
<artifactId>hive-exec</artifactId> | ||
<version>${kite.hive.version}</version> | ||
<scope>compile</scope> <!-- provide Hive dependencies --> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/** | ||
* Copyright 2015 Cloudera Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.kitesdk.examples.tutorials.crunch; | ||
|
||
import java.io.Serializable; | ||
import java.net.URI; | ||
import java.util.Calendar; | ||
import java.util.Iterator; | ||
import java.util.TimeZone; | ||
import org.apache.crunch.DoFn; | ||
import org.apache.crunch.Emitter; | ||
import org.apache.crunch.MapFn; | ||
import org.apache.crunch.PCollection; | ||
import org.apache.crunch.Pair; | ||
import org.apache.crunch.Target; | ||
import org.apache.crunch.types.avro.Avros; | ||
import org.apache.crunch.util.CrunchTool; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.util.ToolRunner; | ||
import org.kitesdk.data.Dataset; | ||
import org.kitesdk.data.Datasets; | ||
import org.kitesdk.data.crunch.CrunchDatasets; | ||
import org.kitesdk.data.event.StandardEvent; | ||
import org.kitesdk.data.spi.filesystem.FileSystemDatasets; | ||
import org.kitesdk.examples.demo.event.Session; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class AggregateEvents extends CrunchTool implements Serializable { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(AggregateEvents.class); | ||
|
||
@Override | ||
public int run(String[] args) throws Exception { | ||
|
||
// Turn debug on while in development. | ||
getPipeline().enableDebug(); | ||
getPipeline().getConfiguration().set("crunch.log.job.progress", "true"); | ||
|
||
// Step 1. Load the dataset into the Pipeline. | ||
Dataset<StandardEvent> eventsDataset = Datasets.load( | ||
"dataset:hive:events", StandardEvent.class); | ||
|
||
// If the dataset is empty, stop. | ||
if (eventsDataset.isEmpty()) { | ||
LOG.info("No records to process."); | ||
return 0; | ||
} | ||
|
||
// Create a parallel collection from the Kite CrunchDatasets | ||
// Source "events" dataset. | ||
PCollection<StandardEvent> events = read( | ||
CrunchDatasets.asSource(eventsDataset)); | ||
|
||
/* Step 2. Process the data. | ||
* a. Create a session key by combining the user ID and session ID. | ||
* b. Group together all events with the same session key. | ||
* c. For each group, create a Session record as an Avro Specific object. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean "create a Session record as an Avro Specific object"? How is that distinct from "create a Session record"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code commentary reflects the terminology and activity in the code. These serve as "anchors" that readers can latch onto as they try to parse the examples. People new to the Avro object model are not as familiar with what's happening, so it doesn't hurt to give them footholds. |
||
*/ | ||
PCollection<Session> sessions = events | ||
.by(new GetSessionKey(), Avros.strings()) | ||
.groupByKey() | ||
.parallelDo(new MakeSession(), Avros.specifics(Session.class)); | ||
|
||
// Step 3. Append the derived sessions to the Kite CrunchDatasets | ||
// Target "sessions" dataset. | ||
getPipeline().write(sessions, | ||
CrunchDatasets.asTarget("dataset:hive:sessions"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This creates a Crunch Target that is configured to write to the sessions dataset in Hive. The rest of the statement instructs Crunch to append the (parallel) collection of sessions to that target. The way you phrase Step 3 here doesn't quite explain what is happening and why the word "target" is introduced. I'd like to make it a little more clear that a Target is Crunch's notion of a place to put data and the CrunchDatasets utility method is returning the given dataset (by URI) as a Target for Crunch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is inline commentary for the example code. The tutorial, https://github.com/kite-sdk/kite-docs/compare/addCrunchExample, sets the context for demonstrating the utility methods for setting the source and the target. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even with discussion elsewhere, this sentence should make sense on its own. I agree that discussion elsewhere can provide the definition of a Target, but I think this still needs to be rephrased. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a difference between being precise and being clear. This description uses anchor words from the code to say that the session records are appended to the sessions dataset. The description is concise and understandable, and suitable for inline commentary. |
||
Target.WriteMode.APPEND); | ||
|
||
return run().succeeded() ? 0 : 1; | ||
} | ||
|
||
private static class GetSessionKey extends MapFn<StandardEvent, String> { | ||
@Override | ||
public String map(StandardEvent event) { | ||
// Create a key by combining the session id and user id | ||
return event.getSessionId() + event.getUserId(); | ||
} | ||
} | ||
|
||
private static class MakeSession | ||
extends DoFn<Pair<String, Iterable<StandardEvent>>, Session> { | ||
|
||
// The process method iterates through a group of events that have | ||
// the same sessionKey. | ||
@Override | ||
public void process( | ||
Pair<String, Iterable<StandardEvent>> keyAndEvents, | ||
Emitter<Session> emitter) { | ||
final Iterator<StandardEvent> events = keyAndEvents.second().iterator(); | ||
if (!events.hasNext()) { | ||
return; | ||
} | ||
|
||
// Initialize the values needed to create a session object for | ||
// this group. | ||
final StandardEvent firstEvent = events.next(); | ||
long startTime = firstEvent.getTimestamp(); | ||
long endTime = firstEvent.getTimestamp(); | ||
int numEvents = 1; | ||
|
||
// Inspect each event in this session group. Track the earliest | ||
// timestamp (start time) and latest timestamp (end time). Keep a | ||
// count of the events in this session group. | ||
while (events.hasNext()) { | ||
final StandardEvent event = events.next(); | ||
|
||
// Reset the start time if the timestamp is earlier than the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The word reset isn't quite appropriate here. It would be better to say "if the current timestamp is before the earliest seen so far, update the earliest." There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The existing sentence is less awkward than the suggested replacement, and can be understood easily in context. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to have a place for the tutorial code separate from the current examples (even if it's a subdirectory of the current examples). As you say, adding this at the top level could be confusing. Whatever structure you want to use is fine with me, so long as I can start pointing to an unambiguous location for tutorial code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The word reset should be replaced with something more clear, however you want to phrase it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See discussion here regarding the kite-examples/kite-tutorials directory structure. https://issues.cloudera.org/browse/CDK-774 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reviewed the use of "reset" with our editor, Cris Morris, who feels that the word reset is clear in this context. |
||
// current start time value. | ||
startTime = Math.min(startTime, event.getTimestamp()); | ||
|
||
// Reset the end time if the timestamp is later then the current | ||
// end time value. | ||
endTime = Math.max(endTime, event.getTimestamp()); | ||
|
||
// Keep a count of the events. | ||
numEvents += 1; | ||
} | ||
|
||
// Create a session. Use values from the first event in the group | ||
// for fields that don't change. | ||
emitter.emit(Session.newBuilder() | ||
.setUserId(firstEvent.getUserId()) | ||
.setSessionId(firstEvent.getSessionId()) | ||
.setIp(firstEvent.getIp()) | ||
.setStartTimestamp(startTime) | ||
.setDuration(endTime - startTime) | ||
.setSessionEventCount(numEvents) | ||
.build()); | ||
} | ||
} | ||
|
||
public static void main(String... args) throws Exception { | ||
int rc = ToolRunner.run(new AggregateEvents(), args); | ||
System.exit(rc); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a standalone project, it shouldn't rely on the demo example as a parent. This should instead rely directly on the Kite parent POM.