Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.6.0-rc7 crashing #419

Closed
kdesjard opened this issue Sep 17, 2024 · 23 comments
Closed

0.6.0-rc7 crashing #419

kdesjard opened this issue Sep 17, 2024 · 23 comments

Comments

@kdesjard
Copy link
Contributor

kdesjard commented Sep 17, 2024

After making a number of changes to get 0.6.0-rc7 to compile, I've set the storage backend as always (which includes my request type):

.backend(storage.clone())

But now I get:

internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyType>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyType>>)

Why do I need to use .data now??

@geofmureithi
Copy link
Owner

There is always something that passes through a refactor.
I will write a quick fix

@kdesjard
Copy link
Contributor Author

Ok thanks. I do like the new Request struct layout as I was doing this myself in my own struct. Overall, it's much simpler now.

@geofmureithi
Copy link
Owner

geofmureithi commented Sep 22, 2024

After consideration, this will possibly be resolved externally. Here are some thoughts:

  1. Adding back an implicit Data Layer for the backend would impose a clone for all backends which is not the case for Redis.
  2. We need to standardize dependency injection, we cant inject the backend sparingly.
  3. Not every job needs access the storage, I think an extra .data( line would not hurt your case.

Let me know your thoughts.

@kdesjard
Copy link
Contributor Author

Ok, if I add the storage via .data, then I get:

Worker missing required context: Missing the an entry for apalis_core::task::task_id::TaskId. Did you forget to add `.data(<apalis_core::task::task_id::TaskId>)

@geofmureithi
Copy link
Owner

geofmureithi commented Sep 22, 2024

I am updating documentation on this.
Since the introduction of Parts, the following are injected directly:

  • TaskId
  • Attempt
  • Namespace
  • Ctx dependent on the Backend

These are no longer available via Data(T), just inject the directly as T into your job function.

@geofmureithi
Copy link
Owner

geofmureithi commented Sep 24, 2024

Let me know if that helps. You can also look at the fn-args example in the v0.6 tree.

@kdesjard
Copy link
Contributor Author

Creating the worker pool:

let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());

let worker = WorkerBuilder::new("MyWorker")
    .chain(|srv| srv.layer(TraceLayer::new()))
    .backend(storage.clone())
    .build_fn(execute);

The build_fn like the fn-args example:

pub async fn execute(
    mut proc: MyReq,
    worker_id: Data<WorkerId>,
    _worker_ctx: Context<TokioExecutor>,
    _sqlite: Data<PostgresStorage<MyReq>>,
    task_id: Data<TaskId>,
    _ctx: Data<SqlContext>,

then I get the crash:

thread 'tokio-runtime-worker' panicked at ..cargo/git/checkouts/apalis-2e5337d3a5750988/d62281f/packages/apalis-core/src/worker/mod.rs:468:45:                                                                                                                   
internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyReq>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyReq>>) 

@geofmureithi
Copy link
Owner

geofmureithi commented Sep 26, 2024

You need to do the following:

let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());

let worker = WorkerBuilder::new("MyWorker")
    .chain(|srv| srv.layer(TraceLayer::new()))
    .data(storage.clone())
    .backend(storage.clone())
    .build_fn(execute);

And for your job fn:

pub async fn execute(
    mut proc: MyReq,
    worker_id: Data<WorkerId>,
    _worker_ctx: Context<TokioExecutor>,
    _sqlite: Data<PostgresStorage<MyReq>>, //We now include this via data
    task_id: TaskId, // TaskId Injected directly
    _ctx: SqlContext // Ctx is injected directly
)

@kdesjard
Copy link
Contributor Author

With that setup I get many trait errors like:

72 | .build_fn(execute);
| ^^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>, which is required by apalis::prelude::WorkerBuilder<MyReq, SqlContext, PostgresStorage<MyReq>, Stack<Data<PostgresStorage<MyReq>>, Stack<TraceLayer, Identity>>, _>: ap alis::prelude::WorkerFactory<_, _, apalis::prelude::ServiceFn<_, _, _, _>>

71 | .backend(storage.clone())
| ^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>

@geofmureithi
Copy link
Owner

Well seems I fkd up somewhere.
Looking at the docs, seems I am wrong about some of those. Looks like I missed that during the refactor. I will write a patch and clarify.

@geofmureithi
Copy link
Owner

Hey @kdesjard Please check out #425 and tell me if that branch works for you.

@kdesjard
Copy link
Contributor Author

Compiles and runs, but now the job gets stuck in the running state.

@geofmureithi
Copy link
Owner

Ok interesting. Could you try and pick a test from here, and provide a failing test?

@kdesjard
Copy link
Contributor Author

Is this related to #422 ?

My build_fn finishes fine, and I see job.done:

DEBUG ThreadId(06) task: apalis::layers::tracing::on_response: job.done done_in=1ms result=OverallResult { ... }

but the status does not change to Done (attempt also equals 0 ?)

    parts: Parts {
        task_id: TaskId(
            Ulid(
                2088692579879522513577402759446946266,
            ),
        ),
        data: Extensions,
        attempt: Attempt(
            0,
        ),
        context: SqlContext {
            status: Running,
            run_at: 2024-09-30T19:53:27.660139Z,
            max_attempts: 1,
            last_error: None,
            lock_at: Some(
                1727726007,
            ),
            lock_by: Some(
                WorkerId {
                    name: "WORKER",
                    instance: None,
                },
            ),
            done_at: None,
        },

@geofmureithi
Copy link
Owner

Tower consumes the request as self, so after the job is done, the parts part might need to be refreshed via fetch_by_id.
Where and when are you debugging the context?

@kdesjard
Copy link
Contributor Author

kdesjard commented Oct 1, 2024

I submit the job and run storage.fetch_by_id to poll the task which is where I'm debugging the context. Even without polling, the DB still has the status as Running:

             id             | status  | attempts | max_attempts |            run_at             |            done_at            
----------------------------+---------+----------+--------------+-------------------------------+-------------------------------
 01J93SDK5HAJE7KGKC17XE4P6B | Running |        0 |            1 | 2024-10-01 10:18:38.130224+00 | 

@geofmureithi
Copy link
Owner

Ack on postgres is chucked and lazy. Can you try sleeping a second or two?

@kdesjard
Copy link
Contributor Author

kdesjard commented Oct 1, 2024

It doesn't matter if I check or not as I have two modes, sync and async, the sync requests do the polling to check for completion within a timeout period. For async jobs, the tasks are fire and forget essentially. So, even for async jobs, the DB still has them as running even though they did complete.

@geofmureithi
Copy link
Owner

geofmureithi commented Oct 2, 2024 via email

@geofmureithi
Copy link
Owner

geofmureithi commented Oct 3, 2024

@kdesjard I found the bug I was missing a critical implementation in the worker setup. Thanks for highlighting and always trying the latest version.

Basically the AckLayer was not being called, but now that's fixed with a test added for each storage. Please give it a try before I merge it.

@kdesjard
Copy link
Contributor Author

kdesjard commented Oct 3, 2024

Jobs are finishing now, but I'm getting trait errors if I include the TaskId in the build_fn's args.

@geofmureithi
Copy link
Owner

Could you pull the latest from the 0.6 branch and give it a try? There was some other fixes on FromRequest

@kdesjard
Copy link
Contributor Author

kdesjard commented Oct 4, 2024

All working now, thank you very much.

@kdesjard kdesjard closed this as completed Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants