This repository has been archived by the owner on Jun 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
sas_definition_sample.py
170 lines (144 loc) · 10.1 KB
/
sas_definition_sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
# This script expects that the following environment vars are set
#
# AZURE_TENANT_ID: with your Azure Active Directory tenant id or domain
# AZURE_CLIENT_ID: with your Azure Active Directory Service Principal AppId
# AZURE_CLIENT_SECRET: with your Azure Active Directory Application Key
# AZURE_SUBSCRIPTION_ID: with your Azure Subscription Id
#
import uuid
from util import KeyVaultSampleBase, keyvaultsample
class SasDefinitionSample(KeyVaultSampleBase):
"""
A collection of samples that demonstrate authenticating with the KeyVaultClient and KeyVaultManagementClient
"""
def __init__(self, config=None):
from azure.keyvault import KeyVaultClient, KeyVaultAuthentication, AccessToken
from msrestazure.azure_active_directory import ServicePrincipalCredentials
super(SasDefinitionSample, self).__init__(config=config)
self.keyvault_client = KeyVaultClient(ServicePrincipalCredentials(client_id=self.config.client_id,
secret=self.config.client_secret,
tenant=self.config.tenant_id))
def run_all_samples(self):
self.create_account_sas_definition()
self.create_blob_sas_defintion()
self.get_sas_definitions()
@keyvaultsample
def create_account_sas_definition(self):
"""
Creates an account sas definition, to manage storage account and its entities.
"""
from azure.storage.common import SharedAccessSignature, CloudStorageAccount
from azure.keyvault.models import SasTokenType, SasDefinitionAttributes
from azure.keyvault import SecretId
# To create an account sas definition in the vault we must first create the template. The
# template_uri for an account sas definition is the intended account sas token signed with an arbitrary key.
# Use the SharedAccessSignature class from azure.storage.common to create a account sas token
sas = SharedAccessSignature(account_name=self.config.storage_account_name,
# don't sign the template with the storage account key use key 00000000
account_key='00000000')
account_sas_template = sas.generate_account(services='bfqt', # all services blob, file, queue and table
resource_types='sco', # all resources service, template, object
permission='acdlpruw',
# all permissions add, create, list, process, read, update, write
expiry='2020-01-01') # expiry will be ignored and validity period will determine token expiry
# use the created template to create a sas definition in the vault
attributes = SasDefinitionAttributes(enabled=True)
sas_def = self.keyvault_client.set_sas_definition(vault_base_url=self.sample_vault_url,
storage_account_name=self.config.storage_account_name,
sas_definition_name='acctall',
template_uri=account_sas_template,
sas_type=SasTokenType.account,
validity_period='PT2H',
sas_definition_attributes=attributes)
# When the sas definition is created a corresponding managed secret is also created in the vault, the. This
# secret is used to provision sas tokens according to the sas definition. Users retrieve the sas token
# via the get_secret method.
# get the secret id from the returned SasDefinitionBundle
sas_secret_id = SecretId(uri=sas_def.secret_id)
# call get_secret and the value of the returned SecretBundle will be a newly issued sas token
acct_sas_token = self.keyvault_client.get_secret(vault_base_url=sas_secret_id.vault,
secret_name=sas_secret_id.name,
secret_version=sas_secret_id.version).value
# create the cloud storage account object
cloud_storage_account = CloudStorageAccount(account_name=self.config.storage_account_name,
sas_token=acct_sas_token)
# create a blob with the account sas token
blob_service = cloud_storage_account.create_block_blob_service()
blob_service.create_container('blobcontainer')
blob_service.create_blob_from_text(container_name='blobcontainer',
blob_name='blob1',
text=u'test blob1 data')
@keyvaultsample
def create_blob_sas_defintion(self):
"""
Creates a service SAS definition with access to a blob container.
"""
from azure.storage.blob import BlockBlobService, ContainerPermissions
from azure.keyvault.models import SasTokenType, SasDefinitionAttributes
from azure.keyvault import SecretId
# create the blob sas definition template
# the sas template uri for service sas definitions contains the storage entity url with the template token
# this sample demonstrates constructing the template uri for a blob container, but a similar approach can
# be used for all other storage service, i.e. File, Queue, Table
# create a template sas token for the container
service = BlockBlobService(account_name=self.config.storage_account_name,
# don't sign the template with the storage account key use key 00000000
account_key='00000000')
permissions = ContainerPermissions(read=True, write=True, delete=True, list=True)
temp_token = service.generate_container_shared_access_signature(container_name='blobcontainer',
permission=permissions,
expiry='2020-01-01')
# use the BlockBlobService to construct the template uri for the container sas definition
blob_sas_template_uri = service.make_container_url(container_name='blobcontainer',
protocol='https',
sas_token=temp_token)
# create the sas definition in the vault
attributes = SasDefinitionAttributes(enabled=True)
blob_sas_def = self.keyvault_client.set_sas_definition(vault_base_url=self.sample_vault_url,
storage_account_name=self.config.storage_account_name,
sas_definition_name='blobcontall',
template_uri=blob_sas_template_uri,
sas_type=SasTokenType.service,
validity_period='PT2H',
sas_definition_attributes=attributes)
# use the sas definition to provision a sas token and use it to create a BlockBlobClient
# which can interact with blobs in the container
# get the secret_id of the container sas definition and get the token from the vault as a secret
sas_secret_id = SecretId(uri=blob_sas_def.secret_id)
blob_sas_token = self.keyvault_client.get_secret(vault_base_url=sas_secret_id.vault,
secret_name=sas_secret_id.name,
secret_version=sas_secret_id.version).value
service = BlockBlobService(account_name=self.config.storage_account_name,
sas_token=blob_sas_token)
service.create_blob_from_text(container_name='blobcontainer',
blob_name='blob2',
text=u'test blob2 data')
blobs = list(service.list_blobs(container_name='blobcontainer'))
for blob in blobs:
service.delete_blob(container_name='blobcontainer',
blob_name=blob.name)
@keyvaultsample
def get_sas_definitions(self):
"""
List the sas definitions for the storage account, and get each.
"""
from azure.keyvault import StorageSasDefinitionId
# list the sas definitions for the storage account
print('list and get sas definitions for the managed storage account')
sas_defs = list(self.keyvault_client.get_sas_definitions(vault_base_url=self.sample_vault_url,
storage_account_name=self.config.storage_account_name,
maxresults=5))
# for each sas definition parse the id and get the SasDefinitionBundle
for s in sas_defs:
sas_def_id = StorageSasDefinitionId(uri=s.id)
sas_def = self.keyvault_client.get_sas_definition(vault_base_url=sas_def_id.vault,
storage_account_name=sas_def_id.account_name,
sas_definition_name=sas_def_id.sas_definition)
print(sas_def_id.sas_definition, sas_def.template_uri)
if __name__ == "__main__":
SasDefinitionSample().run_all_samples()