Skip to content

Commit

Permalink
Stand out toy example & fix bugs in child threads (PaddlePaddle#219)
Browse files Browse the repository at this point in the history
* Stand out toy example & fix bugs in child threads

* Refine comments
  • Loading branch information
Yibing Liu authored Apr 14, 2020
1 parent 502f761 commit df0dff8
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 89 deletions.
2 changes: 0 additions & 2 deletions demo/pantheon/README.md

This file was deleted.

54 changes: 54 additions & 0 deletions demo/pantheon/toy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
## Toy example for Pantheon

See more details about Pantheon in [PaddleSlim/Pantheon](../../../paddleslim/pantheon).

Here implements two teacher models (not trainable, just for demo): teacher1 takes an integer **x** as input and predicts value **2x-1**, see in [run_teacher1.py](run_teacher1.py); teacher2 also takes **x** as input and predicts **2x+1**, see in [run_teacher2.py](run_teacher2.py). They two share a data reader to read a sequence of increasing natural numbers from zero to some positive inter **max_n** as input and generate different knowledge. And the schema keys for knowledge in teacher1 is [**"x", "2x-1", "result"**], and [**"2x+1", "result"**] for knowledge in teacher2, in which **"result"** is the common schema and the copy of two predictions respectively. On instantiating the **Student** object, the merging strategy for the common schema **"result"** should be specified, and the schema keys for the merged knowledge will be [**"x", "2x-1", "2x+1", "result"**], with the merged **"result"** equal to **"2x"** when the merging strategy is **"mean"** and **"4x"** when merging strategy is **"sum"**. The student model gets merged knowledge from teachers and prints them out, see in [run_student.py](run_student.py).

The toy "knowledge distillation" system can be launched in three different modes, i.e., offline, online and their hybrid. All three modes should have the same outputs, and the correctness of results can be verified by checking the order and values of outputs.

### Offline

The two teachers work in offline mode, and start them with given local file paths.

```shell
export PYTHONPATH=../../../:$PYTHONPATH
export CUDA_VISIBLE_DEVICES=0,1
export NUM_POSTPROCESS_THREADS=10 # default 8
nohup python -u run_teacher1.py --use_cuda true --out_path teacher1_offline.dat > teacher1_offline.log 2>&1&
export CUDA_VISIBLE_DEVICES=2
nohup python -u run_teacher2.py --use_cuda true --out_path teacher2_offline.dat > teacher2_offline.log 2>&1&
```
After the two executions both finished, start the student model with the two generated knowledge files.

```shell
export PYTHONPATH=../../../:$PYTHONPATH
python -u run_student.py \
--in_path0 teacher1_offline.dat \
--in_path1 teacher2_offline.dat
```


### Online

The two teachers work in online mode, and start them with given TCP/IP ports. Please make sure that the ICP/IP ports are available.

```shell
export PYTHONPATH=../../../:$PYTHONPATH
export CUDA_VISIBLE_DEVICES=0
nohup python -u run_teacher1.py --use_cuda true --out_port 8080 > teacher1_online.log 2>&1&
export CUDA_VISIBLE_DEVICES=1,2
nohup python -u run_teacher2.py --use_cuda true --out_port 8081 > teacher2_online.log 2>&1&
```
Start the student model with the IP addresses that can reach the ports of the two teacher models, e.g., in the same node

```shell
export PYTHONPATH=../../../:$PYTHONPATH
python -u run_student.py \
--in_address0 127.0.0.1:8080 \
--in_address1 127.0.0.1:8081 \
```
**Note:** in online mode, the starting order of teachers and the sudent doesn't matter, and they will wait for each other to establish connection.
### Hybrid of offline and online
One teacher works in offline mode and another one works in online mode. This time, start the offline teacher first. After the offline knowledge file gets well prepared, start the online teacher and the student at the same time.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
55 changes: 3 additions & 52 deletions paddleslim/pantheon/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,57 +199,8 @@ data_generator = student.get_knowledge_generator(
batch_size=32, drop_last=False)
```

### Example
## Examples

Here provide a toy example to show how the knowledge data is transferred from teachers to the student model and merged.
### Toy Example

In the directory [demo/pantheon/](../../demo/pantheon/), there implement two teacher models (not trainable, just for demo): teacher1 takes an integer **x** as input and predicts value **2x-1**, see in [run_teacher1.py](../../demo/pantheon/run_teacher1.py); teacher2 also takes **x** as input and predicts **2x+1**, see in [run_teacher2.py](../../demo/pantheon/run_teacher2.py). They two share a data reader to read a sequence of increasing natural numbers from zero to some positive inter **max_n** as input and generate different knowledge. And the schema keys for knowledge in teacher1 is [**"x", "2x-1", "result"**], and [**"2x+1", "result"**] for knowledge in teacher2, in which **"result"** is the common schema and the copy of two predictions respectively. On instantiating the **Student** object, the merging strategy for the common schema **"result"** should be specified, and the schema keys for the merged knowledge will be [**"x", "2x-1", "2x+1", "result"**], with the merged **"result"** equal to **"2x"** when the merging strategy is **"mean"** and **"4x"** when merging strategy is **"sum"**. The student model gets merged knowledge from teachers and prints them out, see in [run_student.py](../../demo/pantheon/run_student.py).

The toy "knowledge distillation" system can be launched in three different modes, i.e., offline, online and their hybrid. All three modes should have the same outputs, and the correctness of results can be verified by checking the order and values of outputs.

1) **Offline**

The two teachers work in offline mode, and start them with given local file paths.

```shell
export PYTHONPATH=../../:$PYTHONPATH
export CUDA_VISIBLE_DEVICES=0,1
export NUM_POSTPROCESS_THREADS=10 # default 8
nohup python -u run_teacher1.py --use_cuda true --out_path teacher1_offline.dat > teacher1_offline.log 2>&1&
export CUDA_VISIBLE_DEVICES=2
nohup python -u run_teacher2.py --use_cuda true --out_path teacher2_offline.dat > teacher2_offline.log 2>&1&
```
After the two executions both finished, start the student model with the two generated knowledge files.

```shell
export PYTHONPATH=../../:$PYTHONPATH
python -u run_student.py \
--in_path0 teacher1_offline.dat \
--in_path1 teacher2_offline.dat
```


2) **Online**

The two teachers work in online mode, and start them with given TCP/IP ports. Please make sure that the ICP/IP ports are available.

```shell
export PYTHONPATH=../../:$PYTHONPATH
export CUDA_VISIBLE_DEVICES=0
nohup python -u run_teacher1.py --use_cuda true --out_port 8080 > teacher1_online.log 2>&1&
export CUDA_VISIBLE_DEVICES=1,2
nohup python -u run_teacher2.py --use_cuda true --out_port 8081 > teacher2_online.log 2>&1&
```
Start the student model with the IP addresses that can reach the ports of the two teacher models, e.g., in the same node

```shell
export PYTHONPATH=../../:$PYTHONPATH
python -u run_student.py \
--in_address0 127.0.0.1:8080 \
--in_address1 127.0.0.1:8081 \
```
**Note:** in online mode, the starting order of teachers and the sudent doesn't matter, and they will wait for each other to establish connection.
3) **Hybrid of offline and online**
One teacher works in offline mode and another one works in online mode. This time, start the offline teacher first. After the offline knowledge file gets well prepared, start the online teacher and the student at the same time.
A toy example is provied to show how the knowledge data is transferred from teachers to the student model and merged, including offline, online modes and their hybrid. See [demo/pantheon/toy](../../demo/pantheon/toy).
46 changes: 27 additions & 19 deletions paddleslim/pantheon/student.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ def register_teacher(self, in_path=None, in_address=None):
manager = BaseManager(
address=(ip, int(port)), authkey=public_authkey.encode())

# Wait for teacher model started to establish connection
print("Connecting to {}, with public key {} ...".format(
in_address, public_authkey))
# Wait for teacher model started to establish connection
while True:
try:
manager.connect()
Expand All @@ -122,27 +122,37 @@ def merge(knowledge_queues):

def receive(queue, local_queue):
while True:
data = queue.get()
queue.task_done()
local_queue.put(data)
if isinstance(data, EndSignal):
try:
data = queue.get()
queue.task_done()
local_queue.put(data)
except EOFError:
break

knowledge_queue = Queue.Queue(100)

def gather(local_queues, knowledge_queue):
num = len(local_queues)
end_received = False
end_received = [0] * num
while True:
for i in range(num):
data = local_queues[i].get()
local_queues[i].task_done()
if isinstance(data, SyncSignal) and i > 0:
continue
elif isinstance(data, EndSignal):
end_received = True
knowledge_queue.put(data)
if end_received:
try:
for i in range(num):
data = local_queues[i].get()
local_queues[i].task_done()

if isinstance(data, SyncSignal):
if i == 0:
knowledge_queue.put(data)
elif isinstance(data, EndSignal):
end_received[i] = 1
if i == 0:
knowledge_queue.put(data)
if sum(end_received) == num:
end_received = [0] * num
break
else:
knowledge_queue.put(data)
except EOFError:
break

# threads to receive knowledge from the online teacher
Expand Down Expand Up @@ -419,7 +429,6 @@ def get_knowledge_generator(self, batch_size, drop_last=False):
"Return None.")
return None
self._is_knowledge_gen_locked = True

self.get_knowledge_desc()

def split_batch(batch, num):
Expand Down Expand Up @@ -536,17 +545,17 @@ def data_receiver(queue):
queue.put(StartSignal())
queue.join()

# launch threads to listen on all knowledge queues
local_queues = [Queue.Queue(100) for i in range(self._num_teachers)]
# launch threads to listen on all knowledge queues
for i in range(self._num_teachers):
listen_thread = Thread(
target=listen,
args=(self._teacher_knowledge_queues[i], local_queues[i]))
listen_thread.dameon = True
listen_thread.start()

# launch threads to make new batch for student
med_queues = [Queue.Queue(100) for i in range(self._num_teachers)]
# launch threads to make new batch for student
for i in range(self._num_teachers):
listen_thread = Thread(
target=make_new_batch,
Expand All @@ -560,7 +569,6 @@ def data_receiver(queue):
merge_thread.dameon = True
merge_thread.start()

# yield knowledge data
def wrapper():
while True:
knowledge = self._knowledge_queue.get()
Expand Down
33 changes: 17 additions & 16 deletions paddleslim/pantheon/teacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def func():
else:
for q in self._local_in_queues:
q.put(EndSignal())
break

t = Thread(target=func)
t.daemon = True
Expand All @@ -138,13 +139,18 @@ def _run(self, worker, args):

def _gather(self):
def func():
end_received = False
while True:
for idx, q in enumerate(self._local_out_queues):
data = q.get()
q.task_done()
if isinstance(data, EndSignal) and idx > 0:
continue
if isinstance(data, EndSignal):
end_received = True
if idx > 0:
continue
self._out_queue.put(data)
if end_received:
break

t = Thread(target=func)
t.daemon = True
Expand Down Expand Up @@ -539,6 +545,8 @@ def know_maker(in_queue, out_queue, use_fp16):
else:
# forward other types of data directly (maybe knowledge desc or EndSignal)
out_queue.put(data)
if isinstance(data, EndSignal):
break

know_make_queue = Queue.Queue(self._buf_size)
if self._out_file:
Expand Down Expand Up @@ -569,8 +577,6 @@ def offline_write(queue):
know_make_queue,
self._knowledge_queues)

make_knowledge(worker=know_maker, args=(self._use_fp16, ))

compiled_program = fluid.compiler.CompiledProgram(
self._program).with_data_parallel()

Expand All @@ -579,14 +585,15 @@ def offline_write(queue):
" Teacher begins to serve ...")

data_reader = MixedDataReader(data_loader, dev_count)
# For online mode, send knowledge description every time
for repeated in range(self._times):
make_knowledge(worker=know_maker, args=(self._use_fp16, ))
if self._knowledge_queues:
# wait for the accessing of knowledge desc and data
while True:
if self._sync_required:
for q in self._knowledge_queues:
q.put(SyncSignal())
# For online mode, send knowledge description every sync
know_make_queue.put(self._knowledge_desc)
self._sync_required = False
if self._data_required:
Expand All @@ -601,17 +608,11 @@ def offline_write(queue):
data_reader.multi_dev_generator()):
if self._sync_required:
break
tic = time.time()
outputs = self._exe.run(compiled_program,
feed=dev_batches,
fetch_list=schema_in_fetch_vars)
toc = time.time()
print("teacher predict time = {}".format(toc - tic))
know_make_queue.put((dev_batches, outputs))
#out_buf_queue.put(know)
tic = time.time()

print("teacher out time = {}".format(tic - toc))
num_batches_sent += dev_count
if num_batches_sent % (100 * dev_count) == 0:
log = "Processed {} batch samples.".format(
Expand Down Expand Up @@ -641,18 +642,18 @@ def offline_write(queue):
outputs = copy.deepcopy(output)
if dev_batches or outputs:
know_make_queue.put((dev_batches, outputs))
#out_buf_queue.put(know)
num_batches_sent += (index + 1)

print("Processed {} batch samples in total.".format(
num_batches_sent))

know_make_queue.put(EndSignal())
know_make_queue.join()

if self._knowledge_queues:
for q in self._knowledge_queues:
q.join()
if self._knowledge_queues:
for q in self._knowledge_queues:
q.join()
if self._out_file:
offline_write_queue.join()
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) +
" Teacher ends serving.")

Expand Down

0 comments on commit df0dff8

Please sign in to comment.