Skip to content

Commit

Permalink
Fix bug of describeCollection V2 (#1030)
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <yihua.mo@zilliz.com>
  • Loading branch information
yhmo authored Aug 8, 2024
1 parent d3b50dc commit a191b8d
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/milvus/client/MilvusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public interface MilvusClient {
void setLogLevel(LogLevel level);

/**
* Disconnects from a Milvus server with timeout of 1 minute
* Disconnects from a Milvus server with timeout of 1 second
*/
default void close() {
try {
Expand Down
49 changes: 48 additions & 1 deletion src/main/java/io/milvus/pool/ClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ protected ClientPool(PoolConfig config, PoolClientFactory clientFactory) {
this.clientPool = new GenericKeyedObjectPool<String, T>(clientFactory, poolConfig);
}

/**
* Get a client object which is idle from the pool.
* Once the client is hold by the caller, it will be marked as active state and cannot be fetched by other caller.
* If the number of clients hits the MaxTotalPerKey value, this method will be blocked for MaxBlockWaitDuration.
* If no idle client available after MaxBlockWaitDuration, this method will return a null object to caller.
*
* @param key the key of a group where the client belong
* @return MilvusClient or MilvusClientV2
*/
public T getClient(String key) {
try {
return clientPool.borrowObject(key);
Expand All @@ -44,7 +53,14 @@ public T getClient(String key) {
}
}


/**
* Return a client object. Once a client is returned, it becomes idle state and wait the next caller.
* The caller should ensure the client is returned. Otherwise, the client will keep in active state and cannot be used by the next caller.
* Throw exceptions if the key doesn't exist or the client is not belong to this key group.
*
* @param key the key of a group where the client belong
* @param grpcClient the client object to return
*/
public void returnClient(String key, T grpcClient) {
try {
clientPool.returnObject(key, grpcClient);
Expand All @@ -54,37 +70,68 @@ public void returnClient(String key, T grpcClient) {
}
}

/**
* Release/disconnect all clients of all key groups, close the pool.
*
*/
public void close() {
if (clientPool != null && !clientPool.isClosed()) {
clientPool.close();
clientPool = null;
}
}

/**
* Release/disconnect idle clients of all key groups.
*
*/
public void clear() {
if (clientPool != null && !clientPool.isClosed()) {
clientPool.clear();
}
}

/**
* Release/disconnect idle clients of a key group.
*
* @param key the key of a group
*/
public void clear(String key) {
if (clientPool != null && !clientPool.isClosed()) {
clientPool.clear(key);
}
}

/**
* Return the number of idle clients of a key group
*
* @param key the key of a group
*/
public int getIdleClientNumber(String key) {
return clientPool.getNumIdle(key);
}

/**
* Return the number of active clients of a key group
*
* @param key the key of a group
*/
public int getActiveClientNumber(String key) {
return clientPool.getNumActive(key);
}

/**
* Return the number of idle clients of all key group
*
*/
public int getTotalIdleClientNumber() {
return clientPool.getNumIdle();
}

/**
* Return the number of active clients of all key group
*
*/
public int getTotalActiveClientNumber() {
return clientPool.getNumActive();
}
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/io/milvus/v2/client/MilvusClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,12 @@ public DescribeAliasResp describeAlias(DescribeAliasReq request) {
*
* @return String
*/
public String getVersion() {
return retry(()->clientUtils.getVersion(this.blockingStub));
public String getServerVersion() {
return retry(()->clientUtils.getServerVersion(this.blockingStub));
}

/**
* close client
* Disconnects from a Milvus server with configurable timeout
*
* @param maxWaitSeconds max wait seconds
* @throws InterruptedException if the client failed to close connection
Expand All @@ -710,6 +710,18 @@ public void close(long maxWaitSeconds) throws InterruptedException {
}
}

/**
* Disconnects from a Milvus server with timeout of 1 second
*
*/
public void close() {
try {
close(TimeUnit.MINUTES.toSeconds(1));
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown Milvus client!");
}
}

public boolean clientIsReady() {
return channel != null && !channel.isShutdown() && !channel.isTerminated();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,26 +189,7 @@ public DescribeCollectionResp describeCollection(MilvusServiceGrpc.MilvusService
.build();
DescribeCollectionResponse response = blockingStub.describeCollection(describeCollectionRequest);
rpcUtils.handleResponse(title, response.getStatus());
return convertDescCollectionResp(response);
}

public static DescribeCollectionResp convertDescCollectionResp(DescribeCollectionResponse response) {
DescribeCollectionResp describeCollectionResp = DescribeCollectionResp.builder()
.collectionName(response.getCollectionName())
.databaseName(response.getDbName())
.description(response.getSchema().getDescription())
.numOfPartitions(response.getNumPartitions())
.collectionSchema(SchemaUtils.convertFromGrpcCollectionSchema(response.getSchema()))
.autoID(response.getSchema().getFieldsList().stream().anyMatch(FieldSchema::getAutoID))
.enableDynamicField(response.getSchema().getEnableDynamicField())
.fieldNames(response.getSchema().getFieldsList().stream().map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()))
.vectorFieldNames(response.getSchema().getFieldsList().stream().filter(fieldSchema -> ParamUtils.isVectorDataType(fieldSchema.getDataType())).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()))
.primaryFieldName(response.getSchema().getFieldsList().stream().filter(FieldSchema::getIsPrimaryKey).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()).get(0))
.createTime(response.getCreatedTimestamp())
.consistencyLevel(io.milvus.v2.common.ConsistencyLevel.valueOf(response.getConsistencyLevel().name().toUpperCase()))
.build();

return describeCollectionResp;
return convertUtils.convertDescCollectionResp(response);
}

public Void renameCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RenameCollectionReq request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Data
@SuperBuilder
Expand All @@ -43,4 +46,6 @@ public class DescribeCollectionResp {
private CreateCollectionReq.CollectionSchema collectionSchema;
private Long createTime;
private ConsistencyLevel consistencyLevel;
@Builder.Default
private final Map<String, String> properties = new HashMap<>();
}
6 changes: 3 additions & 3 deletions src/main/java/io/milvus/v2/service/vector/VectorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ public SearchResp search(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
QueryIteratorReq request) {
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp);
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
return new QueryIterator(request, blockingStub, pkField);
}

public SearchIterator searchIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
SearchIteratorReq request) {
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp);
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
return new SearchIterator(request, blockingStub, pkField);
}
Expand All @@ -189,7 +189,7 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
}

DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp);
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
if (request.getFilter() == null) {
request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/milvus/v2/utils/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void checkDatabaseExist(MilvusServiceGrpc.MilvusServiceBlockingStub block
throw new IllegalArgumentException("Database " + dbName + " not exist");
}
}
public String getVersion(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) {
public String getServerVersion(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) {
GetVersionResponse response = blockingStub.getVersion(GetVersionRequest.newBuilder().build());
rpcUtils.handleResponse("Get server version", response.getStatus());
return response.getVersion();
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/milvus/v2/utils/ConvertUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import io.milvus.grpc.*;
import io.milvus.param.Constant;
import io.milvus.param.ParamUtils;
import io.milvus.response.QueryResultsWrapper;
import io.milvus.response.SearchResultsWrapper;
import io.milvus.v2.common.IndexBuildState;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import io.milvus.v2.service.index.response.DescribeIndexResp;
import io.milvus.v2.service.vector.response.QueryResp;
import io.milvus.v2.service.vector.response.SearchResp;
Expand Down Expand Up @@ -112,4 +114,26 @@ public DescribeIndexResp convertToDescribeIndexResp(List<IndexDescription> respo

return DescribeIndexResp.builder().indexDescriptions(descs).build();
}

public DescribeCollectionResp convertDescCollectionResp(DescribeCollectionResponse response) {
Map<String, String> properties = new HashMap<>();
response.getPropertiesList().forEach(prop->properties.put(prop.getKey(), prop.getValue()));

DescribeCollectionResp describeCollectionResp = DescribeCollectionResp.builder()
.collectionName(response.getCollectionName())
.databaseName(response.getDbName())
.description(response.getSchema().getDescription())
.numOfPartitions(response.getNumPartitions())
.collectionSchema(SchemaUtils.convertFromGrpcCollectionSchema(response.getSchema()))
.autoID(response.getSchema().getFieldsList().stream().anyMatch(FieldSchema::getAutoID))
.enableDynamicField(response.getSchema().getEnableDynamicField())
.fieldNames(response.getSchema().getFieldsList().stream().map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()))
.vectorFieldNames(response.getSchema().getFieldsList().stream().filter(fieldSchema -> ParamUtils.isVectorDataType(fieldSchema.getDataType())).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()))
.primaryFieldName(response.getSchema().getFieldsList().stream().filter(FieldSchema::getIsPrimaryKey).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()).get(0))
.createTime(response.getCreatedTimestamp())
.consistencyLevel(io.milvus.v2.common.ConsistencyLevel.valueOf(response.getConsistencyLevel().name().toUpperCase()))
.properties(properties)
.build();
return describeCollectionResp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ void testClientPool() {
Thread t = new Thread(() -> {
for (int i = 0; i < requestPerThread; i++) {
MilvusClientV2 client = pool.getClient(key);
String version = client.getVersion();
String version = client.getServerVersion();
// System.out.printf("%d, %s%n", i, version);
System.out.printf("idle %d, active %d%n", pool.getIdleClientNumber(key), pool.getActiveClientNumber(key));
pool.returnClient(key, client);
Expand Down

0 comments on commit a191b8d

Please sign in to comment.