-
Notifications
You must be signed in to change notification settings - Fork 1
/
rentabilidad_indice.py
executable file
·60 lines (45 loc) · 1.98 KB
/
rentabilidad_indice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import lit,create_map
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.window import Window
from itertools import chain
import sys
sc = pyspark.SparkContext("local[*]")
sqlContext = pyspark.sql.SQLContext(sc)
print('Contexto creado')
# Recogemos parametros
INPUT_CSV = sys.argv[1] # 'gs://financials-data-bucket/data/GSPC-2.csv'
OUTPUT_CSV = sys.argv[2] # gs://financials-data-bucket/data/2_staging/index_anuals.csv
indicesDF = sqlContext.read.format('csv') \
.options(header='true', inferSchema='true') \
.load(INPUT_CSV)
print(INPUT_CSV + ' fichero de entrada leido')
indicesDF = indicesDF.select( \
F.col('Date').alias('DATE'), \
F.col('Open').alias('OPEN'), \
F.col('High').alias('HIGH'), \
F.col('Low').alias('LOW'),\
F.col('Close').alias('CLOSE'),\
F.col('Adj Close').alias('ADJ_CLOSE'),
F.col('Volume').alias('VOLUME'))
indicesDF = indicesDF.withColumn('YEAR', F.substring('DATE', 1, 4).cast(IntegerType()))
indicesDF = indicesDF.withColumn('INDICE', lit(1))
indicesDF = indicesDF.filter("Date > '1970-01-01'").sort(F.asc('Date'))
print('Columnas creadas')
#Rentabilidad diaria
windowSpec = Window.orderBy(F.col("DATE")).rowsBetween(-1, 0)
indicesDF = indicesDF.withColumn('AUX', F.sum("ADJ_CLOSE").over(windowSpec))
indicesDF = indicesDF.withColumn("RETURNS", (F.col("ADJ_CLOSE") - (F.col("AUX")-F.col("ADJ_CLOSE"))) / (F.col("AUX")-F.col("ADJ_CLOSE"))).drop("AUX")
print('Rentabilidad diaria')
#Rentabilidad acumulada
precioIniDF = indicesDF.sort(F.desc('DATE')).groupBy('YEAR').agg(F.last('OPEN').alias('PRICE_START'))
indicesDF = indicesDF.join(precioIniDF, on=['YEAR'])
indicesDF = indicesDF.withColumn('CUMULATIVE_RETURNS', ((F.col('ADJ_CLOSE') - F.col('PRICE_START')) / (F.col('PRICE_START'))))
print('Rentabilidad acumulada')
# Escribimos fichero final
r_acuDF \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.save(OUTPUT_CSV)
print('Fin')