diff --git a/packages/google-cloud-dataproc/samples/README.md b/packages/google-cloud-dataproc/samples/README.md index 8a0a3713f9a..6aea3bcfe6d 100644 --- a/packages/google-cloud-dataproc/samples/README.md +++ b/packages/google-cloud-dataproc/samples/README.md @@ -14,6 +14,7 @@ * [Samples](#samples) * [Create Cluster](#create-cluster) * [Quickstart](#quickstart) + * [Create Cluster](#create-cluster) ## Before you begin @@ -59,8 +60,16 @@ __Usage:__ `node samples/quickstart.js` +### Create Cluster + +View the [source code](https://github.com/googleapis/nodejs-dataproc/blob/master/samples/createCluster.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-dataproc&page=editor&open_in_editor=samples/createCluster.js,samples/README.md) + +__Usage:__ +`node createCluster.js` [shell_img]: https://gstatic.com/cloudssh/images/open-btn.png diff --git a/packages/google-cloud-dataproc/samples/package.json b/packages/google-cloud-dataproc/samples/package.json index 85c8fdd5a6c..8b61e363d13 100644 --- a/packages/google-cloud-dataproc/samples/package.json +++ b/packages/google-cloud-dataproc/samples/package.json @@ -14,9 +14,9 @@ "test": "mocha system-test --timeout 600000" }, "dependencies": { - "@google-cloud/dataproc": "^1.4.4", - "uuid": "^3.3.3", - "yargs": "^15.0.0" + "@google-cloud/dataproc": "^1.4.1", + "@google-cloud/storage": "^4.1.3", + "sleep": "^6.1.0" }, "devDependencies": { "chai": "^4.2.0", diff --git a/packages/google-cloud-dataproc/samples/quickstart.js b/packages/google-cloud-dataproc/samples/quickstart.js index 0371d123b46..0fcac7aa846 100644 --- a/packages/google-cloud-dataproc/samples/quickstart.js +++ b/packages/google-cloud-dataproc/samples/quickstart.js @@ -14,42 +14,135 @@ 'use strict'; -// [START dataproc_quickstart] -const dataproc = require('@google-cloud/dataproc'); -const client = new dataproc.v1.ClusterControllerClient(); - -async function quickstart() { - const projectId = await client.getProjectId(); - const request = { - region: 'global', - projectId, - }; - const [resources] = await client.listClusters(request); - console.log('Total resources:', resources.length); - for (const resource of resources) { - console.log(resource); - } +function main(projectId, region, clusterName, jobFilePath) { + // [START dataproc_quickstart] + const dataproc = require('@google-cloud/dataproc').v1; + const {Storage} = require('@google-cloud/storage'); - let nextRequest = request; - // Or obtain the paged response. - const options = {autoPaginate: false}; - do { - const responses = await client.listClusters(nextRequest, options); - // The actual resources in a response. - const resources = responses[0]; - // The next request if the response shows that there are more responses. - nextRequest = responses[1]; - // The actual response object, if necessary. - // const rawResponse = responses[2]; - for (const resource of resources) { - console.log(resource); - } - } while (nextRequest); + const sleep = require('sleep'); + + // Create a cluster client with the endpoint set to the desired cluster region + const clusterClient = new dataproc.ClusterControllerClient({ + apiEndpoint: `${region}-dataproc.googleapis.com`, + }); - client.listClustersStream(request).on('data', element => { - console.log(element); + // Create a job client with the endpoint set to the desired cluster region + const jobClient = new dataproc.v1.JobControllerClient({ + apiEndpoint: `${region}-dataproc.googleapis.com`, }); + + async function quickstart() { + // TODO(developer): Uncomment and set the following variables + // projectId = 'YOUR_PROJECT_ID' + // region = 'YOUR_CLUSTER_REGION' + // clusterName = 'YOUR_CLUSTER_NAME' + // jobFilePath = 'YOUR_JOB_FILE_PATH' + + // Create the cluster config + const cluster = { + projectId: projectId, + region: region, + cluster: { + clusterName: clusterName, + config: { + masterConfig: { + numInstances: 1, + machineTypeUri: 'n1-standard-1', + }, + workerConfig: { + numInstances: 2, + machineTypeUri: 'n1-standard-1', + }, + }, + }, + }; + + // Create the cluster + const [operation] = await clusterClient.createCluster(cluster); + const [response] = await operation.promise(); + + // Output a success message + console.log(`Cluster created successfully: ${response.clusterName}`); + + const job = { + projectId: projectId, + region: region, + job: { + placement: { + clusterName: clusterName, + }, + pysparkJob: { + mainPythonFileUri: jobFilePath, + }, + }, + }; + + let [jobResp] = await jobClient.submitJob(job); + const jobId = jobResp.reference.jobId; + + console.log(`Submitted job "${jobId}".`); + + // Terminal states for a job + const terminalStates = new Set(['DONE', 'ERROR', 'CANCELLED']); + + // Create a timeout such that the job gets cancelled if not + // in a termimal state after a fixed period of time. + const timeout = 600000; + const start = new Date(); + + // Wait for the job to finish. + const jobReq = { + projectId: projectId, + region: region, + jobId: jobId, + }; + + while (!terminalStates.has(jobResp.status.state)) { + if (new Date() - timeout > start) { + await jobClient.cancelJob(jobReq); + console.log( + `Job ${jobId} timed out after threshold of ` + + `${timeout / 60000} minutes.` + ); + break; + } + await sleep.sleep(1); + [jobResp] = await jobClient.getJob(jobReq); + } + + const clusterReq = { + projectId: projectId, + region: region, + clusterName: clusterName, + }; + + const [clusterResp] = await clusterClient.getCluster(clusterReq); + + const storage = new Storage(); + + const output = await storage + .bucket(clusterResp.config.configBucket) + .file( + `google-cloud-dataproc-metainfo/${clusterResp.clusterUuid}/` + + `jobs/${jobId}/driveroutput.000000000` + ) + .download(); + + // Output a success message. + console.log( + `Job ${jobId} finished with state ${jobResp.status.state}:\n${output}` + ); + + // Delete the cluster once the job has terminated. + const [deleteOperation] = await clusterClient.deleteCluster(clusterReq); + await deleteOperation.promise(); + + // Output a success message + console.log(`Cluster ${clusterName} successfully deleted.`); + } + + quickstart(); + // [END dataproc_quickstart] } -quickstart(); -// [END dataproc_quickstart] +main(...process.argv.slice(2));