forked from naturomics/CapsNet-Tensorflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
capsLayer.py
186 lines (156 loc) · 8.27 KB
/
capsLayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
License: Apache-2.0
Author: Huadong Liao
E-mail: naturomics.liao@gmail.com
"""
import numpy as np
import tensorflow as tf
from config import cfg
epsilon = 1e-9
class CapsLayer(object):
''' Capsule layer.
Args:
input: A 4-D tensor.
num_outputs: the number of capsule in this layer.
vec_len: integer, the length of the output vector of a capsule.
layer_type: string, one of 'FC' or "CONV", the type of this layer,
fully connected or convolution, for the future expansion capability
with_routing: boolean, this capsule is routing with the
lower-level layer capsule.
Returns:
A 4-D tensor.
'''
def __init__(self, num_outputs, vec_len, with_routing=True, layer_type='FC'):
self.num_outputs = num_outputs
self.vec_len = vec_len
self.with_routing = with_routing
self.layer_type = layer_type
def __call__(self, input, kernel_size=None, stride=None):
'''
The parameters 'kernel_size' and 'stride' will be used while 'layer_type' equal 'CONV'
'''
if self.layer_type == 'CONV':
self.kernel_size = kernel_size
self.stride = stride
if not self.with_routing:
# the PrimaryCaps layer, a convolutional layer
# input: [batch_size, 20, 20, 256]
assert input.get_shape() == [cfg.batch_size, 20, 20, 256]
'''
# version 1, computational expensive
capsules = []
for i in range(self.vec_len):
# each capsule i: [batch_size, 6, 6, 32]
with tf.variable_scope('ConvUnit_' + str(i)):
caps_i = tf.contrib.layers.conv2d(input, self.num_outputs,
self.kernel_size, self.stride,
padding="VALID", activation_fn=None)
caps_i = tf.reshape(caps_i, shape=(cfg.batch_size, -1, 1, 1))
capsules.append(caps_i)
assert capsules[0].get_shape() == [cfg.batch_size, 1152, 1, 1]
capsules = tf.concat(capsules, axis=2)
'''
# version 2, equivalent to version 1 but higher computational
# efficiency.
# NOTE: I can't find out any words from the paper whether the
# PrimaryCap convolution does a ReLU activation or not before
# squashing function, but experiment show that using ReLU get a
# higher test accuracy. So, which one to use will be your choice
capsules = tf.contrib.layers.conv2d(input, self.num_outputs * self.vec_len,
self.kernel_size, self.stride, padding="VALID",
activation_fn=tf.nn.relu)
# capsules = tf.contrib.layers.conv2d(input, self.num_outputs * self.vec_len,
# self.kernel_size, self.stride,padding="VALID",
# activation_fn=None)
capsules = tf.reshape(capsules, (cfg.batch_size, -1, self.vec_len, 1))
# [batch_size, 1152, 8, 1]
capsules = squash(capsules)
assert capsules.get_shape() == [cfg.batch_size, 1152, 8, 1]
return(capsules)
if self.layer_type == 'FC':
if self.with_routing:
# the DigitCaps layer, a fully connected layer
# Reshape the input into [batch_size, 1152, 1, 8, 1]
self.input = tf.reshape(input, shape=(cfg.batch_size, -1, 1, input.shape[-2].value, 1))
with tf.variable_scope('routing'):
# b_IJ: [batch_size, num_caps_l, num_caps_l_plus_1, 1, 1],
# about the reason of using 'batch_size', see issue #21
b_IJ = tf.constant(np.zeros([cfg.batch_size, input.shape[1].value, self.num_outputs, 1, 1], dtype=np.float32))
capsules = routing(self.input, b_IJ)
capsules = tf.squeeze(capsules, axis=1)
return(capsules)
def routing(input, b_IJ):
''' The routing algorithm.
Args:
input: A Tensor with [batch_size, num_caps_l=1152, 1, length(u_i)=8, 1]
shape, num_caps_l meaning the number of capsule in the layer l.
Returns:
A Tensor of shape [batch_size, num_caps_l_plus_1, length(v_j)=16, 1]
representing the vector output `v_j` in the layer l+1
Notes:
u_i represents the vector output of capsule i in the layer l, and
v_j the vector output of capsule j in the layer l+1.
'''
# W: [num_caps_i, num_caps_j, len_u_i, len_v_j]
W = tf.get_variable('Weight', shape=(1, 1152, 10, 8, 16), dtype=tf.float32,
initializer=tf.random_normal_initializer(stddev=cfg.stddev))
# Eq.2, calc u_hat
# do tiling for input and W before matmul
# input => [batch_size, 1152, 10, 8, 1]
# W => [batch_size, 1152, 10, 8, 16]
input = tf.tile(input, [1, 1, 10, 1, 1])
W = tf.tile(W, [cfg.batch_size, 1, 1, 1, 1])
assert input.get_shape() == [cfg.batch_size, 1152, 10, 8, 1]
# in last 2 dims:
# [8, 16].T x [8, 1] => [16, 1] => [batch_size, 1152, 10, 16, 1]
# tf.scan, 3 iter, 1080ti, 128 batch size: 10min/epoch
# u_hat = tf.scan(lambda ac, x: tf.matmul(W, x, transpose_a=True), input, initializer=tf.zeros([1152, 10, 16, 1]))
# tf.tile, 3 iter, 1080ti, 128 batch size: 6min/epoch
u_hat = tf.matmul(W, input, transpose_a=True)
assert u_hat.get_shape() == [cfg.batch_size, 1152, 10, 16, 1]
# In forward, u_hat_stopped = u_hat; in backward, no gradient passed back from u_hat_stopped to u_hat
u_hat_stopped = tf.stop_gradient(u_hat, name='stop_gradient')
# line 3,for r iterations do
for r_iter in range(cfg.iter_routing):
with tf.variable_scope('iter_' + str(r_iter)):
# line 4:
# => [batch_size, 1152, 10, 1, 1]
c_IJ = tf.nn.softmax(b_IJ, dim=2)
# At last iteration, use `u_hat` in order to receive gradients from the following graph
if r_iter == cfg.iter_routing - 1:
# line 5:
# weighting u_hat with c_IJ, element-wise in the last two dims
# => [batch_size, 1152, 10, 16, 1]
s_J = tf.multiply(c_IJ, u_hat)
# then sum in the second dim, resulting in [batch_size, 1, 10, 16, 1]
s_J = tf.reduce_sum(s_J, axis=1, keep_dims=True)
assert s_J.get_shape() == [cfg.batch_size, 1, 10, 16, 1]
# line 6:
# squash using Eq.1,
v_J = squash(s_J)
assert v_J.get_shape() == [cfg.batch_size, 1, 10, 16, 1]
elif r_iter < cfg.iter_routing - 1: # Inner iterations, do not apply backpropagation
s_J = tf.multiply(c_IJ, u_hat_stopped)
s_J = tf.reduce_sum(s_J, axis=1, keep_dims=True)
v_J = squash(s_J)
# line 7:
# reshape & tile v_j from [batch_size ,1, 10, 16, 1] to [batch_size, 1152, 10, 16, 1]
# then matmul in the last tow dim: [16, 1].T x [16, 1] => [1, 1], reduce mean in the
# batch_size dim, resulting in [1, 1152, 10, 1, 1]
v_J_tiled = tf.tile(v_J, [1, 1152, 1, 1, 1])
u_produce_v = tf.matmul(u_hat_stopped, v_J_tiled, transpose_a=True)
assert u_produce_v.get_shape() == [cfg.batch_size, 1152, 10, 1, 1]
# b_IJ += tf.reduce_sum(u_produce_v, axis=0, keep_dims=True)
b_IJ += u_produce_v
return(v_J)
def squash(vector):
'''Squashing function corresponding to Eq. 1
Args:
vector: A tensor with shape [batch_size, 1, num_caps, vec_len, 1] or [batch_size, num_caps, vec_len, 1].
Returns:
A tensor with the same shape as vector but squashed in 'vec_len' dimension.
'''
vec_squared_norm = tf.reduce_sum(tf.square(vector), -2, keep_dims=True)
scalar_factor = vec_squared_norm / (1 + vec_squared_norm) / tf.sqrt(vec_squared_norm + epsilon)
vec_squashed = scalar_factor * vector # element-wise
return(vec_squashed)