-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Security: don't call prepare index for reads #34246
Changes from 3 commits
97d5ac3
97f5fcc
42ee9af
facc0fc
5f50fa2
6e9247c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -361,30 +361,34 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce | |
final Cipher cipher = getDecryptionCipher(iv, decodeKey, version, decodedSalt); | ||
if (version.onOrAfter(Version.V_6_2_0)) { | ||
// we only have the id and need to get the token from the doc! | ||
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> | ||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { | ||
final GetRequest getRequest = | ||
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> { | ||
if (securityIndex.isAvailable() == false) { | ||
logger.warn("failed to get token [{}] since index is not available", tokenId); | ||
listener.onResponse(null); | ||
} else { | ||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { | ||
final GetRequest getRequest = | ||
client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, | ||
getTokenDocumentId(tokenId)).request(); | ||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, | ||
getTokenDocumentId(tokenId)).request(); | ||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, | ||
ActionListener.<GetResponse>wrap(response -> { | ||
if (response.isExists()) { | ||
Map<String, Object> accessTokenSource = | ||
(Map<String, Object>) response.getSource().get("access_token"); | ||
(Map<String, Object>) response.getSource().get("access_token"); | ||
if (accessTokenSource == null) { | ||
listener.onFailure(new IllegalStateException("token document is missing " + | ||
"the access_token field")); | ||
"the access_token field")); | ||
} else if (accessTokenSource.containsKey("user_token") == false) { | ||
listener.onFailure(new IllegalStateException("token document is missing " + | ||
"the user_token field")); | ||
"the user_token field")); | ||
} else { | ||
Map<String, Object> userTokenSource = | ||
(Map<String, Object>) accessTokenSource.get("user_token"); | ||
(Map<String, Object>) accessTokenSource.get("user_token"); | ||
listener.onResponse(UserToken.fromSourceMap(userTokenSource)); | ||
} | ||
} else { | ||
listener.onFailure( | ||
new IllegalStateException("token document is missing and must be present")); | ||
new IllegalStateException("token document is missing and must be present")); | ||
} | ||
}, e -> { | ||
// if the index or the shard is not there / available we assume that | ||
|
@@ -397,7 +401,8 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce | |
listener.onFailure(e); | ||
} | ||
}), client::get); | ||
}), listener::onFailure)); | ||
}); | ||
}}, listener::onFailure)); | ||
} else { | ||
decryptToken(in, cipher, version, listener); | ||
} | ||
|
@@ -673,30 +678,36 @@ private void findTokenFromRefreshToken(String refreshToken, ActionListener<Tuple | |
.setVersion(true) | ||
.request(); | ||
|
||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> | ||
if (securityIndex.isAvailable() == false) { | ||
logger.debug("security index is not available to find token from refresh token, retrying"); | ||
attemptCount.incrementAndGet(); | ||
findTokenFromRefreshToken(refreshToken, listener, attemptCount); | ||
} else { | ||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> | ||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, | ||
ActionListener.<SearchResponse>wrap(searchResponse -> { | ||
if (searchResponse.isTimedOut()) { | ||
attemptCount.incrementAndGet(); | ||
findTokenFromRefreshToken(refreshToken, listener, attemptCount); | ||
} else if (searchResponse.getHits().getHits().length < 1) { | ||
logger.info("could not find token document with refresh_token [{}]", refreshToken); | ||
listener.onFailure(invalidGrantException("could not refresh the requested token")); | ||
} else if (searchResponse.getHits().getHits().length > 1) { | ||
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token")); | ||
} else { | ||
listener.onResponse(new Tuple<>(searchResponse, attemptCount)); | ||
} | ||
}, e -> { | ||
if (isShardNotAvailableException(e)) { | ||
logger.debug("failed to search for token document, retrying", e); | ||
attemptCount.incrementAndGet(); | ||
findTokenFromRefreshToken(refreshToken, listener, attemptCount); | ||
} else { | ||
listener.onFailure(e); | ||
} | ||
}), | ||
client::search)); | ||
ActionListener.<SearchResponse>wrap(searchResponse -> { | ||
if (searchResponse.isTimedOut()) { | ||
attemptCount.incrementAndGet(); | ||
findTokenFromRefreshToken(refreshToken, listener, attemptCount); | ||
} else if (searchResponse.getHits().getHits().length < 1) { | ||
logger.info("could not find token document with refresh_token [{}]", refreshToken); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may be debug or trace? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not changed by this PR so I am leaving it as is. |
||
listener.onFailure(invalidGrantException("could not refresh the requested token")); | ||
} else if (searchResponse.getHits().getHits().length > 1) { | ||
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token")); | ||
} else { | ||
listener.onResponse(new Tuple<>(searchResponse, attemptCount)); | ||
} | ||
}, e -> { | ||
if (isShardNotAvailableException(e)) { | ||
logger.debug("failed to search for token document, retrying", e); | ||
attemptCount.incrementAndGet(); | ||
findTokenFromRefreshToken(refreshToken, listener, attemptCount); | ||
} else { | ||
listener.onFailure(e); | ||
} | ||
}), | ||
client::search)); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -831,32 +842,33 @@ public void findActiveTokensForRealm(String realmName, ActionListener<Collection | |
|
||
if (Strings.isNullOrEmpty(realmName)) { | ||
listener.onFailure(new IllegalArgumentException("Realm name is required")); | ||
return; | ||
} | ||
|
||
final Instant now = clock.instant(); | ||
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() | ||
} else if (securityIndex.isAvailable() == false) { | ||
listener.onResponse(Collections.emptyList()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better to |
||
} else { | ||
final Instant now = clock.instant(); | ||
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() | ||
.filter(QueryBuilders.termQuery("doc_type", "token")) | ||
.filter(QueryBuilders.termQuery("access_token.realm", realmName)) | ||
.filter(QueryBuilders.boolQuery() | ||
.should(QueryBuilders.boolQuery() | ||
.must(QueryBuilders.termQuery("access_token.invalidated", false)) | ||
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli())) | ||
) | ||
.should(QueryBuilders.termQuery("refresh_token.invalidated", false)) | ||
.should(QueryBuilders.boolQuery() | ||
.must(QueryBuilders.termQuery("access_token.invalidated", false)) | ||
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli())) | ||
) | ||
.should(QueryBuilders.termQuery("refresh_token.invalidated", false)) | ||
); | ||
|
||
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) | ||
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) | ||
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings)) | ||
.setQuery(boolQuery) | ||
.setVersion(false) | ||
.setSize(1000) | ||
.setFetchSource(true) | ||
.request(); | ||
|
||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false); | ||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> | ||
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit)); | ||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false); | ||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> | ||
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit)); | ||
} | ||
} | ||
|
||
private Tuple<UserToken, String> parseHit(SearchHit hit) { | ||
|
@@ -926,7 +938,7 @@ private void checkIfTokenIsRevoked(UserToken userToken, ActionListener<UserToken | |
// index doesn't exist so the token is considered valid. | ||
jaymode marked this conversation as resolved.
Show resolved
Hide resolved
|
||
listener.onResponse(userToken); | ||
} else { | ||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { | ||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { | ||
MultiGetRequest mGetRequest = client.prepareMultiGet() | ||
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken)) | ||
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I read this correctly, we will retry even if the security index doesn't exist, which seems unnecessary, although unlikely in practice - why would we have a refresh token but no security index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security index could have been deleted after generation of a refresh token?