From 93a4c77b793ab18aec1bc83e7ab417ed98512de5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Tue, 4 Jul 2023 15:15:59 +0100 Subject: [PATCH 01/17] [Maintenance] xapi-types does not need ocaml-migrate-parsetree MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- ocaml/xen-api-client/lib/dune | 2 +- xapi-types.opam | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ocaml/xen-api-client/lib/dune b/ocaml/xen-api-client/lib/dune index dd26361adef..bdbd9596d43 100644 --- a/ocaml/xen-api-client/lib/dune +++ b/ocaml/xen-api-client/lib/dune @@ -8,10 +8,10 @@ cohttp re.str rpclib - xapi-rrd uri uuid xmlm + xapi-rrd xapi-client xapi-types ) diff --git a/xapi-types.opam b/xapi-types.opam index a43f486731a..c3a998e5004 100644 --- a/xapi-types.opam +++ b/xapi-types.opam @@ -13,7 +13,6 @@ depends: [ "ocaml" "dune" {build & >= "1.4"} "astring" - "ocaml-migrate-parsetree" "ppx_deriving_rpc" "rpclib" "sexpr" From 0442a0206564a1230de88bac5d6d32747f72f831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 17 Aug 2023 18:30:42 +0100 Subject: [PATCH 02/17] [benchmark] CP-43769: add named mutex benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- ocaml/tests/bench/bechamel_simple_cli.ml | 49 +++++++++++++++++++++++ ocaml/tests/bench/bench_named_mutex.ml | 50 ++++++++++++++++++++++++ ocaml/tests/bench/dune | 6 +++ 3 files changed, 105 insertions(+) create mode 100644 ocaml/tests/bench/bechamel_simple_cli.ml create mode 100644 ocaml/tests/bench/bench_named_mutex.ml create mode 100644 ocaml/tests/bench/dune diff --git a/ocaml/tests/bench/bechamel_simple_cli.ml b/ocaml/tests/bench/bechamel_simple_cli.ml new file mode 100644 index 00000000000..a32723f7c47 --- /dev/null +++ b/ocaml/tests/bench/bechamel_simple_cli.ml @@ -0,0 +1,49 @@ +(* based on bechamel example code *) +open Bechamel +open Toolkit + +let instances = Instance.[monotonic_clock; minor_allocated; major_allocated] + +let benchmark tests = + (* stabilize:true would be the default but it measures GC stabilization time as part of the function + runtime, leading to about 10x as much time measured than without. + It is also confusing for flamegraphs because the GC will show up much more frequently than in reality + due to the thousands of repeated calls. + *) + let cfg = + Benchmark.cfg + ~quota:Time.(second 5.0) + ~start:10 ~stabilize:false ~compaction:false () + in + Benchmark.all cfg instances tests + +let analyze raw_results = + let ols = + Analyze.ols ~r_square:true ~bootstrap:0 ~predictors:[|Measure.run|] + in + let results = + List.map (fun instance -> Analyze.all ols instance raw_results) instances + in + (Analyze.merge ols instances results, raw_results) + +let () = + List.iter (fun i -> Bechamel_notty.Unit.add i (Measure.unit i)) instances + +let img (window, results) = + Bechamel_notty.Multiple.image_of_ols_results ~rect:window + ~predictor:Measure.run results + +open Notty_unix + +let cli tests = + Format.printf "@,Running benchmarks@." ; + let results, _ = tests |> benchmark |> analyze in + + let window = + match winsize Unix.stdout with + | Some (w, h) -> + {Bechamel_notty.w; h} + | None -> + {Bechamel_notty.w= 80; h= 1} + in + img (window, results) |> eol |> output_image diff --git a/ocaml/tests/bench/bench_named_mutex.ml b/ocaml/tests/bench/bench_named_mutex.ml new file mode 100644 index 00000000000..4d7708efdd3 --- /dev/null +++ b/ocaml/tests/bench/bench_named_mutex.ml @@ -0,0 +1,50 @@ +open Bechamel + +let bench_tracing = true + +let test name allocate execute = + let test_mutex m = execute m ignore in + Test.make_with_resource ~name + Test.multiple (* TODO: Test.uniq segfaults here, bechamel bug *) + ~allocate ~free:ignore (Staged.stage test_mutex) + +let mutex_lock_unlock m f = Mutex.lock m ; f () ; Mutex.unlock m + +let tracing_benchmarks () = + let () = Suite_init.harness_init () in + let __context = Test_common.make_test_database () in + let observer = + Xapi_observer.create ~__context ~name_label:"test" ~name_description:"" + ~hosts:[] ~attributes:[] ~endpoints:["bugtool"] ~components:["xapi"] + ~enabled:true + in + let host = !Xapi_globs.localhost_ref in + let () = Xapi_observer.register ~__context ~self:observer ~host in + let open Locking_helpers in + let named_trace_execute m f = + Context.with_tracing __context "bench" @@ fun __context -> + Named_mutex.execute m f + in + test "NamedMutex.execute (tracing)" + (fun () -> Named_mutex.create "test") + named_trace_execute + +let benchmarks = + let open Locking_helpers in + let named_execute m f = Named_mutex.execute m f in + Test.make_grouped ~name:"Mutex" + ([ + test "Mutex.lock/unlock" Mutex.create mutex_lock_unlock + ; test "Mutex.execute" Mutex.create + Xapi_stdext_threads.Threadext.Mutex.execute + ; test "NamedMutex.execute" + (fun () -> Named_mutex.create "test") + named_execute + ] + @ if bench_tracing then [tracing_benchmarks ()] else [] + ) + +let () = + Gc.compact () ; + Memtrace.trace_if_requested () ; + Bechamel_simple_cli.cli benchmarks diff --git a/ocaml/tests/bench/dune b/ocaml/tests/bench/dune new file mode 100644 index 00000000000..375e76a7392 --- /dev/null +++ b/ocaml/tests/bench/dune @@ -0,0 +1,6 @@ +(executable + (name bench_named_mutex) + (modes exe) + (optional) + (libraries bechamel xapi_internal bechamel-notty notty.unix xapi-stdext-threads tests_common memtrace) +) From 700af3c85f33dbe9082cdc15c0c5bc43a57f03a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 29 Jun 2023 14:54:36 +0100 Subject: [PATCH 03/17] [refactor] CP-43769: locking_helpers: introduce a more efficient and type-safe waiting and acquired resources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Release and acquire must be paired: this is reflected in the types now: 'acquired_fast' returns a value that must be passed to 'released_fast'. The type is currently a unit, but will be replaced with a tracing span when instrumentation is enabled in followup commits. This also enables a more efficient implementation where the lookup is done once in 'waiting_for'. This requires changing the stunnel callbacks to pass the additional parameter. The stunnel pid 'set' function is changed to return a closure that can be used to 'unset' to avoid introducing a circular dependency between Xmlrpc_client and Locking_helpers (or trying to hide the 'a type parameter with elaborate GADT tricks). No functional change. Signed-off-by: Edwin Török --- ocaml/libs/http-lib/xmlrpc_client.ml | 25 ++++++++----------------- ocaml/libs/http-lib/xmlrpc_client.mli | 11 +++-------- ocaml/xapi/local_work_queue.ml | 14 +++++++++----- ocaml/xapi/locking_helpers.ml | 14 +++++++++----- ocaml/xapi/locking_helpers.mli | 10 +++++++--- ocaml/xapi/storage_locks.ml | 12 ++++++------ ocaml/xapi/xapi.ml | 13 +++++-------- 7 files changed, 47 insertions(+), 52 deletions(-) diff --git a/ocaml/libs/http-lib/xmlrpc_client.ml b/ocaml/libs/http-lib/xmlrpc_client.ml index bdfc63621df..ba7bc4507b3 100644 --- a/ocaml/libs/http-lib/xmlrpc_client.ml +++ b/ocaml/libs/http-lib/xmlrpc_client.ml @@ -27,11 +27,9 @@ module E = Debug.Make (struct let name = "mscgen" end) let () = Debug.disable ~level:Syslog.Debug "mscgen" module Internal = struct - let set_stunnelpid_callback : (string option -> int -> unit) option ref = - ref None + let set_unset_stunnelpid_callback = ref None - let unset_stunnelpid_callback : (string option -> int -> unit) option ref = - ref None + let set_stunnelpid_callback set = set_unset_stunnelpid_callback := Some set let destination_is_ok : (string -> bool) option ref = ref None end @@ -345,19 +343,12 @@ let with_transport ?(stunnel_wait_disconnect = true) transport f = host port ; (* Call the {,un}set_stunnelpid_callback hooks around the remote call *) let with_recorded_stunnelpid f = - ( match !Internal.set_stunnelpid_callback with - | Some f -> - f task_id s_pid - | _ -> - () - ) ; - finally f (fun () -> - match !Internal.unset_stunnelpid_callback with - | Some f -> - f task_id s_pid - | _ -> - () - ) + match !Internal.set_unset_stunnelpid_callback with + | Some set -> + let unset = set task_id s_pid in + finally f unset + | None -> + f () in with_recorded_stunnelpid (fun () -> finally diff --git a/ocaml/libs/http-lib/xmlrpc_client.mli b/ocaml/libs/http-lib/xmlrpc_client.mli index 00d77b45937..0e6c782aac5 100644 --- a/ocaml/libs/http-lib/xmlrpc_client.mli +++ b/ocaml/libs/http-lib/xmlrpc_client.mli @@ -138,14 +138,9 @@ end module Internal : sig (** Internal functions should not be used by clients directly *) - val set_stunnelpid_callback : (string option -> int -> unit) option ref - (** When invoking an XMLRPC call over HTTPS via stunnel, this callback - is called to allow us to store the association between a task and an - stunnel pid *) - - val unset_stunnelpid_callback : (string option -> int -> unit) option ref - (** After invoking an XMLRPC call over HTTPS via stunnel, this callback - is called to allow us to forget the association between a task and an + val set_stunnelpid_callback : (string option -> int -> unit -> unit) -> unit + (** When invoking an XMLRPC call over HTTPS via stunnel, these callback + are called to allow us to store and forget the association between a task and an stunnel pid *) val destination_is_ok : (string -> bool) option ref diff --git a/ocaml/xapi/local_work_queue.ml b/ocaml/xapi/local_work_queue.ml index 986c36a0895..d01f61ba8a2 100644 --- a/ocaml/xapi/local_work_queue.ml +++ b/ocaml/xapi/local_work_queue.ml @@ -62,8 +62,10 @@ let wait_in_line q description f = let m = Mutex.create () in let c = Condition.create () in let state = ref `Pending in - Locking_helpers.Thread_state.waiting_for - (Locking_helpers.Lock q.Thread_queue.name) ; + let waiting = + Locking_helpers.Thread_state.waiting_for + (Locking_helpers.Lock q.Thread_queue.name) + in let ok = q.Thread_queue.push_fn description (fun () -> (* Signal the mothership to run the computation now *) @@ -87,11 +89,13 @@ let wait_in_line q description f = Condition.wait c m done ) ; - Locking_helpers.Thread_state.acquired - (Locking_helpers.Lock q.Thread_queue.name) ; + let acquired = + Locking_helpers.Thread_state.acquired + (Locking_helpers.Lock q.Thread_queue.name) waiting + in finally f (fun () -> Locking_helpers.Thread_state.released - (Locking_helpers.Lock q.Thread_queue.name) ; + (Locking_helpers.Lock q.Thread_queue.name) acquired ; with_lock m (fun () -> state := `Finished ; Condition.signal c diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 5ec18803f63..83d7304216c 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -37,6 +37,10 @@ let kill_resource = function Unix.kill pid Sys.sigkill module Thread_state = struct + type waiting = unit + + type acquired = unit + type time = float type t = { @@ -98,7 +102,7 @@ module Thread_state = struct let waiting_for resource = update (fun ts -> {ts with waiting_for= Some (resource, now ())}) - let acquired resource = + let acquired resource (_ : unit) = update (fun ts -> { ts with @@ -107,7 +111,7 @@ module Thread_state = struct } ) - let released resource = + let released resource () = update (fun ts -> { ts with @@ -217,9 +221,9 @@ module Named_mutex = struct let execute (x : t) f = let r = Lock x.name in - Thread_state.waiting_for r ; + let waiting = Thread_state.waiting_for r in with_lock x.m (fun () -> - Thread_state.acquired r ; - finally f (fun () -> Thread_state.released r) + let acquired = Thread_state.acquired r waiting in + finally f (fun () -> Thread_state.released r acquired) ) end diff --git a/ocaml/xapi/locking_helpers.mli b/ocaml/xapi/locking_helpers.mli index b61a42bed57..cc4280c9d40 100644 --- a/ocaml/xapi/locking_helpers.mli +++ b/ocaml/xapi/locking_helpers.mli @@ -22,16 +22,20 @@ val kill_resource : resource -> unit (** Records per-thread diagnostic information *) module Thread_state : sig + type waiting + + type acquired + val with_named_thread : string -> API.ref_task -> (unit -> 'a) -> 'a (** Called when a thread becomes associated with a particular task *) - val waiting_for : resource -> unit + val waiting_for : resource -> waiting (** Called when a thread is about to block waiting for a resource to be free *) - val acquired : resource -> unit + val acquired : resource -> waiting -> acquired (** Called when a thread acquires a resource *) - val released : resource -> unit + val released : resource -> acquired -> unit (** Called when a thread releases a resource *) val get_all_acquired_resources : unit -> resource list diff --git a/ocaml/xapi/storage_locks.ml b/ocaml/xapi/storage_locks.ml index 5e003649f06..2fb9d40da6e 100644 --- a/ocaml/xapi/storage_locks.ml +++ b/ocaml/xapi/storage_locks.ml @@ -37,7 +37,7 @@ let with_instance_lock t key f = Locking_helpers.Lock ("SM/" ^ Ref.really_pretty_and_small (Ref.of_string key)) in - Locking_helpers.Thread_state.waiting_for r ; + let waiting = Locking_helpers.Thread_state.waiting_for r in with_lock t.m (fun () -> (* Wait for the lock to be free (ie the table entry to be removed and the master lock to be released *) while Hashtbl.mem t.t key || t.master_lock do @@ -45,16 +45,16 @@ let with_instance_lock t key f = done ; Hashtbl.replace t.t key () ) ; - Locking_helpers.Thread_state.acquired r ; + let acquired = Locking_helpers.Thread_state.acquired r waiting in Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> with_lock t.m (fun () -> Hashtbl.remove t.t key ; Condition.broadcast t.c) ; - Locking_helpers.Thread_state.released r + Locking_helpers.Thread_state.released r acquired ) (** Execute the function with the master_lock held and no instance locks held *) let with_master_lock t f = let r = Locking_helpers.Lock "SM" in - Locking_helpers.Thread_state.waiting_for r ; + let waiting = Locking_helpers.Thread_state.waiting_for r in with_lock t.m (fun () -> (* Wait for the master_lock to be released *) while t.master_lock do @@ -67,11 +67,11 @@ let with_master_lock t f = Condition.wait t.c t.m done ) ; - Locking_helpers.Thread_state.acquired r ; + let acquired = Locking_helpers.Thread_state.acquired r waiting in Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> with_lock t.m (fun () -> t.master_lock <- false ; Condition.broadcast t.c ) ; - Locking_helpers.Thread_state.released r + Locking_helpers.Thread_state.released r acquired ) diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml index ad23fe97975..23815c9eed4 100644 --- a/ocaml/xapi/xapi.ml +++ b/ocaml/xapi/xapi.ml @@ -165,12 +165,10 @@ let register_callback_fns () = in Xapi_cli.rpc_fun := Some fake_rpc ; let set_stunnelpid _task_opt pid = - Locking_helpers.Thread_state.acquired - (Locking_helpers.Process ("stunnel", pid)) - in - let unset_stunnelpid _task_opt pid = - Locking_helpers.Thread_state.released - (Locking_helpers.Process ("stunnel", pid)) + let resource = Locking_helpers.Process ("stunnel", pid) in + let waiting = Locking_helpers.Thread_state.waiting_for resource in + let acquired = Locking_helpers.Thread_state.acquired resource waiting in + fun () -> Locking_helpers.Thread_state.released resource acquired in let stunnel_destination_is_ok addr = Server_helpers.exec_with_new_task "check_stunnel_destination" @@ -190,8 +188,7 @@ let register_callback_fns () = true ) in - Xmlrpc_client.Internal.set_stunnelpid_callback := Some set_stunnelpid ; - Xmlrpc_client.Internal.unset_stunnelpid_callback := Some unset_stunnelpid ; + Xmlrpc_client.Internal.set_stunnelpid_callback set_stunnelpid ; Xmlrpc_client.Internal.destination_is_ok := Some stunnel_destination_is_ok ; TaskHelper.init () From 45003fac01d884a2ee1e8d020c630e0ca9803b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Tue, 22 Aug 2023 17:23:32 +0100 Subject: [PATCH 04/17] [refactor] CP-43769: Locking_helpers: do not make waiting optional MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- ocaml/xapi/locking_helpers.ml | 65 +++++++++++++++++++++++++++++----- ocaml/xapi/locking_helpers.mli | 5 +-- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 83d7304216c..d4f09996449 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -37,9 +37,9 @@ let kill_resource = function Unix.kill pid Sys.sigkill module Thread_state = struct - type waiting = unit + type waiting = (Tracing.Span.t option * Tracing.Span.t option) option - type acquired = unit + type acquired = Tracing.Span.t option type time = float @@ -99,19 +99,59 @@ module Thread_state = struct let now () = Unix.gettimeofday () - let waiting_for resource = - update (fun ts -> {ts with waiting_for= Some (resource, now ())}) + let waiting_for ?parent resource = + let span = + match (parent : Tracing.Span.t option) with + | None -> + None + | Some _ -> ( + let name = + String.concat "" + ["Thread_state.waiting_for("; string_of_resource resource; ")"] + in + let tracer = Tracing.get_tracer ~name in + match Tracing.Tracer.start ~tracer ~name ~parent () with + | Ok span -> + Some (parent, span) + | Error e -> + D.warn "Failed to start tracing: %s" (Printexc.to_string e) ; + None + ) + in + update (fun ts -> {ts with waiting_for= Some (resource, now ())}) ; + span - let acquired resource (_ : unit) = + let acquired resource parent = + let span = + match parent with + | None -> + None + | Some (parent, span) -> ( + let (_ : (_, _) result) = Tracing.Tracer.finish span in + let name = + String.concat "" + ["Thread_state.acquired("; string_of_resource resource; ")"] + in + let tracer = Tracing.get_tracer ~name in + match Tracing.Tracer.start ~tracer ~name ~parent () with + | Ok span -> + span + | Error e -> + D.warn "Failed to start tracing: %s" (Printexc.to_string e) ; + None + ) + in update (fun ts -> { ts with waiting_for= None ; acquired_resources= (resource, now ()) :: ts.acquired_resources } - ) + ) ; + span - let released resource () = + let released resource span = + let (_ : (_, _) result) = Tracing.Tracer.finish span in update (fun ts -> { ts with @@ -219,9 +259,16 @@ module Named_mutex = struct let create name = {name; m= Mutex.create ()} - let execute (x : t) f = + let execute ?__context ?parent (x : t) f = + let parent = + match parent with + | None -> + Option.bind __context Context.tracing_of + | Some _ as p -> + p + in let r = Lock x.name in - let waiting = Thread_state.waiting_for r in + let waiting = Thread_state.waiting_for ?parent r in with_lock x.m (fun () -> let acquired = Thread_state.acquired r waiting in finally f (fun () -> Thread_state.released r acquired) diff --git a/ocaml/xapi/locking_helpers.mli b/ocaml/xapi/locking_helpers.mli index cc4280c9d40..9f551919916 100644 --- a/ocaml/xapi/locking_helpers.mli +++ b/ocaml/xapi/locking_helpers.mli @@ -29,7 +29,7 @@ module Thread_state : sig val with_named_thread : string -> API.ref_task -> (unit -> 'a) -> 'a (** Called when a thread becomes associated with a particular task *) - val waiting_for : resource -> waiting + val waiting_for : ?parent:Tracing.Span.t -> resource -> waiting (** Called when a thread is about to block waiting for a resource to be free *) val acquired : resource -> waiting -> acquired @@ -50,5 +50,6 @@ module Named_mutex : sig val create : string -> t - val execute : t -> (unit -> 'a) -> 'a + val execute : + ?__context:Context.t -> ?parent:Tracing.Span.t -> t -> (unit -> 'a) -> 'a end From 228f9c7c39b331ce11c8dfd3319b4fbd1041dc27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 24 Aug 2023 14:03:53 +0000 Subject: [PATCH 05/17] [refactor] CP-43769: make Locking_helpers.t abstract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This enables changing/optimizing the implementation without affecting callers. Signed-off-by: Edwin Török --- ocaml/xapi/local_work_queue.ml | 8 +++++--- ocaml/xapi/locking_helpers.ml | 10 ++++++++++ ocaml/xapi/locking_helpers.mli | 13 ++++++++++--- ocaml/xapi/nm.ml | 9 +-------- ocaml/xapi/storage_locks.ml | 11 ++++++----- ocaml/xapi/xapi.ml | 2 +- 6 files changed, 33 insertions(+), 20 deletions(-) diff --git a/ocaml/xapi/local_work_queue.ml b/ocaml/xapi/local_work_queue.ml index d01f61ba8a2..fc2bbe6f561 100644 --- a/ocaml/xapi/local_work_queue.ml +++ b/ocaml/xapi/local_work_queue.ml @@ -64,7 +64,7 @@ let wait_in_line q description f = let state = ref `Pending in let waiting = Locking_helpers.Thread_state.waiting_for - (Locking_helpers.Lock q.Thread_queue.name) + (Locking_helpers.lock q.Thread_queue.name) in let ok = q.Thread_queue.push_fn description (fun () -> @@ -91,11 +91,13 @@ let wait_in_line q description f = ) ; let acquired = Locking_helpers.Thread_state.acquired - (Locking_helpers.Lock q.Thread_queue.name) waiting + (Locking_helpers.lock q.Thread_queue.name) + waiting in finally f (fun () -> Locking_helpers.Thread_state.released - (Locking_helpers.Lock q.Thread_queue.name) acquired ; + (Locking_helpers.lock q.Thread_queue.name) + acquired ; with_lock m (fun () -> state := `Finished ; Condition.signal c diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index d4f09996449..637ec433dd4 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -36,6 +36,16 @@ let kill_resource = function info "Sending SIGKILL to %s pid %d" name pid ; Unix.kill pid Sys.sigkill +let lock name = Lock name + +let process (name, pid) = Process (name, pid) + +let is_process name = function + | Lock _ -> + false + | Process (name', _) -> + String.equal name name' + module Thread_state = struct type waiting = (Tracing.Span.t option * Tracing.Span.t option) option diff --git a/ocaml/xapi/locking_helpers.mli b/ocaml/xapi/locking_helpers.mli index 9f551919916..b558b63b5da 100644 --- a/ocaml/xapi/locking_helpers.mli +++ b/ocaml/xapi/locking_helpers.mli @@ -13,9 +13,16 @@ *) (** Represents a type of resource a thread has either allocated or is waiting for. *) -type resource = - | Lock of string (** e.g. a per-VM lock or a queue *) - | Process of string * int (** e.g. an stunnel process with the given pid *) +type resource + +val lock : string -> resource +(** [lock name] a per-VM lock or a queue *) + +val process : string * int -> resource +(** [process (name, pid)] e.g a an stunnel process with the given pid *) + +val is_process : string -> resource -> bool +(** [is_process name resource] checks whether [resource] is a process named [name]. *) val kill_resource : resource -> unit (** Best-effort attempt to kill a resource *) diff --git a/ocaml/xapi/nm.ml b/ocaml/xapi/nm.ml index 5129b01b389..6c131eb2604 100644 --- a/ocaml/xapi/nm.ml +++ b/ocaml/xapi/nm.ml @@ -735,14 +735,7 @@ let bring_pif_up ~__context ?(management_interface = false) (pif : API.ref_PIF) Locking_helpers.Thread_state.get_all_acquired_resources () in debug "There are %d allocated resources" (List.length all) ; - List.filter - (function - | Locking_helpers.Process ("stunnel", _) -> - true - | _ -> - false - ) - all + List.filter (Locking_helpers.is_process "stunnel") all in debug "Of which %d are stunnels" (List.length stunnels) ; List.iter Locking_helpers.kill_resource stunnels diff --git a/ocaml/xapi/storage_locks.ml b/ocaml/xapi/storage_locks.ml index 2fb9d40da6e..3c557428785 100644 --- a/ocaml/xapi/storage_locks.ml +++ b/ocaml/xapi/storage_locks.ml @@ -34,7 +34,7 @@ let make () = (** Execute the function with the specified instance locked *) let with_instance_lock t key f = let r = - Locking_helpers.Lock + Locking_helpers.lock ("SM/" ^ Ref.really_pretty_and_small (Ref.of_string key)) in let waiting = Locking_helpers.Thread_state.waiting_for r in @@ -51,10 +51,11 @@ let with_instance_lock t key f = Locking_helpers.Thread_state.released r acquired ) +let sm_lock = Locking_helpers.lock "SM" + (** Execute the function with the master_lock held and no instance locks held *) let with_master_lock t f = - let r = Locking_helpers.Lock "SM" in - let waiting = Locking_helpers.Thread_state.waiting_for r in + let waiting = Locking_helpers.Thread_state.waiting_for sm_lock in with_lock t.m (fun () -> (* Wait for the master_lock to be released *) while t.master_lock do @@ -67,11 +68,11 @@ let with_master_lock t f = Condition.wait t.c t.m done ) ; - let acquired = Locking_helpers.Thread_state.acquired r waiting in + let acquired = Locking_helpers.Thread_state.acquired sm_lock waiting in Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> with_lock t.m (fun () -> t.master_lock <- false ; Condition.broadcast t.c ) ; - Locking_helpers.Thread_state.released r acquired + Locking_helpers.Thread_state.released sm_lock acquired ) diff --git a/ocaml/xapi/xapi.ml b/ocaml/xapi/xapi.ml index 23815c9eed4..2e0e84fab44 100644 --- a/ocaml/xapi/xapi.ml +++ b/ocaml/xapi/xapi.ml @@ -165,7 +165,7 @@ let register_callback_fns () = in Xapi_cli.rpc_fun := Some fake_rpc ; let set_stunnelpid _task_opt pid = - let resource = Locking_helpers.Process ("stunnel", pid) in + let resource = Locking_helpers.process ("stunnel", pid) in let waiting = Locking_helpers.Thread_state.waiting_for resource in let acquired = Locking_helpers.Thread_state.acquired resource waiting in fun () -> Locking_helpers.Thread_state.released resource acquired From 2c252fe7214f649b57dd173a9cd78efd28cdf5cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 25 Aug 2023 12:19:28 +0100 Subject: [PATCH 06/17] [test] CP-43769: named mutex and threadstate basic tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Verify basic properties: * acquired/released updates all_acquired_resources properly * works both on the main and another threads * named mutex shows up in the acquired resources * when using 64 threads only one thread at a time can successfully acquire the named mutex * when threads finish the thread table is the same as it started out with (allow nonzero for main thread remaining in table) In the future these tests could be enhanced with qcheck-stm. Signed-off-by: Edwin Török --- ocaml/tests/dune | 8 + ocaml/tests/test_locking_helpers.ml | 280 ++++++++++++++++++++++++++++ ocaml/xapi/locking_helpers.ml | 2 + ocaml/xapi/locking_helpers.mli | 5 + 4 files changed, 295 insertions(+) create mode 100644 ocaml/tests/test_locking_helpers.ml diff --git a/ocaml/tests/dune b/ocaml/tests/dune index 93bf4b66ddf..a3031671c5b 100644 --- a/ocaml/tests/dune +++ b/ocaml/tests/dune @@ -8,6 +8,7 @@ test_vm_placement test_vm_helpers test_repository test_repository_helpers test_ref test_livepatch test_rpm test_updateinfo test_storage_smapiv1_wrapper test_storage_quicktest test_observer + test_locking_helpers test_pool_periodic_update_sync)) (libraries alcotest @@ -123,6 +124,13 @@ (modules test_observer) (libraries alcotest tracing xapi_internal tests_common yojson)) +(test +(name test_locking_helpers) +(package xapi) +(modules test_locking_helpers) +(libraries alcotest tracing xapi_internal tests_common) +) + (rule (alias runtest) (package xapi) diff --git a/ocaml/tests/test_locking_helpers.ml b/ocaml/tests/test_locking_helpers.ml new file mode 100644 index 00000000000..2251fa20a2a --- /dev/null +++ b/ocaml/tests/test_locking_helpers.ml @@ -0,0 +1,280 @@ +open Locking_helpers + +let test_kill_resource () = + (* no-op *) + kill_resource (lock "foo") ; + + Alcotest.check_raises "non-existent pid" + (Unix.Unix_error (Unix.ESRCH, "kill", "")) + (fun () -> + (* non-existent pid, we cannot use max_int here as that will result in a negative overflow when converted to a C int *) + kill_resource (process ("foo", 0x7FFF_FFFF)) + ) + +let test_is_process = + [ + (lock "stunnel", false) + ; (process ("stunnel", 100), true) + ; (process ("other", 100), false) + ] + |> List.map @@ fun (resource, expected) -> + let test () = + Alcotest.( + check' ~msg:"is_process" bool ~expected + ~actual:(is_process "stunnel" resource) + ) + in + Alcotest.(test_case (string_of_resource resource) `Quick test) + +let resource = Alcotest.testable (Fmt.of_to_string string_of_resource) ( = ) + +let main = Thread.id (Thread.self ()) + +let test_acquired_resources () = + let r = lock "locktest" in + let waiting = Thread_state.waiting_for r in + let acquired = Thread_state.acquired r waiting in + + let lst = Thread_state.get_all_acquired_resources () in + Alcotest.( + check' (list resource) ~msg:"acquired resources" ~expected:[r] ~actual:lst + ) ; + + let str = Thread_state.to_graphviz () in + ( if Thread.id (Thread.self ()) = main then + let expected = + {|digraph Resources { +node [shape=Mrecord]; +t0 [label="{} | {NULL} | {Lock(locktest) | 0}"]; +node [shape=record]; +r0 [style=filled label="{lock} | {locktest}"]; +r0 -> t0 +rankdir=LR +overlap=false +label="Threads and resources" +fontsize=12 +}|} + in + Alcotest.(check' string ~msg:"graphviz" ~expected ~actual:str) + ) ; + + Thread_state.released r acquired ; + let lst = Thread_state.get_all_acquired_resources () in + Alcotest.( + check' (list resource) ~msg:"acquired resources (released)" ~expected:[] + ~actual:lst + ) + +let test_single_task () = + (* cannot create it within the test since it'll be considered a leak *) + let __context = Test_common.make_test_database () in + + let rpc, session_id = Test_common.make_client_params ~__context in + let self = + Client.Client.Task.create ~rpc ~session_id ~label:"task_label" + ~description:"task_description" + in + let r = lock "bar" in + Thread_state.with_named_thread "myname" self (fun () -> + let waiting = Thread_state.waiting_for r in + let acquired = Thread_state.acquired r waiting in + + let lst = Thread_state.get_acquired_resources_by_task self in + Alcotest.( + check' (list resource) ~msg:"acquired resources" ~expected:[r] + ~actual:lst + ) ; + + Thread_state.released r acquired ; + let lst = Thread_state.get_acquired_resources_by_task self in + Alcotest.( + check' (list resource) ~msg:"acquired resources (0)" ~expected:[] + ~actual:lst + ) + ) ; + Client.Client.Task.destroy ~rpc ~session_id ~self ; + Db_gc.single_pass () + +let test_named_mutex_simple () = + let name = "mytestmutex" in + let r = lock name in + let m = Named_mutex.create name in + Named_mutex.execute m (fun () -> + let lst = Thread_state.get_all_acquired_resources () in + Alcotest.( + check' (list resource) ~msg:"acquired resources (mutex)" ~expected:[r] + ~actual:lst + ) + ) ; + let lst = Thread_state.get_all_acquired_resources () in + Alcotest.( + check' (list resource) ~msg:"acquired resources (mutex)" ~expected:[] + ~actual:lst + ) + +let test_named_mutex_finally () = + let name = "mytestmutex2" in + let r = lock name in + let m = Named_mutex.create name in + Alcotest.check_raises "exit" Exit (fun () -> + Named_mutex.execute m (fun () -> + let lst = Thread_state.get_all_acquired_resources () in + Alcotest.( + check' (list resource) ~msg:"acquired resources (mutex)" + ~expected:[r] ~actual:lst + ) ; + raise Exit + ) + ) ; + let lst = Thread_state.get_all_acquired_resources () in + Alcotest.( + check' (list resource) ~msg:"acquired resources (mutex)" ~expected:[] + ~actual:lst + ) ; + (* check that mutex got released, no deadlock *) + Named_mutex.execute m ignore + +module ThreadWrap = struct + type t = { + thread: Thread.t + ; failure: (Printexc.raw_backtrace * exn) option Atomic.t + } + + let create f arg = + let failure = Atomic.make None in + let wrap f = + try f arg + with e -> + let bt = Printexc.get_raw_backtrace () in + Atomic.set failure (Some (bt, e)) + in + {thread= Thread.create wrap f; failure} + + let join t = + Thread.join t.thread ; + match Atomic.get t.failure with + | None -> + () + | Some (bt, e) -> + Printexc.raise_with_backtrace e bt +end + +let other_thread f () = + let thr = ThreadWrap.create f () in + ThreadWrap.join thr + +let full_gc () = + (* see comment in Gc module: finalisers may allocate and need a 2nd run *) + Gc.full_major () ; Gc.compact () + +let no_table_leak f () = + (* could run 2 full_major, look at live words, but that is difficult to get working reliably across runtime versions + (sometimes the allocated memory goes negative, e.g. if some global gets freed) + *) + full_gc () ; + let before = Thread_state.known_threads () in + f () ; + full_gc () ; + let after = Thread_state.known_threads () in + Alcotest.( + check' int ~msg:"leaked thread table entry" ~expected:before ~actual:after + ) + +let test_named_mutex_many i = + let name = "mytestmutex" ^ string_of_int i in + let r = lock name in + let m = Named_mutex.create name in + Named_mutex.execute m (fun () -> + let lst = + Thread_state.get_all_acquired_resources () |> List.filter (( = ) r) + in + Alcotest.( + check' (list resource) ~msg:"acquired resources (mutex)" ~expected:[r] + ~actual:lst + ) + ) ; + let lst = + Thread_state.get_all_acquired_resources () |> List.filter (( = ) r) + in + Alcotest.( + check' (list resource) ~msg:"acquired resources (mutex)" ~expected:[] + ~actual:lst + ) + +let shared_mutex = Named_mutex.create "shared" + +let shared_owner = Atomic.make (-1) + +let holders = Atomic.make 0 + +let test_named_mutex_many_same j = + for i = 1 to 1000 do + Named_mutex.execute shared_mutex (fun () -> + let old_holders = Atomic.fetch_and_add holders 1 in + let actual = Atomic.exchange shared_owner j in + if actual <> -1 then + Fmt.failwith + "Shared owner already set to: %d (I am %d). Old holders = %d" actual + j old_holders ; + + if i mod 10 = 0 then Thread.yield () ; + + (* try to introduce more race conditions while holding the lock *) + if not (Atomic.compare_and_set shared_owner j (-1)) then + Fmt.failwith "Failed to restore shared owner" ; + Atomic.decr holders ; + + if old_holders <> 0 then + Fmt.failwith + "Only one thread should be able to acquire the mutex at a time: %d" + old_holders + ) + done + +let many_threads f () = + let waiting = Atomic.make 0 in + let n = 64 in + let test_thread i = + (* wait for all threads to start, to maximize race conditions *) + Atomic.incr waiting ; + while Atomic.get waiting <> n do + Thread.yield () + done ; + + f i + in + + let threads = Array.init n @@ ThreadWrap.create test_thread in + Array.iter ThreadWrap.join threads + +let () = + Suite_init.harness_init () ; + Alcotest.( + run "Locking_helpers" + [ + ("is_process", test_is_process) + ; ("kill_resource", [test_case "kill_resource" `Quick test_kill_resource]) + ; ( "acquired resources" + , [ + test_case "single thread" `Quick + @@ no_table_leak test_acquired_resources + ; test_case "single task" `Quick @@ no_table_leak test_single_task + ; test_case "single thread (other)" `Quick + (no_table_leak @@ other_thread test_acquired_resources) + ; test_case "single task (other)" `Quick + (no_table_leak @@ other_thread test_single_task) + ] + ) + ; ( "named mutex" + , [ + test_case "without tracing" `Quick + @@ no_table_leak test_named_mutex_simple + ; test_case "finally" `Quick @@ no_table_leak test_named_mutex_finally + ; test_case "race (64 threads)" `Slow + (no_table_leak @@ many_threads test_named_mutex_many) + ; test_case "race same (64 threads)" `Slow + (no_table_leak @@ many_threads test_named_mutex_many_same) + ] + ) + ] + ) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 637ec433dd4..4ce8d52cb70 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -262,6 +262,8 @@ module Thread_state = struct ] in String.concat "\n" all + + let known_threads () = with_lock m (fun () -> IntMap.cardinal !thread_states) end module Named_mutex = struct diff --git a/ocaml/xapi/locking_helpers.mli b/ocaml/xapi/locking_helpers.mli index b558b63b5da..89835c7033a 100644 --- a/ocaml/xapi/locking_helpers.mli +++ b/ocaml/xapi/locking_helpers.mli @@ -27,6 +27,9 @@ val is_process : string -> resource -> bool val kill_resource : resource -> unit (** Best-effort attempt to kill a resource *) +val string_of_resource : resource -> string +(** [string_of_resource resource] a string representation of the resource for debugging *) + (** Records per-thread diagnostic information *) module Thread_state : sig type waiting @@ -50,6 +53,8 @@ module Thread_state : sig val get_acquired_resources_by_task : API.ref_task -> resource list val to_graphviz : unit -> string + + val known_threads : unit -> int end module Named_mutex : sig From ce0cf6ca70721293041f262045db36a562b337d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 29 Jun 2023 18:04:45 +0100 Subject: [PATCH 07/17] [refactor] CP-43769: propagate a Context.t to Locking_helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This enables tracing of Lock_helpers when enabled through the Observer API. Signed-off-by: Edwin Török --- ocaml/xapi/cert_refresh.ml | 2 +- ocaml/xapi/workload_balancing.ml | 4 ++-- ocaml/xapi/xapi_cluster.ml | 2 +- ocaml/xapi/xapi_cluster_host.ml | 12 ++++++------ ocaml/xapi/xapi_clustering.ml | 8 ++++---- ocaml/xapi/xapi_session.ml | 16 +++++++++------- 6 files changed, 23 insertions(+), 21 deletions(-) diff --git a/ocaml/xapi/cert_refresh.ml b/ocaml/xapi/cert_refresh.ml index 12ab75dc230..2ab092e3178 100644 --- a/ocaml/xapi/cert_refresh.ml +++ b/ocaml/xapi/cert_refresh.ml @@ -53,7 +53,7 @@ let unreachable_hosts ~__context = let maybe_update_clustering_tls_config ~__context = let open Xapi_clustering in - with_clustering_lock __LOC__ @@ fun () -> + with_clustering_lock ~__context __LOC__ @@ fun () -> let host = Helpers.get_localhost ~__context in match Xapi_clustering.find_cluster_host ~__context ~host with | None -> diff --git a/ocaml/xapi/workload_balancing.ml b/ocaml/xapi/workload_balancing.ml index 4a1f0a37b75..5122b727baa 100644 --- a/ocaml/xapi/workload_balancing.ml +++ b/ocaml/xapi/workload_balancing.ml @@ -516,7 +516,7 @@ let init_wlb ~__context ~wlb_url ~wlb_username ~wlb_password ~xenserver_username Db.Secret.destroy ~__context ~self:old_secret_ref ) in - Locking_helpers.Named_mutex.execute request_mutex + Locking_helpers.Named_mutex.execute ~__context request_mutex (perform_wlb_request ~enable_log:false ~meth:"AddXenServer" ~params ~auth:(encoded_auth wlb_username wlb_password) ~url:wlb_url ~handle_response ~__context @@ -545,7 +545,7 @@ let decon_wlb ~__context = else let params = pool_uuid_param ~__context in try - Locking_helpers.Named_mutex.execute request_mutex + Locking_helpers.Named_mutex.execute ~__context request_mutex (perform_wlb_request ~meth:"RemoveXenServer" ~params ~handle_response ~__context ) diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index 5c1c26fca84..13f6e57b3bb 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -37,7 +37,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout (* Currently we only support corosync. If we support more cluster stacks, this * should be replaced by a general function that checks the given cluster_stack *) Pool_features.assert_enabled ~__context ~f:Features.Corosync ; - with_clustering_lock __LOC__ (fun () -> + with_clustering_lock ~__context __LOC__ (fun () -> let dbg = Context.string_of_task __context in validate_params ~token_timeout ~token_timeout_coefficient ; let cluster_ref = Ref.make () in diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index 47188506046..c13a28ef776 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -55,7 +55,7 @@ let call_api_function_with_alert ~__context ~msg ~cls ~obj_uuid ~body (* Create xapi db object for cluster_host, resync_host calls clusterd *) let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = - with_clustering_lock __LOC__ (fun () -> + with_clustering_lock ~__context __LOC__ (fun () -> assert_operation_host_target_is_localhost ~__context ~host ; assert_pif_attached_to ~host ~pIF ~__context ; assert_cluster_host_can_be_created ~__context ~host ; @@ -104,7 +104,7 @@ let set_tls_config ~__context ~self ~verify = (* Helper function atomically enables clusterd and joins the cluster_host *) let join_internal ~__context ~self = - with_clustering_lock __LOC__ (fun () -> + with_clustering_lock ~__context __LOC__ (fun () -> let pIF = Db.Cluster_host.get_PIF ~__context ~self in fix_pif_prerequisites ~__context pIF ; let dbg = Context.string_of_task __context in @@ -206,7 +206,7 @@ let create ~__context ~cluster ~host ~pif = cluster_host let destroy_op ~__context ~self ~force = - with_clustering_lock __LOC__ (fun () -> + with_clustering_lock ~__context __LOC__ (fun () -> let dbg = Context.string_of_task __context in let host = Db.Cluster_host.get_host ~__context ~self in assert_operation_host_target_is_localhost ~__context ~host ; @@ -252,7 +252,7 @@ let destroy ~__context ~self = let ip_of_str str = Cluster_interface.IPv4 str let forget ~__context ~self = - with_clustering_lock __LOC__ (fun () -> + with_clustering_lock ~__context __LOC__ (fun () -> let dbg = Context.string_of_task __context in let cluster = Db.Cluster_host.get_cluster ~__context ~self in let pif = Db.Cluster_host.get_PIF ~__context ~self in @@ -285,7 +285,7 @@ let forget ~__context ~self = ) let enable ~__context ~self = - with_clustering_lock __LOC__ (fun () -> + with_clustering_lock ~__context __LOC__ (fun () -> let dbg = Context.string_of_task __context in let host = Db.Cluster_host.get_host ~__context ~self in assert_operation_host_target_is_localhost ~__context ~host ; @@ -325,7 +325,7 @@ let enable ~__context ~self = ) let disable ~__context ~self = - with_clustering_lock __LOC__ (fun () -> + with_clustering_lock ~__context __LOC__ (fun () -> let dbg = Context.string_of_task __context in let host = Db.Cluster_host.get_host ~__context ~self in assert_operation_host_target_is_localhost ~__context ~host ; diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 4e498da91a2..728e4402634 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -27,9 +27,9 @@ let set_ha_cluster_stack ~__context = (* host-local clustering lock *) let clustering_lock_m = Locking_helpers.Named_mutex.create "clustering" -let with_clustering_lock where f = +let with_clustering_lock ~__context where f = debug "Trying to grab host-local clustering lock... (%s)" where ; - Locking_helpers.Named_mutex.execute clustering_lock_m (fun () -> + Locking_helpers.Named_mutex.execute ~__context clustering_lock_m (fun () -> Xapi_stdext_pervasives.Pervasiveext.finally (fun () -> debug "Grabbed host-local clustering lock; executing function... (%s)" @@ -155,14 +155,14 @@ let with_clustering_lock_if_needed ~__context ~sr_sm_type where f = | [] -> f () | _required_cluster_stacks -> - with_clustering_lock where f + with_clustering_lock ~__context where f let with_clustering_lock_if_cluster_exists ~__context where f = match Db.Cluster.get_all ~__context with | [] -> f () | _ -> - with_clustering_lock where f + with_clustering_lock ~__context where f let find_cluster_host ~__context ~host = match diff --git a/ocaml/xapi/xapi_session.ml b/ocaml/xapi/xapi_session.ml index 42d590bb54d..6489b603ae8 100644 --- a/ocaml/xapi/xapi_session.ml +++ b/ocaml/xapi/xapi_session.ml @@ -271,13 +271,13 @@ let wipe_params_after_fn params fn = wipe params ; r with e -> wipe params ; raise e -let do_external_auth uname pwd = +let do_external_auth ~__context uname pwd = with_lock serialize_auth (fun () -> (Ext_auth.d ()).authenticate_username_password uname (Bytes.unsafe_to_string pwd) ) -let do_local_auth uname pwd = +let do_local_auth ~__context uname pwd = with_lock serialize_auth (fun () -> try Pam.authenticate uname (Bytes.unsafe_to_string pwd) with Failure msg -> @@ -287,7 +287,7 @@ let do_local_auth uname pwd = ) ) -let do_local_change_password uname newpwd = +let do_local_change_password ~__context uname newpwd = with_lock serialize_auth (fun () -> Pam.change_password uname (Bytes.unsafe_to_string newpwd) ) @@ -707,7 +707,7 @@ let slave_local_login_with_password ~__context ~uname ~pwd = if Context.preauth ~__context <> Some `root then ( try (* CP696 - only tries to authenticate against LOCAL superuser account *) - do_local_auth uname pwd + do_local_auth ~__context uname pwd with Failure msg -> debug "Failed to authenticate user %s: %s" uname msg ; raise @@ -785,7 +785,7 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = (* makes local superuser = root only*) failwith ("Local superuser must be " ^ local_superuser) else ( - do_local_auth uname pwd ; + do_local_auth ~__context uname pwd ; debug "Success: local auth, user %s from %s" uname (Context.get_origin __context) ; login_no_password_common ~__context ~uname:(Some uname) @@ -871,7 +871,9 @@ let login_with_password ~__context ~uname ~pwd ~version:_ ~originator = (* so that we know that he/she exists there *) let subject_identifier = try - let _subject_identifier = do_external_auth uname pwd in + let _subject_identifier = + do_external_auth ~__context uname pwd + in debug "Successful external authentication user %s \ (subject_identifier, %s from %s)" @@ -1152,7 +1154,7 @@ let change_password ~__context ~old_pwd ~new_pwd = raise (Api_errors.Server_error (Api_errors.session_authentication_failed,[uname;msg])) end; *) - do_local_change_password uname new_pwd ; + do_local_change_password ~__context uname new_pwd ; info "Password changed successfully for user %s" uname ; info "Syncing password change across hosts in pool" ; (* tell all hosts (except me to sync new passwd file) *) From 3669482033202f348997c6aac6bd8d72d9303d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 29 Jun 2023 18:05:16 +0100 Subject: [PATCH 08/17] [refactor] CP-43769: propagate a parent span for SMAPIv1 wrapper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For consistency with other code that calls Thread_state: we don't have a context here, but pass a parent span (the caller does have a context). Signed-off-by: Edwin Török --- ocaml/xapi/sm.ml | 13 +++++++------ ocaml/xapi/storage_smapiv1.ml | 8 +++++++- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/ocaml/xapi/sm.ml b/ocaml/xapi/sm.ml index 2a18041f507..4b66a67d981 100644 --- a/ocaml/xapi/sm.ml +++ b/ocaml/xapi/sm.ml @@ -86,23 +86,24 @@ let sr_delete dconf driver sr = let serialize_attach_detach = Locking_helpers.Named_mutex.create "sr_attach/detach" -let sr_attach dconf driver sr = - Locking_helpers.Named_mutex.execute serialize_attach_detach (fun () -> +let sr_attach parent dconf driver sr = + Locking_helpers.Named_mutex.execute ?parent serialize_attach_detach (fun () -> debug "sr_attach" driver (sprintf "sr=%s" (Ref.string_of sr)) ; let call = Sm_exec.make_call ~sr_ref:sr dconf "sr_attach" [] in Sm_exec.parse_unit (Sm_exec.exec_xmlrpc (driver_filename driver) call) ) -let sr_detach dconf driver sr = - Locking_helpers.Named_mutex.execute serialize_attach_detach (fun () -> +let sr_detach parent dconf driver sr = + Locking_helpers.Named_mutex.execute ?parent serialize_attach_detach (fun () -> debug "sr_detach" driver (sprintf "sr=%s" (Ref.string_of sr)) ; let call = Sm_exec.make_call ~sr_ref:sr dconf "sr_detach" [] in Sm_exec.parse_unit (Sm_exec.exec_xmlrpc (driver_filename driver) call) ) -let sr_probe dconf driver sr_sm_config = +let sr_probe parent dconf driver sr_sm_config = if List.mem_assoc Sr_probe (features_of_driver driver) then - Locking_helpers.Named_mutex.execute serialize_attach_detach (fun () -> + Locking_helpers.Named_mutex.execute ?parent serialize_attach_detach + (fun () -> debug "sr_probe" driver (sprintf "sm_config=[%s]" (String.concat "; " diff --git a/ocaml/xapi/storage_smapiv1.ml b/ocaml/xapi/storage_smapiv1.ml index 8705896d719..bc0a09441b1 100644 --- a/ocaml/xapi/storage_smapiv1.ml +++ b/ocaml/xapi/storage_smapiv1.ml @@ -194,6 +194,7 @@ module SMAPIv1 : Server_impl = struct let task = Context.get_task_id __context in Storage_interface.Raw (Sm.sr_probe + (Context.tracing_of __context) (Some task, Sm.sm_master true :: device_config) _type sm_config ) @@ -271,6 +272,7 @@ module SMAPIv1 : Server_impl = struct Sm.call_sm_functions ~__context ~sR:sr (fun _ _type -> try Sm.sr_attach + (Context.tracing_of __context) (Some (Context.get_task_id __context), device_config) _type sr with @@ -291,7 +293,11 @@ module SMAPIv1 : Server_impl = struct ~uuid:(Storage_interface.Sr.string_of sr) in Sm.call_sm_functions ~__context ~sR:sr (fun device_config _type -> - try Sm.sr_detach device_config _type sr with + try + Sm.sr_detach + (Context.tracing_of __context) + device_config _type sr + with | Api_errors.Server_error (code, params) -> raise (Storage_error (Backend_error (code, params))) | e -> From dd1fd1f85e2cfb07c567ae818e7bc071a14adc7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 29 Jun 2023 18:12:02 +0100 Subject: [PATCH 09/17] [tracing] CP-43769: Local_work_queue: enable tracing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pass a context, which enables tracing when the Context itself has tracing enabled. Also optimize allocations for 'resource'. Signed-off-by: Edwin Török --- ocaml/xapi/local_work_queue.ml | 16 ++++------ ocaml/xapi/locking_helpers.ml | 57 ++++++++++++++++++++-------------- ocaml/xapi/thread_queue.ml | 4 +-- ocaml/xapi/xapi_vm.ml | 2 +- 4 files changed, 42 insertions(+), 37 deletions(-) diff --git a/ocaml/xapi/local_work_queue.ml b/ocaml/xapi/local_work_queue.ml index fc2bbe6f561..3e80b7470f8 100644 --- a/ocaml/xapi/local_work_queue.ml +++ b/ocaml/xapi/local_work_queue.ml @@ -58,13 +58,15 @@ open Xapi_stdext_pervasives.Pervasiveext (** Join a given queue and execute the function 'f' when its our turn. Actually perform the computation in this thread so we can return a result. *) -let wait_in_line q description f = +let wait_in_line ~__context q description f = let m = Mutex.create () in let c = Condition.create () in let state = ref `Pending in + let lock = q.Thread_queue.lock in let waiting = Locking_helpers.Thread_state.waiting_for - (Locking_helpers.lock q.Thread_queue.name) + ?parent:(Context.tracing_of __context) + lock in let ok = q.Thread_queue.push_fn description (fun () -> @@ -89,15 +91,9 @@ let wait_in_line q description f = Condition.wait c m done ) ; - let acquired = - Locking_helpers.Thread_state.acquired - (Locking_helpers.lock q.Thread_queue.name) - waiting - in + let acquired = Locking_helpers.Thread_state.acquired lock waiting in finally f (fun () -> - Locking_helpers.Thread_state.released - (Locking_helpers.lock q.Thread_queue.name) - acquired ; + Locking_helpers.Thread_state.released lock acquired ; with_lock m (fun () -> state := `Finished ; Condition.signal c diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 4ce8d52cb70..ebf81e231f2 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -21,14 +21,30 @@ module D = Debug.Make (struct let name = "locking_helpers" end) open D -type resource = Lock of string | Process of string * int +type resource_kind = Lock of string | Process of (string * int) -let string_of_resource = function +let string_of_resource_kind = function | Lock x -> Printf.sprintf "Lock(%s)" x | Process (name, pid) -> Printf.sprintf "Process(%s, %d)" name pid +type resource = { + kind: resource_kind + ; str: string + ; waiting_str: string + ; acquired_str: string +} + +let make kind = + let str = string_of_resource_kind kind in + let name state = String.concat "" ["Thread_state."; state; "("; str; ")"] in + {kind; str; waiting_str= name "waiting_for"; acquired_str= "acquired"} + +let lock name = make (Lock name) + +let process (name, pid) = make (Process (name, pid)) + let kill_resource = function | Lock x -> debug "There is no way to forcibly remove Lock(%s)" x @@ -36,15 +52,15 @@ let kill_resource = function info "Sending SIGKILL to %s pid %d" name pid ; Unix.kill pid Sys.sigkill -let lock name = Lock name - -let process (name, pid) = Process (name, pid) +let kill_resource r = kill_resource r.kind let is_process name = function - | Lock _ -> + | {kind= Process (p, _); _} -> + p = name + | {kind= Lock _; _} -> false - | Process (name', _) -> - String.equal name name' + +let string_of_resource r = r.str module Thread_state = struct type waiting = (Tracing.Span.t option * Tracing.Span.t option) option @@ -115,10 +131,7 @@ module Thread_state = struct | None -> None | Some _ -> ( - let name = - String.concat "" - ["Thread_state.waiting_for("; string_of_resource resource; ")"] - in + let name = resource.waiting_str in let tracer = Tracing.get_tracer ~name in match Tracing.Tracer.start ~tracer ~name ~parent () with | Ok span -> @@ -138,10 +151,7 @@ module Thread_state = struct None | Some (parent, span) -> ( let (_ : (_, _) result) = Tracing.Tracer.finish span in - let name = - String.concat "" - ["Thread_state.acquired("; string_of_resource resource; ")"] - in + let name = resource.acquired_str in let tracer = Tracing.get_tracer ~name in match Tracing.Tracer.start ~tracer ~name ~parent () with | Ok span -> @@ -201,9 +211,9 @@ module Thread_state = struct let resources_to_sll = List.map (function - | Lock x as y -> + | {kind= Lock x; _} as y -> (y, [["lock"]; [x]]) - | Process (name, pid) as y -> + | {kind= Process (name, pid); _} as y -> (y, [["process"]; [name]; [string_of_int pid]]) ) all_resources @@ -267,9 +277,9 @@ module Thread_state = struct end module Named_mutex = struct - type t = {name: string; m: Mutex.t} + type t = {name: string; m: Mutex.t; r: resource} - let create name = {name; m= Mutex.create ()} + let create name = {name; m= Mutex.create (); r= lock name} let execute ?__context ?parent (x : t) f = let parent = @@ -279,10 +289,9 @@ module Named_mutex = struct | Some _ as p -> p in - let r = Lock x.name in - let waiting = Thread_state.waiting_for ?parent r in + let waiting = Thread_state.waiting_for ?parent x.r in with_lock x.m (fun () -> - let acquired = Thread_state.acquired r waiting in - finally f (fun () -> Thread_state.released r acquired) + let acquired = Thread_state.acquired x.r waiting in + finally f (fun () -> Thread_state.released x.r acquired) ) end diff --git a/ocaml/xapi/thread_queue.ml b/ocaml/xapi/thread_queue.ml index 227b93edf98..bfcc6e710e0 100644 --- a/ocaml/xapi/thread_queue.ml +++ b/ocaml/xapi/thread_queue.ml @@ -29,7 +29,7 @@ type 'a process_fn = 'a -> unit (** The type of the function which pushes new elements into the queue *) type 'a push_fn = string -> 'a -> bool -type 'a t = {push_fn: 'a push_fn; name: string} +type 'a t = {push_fn: 'a push_fn; name: string; lock: Locking_helpers.resource} (** Given an optional maximum queue length and a function for processing elements (which will be called in a single background thread), return a function which pushes items onto the queue. *) @@ -95,4 +95,4 @@ let make ?max_q_length ?(name = "unknown") (process_fn : 'a process_fn) : 'a t = true ) in - {push_fn= push; name} + {push_fn= push; name; lock= Locking_helpers.lock name} diff --git a/ocaml/xapi/xapi_vm.ml b/ocaml/xapi/xapi_vm.ml index 879ca1ae3e2..b0926d5bd82 100644 --- a/ocaml/xapi/xapi_vm.ml +++ b/ocaml/xapi/xapi_vm.ml @@ -742,7 +742,7 @@ let revert ~__context ~snapshot = (* thread mess around with that. *) let checkpoint ~__context ~vm ~new_name = Pool_features.assert_enabled ~__context ~f:Features.Checkpoint ; - Local_work_queue.wait_in_line Local_work_queue.long_running_queue + Local_work_queue.wait_in_line ~__context Local_work_queue.long_running_queue (Printf.sprintf "VM.checkpoint %s" (Context.string_of_task __context)) (fun () -> TaskHelper.set_cancellable ~__context ; From ff970ffb6cefaac0f67616fd19d18733746ce5a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 18 Aug 2023 09:34:19 +0100 Subject: [PATCH 10/17] [opt] CP-43769: locking_helpers: further reduce allocations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No functional change. Signed-off-by: Edwin Török --- ocaml/xapi/locking_helpers.ml | 36 ++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index ebf81e231f2..c21cefc7422 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -274,12 +274,38 @@ module Thread_state = struct String.concat "\n" all let known_threads () = with_lock m (fun () -> IntMap.cardinal !thread_states) + + let with_resource resource acquire f release arg = + let acquired = acquire resource arg in + match f () with + | r -> + release resource acquired ; r + | exception e -> + let bt = Printexc.get_raw_backtrace () in + D.log_and_ignore_exn (fun () -> release resource acquired) ; + Printexc.raise_with_backtrace e bt end module Named_mutex = struct - type t = {name: string; m: Mutex.t; r: resource} + type t = { + name: string + ; m: Mutex.t + ; r: resource + ; acquire: t -> Tracing.Span.t option -> Thread_state.acquired + ; release: t -> Thread_state.acquired -> unit + } - let create name = {name; m= Mutex.create (); r= lock name} + let create name = + let acquire t parent = + let waiting = Thread_state.waiting_for ?parent t.r in + Mutex.lock t.m ; + Thread_state.acquired t.r waiting + in + let release t waiting = + Mutex.unlock t.m ; + Thread_state.released t.r waiting + in + {name; m= Mutex.create (); r= lock name; acquire; release} let execute ?__context ?parent (x : t) f = let parent = @@ -289,9 +315,5 @@ module Named_mutex = struct | Some _ as p -> p in - let waiting = Thread_state.waiting_for ?parent x.r in - with_lock x.m (fun () -> - let acquired = Thread_state.acquired x.r waiting in - finally f (fun () -> Thread_state.released x.r acquired) - ) + Thread_state.with_resource x x.acquire f x.release parent end From b361096adb419aa67fad9ba2fdaf34df88783948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 18 Aug 2023 18:11:53 +0100 Subject: [PATCH 11/17] [opt] CP-43769: locking_helpers: factor out and reduce allocations in thread local storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of continously inserting and removing entries from a global integer map use a hashtable: first time we want to store some per thread data we allocate, and after that we efficiently find and update the thread's data. When the thread exits it will be dropped from the hashtable (except the main thread). Snapshotting becomes less efficient: we need to make a copy of the hashtable, but this is in a slowpath (diagnostic API), and instead the fastpath avoids allocations. Ephemerons could've been used, but they currently always allocate an option value internally on lookup. Signed-off-by: Edwin Török --- ocaml/xapi/locking_helpers.ml | 128 ++++++++++++++++++++++++++-------- 1 file changed, 99 insertions(+), 29 deletions(-) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index c21cefc7422..60fde7b80de 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -11,7 +11,93 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. *) -let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute + +module IntMap = Map.Make (Int) + +module Thread_local_storage = struct + module Thread_key : sig + type t = private int + + val of_thread : Thread.t -> t + + val hash : t -> int + + val equal : t -> t -> bool + end = struct + type t = int + + let of_thread = Thread.id + + let hash x = x + + let equal = Int.equal + end + + module LiveThreads = Hashtbl.Make (Thread_key) + + (* While a thread is alive we keep some per-thread data, + after the thread dies the data will be GC-ed. + Ephemerons would allocate some internal options on each lookup, + so we cannot use them here. Instead we add a finaliser on the Thread.t. + *) + type 'a t = {lock: Mutex.t; tbl: 'a LiveThreads.t; init: unit -> 'a} + + let with_lock t f arg = + Mutex.lock t.lock ; + match f t arg with + | result -> + Mutex.unlock t.lock ; result + | exception e -> + let bt = Printexc.get_raw_backtrace () in + Mutex.unlock t.lock ; + Printexc.raise_with_backtrace e bt + + let on_thread_gc t thread_id () = + Mutex.lock t.lock ; + LiveThreads.remove t.tbl thread_id ; + Mutex.unlock t.lock + + let find_or_create_unlocked t self = + (* try/with avoids allocation on fast-path *) + let id = Thread_key.of_thread self in + try LiveThreads.find t.tbl id + with Not_found -> + (* slow-path: first time use on current thread *) + let v = t.init () in + LiveThreads.replace t.tbl id v ; + (* do not use a closure here, it might keep 'self' alive forver *) + Gc.finalise_last (on_thread_gc t id) self ; + v + + let get t = + let self = Thread.self () in + with_lock t find_or_create_unlocked self + + let make init : 'a t = + let lock = Mutex.create () in + let tbl = LiveThreads.create 47 in + let t = {lock; tbl; init} in + (* preallocate storage for current thread *) + let (_ : 'a) = get t in + t + + let set_unlocked t v = + let self = Thread.self () in + LiveThreads.replace t.tbl (Thread_key.of_thread self) v + + let set t v = with_lock t set_unlocked v + + let snapshot_unlocked t () = + LiveThreads.fold + (fun thr v acc -> IntMap.add (thr :> int) v acc) + t.tbl IntMap.empty + + let snapshot t = with_lock t snapshot_unlocked () + + let count_unlocked t () = LiveThreads.length t.tbl + + let count t = with_lock t count_unlocked () +end let finally = Xapi_stdext_pervasives.Pervasiveext.finally @@ -79,45 +165,29 @@ module Thread_state = struct let empty = {acquired_resources= []; task= Ref.null; name= ""; waiting_for= None} - let m = Mutex.create () + let make_empty () = empty - module IntMap = Map.Make (struct - type t = int - - let compare = compare - end) + let thread_states = Thread_local_storage.make make_empty - let thread_states = ref IntMap.empty + (* to be able to debug locking problems we need a consistent snapshot: + if we're waiting for a lock, who's holding it currently and what locks are they holding or waiting for? + *) let get_acquired_resources_by_task task = - let snapshot = with_lock m (fun () -> !thread_states) in + let snapshot = Thread_local_storage.snapshot thread_states in let all, _ = IntMap.partition (fun _ ts -> ts.task = task) snapshot in List.map fst (IntMap.fold (fun _ ts acc -> ts.acquired_resources @ acc) all []) let get_all_acquired_resources () = - let snapshot = with_lock m (fun () -> !thread_states) in + let snapshot = Thread_local_storage.snapshot thread_states in List.map fst (IntMap.fold (fun _ ts acc -> ts.acquired_resources @ acc) snapshot []) - let me () = Thread.id (Thread.self ()) - let update f = - let id = me () in - let snapshot = with_lock m (fun () -> !thread_states) in - let ts = - if IntMap.mem id snapshot then - f (IntMap.find id snapshot) - else - f empty - in - with_lock m (fun () -> - thread_states := - if ts = empty then - IntMap.remove id !thread_states - else - IntMap.add id ts !thread_states - ) + let old = Thread_local_storage.get thread_states in + let ts = f old in + Thread_local_storage.set thread_states ts let with_named_thread name task f = update (fun ts -> {ts with name; task}) ; @@ -182,7 +252,7 @@ module Thread_state = struct let to_graphviz () = let t' = now () in - let snapshot = with_lock m (fun () -> !thread_states) in + let snapshot = Thread_local_storage.snapshot thread_states in (* Map from thread ids -> record rows *) let threads = IntMap.map @@ -273,7 +343,7 @@ module Thread_state = struct in String.concat "\n" all - let known_threads () = with_lock m (fun () -> IntMap.cardinal !thread_states) + let known_threads () = Thread_local_storage.count thread_states let with_resource resource acquire f release arg = let acquired = acquire resource arg in From 466fb024fdce606161c7cdb5b1104e137cfd8f90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 18 Aug 2023 18:26:45 +0100 Subject: [PATCH 12/17] [opt] CP-43769: locking_helpers: switch to mutable record MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This further reduces allocations, although is less functional in style. Signed-off-by: Edwin Török --- ocaml/xapi/locking_helpers.ml | 85 +++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 60fde7b80de..6fe23f976ac 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -85,7 +85,7 @@ module Thread_local_storage = struct let self = Thread.self () in LiveThreads.replace t.tbl (Thread_key.of_thread self) v - let set t v = with_lock t set_unlocked v + let _set t v = with_lock t set_unlocked v let snapshot_unlocked t () = LiveThreads.fold @@ -107,14 +107,25 @@ module D = Debug.Make (struct let name = "locking_helpers" end) open D -type resource_kind = Lock of string | Process of (string * int) +type resource_kind = No_resource | Lock of string | Process of string * int let string_of_resource_kind = function + | No_resource -> + "" | Lock x -> Printf.sprintf "Lock(%s)" x | Process (name, pid) -> Printf.sprintf "Process(%s, %d)" name pid +let kill_resource = function + | No_resource -> + () + | Lock x -> + debug "There is no way to forcibly remove Lock(%s)" x + | Process (name, pid) -> + info "Sending SIGKILL to %s pid %d" name pid ; + Unix.kill pid Sys.sigkill + type resource = { kind: resource_kind ; str: string @@ -122,6 +133,8 @@ type resource = { ; acquired_str: string } +let none = {kind= No_resource; str= ""; waiting_str= ""; acquired_str= ""} + let make kind = let str = string_of_resource_kind kind in let name state = String.concat "" ["Thread_state."; state; "("; str; ")"] in @@ -131,19 +144,12 @@ let lock name = make (Lock name) let process (name, pid) = make (Process (name, pid)) -let kill_resource = function - | Lock x -> - debug "There is no way to forcibly remove Lock(%s)" x - | Process (name, pid) -> - info "Sending SIGKILL to %s pid %d" name pid ; - Unix.kill pid Sys.sigkill - let kill_resource r = kill_resource r.kind let is_process name = function | {kind= Process (p, _); _} -> p = name - | {kind= Lock _; _} -> + | {kind= No_resource | Lock _; _} -> false let string_of_resource r = r.str @@ -156,16 +162,14 @@ module Thread_state = struct type time = float type t = { - acquired_resources: (resource * time) list - ; task: API.ref_task - ; name: string - ; waiting_for: (resource * time) option + mutable acquired_resources: (resource * time) list + ; mutable task: API.ref_task + ; mutable name: string + ; mutable waiting_for: resource } - let empty = - {acquired_resources= []; task= Ref.null; name= ""; waiting_for= None} - - let make_empty () = empty + let make_empty () = + {acquired_resources= []; task= Ref.null; name= ""; waiting_for= none} let thread_states = Thread_local_storage.make make_empty @@ -186,12 +190,19 @@ module Thread_state = struct let update f = let old = Thread_local_storage.get thread_states in - let ts = f old in - Thread_local_storage.set thread_states ts + f old let with_named_thread name task f = - update (fun ts -> {ts with name; task}) ; - finally f (fun () -> update (fun ts -> {ts with name= ""; task= Ref.null})) + update (fun ts -> + ts.name <- name ; + ts.task <- task + ) ; + finally f (fun () -> + update (fun ts -> + ts.name <- "" ; + ts.task <- Ref.null + ) + ) let now () = Unix.gettimeofday () @@ -211,7 +222,7 @@ module Thread_state = struct None ) in - update (fun ts -> {ts with waiting_for= Some (resource, now ())}) ; + update (fun ts -> ts.waiting_for <- resource) ; span let acquired resource parent = @@ -232,22 +243,16 @@ module Thread_state = struct ) in update (fun ts -> - { - ts with - waiting_for= None - ; acquired_resources= (resource, now ()) :: ts.acquired_resources - } + ts.waiting_for <- none ; + ts.acquired_resources <- (resource, now ()) :: ts.acquired_resources ) ; span let released resource span = let (_ : (_, _) result) = Tracing.Tracer.finish span in update (fun ts -> - { - ts with - acquired_resources= - List.filter (fun (r, _) -> r <> resource) ts.acquired_resources - } + ts.acquired_resources <- + List.filter (fun (r, _) -> r <> resource) ts.acquired_resources ) let to_graphviz () = @@ -269,7 +274,7 @@ module Thread_state = struct in let resources_of_ts ts = List.map fst ts.acquired_resources - @ Option.fold ~none:[] ~some:(fun (r, _) -> [r]) ts.waiting_for + @ if ts.waiting_for.kind = No_resource then [] else [ts.waiting_for] in let all_resources = Xapi_stdext_std.Listext.List.setify @@ -279,12 +284,14 @@ module Thread_state = struct List.combine all_resources (List.init (List.length all_resources) Fun.id) in let resources_to_sll = - List.map + List.filter_map (function + | {kind= No_resource; _} -> + None | {kind= Lock x; _} as y -> - (y, [["lock"]; [x]]) + Some (y, [["lock"]; [x]]) | {kind= Process (name, pid); _} as y -> - (y, [["process"]; [name]; [string_of_int pid]]) + Some (y, [["process"]; [name]; [string_of_int pid]]) ) all_resources in @@ -302,9 +309,9 @@ module Thread_state = struct IntMap.fold (fun id ts acc -> match ts.waiting_for with - | None -> + | {kind= No_resource; _} -> acc - | Some (r, _) -> + | r -> (id, List.assoc r resources_to_ids) :: acc ) snapshot [] From 484a697475a1b1095d3cc64b6e2bd59cbcc58b37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 18 Aug 2023 18:30:40 +0100 Subject: [PATCH 13/17] [opt] CP-43769: locking_helpers: inline "update" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Further reduces allocations. Signed-off-by: Edwin Török --- ocaml/xapi/locking_helpers.ml | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 6fe23f976ac..30cb8233aa8 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -188,20 +188,16 @@ module Thread_state = struct List.map fst (IntMap.fold (fun _ ts acc -> ts.acquired_resources @ acc) snapshot []) - let update f = - let old = Thread_local_storage.get thread_states in - f old + let get_states () = Thread_local_storage.get thread_states let with_named_thread name task f = - update (fun ts -> - ts.name <- name ; - ts.task <- task - ) ; + let ts = get_states () in + ts.name <- name ; + ts.task <- task ; finally f (fun () -> - update (fun ts -> - ts.name <- "" ; - ts.task <- Ref.null - ) + let ts = get_states () in + ts.name <- "" ; + ts.task <- Ref.null ) let now () = Unix.gettimeofday () @@ -222,7 +218,8 @@ module Thread_state = struct None ) in - update (fun ts -> ts.waiting_for <- resource) ; + let ts = get_states () in + ts.waiting_for <- resource ; span let acquired resource parent = @@ -242,18 +239,16 @@ module Thread_state = struct None ) in - update (fun ts -> - ts.waiting_for <- none ; - ts.acquired_resources <- (resource, now ()) :: ts.acquired_resources - ) ; + let ts = get_states () in + ts.waiting_for <- none ; + ts.acquired_resources <- (resource, now ()) :: ts.acquired_resources ; span let released resource span = let (_ : (_, _) result) = Tracing.Tracer.finish span in - update (fun ts -> - ts.acquired_resources <- - List.filter (fun (r, _) -> r <> resource) ts.acquired_resources - ) + let ts = get_states () in + ts.acquired_resources <- + List.filter (fun (r, _) -> r <> resource) ts.acquired_resources let to_graphviz () = let t' = now () in From 512f3392cb4ef0813ce665c9bbd832ace45d5213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Fri, 18 Aug 2023 18:36:39 +0100 Subject: [PATCH 14/17] [opt] CP-43769: locking_helpers: fastpath for single resource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a single resource is allocated/released then don't construct a list for it: use a separate field. Also fastpath for acquiring releaseing resources with proper nesting. Signed-off-by: Edwin Török --- ocaml/xapi/locking_helpers.ml | 44 +++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 30cb8233aa8..47b715dda9c 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -162,14 +162,30 @@ module Thread_state = struct type time = float type t = { - mutable acquired_resources: (resource * time) list + mutable last_acquired_resource: resource + ; mutable last_acquired_at: time + ; mutable acquired_resources_other: (resource * time) list ; mutable task: API.ref_task ; mutable name: string ; mutable waiting_for: resource } + let acquired_resources t = + if t.last_acquired_resource.kind = No_resource then + t.acquired_resources_other + else + (t.last_acquired_resource, t.last_acquired_at) + :: t.acquired_resources_other + let make_empty () = - {acquired_resources= []; task= Ref.null; name= ""; waiting_for= none} + { + acquired_resources_other= [] + ; last_acquired_resource= none + ; last_acquired_at= Float.nan + ; task= Ref.null + ; name= "" + ; waiting_for= none + } let thread_states = Thread_local_storage.make make_empty @@ -181,12 +197,12 @@ module Thread_state = struct let snapshot = Thread_local_storage.snapshot thread_states in let all, _ = IntMap.partition (fun _ ts -> ts.task = task) snapshot in List.map fst - (IntMap.fold (fun _ ts acc -> ts.acquired_resources @ acc) all []) + (IntMap.fold (fun _ ts acc -> acquired_resources ts @ acc) all []) let get_all_acquired_resources () = let snapshot = Thread_local_storage.snapshot thread_states in List.map fst - (IntMap.fold (fun _ ts acc -> ts.acquired_resources @ acc) snapshot []) + (IntMap.fold (fun _ ts acc -> acquired_resources ts @ acc) snapshot []) let get_states () = Thread_local_storage.get thread_states @@ -241,14 +257,22 @@ module Thread_state = struct in let ts = get_states () in ts.waiting_for <- none ; - ts.acquired_resources <- (resource, now ()) :: ts.acquired_resources ; + if ts.last_acquired_resource.kind <> No_resource then + ts.acquired_resources_other <- + (ts.last_acquired_resource, ts.last_acquired_at) + :: ts.acquired_resources_other ; + ts.last_acquired_resource <- resource ; + ts.last_acquired_at <- now () ; span let released resource span = let (_ : (_, _) result) = Tracing.Tracer.finish span in let ts = get_states () in - ts.acquired_resources <- - List.filter (fun (r, _) -> r <> resource) ts.acquired_resources + if ts.last_acquired_resource = resource then + ts.last_acquired_resource <- none + else + ts.acquired_resources_other <- + List.filter (fun (r, _) -> r <> resource) ts.acquired_resources_other let to_graphviz () = let t' = now () in @@ -263,12 +287,12 @@ module Thread_state = struct (fun (r, t) -> [string_of_resource r; Printf.sprintf "%.0f" (t' -. t)] ) - ts.acquired_resources + (acquired_resources ts) ) snapshot in let resources_of_ts ts = - List.map fst ts.acquired_resources + List.map fst (acquired_resources ts) @ if ts.waiting_for.kind = No_resource then [] else [ts.waiting_for] in let all_resources = @@ -295,7 +319,7 @@ module Thread_state = struct (fun id ts acc -> List.map (fun (r, _) -> (id, List.assoc r resources_to_ids)) - ts.acquired_resources + (acquired_resources ts) @ acc ) snapshot [] From 249d59bd6725ef8fdc2a96793faf9fd4f5df7b93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 21 Aug 2023 09:36:45 +0100 Subject: [PATCH 15/17] [opt] CP-43769: Tracing.Tracer.finish: avoid overhead when tracing is off MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This function is called with `None` when tracing is off, but it still has an overhead: it allocates an `Ok None` for the return value, and it has a closure in `Option.map`. The function never returns an error, so change the return type to be the option. Manually inline Option.map to avoid the extra closure allocation. Now Tracing.Tracer.finish has 0 memory overhead when called with tracing off. Signed-off-by: Edwin Török --- ocaml/libs/tracing/tracing.ml | 28 +++++++++++++--------------- ocaml/libs/tracing/tracing.mli | 3 +-- ocaml/tests/test_observer.ml | 6 ++---- ocaml/xapi/context.ml | 16 ++++------------ ocaml/xapi/locking_helpers.ml | 4 ++-- 5 files changed, 22 insertions(+), 35 deletions(-) diff --git a/ocaml/libs/tracing/tracing.ml b/ocaml/libs/tracing/tracing.ml index d0f5eed3497..77c77dfb35b 100644 --- a/ocaml/libs/tracing/tracing.ml +++ b/ocaml/libs/tracing/tracing.ml @@ -416,21 +416,19 @@ module Tracer = struct Spans.add_to_spans ~span ; Ok (Some span) let finish ?error span = - Ok - (Option.map - (fun span -> - let span = - match error with - | Some exn_t -> - Span.set_error span exn_t - | None -> - Span.set_ok span - in - let span = Span.finish ~span () in - Spans.mark_finished span ; span - ) - span - ) + match span with + | None -> + None + | Some span -> + let span = + match error with + | Some exn_t -> + Span.set_error span exn_t + | None -> + Span.set_ok span + in + let span = Span.finish ~span () in + Spans.mark_finished span ; Some span let span_is_finished x = Spans.span_is_finished x diff --git a/ocaml/libs/tracing/tracing.mli b/ocaml/libs/tracing/tracing.mli index 86ea6ac27b7..75b9cc88f36 100644 --- a/ocaml/libs/tracing/tracing.mli +++ b/ocaml/libs/tracing/tracing.mli @@ -73,8 +73,7 @@ module Tracer : sig -> unit -> (Span.t option, exn) result - val finish : - ?error:exn * string -> Span.t option -> (Span.t option, exn) result + val finish : ?error:exn * string -> Span.t option -> Span.t option val span_is_finished : Span.t option -> bool diff --git a/ocaml/tests/test_observer.ml b/ocaml/tests/test_observer.ml index 4a1f76823b5..72a239d1370 100644 --- a/ocaml/tests/test_observer.ml +++ b/ocaml/tests/test_observer.ml @@ -445,7 +445,7 @@ let test_tracing_exn_backtraces () = let stacktrace = Printexc.get_backtrace () in let x = Tracer.finish ~error:(e, stacktrace) x in match x with - | Ok (Some span) -> + | Some span -> let span_stacktrace = Span.get_tag span "exception.stacktrace" in debug "STACKTRACE: %s" span_stacktrace ; List.iter @@ -457,10 +457,8 @@ let test_tracing_exn_backtraces () = true function_match ) ["raise_exn"; "test_b"; "test_a"] - | Ok None -> + | None -> Alcotest.failf "Span finish failed" - | Error _ -> - Alcotest.failf "Failed to fetch exception stacktrace" ) ) | Error e -> diff --git a/ocaml/xapi/context.ml b/ocaml/xapi/context.ml index e4495dff745..fecd0860ac4 100644 --- a/ocaml/xapi/context.ml +++ b/ocaml/xapi/context.ml @@ -55,21 +55,13 @@ type t = { } let complete_tracing __context = - ( match Tracing.Tracer.finish __context.tracing with - | Ok _ -> - () - | Error e -> - R.warn "Failed to complete tracing: %s" (Printexc.to_string e) - ) ; + let (_ : Tracing.Span.t option) = Tracing.Tracer.finish __context.tracing in __context.tracing <- None let complete_tracing_with_exn __context error = - ( match Tracing.Tracer.finish ~error __context.tracing with - | Ok _ -> - () - | Error e -> - R.warn "Failed to complete tracing: %s" (Printexc.to_string e) - ) ; + let (_ : Tracing.Span.t option) = + Tracing.Tracer.finish ~error __context.tracing + in __context.tracing <- None let tracing_of __context = __context.tracing diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 47b715dda9c..6fc9629c820 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -244,7 +244,7 @@ module Thread_state = struct | None -> None | Some (parent, span) -> ( - let (_ : (_, _) result) = Tracing.Tracer.finish span in + let (_ : Tracing.Span.t option) = Tracing.Tracer.finish span in let name = resource.acquired_str in let tracer = Tracing.get_tracer ~name in match Tracing.Tracer.start ~tracer ~name ~parent () with @@ -266,7 +266,7 @@ module Thread_state = struct span let released resource span = - let (_ : (_, _) result) = Tracing.Tracer.finish span in + let (_ : Tracing.Span.t option) = Tracing.Tracer.finish span in let ts = get_states () in if ts.last_acquired_resource = resource then ts.last_acquired_resource <- none From 055c287a82553d932445d573795124f0c2d5660b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Mon, 21 Aug 2023 17:13:46 +0100 Subject: [PATCH 16/17] [opt] CP-43769: Locking_helpers: optimise waiting->acquired->release by looking up the thread local var just once MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There is already a value returned from waiting_for that needs to e sent to acquired and the value returned from acquired sent to released. Initially this was introduced to pass along Span.t, but it is more efficient to stor e the Span.t inside the thread local record, and instead return the thread-local record. That way we don't need to take the global mutex and look up the current thread in the global hash table on acquire and release, it is sufficient to do that in waiting_for. Signed-off-by: Edwin Török --- ocaml/xapi/locking_helpers.ml | 54 ++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/ocaml/xapi/locking_helpers.ml b/ocaml/xapi/locking_helpers.ml index 6fc9629c820..2f9cfdcdef1 100644 --- a/ocaml/xapi/locking_helpers.ml +++ b/ocaml/xapi/locking_helpers.ml @@ -155,10 +155,6 @@ let is_process name = function let string_of_resource r = r.str module Thread_state = struct - type waiting = (Tracing.Span.t option * Tracing.Span.t option) option - - type acquired = Tracing.Span.t option - type time = float type t = { @@ -168,8 +164,14 @@ module Thread_state = struct ; mutable task: API.ref_task ; mutable name: string ; mutable waiting_for: resource + ; mutable parent: Tracing.Span.t option + ; mutable span: Tracing.Span.t option } + type waiting = t + + type acquired = t + let acquired_resources t = if t.last_acquired_resource.kind = No_resource then t.acquired_resources_other @@ -185,6 +187,8 @@ module Thread_state = struct ; task= Ref.null ; name= "" ; waiting_for= none + ; parent= None + ; span= None } let thread_states = Thread_local_storage.make make_empty @@ -219,43 +223,42 @@ module Thread_state = struct let now () = Unix.gettimeofday () let waiting_for ?parent resource = - let span = + let ts = get_states () in + let () = match (parent : Tracing.Span.t option) with | None -> - None + () | Some _ -> ( let name = resource.waiting_str in let tracer = Tracing.get_tracer ~name in match Tracing.Tracer.start ~tracer ~name ~parent () with | Ok span -> - Some (parent, span) + ts.parent <- parent ; + ts.span <- span | Error e -> - D.warn "Failed to start tracing: %s" (Printexc.to_string e) ; - None + D.warn "Failed to start tracing: %s" (Printexc.to_string e) ) in - let ts = get_states () in ts.waiting_for <- resource ; - span + ts - let acquired resource parent = - let span = - match parent with + let acquired resource ts = + let () = + match ts.parent with | None -> - None - | Some (parent, span) -> ( - let (_ : Tracing.Span.t option) = Tracing.Tracer.finish span in + () + | Some _ -> ( + let (_ : Tracing.Span.t option) = Tracing.Tracer.finish ts.span in let name = resource.acquired_str in let tracer = Tracing.get_tracer ~name in - match Tracing.Tracer.start ~tracer ~name ~parent () with + match Tracing.Tracer.start ~tracer ~name ~parent:ts.parent () with | Ok span -> - span + ts.span <- span | Error e -> D.warn "Failed to start tracing: %s" (Printexc.to_string e) ; - None + ts.span <- None ) in - let ts = get_states () in ts.waiting_for <- none ; if ts.last_acquired_resource.kind <> No_resource then ts.acquired_resources_other <- @@ -263,12 +266,11 @@ module Thread_state = struct :: ts.acquired_resources_other ; ts.last_acquired_resource <- resource ; ts.last_acquired_at <- now () ; - span + ts - let released resource span = - let (_ : Tracing.Span.t option) = Tracing.Tracer.finish span in - let ts = get_states () in - if ts.last_acquired_resource = resource then + let released resource ts = + let (_ : Tracing.Span.t option) = Tracing.Tracer.finish ts.span in + if ts.last_acquired_resource == resource then ts.last_acquired_resource <- none else ts.acquired_resources_other <- From d3a57272ac8914c5c5930c4dfc5c7771fd084dc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Tue, 27 Jun 2023 15:04:39 +0100 Subject: [PATCH 17/17] [tracing] CP-43769: instrument serialize_auth with a Named_mutex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instrument 'serialize_auth' with a Named_mutex. This function is a bottleneck both for succesful and failed authentication calls, and we need to be able to see it. Eventually we should allow authentication to proceed in parallel, once we made our PAM library thread-safe: PAM itself will apply throttling, currently both XAPI and PAM do this. The throttling might still be useful for AD to prevent brute-force attacks. Signed-off-by: Edwin Török --- ocaml/xapi/xapi_session.ml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ocaml/xapi/xapi_session.ml b/ocaml/xapi/xapi_session.ml index 6489b603ae8..a9bd9486e8b 100644 --- a/ocaml/xapi/xapi_session.ml +++ b/ocaml/xapi/xapi_session.ml @@ -255,7 +255,7 @@ let local_superuser = "root" let xapi_internal_originator = "xapi" -let serialize_auth = Mutex.create () +let serialize_auth = Locking_helpers.Named_mutex.create "serialize_auth" let wipe_string_contents str = for i = 0 to Bytes.length str - 1 do @@ -272,13 +272,13 @@ let wipe_params_after_fn params fn = with e -> wipe params ; raise e let do_external_auth ~__context uname pwd = - with_lock serialize_auth (fun () -> + Locking_helpers.Named_mutex.execute ~__context serialize_auth (fun () -> (Ext_auth.d ()).authenticate_username_password uname (Bytes.unsafe_to_string pwd) ) let do_local_auth ~__context uname pwd = - with_lock serialize_auth (fun () -> + Locking_helpers.Named_mutex.execute ~__context serialize_auth (fun () -> try Pam.authenticate uname (Bytes.unsafe_to_string pwd) with Failure msg -> raise @@ -288,7 +288,7 @@ let do_local_auth ~__context uname pwd = ) let do_local_change_password ~__context uname newpwd = - with_lock serialize_auth (fun () -> + Locking_helpers.Named_mutex.execute ~__context serialize_auth (fun () -> Pam.change_password uname (Bytes.unsafe_to_string newpwd) )