Skip to content

Commit

Permalink
fix: Fix translation limits handling
Browse files Browse the repository at this point in the history
  • Loading branch information
3y3 committed Mar 5, 2024
1 parent 5526cab commit 193dbe9
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 166 deletions.
259 changes: 159 additions & 100 deletions src/cmd/translate/handler.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import axios from 'axios';
import axios, {AxiosError, AxiosResponse} from 'axios';
import {green, red} from 'chalk';
import {Arguments} from 'yargs';
import {ArgvService} from '../../services';
import {logger} from '../../utils';
import {ok} from 'assert';
import {dirname, extname, join, resolve} from 'path';
import {mkdir} from 'fs/promises';
import {getYandexAuth} from './yandex/auth';
import {asyncify, eachLimit, retry} from 'async';
import {asyncify, eachLimit} from 'async';

import {
AuthError,
Defer,
LimitExceed,
RequestError,
TranslateError,
TranslateParams,
bytes,
compose,
Expand All @@ -23,24 +28,39 @@ const REQUESTS_LIMIT = 20;
const BYTES_LIMIT = 10000;
const RETRY_LIMIT = 3;

class TranslatorError extends Error {
path: string;
type TranslatorParams = {
input: string;
output: string;
sourceLanguage: string;
targetLanguage: string;
// yandexCloudTranslateGlossaryPairs: YandexCloudTranslateGlossaryPair[];
};

constructor(message: string, path: string) {
super(message);
type RequesterParams = {
auth: string;
folderId: string | undefined;
sourceLanguage: string;
targetLanguage: string;
dryRun: boolean;
};

this.path = path;
}
}
type Request = {
(texts: string[]): () => Promise<void>;
stat: {
bytes: number;
chunks: number;
};
};

class RequestError extends Error {
code: string;
type Split = (path: string, texts: string[]) => Promise<string[]>;

constructor(error: Error) {
super(error?.message || String(error));
this.code = 'REQUEST_ERROR';
}
}
type Cache = Map<string, Defer>;

type Translations = {
translations: {
text: string;
}[];
};

export async function handler(args: Arguments<any>) {
const params = normalizeParams({
Expand Down Expand Up @@ -85,106 +105,130 @@ export async function handler(args: Arguments<any>) {
try {
await translate(file);
} catch (error: any) {
logger.error(file, error.message);
if (error instanceof TranslateError) {
logger.error(file, `${error.message}`, error.code);

if (error.fatal) {
process.exit(1);
}
} else {
logger.error(file, error.message);
}
}
}),
);

console.log('PROCESSED', `bytes: ${request.stat.bytes} chunks: ${request.stat.chunks}`);
console.log(
green('PROCESSED'),
`bytes: ${request.stat.bytes} chunks: ${request.stat.chunks}`,
);
}
} catch (error: any) {
const message = error.message;

const file = error instanceof TranslatorError ? error.path : '';
if (error instanceof TranslateError) {
console.error(red(error.code), error.message);
} else {
console.error(error);
}

logger.error(file, message);
process.exit(1);
}
}

type TranslatorParams = {
input: string;
output: string;
sourceLanguage: string;
targetLanguage: string;
// yandexCloudTranslateGlossaryPairs: YandexCloudTranslateGlossaryPair[];
};
function scheduler(limit: number, interval: number) {
const scheduled: Defer<void>[] = [];

type RequesterParams = {
auth: string;
folderId: string | undefined;
sourceLanguage: string;
targetLanguage: string;
dryRun: boolean;
};
let processing = 0;

type Request = {
(texts: string[]): () => Promise<string[]>;
stat: {
bytes: number;
chunks: number;
};
};
function idle() {
const defer = new Defer<void>();

type Split = (path: string, texts: string[]) => Promise<string>[];
scheduled.push(defer);

type Cache = Map<string, Defer>;
return defer.promise;
}

type Translations = {
translations: {
text: string;
}[];
};
async function queue() {
processing++;
await wait(interval);
processing--;
unqueue();
}

function requester(params: RequesterParams, cache: Cache) {
const {auth, folderId, sourceLanguage, targetLanguage, dryRun} = params;
const resolve = (text: string, index: number, texts: string[]) => {
const defer = cache.get(texts[index]);
if (defer) {
defer.resolve(text);
async function unqueue() {
scheduled.shift()?.resolve();
}

return async function <R>(action: Function): Promise<R> {
if (processing >= limit) {
await idle();
}
return text;

queue();

return action();
};
}

function requester(params: RequesterParams, cache: Cache) {
const {auth, folderId, sourceLanguage, targetLanguage, dryRun} = params;
const schedule = scheduler(REQUESTS_LIMIT, 1000);

const request = function request(texts: string[]) {
const resolve = (text: string, index: number) => {
const defer = cache.get(texts[index]);
if (defer) {
defer.resolve(text);
}
};

request.stat.bytes += bytes(texts);
request.stat.chunks++;

return async function () {
if (dryRun) {
return texts.map(resolve);
texts.forEach(resolve);
}

return axios({
method: 'POST',
url: 'https://translate.api.cloud.yandex.net/translate/v2/translate',
headers: {
'Content-Type': 'application/json',
Authorization: auth,
},
data: {
folderId,
texts,
sourceLanguageCode: sourceLanguage,
targetLanguageCode: targetLanguage,
format: 'HTML',
},
})
.then(({data, status}) => {
if (status === 200) {
return data;
} else {
throw new Error(data.message);
try {
const {data} = await schedule<AxiosResponse<Translations>>(() =>
axios({
method: 'POST',
url: 'https://translate.api.cloud.yandex.net/translate/v2/translate',
timeout: 5000,
maxRedirects: 0,
headers: {
Authorization: auth,
'Content-Type': 'application/json',
'User-Agent': 'github.com/diplodoc-platform/cli',
},
data: {
folderId,
texts,
sourceLanguageCode: sourceLanguage,
targetLanguageCode: targetLanguage,
format: 'HTML',
},
}),
);

return data.translations.map(({text}) => text).forEach(resolve);
} catch (error: any) {
if (error instanceof AxiosError) {
const {response} = error;
const {status, statusText, data} = response as AxiosResponse;

switch (true) {
case LimitExceed.is(data.message):
throw new LimitExceed(data.message);
case AuthError.is(data.message):
throw new AuthError(data.message);
default:
throw new RequestError(status, statusText, data);
}
})
.then((result: Translations) => {
return result.translations.map(({text}, index) => {
return resolve(text, index, texts);
});
})
.catch((error) => {
console.error(error);
throw new RequestError(error);
});
}

throw new RequestError(0, error.message, {fatal: true});
}
};
};

Expand Down Expand Up @@ -234,21 +278,22 @@ function translator(params: TranslatorParams, split: Split) {
return;
}

const parts = await Promise.all(split(path, units));
const parts = await split(path, units);
const composed = compose(skeleton, parts, {useSource: true});

await dumpFile(outputPath, composed);
};
}

function splitter(request: Request, cache: Cache): Split {
return function (path: string, texts: string[]) {
return async function (path: string, texts: string[]) {
const promises: Promise<string>[] = [];
const requests: Promise<void>[] = [];
let buffer: string[] = [];
let bufferSize = 0;

const release = () => {
backoff(request(buffer));
requests.push(backoff(request(buffer)));
buffer = [];
bufferSize = 0;
};
Expand Down Expand Up @@ -278,16 +323,30 @@ function splitter(request: Request, cache: Cache): Split {
release();
}

return promises;
await Promise.all(requests);

return Promise.all(promises);
};
}

function backoff(action: () => Promise<string[]>): Promise<string[]> {
return retry(
{
times: RETRY_LIMIT,
interval: (count: number) => Math.pow(2, count) * 1000,
},
asyncify(action),
);
function wait(interval: number) {
const defer = new Defer<void>();
setTimeout(() => defer.resolve(), interval);
return defer.promise;
}

async function backoff(action: () => Promise<void>): Promise<void> {
let retry = 0;

while (++retry < RETRY_LIMIT) {
try {
await action();
} catch (error: any) {
if (RequestError.canRetry(error)) {
await wait(Math.pow(2, retry) * 1000);
} else {
throw error;
}
}
}
}
Loading

0 comments on commit 193dbe9

Please sign in to comment.