Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZMQError: Operation cannot be accomplished in current state #29

Closed
astariul opened this issue Nov 20, 2018 · 9 comments
Closed

ZMQError: Operation cannot be accomplished in current state #29

astariul opened this issue Nov 20, 2018 · 9 comments

Comments

@astariul
Copy link
Contributor

astariul commented Nov 20, 2018

When running client side, I encounter this error :
zmq.error.ZMQError: Operation cannot be accomplished in current state

Any idea of how to solve this ? Thanks !


On server side, everything seems fine :

I:WORKER-0:[ser:run:234]:ready and listening
I:WORKER-0:[ser:gen:253]:received 64 from b'6bbd50cb-b7e1-46b0-b14f-f3e0511c85aa'
I:WORKER-0:[ser:run:242]:job b'6bbd50cb-b7e1-46b0-b14f-f3e0511c85aa' samples: 64 done: 10.66s
I:SINK:[ser:run:175]:received 64 of client b'6bbd50cb-b7e1-46b0-b14f-f3e0511c85aa' (64/64)
I:SINK:[ser:run:183]:client b'6bbd50cb-b7e1-46b0-b14f-f3e0511c85aa' 64 samples are done! sending back to client

Full stack :

File "train.py", line 175, in bert_embed
    embeddings = bert_client.encode(sentences)
  File "/home/remondn/workspace/Siamese_BERT/resources/BERT_Service/service/client.py", line 51, in encode
    self.socket.send_pyobj(texts)
  File "/home/remondn/.local/lib/python3.5/site-packages/zmq/sugar/socket.py", line 603, in send_pyobj
    return self.send(msg, flags=flags, **kwargs)
  File "/home/remondn/.local/lib/python3.5/site-packages/zmq/sugar/socket.py", line 392, in send
    return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
  File "zmq/backend/cython/socket.pyx", line 725, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 772, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 247, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/socket.pyx", line 242, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation cannot be accomplished in current state
@astariul
Copy link
Contributor Author

This is due to multi-threading. Without multi-threading on client-side, this error does not appear anymore.


Note :
I put bc.encode(...) on a Keras generator. By default, Keras fit_generator have the argument multi-threading to False. However this is not enough, you need to also set the argument workers to 0 (default to 1), or several request will be done at the same time, resulting in above error.

@hanxiao
Copy link
Member

hanxiao commented Nov 21, 2018

hi, just fyi, the client-side supports multi-thread/process. that’s part of the design principle. It’s just that you can’t reuse a BertClient in multiple threads/processes. For example:

BAD

bc = BertClient()

# in Proc1/Thread1 scope:
bc.encode(lst_str)

# in Proc2/Thread2 scope:
bc.encode(lst_str)

Instead, please do:

GOOD

# in Proc1/Thread1 scope:
bc1 = BertClient()
bc1.encode(lst_str)

# in Proc2/Thread2 scope:
bc2 = BertClient()
bc2.encode(lst_str)

@hanxiao
Copy link
Member

hanxiao commented Nov 21, 2018

i think this is a quite common issue, will add this to faq

@astariul
Copy link
Contributor Author

Thanks for the clarification.

One last question :
If I declare the BertClient inside my generator, it will allow multi-threading. But by doing so, it means that a single thread will redeclare a new instance of BertClient every batch.

Which one is better ?

  • Declare new BertClient every batch, but allowing multi-threading ?
  • Declare just one shared BertClient among all bacthes, not allowing multi-threading ?

hanxiao pushed a commit that referenced this issue Nov 21, 2018
@hanxiao
Copy link
Member

hanxiao commented Nov 27, 2018

@colanim I believe this is close to what you need https://github.com/hanxiao/bert-as-service/blob/afe66168b45053e7052a102c396e8e2e70e1a744/example4.py#L33-L37

In this example, bert-as-service is used within tf.data, which I'm currently working on, therefore in a separate branch.

The trick is making multiple BertClient in a row; each time take one, do encoding, then put it back; in this way your tf.data.map() can leverage num_parrallel_calls and encode strings very efficiently.

Note that there is a pending bug when making multiple BertClient in a row, as I wrote in #60 . for now you can simply turn off show_server_info as a work-round.

@hanxiao hanxiao reopened this Nov 27, 2018
@astariul
Copy link
Contributor Author

So, if I understand well, I need to work with a pool of BertClient ?

@hanxiao
Copy link
Member

hanxiao commented Nov 28, 2018

yes, BertClient.encode() is not thread-safe, thus you need to make a pool of BertClient and reuse them one by one.

@astariul
Copy link
Contributor Author

astariul commented Nov 28, 2018

I did as you said, and it works like a charm !


For those who are interested, here is my code in Keras for multi-threaded.

The Sequence (documentation) : (class acting like a generator, but easier for multi-threading)

class PairSentenceSequence(Sequence):
    """ Please refer to : https://keras.io/utils/#sequence """

    def __init__(self, dataset, batch_size, bert_clients):
        self.dataset = dataset
        self.batch_size = batch_size
        self.bc = bert_clients

    def __len__(self):
        sample_per_epoch = len(self.dataset)
        batch_nb = int(sample_per_epoch / self.batch_size) + 1
        return batch_nb

    def __getitem__(self, idx):
        # Extract from dataset the data for this batch
        batch_data = self.dataset[idx * self.batch_size:(idx + 1) * self.batch_size]

        # Extract the sentences and construct the sentence pair
        pair_sen = ["{} ||| {}".format(data.doc_1, data.doc_2) for data in batch_data]

        # Sentence embeddings using BERT service 
        bert_client = self.bc.get()     # Wait a BertClient to be available
        sen_emb = bert_client.encode(pair_sen)
        self.bc.put(bert_client)        # Put it back after use

        return np.array(sen_emb), np.array([data.label for data in batch_data])

As you can see, before calling the encode method, I extract one BertClient from the queue (if it is in the queue it means it's available). After finishing using it, I put it back in the queue, so it is available to other threads.

In my training script, I simply create a Queue of BertClient (among other things like retrieving the dataset):

workers = 4
bert_clients = queue.Queue()
    for _ in range(workers):
        bert_clients.put(bert_service_lib.BertClient(show_server_config=False))

And the actual training :

train_seq = PairSentenceSequence(dataset=train_data,
            batch_size=args.batch_size, bert_clients=bert_clients)
dev_seq = PairSentenceSequence(dataset=dev_data,  
            batch_size=args.batch_size, bert_clients=bert_clients)

history = model.fit_generator(generator=train_seq, epochs=args.epochs, 
        callbacks=callbacks, validation_data=dev_seq, workers=workers)

@hanxiao
Copy link
Member

hanxiao commented Jan 18, 2019

fyi, from 1.7.3 you can directly use ConcurrentBertClient instead of pop and append manually. see
https://github.com/hanxiao/bert-as-service/blob/827d2742e4b107e6c25ae552aa24219095133712/example/example4.py#L29-L38

Again pip install -U bert-serving-server bert-serving-client for the update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants