forked from Azure-Samples/data-factory-copy-blob-to-blob
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Program.cs
169 lines (155 loc) · 7.63 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Rest;
using Microsoft.Azure.Management.ResourceManager;
using Microsoft.Azure.Management.DataFactory;
using Microsoft.Azure.Management.DataFactory.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
namespace V2Test
{
class Program
{
static void Main(string[] args)
{
// Set variables
string tenantID = "<tenant ID>";
string applicationId = "<application ID>";
string authenticationKey = "<application key>";
string subscriptionId = "<subscription ID>";
string resourceGroup = "<resource group name>";
string region = "East US";
string dataFactoryName = "<data factory name>";
string storageAccount = "<azure storage account name>";
string storageKey = "<azure storage account key>";
string inputBlobPath = "<blobcontainer/inputfolder>";
string outputBlobPath = "<blobcontainer/outputfolder>";
string storageLinkedServiceName = "AzureStorageLinkedService"; // name of the Azure Storage linked service
string blobDatasetName = "BlobDataset"; // name of the blob dataset
string pipelineName = "Adfv2QuickStartPipeline"; // name of the pipeline
// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
// Create a data factory
Console.WriteLine("Creating data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));
while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
{
System.Threading.Thread.Sleep(1000);
}
// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
}
);
client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
// Create a Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
new AzureBlobDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = storageLinkedServiceName
},
FolderPath = new Expression { Value = "@{dataset().path}" },
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "path", new ParameterSpecification { Type = ParameterType.String } }
}
}
);
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "inputPath", new ParameterSpecification { Type = ParameterType.String } },
{ "outputPath", new ParameterSpecification { Type = ParameterType.String } }
},
Activities = new List<Activity>
{
new CopyActivity
{
Name = "CopyFromBlobToBlob",
Inputs = new List<DatasetReference>
{
new DatasetReference()
{
ReferenceName = blobDatasetName,
Parameters = new Dictionary<string, object>
{
{ "path", "@pipeline().parameters.inputPath" }
}
}
},
Outputs = new List<DatasetReference>
{
new DatasetReference
{
ReferenceName = blobDatasetName,
Parameters = new Dictionary<string, object>
{
{ "path", "@pipeline().parameters.outputPath" }
}
}
},
Source = new BlobSource { },
Sink = new BlobSink { }
}
}
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> parameters = new Dictionary<string, object>
{
{ "inputPath", inputBlobPath },
{ "outputPath", outputBlobPath }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, parameters).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
// Monitor the pipeline run
Console.WriteLine("Checking pipeline run status...");
PipelineRun pipelineRun;
while (true)
{
pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
Console.WriteLine("Status: " + pipelineRun.Status);
if (pipelineRun.Status == "InProgress")
System.Threading.Thread.Sleep(15000);
else
break;
}
// Check the copy activity run details
Console.WriteLine("Checking copy activity run details...");
List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10), pipelineName).ToList();
if (pipelineRun.Status == "Succeeded")
Console.WriteLine(activityRuns.First().Output);
else
Console.WriteLine(activityRuns.First().Error);
Console.WriteLine("Deleting the data factory");
client.Factories.Delete(resourceGroup, dataFactoryName);
Console.WriteLine("\nPress any key to exit...");
Console.ReadKey();
}
}
}