-
Notifications
You must be signed in to change notification settings - Fork 2
/
spark_sourcecode.scala
executable file
·139 lines (83 loc) · 3.85 KB
/
spark_sourcecode.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// First declear alfa,eta and num_partiotions.
// package spark
import org.apache.spark.SparkContext
import scala.math._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import java.io._
import scala.io._
import util.control.Breaks._
import org.apache.spark.HashPartitioner
class RWRHNFF{
val conf = new SparkConf().setAppName("spark").setMaster("local[2]")
val sc = new SparkContext(conf)
def calculate_rwrhn(path_heternet:String,path_ranknet: String ,path_result:String): Unit={
//set number of partitions
var num_partitions= NUMBER of PARTITIONS
//set alfa parameters
var epcilon= EPCILONE
//set alfa parameters
var alfa= ALFA
var eta= ETA
//read first networks and parsed to interactions then groupbykey and partitioning base hashcode
var interactions=sc.textFile(path_heternet).map { lines =>
val parts_of_lines_of_interactions=lines.split(",")
var id2 = parts_of_lines_of_interactions(0)
var id1= parts_of_lines_of_interactions(1)
var weight= parts_of_lines_of_interactions(2)
(id2.toDouble,(id1.toDouble,weight.toDouble))}.groupByKey().partitionBy(new HashPartitioner(num_partitions))
//read ranks of nodes file and parse it
var rank= sc.textFile(path_ranknet).map { lines =>
val parts_of_line_ranks=lines.split(",")
var node=parts_of_line_ranks(0)
var rank=parts_of_line_ranks(1)
(node.toDouble,rank.toDouble)}
//join two up rdd for create ranks rdd and avoid shuffle in join inside while loop,(note:this join will does once)
val join_two_rdd = interactions.join(rank)
var ranks=join_two_rdd.map{case(id,(iterable,rank))=>(id,rank)}
//read ranks file again,(note: beacuse this file use for exit of loop)
var ranks_again= sc.textFile(path_ranknet).map { lines =>
val parts_of_line_ranks=lines.split(",")
var node=parts_of_line_ranks(0)
var rank=parts_of_line_ranks(1)
(node.toDouble,rank.toDouble)}.groupByKey()
//start loop for calculate ranks of nodes!
while(true){
var befor_setion_ranks=ranks
//join
val contribs = interactions.join(ranks).values.flatMap{ case (iterable, rank) =>
iterable.map{case(id,weight)=>(id,weight*rank)}}
//update rank of each node
ranks = contribs.reduceByKey(_ + _).mapValues(alfa * _)
//sum ranks updated with initioal ranks
var update_ranks=ranks_again.join(ranks).mapValues{
case(rank_ini,rank_updated)=>
rank_ini.max*eta+rank_updated}
//final ranks
ranks=update_ranks.reduceByKey(_+_)
//cheke foe exit()
var cheke=ranks.join(befor_setion_ranks).filter{case(id,(after_rank,befor_rank))=> (after_rank-befor_rank).abs > epcilon}
var size=cheke.count
//result
if(size <1){
val output = ranks.collect()
var file=path_result
var writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file)))
for (resul <- output) {
writer.write( resul + " ")
writer.write("\n")}
writer.close
break
}
}
}}
object RWRHN {
def main(args: Array[String]): Unit = {
// set paths of networks
var path1="path_matrix"
val path2="path_init_rankns"
var result_out="path_out"
var ob1=new RWRHNFF()
ob1.calculate_rwrhn(path1,path2,result_out)
}
}