Skip to content

Commit

Permalink
add support for force refresh in broker layer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ugonnaak1 committed Aug 28, 2024
1 parent 4afbd8d commit a60ce8d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ src/build
docs/_build/
# Visual Studio Files
/.vs/*
.vscode/*
/tests/.vs/*

# vim files
Expand Down
15 changes: 15 additions & 0 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,21 @@ def _acquire_token_silent_from_cache_and_possibly_refresh_it(
account_was_established_by_broker = account.get(
"account_source") == _GRANT_TYPE_BROKER
broker_attempt_succeeded_just_now = "error" not in response

if (response.get("access_token") and force_refresh):
at_to_renew = response.get("access_token")
response = _acquire_token_silently(
"https://{}/{}".format(self.authority.instance, self.authority.tenant),
self.client_id,
account["local_account_id"],
scopes,
claims=_merge_claims_challenge_and_capabilities(
self._client_capabilities, claims_challenge),
correlation_id=correlation_id,
auth_scheme=auth_scheme,
at_to_renew= at_to_renew,
**data)

if account_was_established_by_broker or broker_attempt_succeeded_just_now:
return self._process_broker_response(response, scopes, data)

Expand Down
4 changes: 3 additions & 1 deletion msal/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _signin_interactively(

def _acquire_token_silently(
authority, client_id, account_id, scopes, claims=None, correlation_id=None,
auth_scheme=None,
auth_scheme=None, at_to_renew=None,
**kwargs):
# For MSA PT scenario where you use the /organizations, yes,
# acquireTokenSilently is expected to fail. - Sam Wilson
Expand All @@ -224,6 +224,8 @@ def _acquire_token_silently(
return
params = pymsalruntime.MSALRuntimeAuthParameters(client_id, authority)
params.set_requested_scopes(scopes)
if at_to_renew:
params.set_access_token_to_renew(at_to_renew)
if claims:
params.set_decoded_claims(claims)
if auth_scheme:
Expand Down
69 changes: 69 additions & 0 deletions tests/test_force_refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from tests import unittest
import msal
import logging
import sys

# from tests.test_e2e import LabBasedTestCase

if not sys.platform.startswith("win"):
raise unittest.SkipTest("Currently, our broker supports Windows")

SCOPE_ARM = "https://management.azure.com/.default"
_AZURE_CLI = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"
pca = msal.PublicClientApplication(
_AZURE_CLI,
authority="https://login.microsoftonline.com/organizations",
enable_broker_on_mac=True,
enable_broker_on_windows=True)


# class ForceRefreshTestCase(LabBasedTestCase):
# def test_silent_with_force_refresh(self):
# # acquire token using username and password
# print("Testing silent flow with force_refresh=True")
# config = self.get_lab_user(usertype="cloud")
# config["password"] = self.get_lab_user_secret(config["lab_name"])
# result = pca.acquire_token_by_username_password(username=config["lab_name"], password=config["password"], scopes=config["scope"])
# # assert username and password, "You need to provide a test account and its password"

# ropcToken = result.get("access_token")
# accounts = pca.get_accounts()
# account = accounts[0]
# assert account, "The logged in account should have been established by interactive flow"

# result = pca.acquire_token_silent(
# config["scope"],
# account=account,
# force_refresh=False,
# auth_scheme=None, data=None)

# assert result.get("access_token") == ropcToken, "Token should not be refreshed"


class ForceRefreshTestCase(unittest.TestCase):
def test_silent_with_force_refresh(self):
# acquire token using username and password
print("Testing silent flow with force_refresh=True")
result = pca.acquire_token_interactive(scopes=[SCOPE_ARM], prompt="select_account", parent_window_handle=pca.CONSOLE_WINDOW_HANDLE, enable_msa_passthrough=True)
accounts = pca.get_accounts()
account = accounts[0]
assert account, "The logged in account should have been established by interactive flow"
oldToken = result.get("access_token")


result = pca.acquire_token_silent(
scopes=[SCOPE_ARM],
account=account,
force_refresh=False)

# This token should be recieved from cache
assert result.get("access_token") == oldToken, "Token should not be refreshed"


result = pca.acquire_token_silent(
scopes=[SCOPE_ARM],
account=account,
force_refresh=True)

# Token will be different proving it is not from cache and was renewed
assert result.get("access_token") != oldToken, "Token should be refreshed"

0 comments on commit a60ce8d

Please sign in to comment.