Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data stores, Collections, and Buffers #2190

Open
christad92 opened this issue Jun 10, 2024 · 3 comments
Open

Data stores, Collections, and Buffers #2190

christad92 opened this issue Jun 10, 2024 · 3 comments
Assignees
Labels

Comments

@christad92
Copy link

christad92 commented Jun 10, 2024

We want to add the ability for Job code to be able to access a common datastore during execution.

The current proposal is that we leverage our existing postgres database, but must remain eagle eyed on performance implications as most of this data will not be indexable.

This will require work on both Lightning and the worker code.

Worker

  • Needs to be provide an interface for all execution environments for:
    • building queries
    • handing over queries to parent/other process for invocation
    • sends the Runs token to Lightning
    • receiving streaming responses from Lighting
  • TBD Write a spec for the query interface
  • Needed links and plans for the worker

Lightning

  • Will have an HTTP API/Controller for handling requests for collections that:
    • Is authenticated via a run token
    • Calls functions to execute queries
    • Streams them back out
  • A Collections context that:
    • Parses and validates queries from the worker
    • Has a query handler that can execute queries using Ecto dynamic.
    • Returns a stream of records
  • An interface for creating collections
    • Collections will be project scoped
    • TBD Where would this interface be and who can use it?
    • TBD Will users be able to view, query or manage the data from the get go?

Grouped Deliverables

This is a rough bounding of features we can take on individually:

Job

  1. Provide a JS interface that is available during runtime for all Jobs.
  2. Figure out how to make an async iterator over a cross process message

Worker

  1. Make an authenticated query to Lightning
  2. Expect a streaming response so results can be dispatched to the Job before all of them arrive.

Controller

  1. Phoenix controller that can authenticate using a Run token
  2. Enforces the scope of the query using the token.
  3. Calls out to the Collections context for execution of the query
  4. Returns a stream of items.
  5. Ensures that the streams are batched and limited.

Collections

  1. Design a datastructure
  2. Add said tables
  3. Add the Collections context module
  4. Define the shape of query parameters that the worker will use
  5. Decide what clauses we want to support
  6. Cast the params, validate them and execute a query that returns a stream.

Still TBD

  1. Limitations, we don't want massive JSONB items being stored in the DB, rather lots of smaller items.
  2. Consider expiration of records
  3. How can we build guardrails into the delete functionality?
  4. Mutation/push/insert/update
  5. Who gets to create collections
  6. What is the appetite for a UI that does one or more of:
    1. Creating and deleting Collections
    2. Creating or deleting records
    3. Viewing records
    4. Deleting records
flowchart TD

  

subgraph Collections

H([Collections<br/>Cast query into Ecto query]):::yellow

I([Collections<br/>Execute query]):::yellow

end

subgraph API

D([Controller<br/>Authenticates using Run token]):::blue

E([Controller<br/>Passes query to Collections]):::blue

F([Controller<br/>Serialize stream into JSON]):::blue

G([Controller<br/>Send streaming response<br/>back to Worker]):::blue

end

  

subgraph Worker

J([Job<br/>Makes a call to collections]):::green

A([Worker<br/>Receives query call]):::red

B([Worker<br/>Makes call to Lightning]):::red

C([Worker<br/>Marshal records back to Job]):::red

K([Job<br/>Receive records and present<br/>async iterator]):::green
end

  

J --> A

A --> B

B --> D

D --> E

E --> H

H --> I

I --> F

F --> G

G --> C

C --> K
  

%% Define color styles

classDef blue fill:#1E90FF,stroke:#000,stroke-width:2px;

classDef red fill:#FF6347,stroke:#000,stroke-width:2px;

classDef yellow fill:#FFD700,stroke:#000,stroke-width:2px,color:#000;

classDef green fill:#15803d,stroke:#000,stroke-width:2px;
Loading
@christad92 christad92 added this to v2 Jun 10, 2024
@christad92 christad92 moved this to Backlog in v2 Jun 10, 2024
@christad92 christad92 changed the title Data stores and Collections Data stores, Collections, and Buffers Jul 15, 2024
@stuartc stuartc self-assigned this Sep 13, 2024
@stuartc
Copy link
Member

stuartc commented Sep 13, 2024

@jyeshe @josephjclark I've updated the issue description for you guys to take a look and think before we chat next.

@josephjclark
Copy link
Contributor

josephjclark commented Sep 20, 2024

Summary of discussion with @stuartc :

We've agreed that support for the collections API is mostly in the form of a regular adaptor. A standalone adaptor, not common.

To make this work, Runs will need to support two adaptors for each step. As it happens the runtime already supports this, so we just need to ensure support in the CLI, worker and engine. The Lightning contract does not need to change: the Worker will append the collections adaptor to the run spec.

Lightning can later explicitly send an array of adaptors if it wants to (this will be important if the user wants to pick a collections provider from a list).

The collections API will use the run token to authorise all incoming requests. Ie, when calling out to the collections API, you must include an API token.

The Worker will attach the run token to state.configuration and pass that into runtime. The adaptor will then read the token from state, like any adaptor does.

The run token is totally safe to send to the job code. While the run token can be loads to load credentials and dataclips from lightning, it can only be used with a web socket API. And to connect to the web socket you need a separate worker token. So it's basically useless to you.

We can however add further security by separating out state and configuration into two objects. The signature for an operation would become (state, config) => (...args) => State. The config object would only be shared with the adaptor, not with job code. This is a worthwhile but large change to the runtime and adaptors. It doesn't necessarily need to be tied to this work, bu maybe it's a 2024 project still.

This approach is entirely compatible with the CLI. Users will need to explicitly specify the collections adaptoir (ie, openfn workflow.json -a dhis2 -a collections). Maybe later the CLI can auto-load a collections adaptor if there's a collections api key on config (let's see).

So, here's the engineering work in JS land:

@josephjclark
Copy link
Contributor

@stuartc any thoughts on expiration of records? Data retention stuff? A burn after reading option?

@stuartc stuartc mentioned this issue Oct 30, 2024
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: In review
Development

No branches or pull requests

3 participants