Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix faiss index batch_size bug on python3.7 and update es config for … #2965

Merged
merged 5 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ xpack.security.enabled: false
# 以百科城市数据为例建立 ANN 索引库
python utils/offline_ann.py --index_name baike_cities --doc_dir data/baike
```

参数含义说明
* `index_name`: 索引的名称
* `doc_dir`: txt文本数据的路径
* `host`: Elasticsearch的IP地址
* `port`: Elasticsearch的端口号
* `delete_index`: 是否删除现有的索引和数据,用于清空es的数据,默认为false


运行成功后会输出如下的日志:
```
INFO - pipelines.utils.logger - Logged parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,14 @@ python examples/question-answering/dense_qa_example.py --device cpu
整个 Web 可视化问答系统主要包含 3 大组件: 1. 基于 ElasticSearch 的 ANN 服务 2. 基于 RestAPI 构建模型服务 3. 基于 Streamlit 构建 WebUI。接下来我们依次搭建这 3 个服务并串联构成可视化的问答系统

#### 3.4.1 启动 ANN 服务
1. 参考官方文档下载安装 [elasticsearch-8.1.2](https://www.elastic.co/cn/downloads/elasticsearch) 并解压。
1. 参考官方文档下载安装 [elasticsearch-8.3.2](https://www.elastic.co/cn/downloads/elasticsearch) 并解压。
2. 启动 ES 服务
首先修改`config/elasticsearch.yml`的配置:
```
xpack.security.enabled: false
```
然后启动:

```bash
./bin/elasticsearch
```
Expand All @@ -91,10 +97,11 @@ python utils/offline_ann.py --index_name baike_cities \
--doc_dir data/baike \
--delete_index
```

参数含义说明
* `index_name`: 索引的名称
* `doc_dir`: txt文本数据的路径
* `host`: Elasticsearch的IP地址
* `port`: Elasticsearch的端口号
* `delete_index`: 是否删除现有的索引和数据,用于清空es的数据,默认为false

运行成功后会输出如下的日志:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ xpack.security.enabled: false
#### 1.4.2 文档数据写入 ANN 索引库
```
# 以DuReader-Robust 数据集为例建立 ANN 索引库
python utils/offline_ann.py --index_name dureader_robust_query_encoder --doc_dir data/dureader_robust_processed
python utils/offline_ann.py --index_name dureader_robust_query_encoder --doc_dir data/dureader_dev
```

参数含义说明
* `index_name`: 索引的名称
* `doc_dir`: txt文本数据的路径
* `host`: Elasticsearch的IP地址
* `port`: Elasticsearch的端口号
* `delete_index`: 是否删除现有的索引和数据,用于清空es的数据,默认为false


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ python examples/semantic-search/semantic_search_example.py --device cpu
#### 3.4.1 启动 ANN 服务
1. 参考官方文档下载安装 [elasticsearch-8.3.2](https://www.elastic.co/cn/downloads/elasticsearch) 并解压。
2. 启动 ES 服务
首先修改`config/elasticsearch.yml`的配置:
```
xpack.security.enabled: false
```
然后启动:
```bash
./bin/elasticsearch
```
Expand All @@ -97,6 +102,8 @@ python utils/offline_ann.py --index_name dureader_robust_query_encoder \
参数含义说明
* `index_name`: 索引的名称
* `doc_dir`: txt文本数据的路径
* `host`: Elasticsearch的IP地址
* `port`: Elasticsearch的端口号
* `delete_index`: 是否删除现有的索引和数据,用于清空es的数据,默认为false

#### 3.4.3 启动 RestAPI 模型服务
Expand All @@ -111,7 +118,12 @@ Linux 用户推荐采用 Shell 脚本来启动服务::
```bash
sh scripts/run_search_server.sh
```
启动后可以使用curl命令验证是否成功运行:

```
curl -X POST -k http://localhost:8891/query -H 'Content-Type: application/json' -d '{"query": "亚马逊河流的介绍","params": {"Retriever": {"top_k": 5}, "Ranker":{"top_k": 5}}}'

```
#### 3.4.4 启动 WebUI
```bash
# 配置模型服务地址
Expand Down Expand Up @@ -169,6 +181,23 @@ elasticsearch默认达到95%就全都设置只读,可以腾出一部分空
cluster.routing.allocation.disk.threshold_enabled: false
```

#### nltk_data加载失败的错误 `[nltk_data] Error loading punkt: [Errno 60] Operation timed out`

在命令行里面输入python,然后输入下面的命令进行下载:

```
import nltk
nltk.download('punkt')
```
如果下载还是很慢,可以手动[下载](https://github.com/nltk/nltk_data/tree/gh-pages/packages/tokenizers),然后放入本地的`~/nltk_data/tokenizers`进行解压即可。

#### 服务端运行报端口占用的错误 `[Errno 48] error while attempting to bind on address ('0.0.0.0',8891): address already in use`

```
lsof -i:8891
kill -9 PID # PID为8891端口的进程
```

## Reference
[1]Y. Sun et al., “[ERNIE 3.0: Large-scale Knowledge Enhanced Pre-training for Language Understanding and Generation](https://arxiv.org/pdf/2107.02137.pdf),” arXiv:2107.02137 [cs], Jul. 2021, Accessed: Jan. 17, 2022. [Online]. Available: http://arxiv.org/abs/2107.02137

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def semantic_search_tutorial():
embed_title=False,
)
else:
doc_dir = "data/dureader_robust_processed"
doc_dir = "data/dureader_dev"
dureader_data = "https://paddlenlp.bj.bcebos.com/applications/dureader_dev.zip"

fetch_archive_from_http(url=dureader_data, output_dir=doc_dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def write_documents(
self,
documents: Union[List[dict], List[Document]],
index: Optional[str] = None,
batch_size: int = 10_000,
batch_size: int = 1000,
duplicate_documents: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
) -> None:
Expand Down Expand Up @@ -349,7 +349,7 @@ def update_embeddings(
filters: Optional[Dict[
str,
Any]] = None, # TODO: Adapt type once we allow extended filters in FAISSDocStore
batch_size: int = 10_000,
batch_size: int = 10000,
):
"""
Updates the embeddings in the the document store using the encoding model specified in the retriever.
Expand Down Expand Up @@ -432,7 +432,7 @@ def get_all_documents(
str,
Any]] = None, # TODO: Adapt type once we allow extended filters in FAISSDocStore
return_embedding: Optional[bool] = None,
batch_size: int = 10_000,
batch_size: int = 10000,
headers: Optional[Dict[str, str]] = None,
) -> List[Document]:
if headers:
Expand All @@ -454,7 +454,7 @@ def get_all_documents_generator(
str,
Any]] = None, # TODO: Adapt type once we allow extended filters in FAISSDocStore
return_embedding: Optional[bool] = None,
batch_size: int = 10_000,
batch_size: int = 10000,
headers: Optional[Dict[str, str]] = None,
) -> Generator[Document, None, None]:
"""
Expand Down Expand Up @@ -493,7 +493,7 @@ def get_documents_by_id(
self,
ids: List[str],
index: Optional[str] = None,
batch_size: int = 10_000,
batch_size: int = 10000,
headers: Optional[Dict[str, str]] = None,
) -> List[Document]:
if headers:
Expand Down
30 changes: 25 additions & 5 deletions applications/experimental/pipelines/utils/offline_ann.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import argparse

import paddle
from pipelines.utils import convert_files_to_dicts
from pipelines.utils import convert_files_to_dicts, fetch_archive_from_http
from pipelines.document_stores import ElasticsearchDocumentStore
from pipelines.nodes import DensePassageRetriever
from pipelines.utils import launch_es

data_dict = {
'data/dureader_dev':
"https://paddlenlp.bj.bcebos.com/applications/dureader_dev.zip",
"data/baike": "https://paddlenlp.bj.bcebos.com/applications/baike.zip"
}

parser = argparse.ArgumentParser()
parser.add_argument("--index_name",
default='baike_cities',
Expand All @@ -15,6 +21,17 @@
default='data/baike/',
type=str,
help="The doc path of the corpus")

parser.add_argument('--host',
type=str,
default="127.0.0.1",
help='host ip of elastic search')

parser.add_argument('--port',
type=str,
default="9200",
help='port of elastic search')

parser.add_argument(
'--delete_index',
action='store_true',
Expand All @@ -27,8 +44,8 @@ def offline_ann(index_name, doc_dir):

launch_es()

document_store = ElasticsearchDocumentStore(host="127.0.0.1",
port="9200",
document_store = ElasticsearchDocumentStore(host=args.host,
port=args.port,
username="",
password="",
index=index_name)
Expand Down Expand Up @@ -59,8 +76,8 @@ def offline_ann(index_name, doc_dir):


def delete_data(index_name):
document_store = ElasticsearchDocumentStore(host="127.0.0.1",
port="9200",
document_store = ElasticsearchDocumentStore(host=args.host,
port=args.port,
username="",
password="",
index=index_name)
Expand All @@ -70,6 +87,9 @@ def delete_data(index_name):


if __name__ == "__main__":
if (args.doc_dir in data_dict):
fetch_archive_from_http(url=data_dict[args.doc_dir],
output_dir=args.doc_dir)
if (args.delete_index):
delete_data(args.index_name)
offline_ann(args.index_name, args.doc_dir)