diff --git a/ocaml/xapi/xapi_xenops.ml b/ocaml/xapi/xapi_xenops.ml index ba93d37d424..28a527511c1 100644 --- a/ocaml/xapi/xapi_xenops.ml +++ b/ocaml/xapi/xapi_xenops.ml @@ -3207,7 +3207,9 @@ end (* XXX: PR-1255: we also want to only listen for events on VMs and fields we care about *) let events_from_xapi () = let open Event_types in - Server_helpers.exec_with_new_task "xapi events" (fun __context -> + Server_helpers.exec_with_new_task ~task_in_database:true "xapi events" + (fun __context -> + let task = Context.get_task_id __context |> Ref.string_of in let localhost = Helpers.get_localhost ~__context in let token = ref "" in let stop = ref false in @@ -3216,19 +3218,18 @@ let events_from_xapi () = Helpers.call_api_functions ~__context (fun rpc session_id -> (trigger_xenapi_reregister := fun () -> - (* This causes Event.next () and Event.from () to return SESSION_INVALID *) + debug + "triggering xapi event thread to re-register via event \ + injection" ; try - debug - "triggering xapi event thread to re-register via \ - session.logout" ; - XenAPI.Session.logout ~rpc ~session_id - with - | Api_errors.Server_error (code, _) - when code = Api_errors.session_invalid -> - debug "Event thread has already woken up" - | e -> - error "Waking up the xapi event thread: %s" - (string_of_exn e) + let _ = + XenAPI.Event.inject ~rpc ~session_id ~_class:"task" + ~_ref:task + in + () + with e -> + error "Waking up the xapi event thread: %s" + (string_of_exn e) ) ; (* We register for events on resident_VMs only *) let resident_VMs = @@ -3251,13 +3252,15 @@ let events_from_xapi () = error "events_from_xapi: extra items in the cache: [ %s ]" (String.concat "; " (StringSet.elements extra_in_cache)) ; let classes = - List.map - (fun x -> Printf.sprintf "VM/%s" (Ref.string_of x)) - resident_VMs + Printf.sprintf "task/%s" task + :: List.map + (fun x -> Printf.sprintf "VM/%s" (Ref.string_of x)) + resident_VMs in (* NB we re-use the old token so we don't get events we've already - received BUT we will not necessarily receive events for the new VMs *) - while true do + received BUT we will not necessarily receive events for the new VMs *) + let reregister = ref false in + while not !reregister do let api_timeout = 60. in let timeout = 30. @@ -3307,6 +3310,11 @@ let events_from_xapi () = raise e ) ) + | {ty= "task"; reference= t} when t = task -> + debug + "Woken event thread: updating list of event \ + subscriptions" ; + reregister := true | _ -> warn "Received event for something we didn't register for!" @@ -3319,7 +3327,9 @@ let events_from_xapi () = with | Api_errors.Server_error (code, _) when code = Api_errors.session_invalid -> - debug "Woken event thread: updating list of event subscriptions" + debug + "Caught SESSION_INVALID listening to events from xapi. \ + Restarting thread immediately." | Api_errors.Server_error (code, _) when code = Api_errors.xen_incompatible -> warn @@ -3327,7 +3337,10 @@ let events_from_xapi () = incompatibility" ; stop := true | e -> - debug "Caught %s listening to events from xapi" (string_of_exn e) ; + debug + "Caught %s listening to events from xapi. Restarting thread \ + after 15 seconds." + (string_of_exn e) ; (* Start from scratch *) token := "" ; Thread.delay 15.