-
Notifications
You must be signed in to change notification settings - Fork 28
/
Spark Solutions.txt
41 lines (22 loc) · 2.31 KB
/
Spark Solutions.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
02.Find_sales_for_each_month.txt
#To find sales for each month, we need to join orders & order_items dataset
#order_items dataset has sales for each order, orders dataset having order_date
#To find month, we can use substring to get year & month
orders = spark.read.format('com.databricks.spark.avro').load('/user/sanchit089/retail_db/orders/*.avro')
#Converting date format from unix to utc.
orders = orders.withColumn('order_date_converted' , from_unixtime(orders.order_date/1000.0))
order_items = spark.read.parquet('/user/sanchit089/retail_db/order_items/*.parquet')
#Joining two dataframes
orders_join = orders.join(order_items, orders.order_id == order_items.order_item_order_id).select('order_id','order_status','order_date_converted','order_item_subtotal')
sales = orders_join.groupBy(year(orders_join.order_date_converted).alias('year'), month(orders_join.order_date_converted).alias('month')).agg({"order_item_subtotal" : "sum"})
#rounding the sales value and renaming the column.
sales = sales.withColumnRenamed("sum(order_item_subtotal)",'total_sales')
sales = sales.withColumn('total_sales', round(sales.total_sales,2))
#To check the compression codec for parquet in spark
spark.conf.get('spark.sql.parquet.compression.codec')
#setting the compression to none
sales.write.parquet('/user/sanchit089/output/sales', compression = 'snappy')
productfilter.groupBy('product_category_id').agg({'product_price':'max', 'product_id':'count', 'product_price':'mean', 'product_price':'min'}).show(5)
productfilter.groupBy('product_category_id').agg(max(productfilter.product_price).alias('max'), min(productfilter.product_price).alias('min'), count(productfilter.product_category_id).alias('count'), avg(productfilter.product_price).alias('mean')).show()
sqoop import --connect jdbc:mysql://nn01.itversity.com:3306/retail_import --username retail_dba --password itversity --table sanchit_products --fields-terminated-by '*' --lines-terminated-by '\n' --null-non-string -1000 --null-string 'NA' --where 'product id <= 1111' -m 2 --delete-target-dir --target-dir '/user/sanchit089/problem5/products-text-part1' --boundary-query "select min(product_id), max(product_id) from sanchit_products $CONDITION where product_id <= 1111"
sqoop export --connect jdbc:mysql://nn01.itversity.com:3306/retail_export --username retail_dba --password itversity --