-
Notifications
You must be signed in to change notification settings - Fork 0
/
keyPig
37 lines (34 loc) · 2.42 KB
/
keyPig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
airtraffic = LOAD '/user/daniel/project/sqoop/airtraffic_yearly' USING PigStorage(',') AS (Year: int,Month: int,DayofMonth: int,DayOfWeek: int,DepTime: int,CRSDepTime: int,ArrTime: int,CRSArrTime: int,UniqueCarrier: chararray,FlightNum: int,TailNum:chararray,ActualElapsedTime:int,CRSElapsedTime:int,AirTime:int,ArrDelay:int,DepDelay:int,Origin:chararray,Dest:chararray,Distance: int,TaxiIn:int,TaxiOut:int,Cancelled:int,CancellationCode:chararray,Diverted:int,CarrierDelay:int,WeatherDelay:int,NASDelay:int,SecurityDelay:int,LateAircraftDelay:int);
--TOTAL AIRTIME
split_airtime = GROUP airtraffic BY AirTime;
total_airtime = FOREACH split_airtime GENERATE SUM(airtraffic.AirTime);
STORE total_airtime INTO '/user/daniel/project/pig/total_airtime';
--CANCELLED FLIGHTS
split_cancelled = GROUP airtraffic BY Cancelled;
cancelled_flights = FILTER split_cancelled BY $0>0;
STORE cancelled_flights INTO '/user/daniel/project/pig/cancelled_flights';
--JOURNEY
--Find every flights first Destination & Last destination along with the date of journey.
first_and_last_dest_ordered = ORDER airtraffic BY Year DESC, Month Desc, DayofMonth Desc, DepTime Asc;
journeyDate = FOREACH airtraffic GENERATE CONCAT(Month, '-',DayofMonth, '-', Year) as Date, Year, Dest, DepTime, TailNum;
journeyDate_ordered = ORDER journeyDate BY Date DESC, DepTime ASC;
journeyDate_countered = ORDER journeyDate BY Date DESC, DepTime DESC;
firstDest = FOREACH journeyDate_ordered {
firstDest_nested = TOP(1, 0, $1);
GENERATE FLATTEN(firstDest_nested);
};
lastDest = FOREACH journeyDate_countered {
lastDest_nested = TOP(1, 0, $1);
GENERATE FLATTEN(lastDest_nested);
};
firstDest = FOREACH firstDest GENERATE TailNum, Date as DateOfJourney, Dest as firstDestination;
lastDest = FOREACH lastDest GENERATE TailNum, Date as DateOfJourney, Dest as lastDestination;
journey = JOIN firstDest BY Date, lastDest BY Date;
STORE journey AS '/user/daniel/project/pig/journey';
dimensionJourney = JOIN journey by TailNum, airtraffic by TailNum;
dimensionJourney_filtered = FILTER dimensionJourney BY (dimensionJourney.Year > 2003)
AND (dimensionJourney.Month > 1)
AND (dimensionJourney.DayofMonth > 1);
timestamp = FOREACH dimensionJourney GENERATE ToUnixTime(ToDate(CONCAT(Date, ArrTime),'yyyy-mm-dd hh24:mi:ss', 'GMT')) AS unix_time, TailNum;
dimensionTimestamp = JOIN timestamp BY TailNum, dimensionJourney BY TailNum;
STORE dimensionTimestamp AS '/user/daniel/project/pig/dimensionTimestamp';