Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[google-portability ]Add start of data portability connection #68

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions src/providers/google-portability/dataportability.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import BaseSyncHandler from "../BaseSyncHandler";
import CONFIG from "../../config";
// import Imap from 'node-imap'
// import { simpleParser } from 'mailparser'
import { google, dataportability_v1 } from "googleapis";
import { GaxiosResponse } from "gaxios";

import {
SyncResponse,
SyncSchemaPosition,
SyncHandlerStatus,
} from "../../interfaces";
import { SchemaEmail, SchemaEmailType, SchemaRecord } from "../../schemas";
import { GmailHelpers } from "../google/helpers";

const _ = require("lodash");

function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}

export default class DataPortability extends BaseSyncHandler {
public getSchemaUri(): string {
return CONFIG.verida.schemas.EMAIL;
}



public async _sync(
api: any,
syncPosition: SyncSchemaPosition
): Promise<SyncResponse> {
const dpApi = this.getApi();

Check failure on line 33 in src/providers/google-portability/dataportability.ts

View workflow job for this annotation

GitHub Actions / build-app

Property 'getApi' does not exist on type 'DataPortability'.

Check failure on line 33 in src/providers/google-portability/dataportability.ts

View workflow job for this annotation

GitHub Actions / build-app

Property 'getApi' does not exist on type 'DataPortability'.

dataportability_v1.Resource$Portabilityarchive

try {
// See https://developers.google.com/data-portability/reference/rest/v1/portabilityArchive/initiate
const initiateResponse = await dpApi.portabilityArchive.initiate({
requestBody: {
resources: ["https://www.googleapis.com/auth/dataportability.myactivity.search"]
}
})
console.log(initiateResponse.data)

//while (true) {
console.log('sleeping for 10 seconds')
await sleep(2000)
const archiveState = await dpApi.archiveJobs.getPortabilityArchiveState({
name: initiateResponse.data.archiveJobId
})

console.log(archiveState.data)
//}
} catch (err) {
console.log(err)
}

const results: SchemaRecord[] = []

return {
results,
position: syncPosition,
};
}

// protected stopSync(syncPosition: SyncSchemaPosition): SyncSchemaPosition {
// if (syncPosition.status == SyncHandlerStatus.STOPPED) {
// return syncPosition;
// }

// syncPosition.status = SyncHandlerStatus.STOPPED;
// syncPosition.thisRef = undefined;
// syncPosition.breakId = syncPosition.futureBreakId;
// syncPosition.futureBreakId = undefined;

// return syncPosition;
// }

// protected setNextPosition(
// syncPosition: SyncSchemaPosition,
// serverResponse: GaxiosResponse<gmail_v1.Schema$ListMessagesResponse>
// ): SyncSchemaPosition {
// if (!syncPosition.futureBreakId && serverResponse.data.messages.length) {
// syncPosition.futureBreakId = `${this.connection.profile.id}-${serverResponse.data.messages[0].id}`;
// }

// if (_.has(serverResponse, "data.nextPageToken")) {
// // Have more results, so set the next page ready for the next request
// syncPosition.thisRef = serverResponse.data.nextPageToken;
// } else {
// syncPosition = this.stopSync(syncPosition);
// }

// return syncPosition;
// }

// protected async buildResults(
// gmail: gmail_v1.Gmail,
// serverResponse: GaxiosResponse<gmail_v1.Schema$ListMessagesResponse>,
// breakId: string,
// messageType: SchemaEmailType,
// breakTimestamp?: string
// ): Promise<SchemaEmail[]> {
// const results: SchemaEmail[] = [];
// for (const message of serverResponse.data.messages) {
// const messageId = `${this.connection.profile.id}-${message.id}`;

// if (messageId == breakId) {
// break;
// }

// const msg = await GmailHelpers.getMessage(gmail, message.id);
// const internalDate = msg.internalDate
// ? new Date(parseInt(msg.internalDate)).toISOString()
// : "Unknown";

// if (breakTimestamp && internalDate < breakTimestamp) {
// break;
// }

// const text = GmailHelpers.getTextContent(msg.payload);
// const html = GmailHelpers.getHtmlContent(msg.payload);
// const subject = GmailHelpers.getHeader(msg.payload?.headers, "Subject");
// const from = GmailHelpers.parseEmail(
// GmailHelpers.getHeader(msg.payload?.headers, "From")
// );
// const to = GmailHelpers.parseEmail(
// GmailHelpers.getHeader(msg.payload?.headers, "To")
// );
// const threadId = msg.threadId || "Unknown";
// const attachments = await GmailHelpers.getAttachments(gmail, msg);

// results.push({
// _id: `gmail-${messageId}`,
// type: messageType,
// name: subject ? subject : 'No email subject',
// sourceApplication: "https://gmail.com/",
// sourceId: message.id,
// fromName: from.name,
// fromEmail: from.email,
// toEmail: to.name,
// messageText: text ? text : 'No email body',
// messageHTML: html ? html : 'No email body',
// sentAt: internalDate,
// insertedAt: internalDate,
// threadId,
// attachments,
// });
// }

// return results;
// }
}
134 changes: 134 additions & 0 deletions src/providers/google-portability/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { Request, Response } from "express";
import Base from "../BaseProvider";
import { BaseProviderConfig, SyncProviderLogLevel } from "../../interfaces";
import { google, dataportability_v1 } from "googleapis";

const passport = require("passport");
const GoogleStrategy = require("passport-google-oauth20");

import GoogleDataPortability from './dataportability'

export interface GoogleDataPortabilityProviderConfig extends BaseProviderConfig {
clientId: string;
clientSecret: string;
callbackUrl: string;
}

export default class GoogleDataPortabilityProvider extends Base {
protected config: GoogleDataPortabilityProviderConfig;

public getProviderId() {
return "google-portability";
}

public getProviderLabel() {
return "Google (Data Portability)";
}

public getProviderApplicationUrl() {
return "https://google.com/";
}

public syncHandlers(): any[] {
return [GoogleDataPortability];
}

public getScopes(): string[] {
return [
"https://www.googleapis.com/auth/dataportability.myactivity.search",
];
}

public async connect(req: Request, res: Response, next: any): Promise<any> {
this.init();

const auth = await passport.authenticate("google", {
scope: this.getScopes(),
accessType: "offline",
});

return auth(req, res, next);
}

public async callback(req: Request, res: Response, next: any): Promise<any> {
this.init();

const promise = new Promise((resolve, rejects) => {
const auth = passport.authenticate(
"google",
{
failureRedirect: "/failure/google",
failureMessage: true,
},
function (err: any, data: any) {
if (err) {
rejects(err);
} else {
console.log("callback!");
console.log(data);
const connectionToken = {
id: data.profile.id,
accessToken: data.accessToken,
refreshToken: data.refreshToken,
profile: data.profile,
};

resolve(connectionToken);
}
}
);

auth(req, res, next);
});

const result = await promise;
return result;
}

public async getApi(accessToken?: string, refreshToken?: string): Promise<dataportability_v1.Dataportability> {
const TOKEN = {
access_token: accessToken ? accessToken : this.connection.accessToken,
refresh_token: refreshToken ? refreshToken : this.connection.refreshToken,
scope: "https://www.googleapis.com/auth/dataportability.myactivity.search",
token_type: "Bearer",
};

const redirectUrl = "";

const oAuth2Client = new google.auth.OAuth2(
this.config.clientId,
this.config.clientSecret,
redirectUrl
);

oAuth2Client.setCredentials(TOKEN);

return google.dataportability({version: "v1", auth: oAuth2Client})
}

public init() {
console.log(this.config);
passport.use(
new GoogleStrategy(
{
clientID: this.config.clientId,
clientSecret: this.config.clientSecret,
callbackURL: this.config.callbackUrl,
},
function (
accessToken: string,
refreshToken: string,
profile: any,
cb: any
) {
// Simply return the raw data
return cb(null, {
accessToken,
refreshToken,
profile,
});
}
)
);
}
}
73 changes: 73 additions & 0 deletions tests/providers/google-portability/dataportability.tests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
const assert = require("assert");
import {
BaseProviderConfig,
Connection,
SyncHandlerStatus,
SyncSchemaPosition,
SyncSchemaPositionType,
} from "../../../src/interfaces";
import Providers from "../../../src/providers";
import CommonUtils, { NetworkInstance } from "../../common.utils";

import DataPortability from "../../../src/providers/google-portability/dataportability";
import BaseProvider from "../../../src/providers/BaseProvider";
import { CommonTests, GenericTestConfig } from "../../common.tests";

const providerName = "google";
let network: NetworkInstance;
let connection: Connection;
let provider: BaseProvider;

describe(`${providerName} Tests`, function () {
this.timeout(100000);

this.beforeAll(async function () {
network = await CommonUtils.getNetwork();
connection = await CommonUtils.getConnection(providerName);
provider = Providers(providerName, network.context, connection);
});

describe(`Fetch ${providerName} data`, () => {
const handlerName = "google-portability";
const testConfig: GenericTestConfig = {
idPrefix: "google-portability",
timeOrderAttribute: "insertedAt",
batchSizeLimitAttribute: "batchSize",
};
const providerConfig: Omit<BaseProviderConfig, "sbtImage" | "label"> = {};

// it(`Can pass basic tests: ${handlerName}`, async () => {
// await CommonTests.runGenericTests(
// providerName,
// Gmail,
// testConfig,
// providerConfig
// );
// });

it(`Can get data`, async () => {
const syncPosition: Omit<SyncSchemaPosition, "_id" | "schemaUri"> = {
type: SyncSchemaPositionType.SYNC,
provider: providerName,
status: SyncHandlerStatus.ACTIVE,
};

providerConfig.batchSize = 10;

const syncResponse = await CommonTests.runSyncTest(
providerName,
DataPortability,
testConfig,
syncPosition,
providerConfig
);

console.log(syncResponse)
});
});

this.afterAll(async function () {
const { context } = await CommonUtils.getNetwork();
await context.close();
});
});
Loading