Skip to content

Commit

Permalink
Merge pull request #797 from OpenFn/multi-adaptors-compiler-imports
Browse files Browse the repository at this point in the history
Support multiple adaptors in the compiler
  • Loading branch information
josephjclark authored Oct 22, 2024
2 parents 591bcc8 + ad15608 commit 9186fa8
Show file tree
Hide file tree
Showing 53 changed files with 702 additions and 500 deletions.
5 changes: 5 additions & 0 deletions .changeset/neat-lies-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/engine-multi': minor
---

Support multiple adaptors in job structures
5 changes: 5 additions & 0 deletions .changeset/sharp-needles-peel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Update worker to use adaptors as an array on xplans. Internal only change.
5 changes: 5 additions & 0 deletions .changeset/sharp-tips-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/compiler': minor
---

support multiple adaptors when adding imports
5 changes: 5 additions & 0 deletions .changeset/wild-donuts-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/cli': patch
---

Support multiple adaptors
10 changes: 6 additions & 4 deletions integration-tests/execute/src/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ const execute = async (job: string, state: any, adaptor = 'common') => {
// compile with common and dumb imports
const options = {
'add-imports': {
adaptor: {
name: `@openfn/language-${adaptor}`,
exportAll: true,
},
adaptors: [
{
name: `@openfn/language-${adaptor}`,
exportAll: true,
},
],
},
};
const compiled = compiler(job, options);
Expand Down
58 changes: 30 additions & 28 deletions packages/cli/src/compile/compile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ const compileWorkflow = async (
const job = step as Job;
const jobOpts = {
...opts,
adaptors: job.adaptors ?? opts.adaptors,
};
if (job.adaptor) {
jobOpts.adaptors = [job.adaptor];
}
if (job.expression) {
job.expression = await compileJob(
job.expression as string,
Expand Down Expand Up @@ -115,37 +113,41 @@ export const loadTransformOptions = async (
// If an adaptor is passed in, we need to look up its declared exports
// and pass them along to the compiler
if (opts.adaptors?.length && opts.ignoreImports != true) {
let exports;
const [pattern] = opts.adaptors;
const [specifier] = pattern.split('=');

// Preload exports from a path, optionally logging errors in case of a failure
log.debug(`Trying to preload types for ${specifier}`);
const path = await resolveSpecifierPath(pattern, opts.repoDir, log);
if (path) {
try {
exports = await preloadAdaptorExports(
path,
opts.useAdaptorsMonorepo,
log
);
} catch (e) {
log.error(`Failed to load adaptor typedefs from path ${path}`);
log.error(e);
const adaptorsConfig = [];
for (const adaptorInput of opts.adaptors) {
let exports;
const [specifier] = adaptorInput.split('=');

// Preload exports from a path, optionally logging errors in case of a failure
log.debug(`Trying to preload types for ${specifier}`);
const path = await resolveSpecifierPath(adaptorInput, opts.repoDir, log);
if (path) {
try {
exports = await preloadAdaptorExports(
path,
opts.useAdaptorsMonorepo,
log
);
} catch (e) {
log.error(`Failed to load adaptor typedefs from path ${path}`);
log.error(e);
}
}
}

if (!exports || exports.length === 0) {
log.debug(`No module exports found for ${pattern}`);
}
if (!exports || exports.length === 0) {
log.debug(`No module exports found for ${adaptorInput}`);
}

options['add-imports'] = {
ignore: opts.ignoreImports as string[],
adaptor: {
adaptorsConfig.push({
name: stripVersionSpecifier(specifier),
exports,
exportAll: true,
},
});
}

options['add-imports'] = {
ignore: opts.ignoreImports as string[],
adaptors: adaptorsConfig,
};
}

Expand Down
12 changes: 7 additions & 5 deletions packages/cli/src/execute/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ export function parseAdaptors(plan: ExecutionPlan) {

const adaptors: ModuleInfoMap = {};

// TODO what if there are different versions of the same adaptor?
// This structure can't handle it - we'd need to build it for every job
Object.values(plan.workflow.steps).forEach((step) => {
const job = step as Job;
if (job.adaptor) {
const { name, ...maybeVersionAndPath } = extractInfo(job.adaptor);
// Usually every job should have an adaptors array
// But there are a couple of cases mostly in test, when validation is skipped,
// when the array may not be set
// It's mostly redundant nbut harmless to optionally chain here
job.adaptors?.forEach((adaptor) => {
const { name, ...maybeVersionAndPath } = extractInfo(adaptor);
adaptors[name] = maybeVersionAndPath;
}
});
});

return adaptors;
Expand Down
8 changes: 5 additions & 3 deletions packages/cli/src/execute/get-autoinstall-targets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ const getAutoinstallTargets = (plan: ExecutionPlan) => {
Object.values(plan.workflow.steps).forEach((step) => {
const job = step as Job;
// Do not autoinstall adaptors with a path
if (job.adaptor && !/=/.test(job.adaptor)) {
adaptors[job.adaptor] = true;
}
job.adaptors
?.filter((adaptor) => !/=/.test(adaptor))
.forEach((adaptor) => {
adaptors[adaptor] = true;
});
});
return Object.keys(adaptors);
};
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/execute/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => {
if (options.start) {
customStart = matchStep(
plan,
options.start ?? plan.options.start,
options.start ?? plan.options!.start,
'start',
logger
);
Expand All @@ -95,7 +95,7 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => {
if (options.end) {
customEnd = matchStep(
plan,
options.end ?? plan.options.end,
options.end ?? plan.options!.end,
'end',
logger
);
Expand All @@ -113,8 +113,8 @@ const executeHandler = async (options: ExecuteOptions, logger: Logger) => {
const finalPlan = {
...plan,
options: {
...plan.options,
start: customStart || plan.options.start,
...plan.options!,
start: customStart || plan.options!.start,
end: customEnd,
},
workflow: plan.workflow,
Expand Down
20 changes: 15 additions & 5 deletions packages/cli/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
// the executionPLan for the CLI is a bit differnt to the runtime's perspective

import { Trigger, UUID, WorkflowOptions } from '@openfn/lexicon';

// Ie config can be a string
export type JobNodeID = string;

Expand All @@ -9,19 +12,26 @@ export type OldCLIWorkflow = {
};

export type CLIExecutionPlan = {
id?: string; // UUID for this plan
start?: JobNodeID;
jobs: CLIJobNode[];
id?: string;
options?: WorkflowOptions;
workflow: {
id?: UUID;
name?: string;
steps: Array<CLIJobNode | Trigger>;
};
};

export type CLIJobNode = {
id?: string;
adaptor?: string;
expression?: string; // path or expression
configuration?: string | object; // path or credential object
data?: any;

next?: string | Record<JobNodeID, true | CLIJobEdge>;

// We can accept a single adaptor or multiple
// The CLI will convert it to adaptors as an array
adaptor?: string;
adaptors?: string[];
};

export type CLIJobEdge = {
Expand Down
6 changes: 2 additions & 4 deletions packages/cli/src/util/expand-adaptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ const expand = (name: string) => {

type ArrayOrPlan<T> = T extends string[] ? string[] : ExecutionPlan;

// TODO typings here aren't good,I can't get this to work!
// At least this looks nice externally
export default <T extends Array<string> | ExecutionPlan>(
input: T
): ArrayOrPlan<T> => {
Expand All @@ -26,8 +24,8 @@ export default <T extends Array<string> | ExecutionPlan>(
const plan = input as ExecutionPlan;
Object.values(plan.workflow.steps).forEach((step) => {
const job = step as Job;
if (job.adaptor) {
job.adaptor = expand(job.adaptor);
if (job.adaptors) {
job.adaptors = job.adaptors.map(expand);
}
});

Expand Down
43 changes: 28 additions & 15 deletions packages/cli/src/util/load-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import mapAdaptorsToMonorepo from './map-adaptors-to-monorepo';
import type { ExecutionPlan, Job, WorkflowOptions } from '@openfn/lexicon';
import type { Opts } from '../options';
import type { Logger } from './logger';
import type { OldCLIWorkflow } from '../types';
import type { CLIExecutionPlan, CLIJobNode, OldCLIWorkflow } from '../types';

const loadPlan = async (
options: Pick<
Expand Down Expand Up @@ -36,6 +36,7 @@ const loadPlan = async (

const json = await loadJson(jsonPath!, logger);
const defaultName = path.parse(jsonPath!).name;

if (json.workflow) {
return loadXPlan(json, options, logger, defaultName);
} else {
Expand Down Expand Up @@ -94,21 +95,17 @@ const loadExpression = async (
const expression = await fs.readFile(expressionPath, 'utf8');
const name = path.parse(expressionPath).name;

const step: Job = { expression };

// The adaptor should have been expanded nicely already, so we don't need intervene here
if (options.adaptors) {
const [adaptor] = options.adaptors;
if (adaptor) {
step.adaptor = adaptor;
}
}
const step: Job = {
expression,
// The adaptor should have been expanded nicely already, so we don't need intervene here
adaptors: options.adaptors ?? [],
};

const wfOptions: WorkflowOptions = {};
// TODO support state props to remove?
maybeAssign(options, wfOptions, ['timeout']);

const plan: ExecutionPlan = {
const plan: CLIExecutionPlan = {
workflow: {
name,
steps: [step],
Expand All @@ -126,7 +123,7 @@ const loadExpression = async (
);

// This will never execute
return {} as ExecutionPlan;
return {} as CLIExecutionPlan;
}
};

Expand Down Expand Up @@ -188,7 +185,7 @@ const fetchFile = async (
// TODO this is currently untested in load-plan
// (but covered a bit in execute tests)
const importExpressions = async (
plan: ExecutionPlan,
plan: CLIExecutionPlan,
rootDir: string,
log: Logger
) => {
Expand Down Expand Up @@ -234,8 +231,22 @@ const importExpressions = async (
}
};

// Allow users to specify a single adaptor on a job,
// but convert the internal representation into an array
const ensureAdaptors = (plan: CLIExecutionPlan) => {
Object.values(plan.workflow.steps).forEach((step) => {
const job = step as CLIJobNode;
if (job.adaptor) {
job.adaptors = [job.adaptor];
delete job.adaptor;
}
// Also, ensure there is an empty adaptors array, which makes everything else easier
job.adaptors ??= [];
});
};

const loadXPlan = async (
plan: ExecutionPlan,
plan: CLIExecutionPlan,
options: Pick<Opts, 'monorepoPath' | 'baseDir' | 'expandAdaptors'>,
logger: Logger,
defaultName: string = ''
Expand All @@ -247,6 +258,8 @@ const loadXPlan = async (
if (!plan.workflow.name && defaultName) {
plan.workflow.name = defaultName;
}
ensureAdaptors(plan);

// Note that baseDir should be set up in the default function
await importExpressions(plan, options.baseDir!, logger);
// expand shorthand adaptors in the workflow jobs
Expand All @@ -261,5 +274,5 @@ const loadXPlan = async (

logger.info(`Loaded workflow ${plan.workflow.name ?? ''}`);

return plan;
return plan as ExecutionPlan;
};
6 changes: 4 additions & 2 deletions packages/cli/src/util/map-adaptors-to-monorepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ const mapAdaptorsToMonorepo = (
const plan = input as ExecutionPlan;
Object.values(plan.workflow.steps).forEach((step) => {
const job = step as Job;
if (job.adaptor) {
job.adaptor = updatePath(job.adaptor, monorepoPath, log);
if (job.adaptors) {
job.adaptors = job.adaptors.map((a) =>
updatePath(a, monorepoPath, log)
);
}
});

Expand Down
Loading

0 comments on commit 9186fa8

Please sign in to comment.