-
Notifications
You must be signed in to change notification settings - Fork 3
/
particle.py
107 lines (96 loc) · 4.47 KB
/
particle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
class Particle:
def __init__(self, n_clusters, data, use_kmeans=True, w=0.72, c1=1.49, c2=1.49):
self.n_clusters = n_clusters
if use_kmeans:
k_means = KMeans(n_clusters=self.n_clusters)
k_means.fit(data)
self.centroids_pos = k_means.cluster_centers_
else:
self.centroids_pos = data[np.random.choice(list(range(len(data))), self.n_clusters)]
# each cluster has a centroid which is the point that represents that cluster
# assign k random data to k centroids
self.pb_val = np.inf
# personal best position for all the centroids so far
self.pb_pos = self.centroids_pos.copy()
self.velocity = np.zeros_like(self.centroids_pos)
# best data clustering so far
self.pb_clustering = None
# pso params
self.w = w
self.c1 = c1
self.c2 = c2
def update_pb(self, data: np.ndarray):
"""
Updates personal best score based on the fitness function mentioned in the paper (Equation(4))
:return:
"""
# finding out which data points belongs to which cluster
# for that we have to find the distance between the data points and the centroids.
distances = self._get_distances(data=data)
# the minimum distance between a data point and a centroid indicates that, the point belongs to that cluster.
clusters = np.argmin(distances, axis=0) # shape: (len(data),)
clusters_ids = np.unique(clusters)
# This is for when the algorithm generates less than n clusters
# So we find the cluster id that is missed and generate the centroid position with a random data point
while len(clusters_ids) != self.n_clusters:
deleted_clusters = np.where(np.isin(np.arange(self.n_clusters), clusters_ids) == False)[0]
self.centroids_pos[deleted_clusters] = data[np.random.choice(list(range(len(data))), len(deleted_clusters))]
distances = self._get_distances(data=data)
clusters = np.argmin(distances, axis=0)
clusters_ids = np.unique(clusters)
new_val = self._fitness_function(clusters=clusters, distances=distances)
if new_val < self.pb_val:
self.pb_val = new_val
self.pb_pos = self.centroids_pos.copy()
self.pb_clustering = clusters.copy()
def update_velocity(self, gb_pos: np.ndarray):
"""
Updates new velocity based on the current velocity, personal best position so far, and the swarm (global) best
position so far.
:param gb_pos: vector of best centroid positions among all particles so far
:return:
"""
self.velocity = self.w * self.velocity + \
self.c1 * np.random.random() * (self.pb_pos - self.centroids_pos) + \
self.c2 * np.random.random() * (gb_pos - self.centroids_pos)
def move_centroids(self, gb_pos):
self.update_velocity(gb_pos=gb_pos)
new_pos = self.centroids_pos + self.velocity
self.centroids_pos = new_pos.copy()
def _get_distances(self, data: np.ndarray) -> np.ndarray:
"""
Calculates the Euclidean distance between data and centroids
:param data:
:return: distances: a numpy array of distances (len(centroids) x len(data))
"""
distances = []
for centroid in self.centroids_pos:
# calculate euclidean distance --> square root of sum of absolute squares
d = np.linalg.norm(data - centroid, axis=1)
distances.append(d)
distances = np.array(distances)
return distances
def _fitness_function(self, clusters: np.ndarray, distances: np.ndarray) -> float:
"""
Calculates the fitness function ( Equation 4)
i is the index of particle
j is the index of clusters in the particle i
p is the vector of the input data indices belonging the cluster[ij]
z[p] is the vector of the input data belonging the cluster[ij]
d is a vector of distances between z(p) and centroid j
:param clusters:
:param distances:
:return: J:
"""
J = 0.0
for i in range(self.n_clusters):
p = np.where(clusters == i)[0]
if len(p):
d = sum(distances[i][p])
d /= len(p)
J += d
J /= self.n_clusters
return J