From 451f058c508e9724374200e664b8b009c42c601f Mon Sep 17 00:00:00 2001 From: Calascibetta Romain Date: Tue, 26 Nov 2024 17:01:50 +0100 Subject: [PATCH] Add the lwt support of cachet --- cachet-lwt.opam | 24 +++++++++++ lib-lwt/cachet_lwt.ml | 97 ++++++++++++++++++++++++++++++++++++++++++ lib-lwt/cachet_lwt.mli | 43 +++++++++++++++++++ lib-lwt/dune | 4 ++ lib/cachet.ml | 8 ++++ lib/cachet.mli | 5 +++ 6 files changed, 181 insertions(+) create mode 100644 cachet-lwt.opam create mode 100644 lib-lwt/cachet_lwt.ml create mode 100644 lib-lwt/cachet_lwt.mli create mode 100644 lib-lwt/dune diff --git a/cachet-lwt.opam b/cachet-lwt.opam new file mode 100644 index 0000000..e8346b5 --- /dev/null +++ b/cachet-lwt.opam @@ -0,0 +1,24 @@ +opam-version: "2.0" +name: "cachet" +maintainer: [ "Romain Calascibetta " + "Reynir Björnsson " ] +authors: [ "Romain Calascibetta " + "Reynir Björnsson " ] +homepage: "https://git.robur.coop/robur/cachet" +bug-reports: "https://git.robur.coop/robur/cachet" +dev-repo: "git+https://git.robur.coop/robur/cachet" +doc: "https://robur-coop.github.io/cachet/" +license: "MIT" +synopsis: "A simple cache system for mmap and lwt" +description: """A small library that provides a simple cache system for page-by-page read access on a block device with lwt.""" + +build: [ "dune" "build" "-p" name "-j" jobs ] +run-test: [ "dune" "runtest" "-p" name "-j" jobs ] + +depends: [ + "ocaml" {>= "4.14.0"} + "dune" {>= "3.5.0"} + "lwt" + "cachet" {= version} + "alcotest" {with-test & >= "1.8.0"} +] diff --git a/lib-lwt/cachet_lwt.ml b/lib-lwt/cachet_lwt.ml new file mode 100644 index 0000000..347f988 --- /dev/null +++ b/lib-lwt/cachet_lwt.ml @@ -0,0 +1,97 @@ +open Lwt + +let load t ?len logical_address = + let cached = Cachet.is_cached t logical_address in + let res = Cachet.load t ?len logical_address in + if not cached then Lwt.pause () >|= fun () -> res else Lwt.return res + +let get_uint8 t logical_address = + let res = Cachet.get_uint8 t logical_address in + Lwt.pause () >|= fun () -> res + +let get_int8 t logical_address = + let res = Cachet.get_int8 t logical_address in + Lwt.pause () >|= fun () -> res + +let is_aligned x = x land ((1 lsl 2) - 1) == 0 +let[@inline never] out_of_bounds offset = raise (Cachet.Out_of_bounds offset) + +let blit_to_bytes t ~src_off:logical_address buf ~dst_off ~len = + if len < 0 || dst_off < 0 || dst_off > Bytes.length buf - len then + invalid_arg "Cachet_lwt.blit_to_bytes"; + let pagesize = Cachet.pagesize t in + let off = logical_address land ((1 lsl pagesize) - 1) in + if is_aligned off && (1 lsl pagesize) - off >= len then + load t ~len logical_address >|= function + | None -> out_of_bounds logical_address + | Some slice -> + Cachet.Bstr.blit_to_bytes slice.payload ~src_off:off buf ~dst_off:0 ~len + else + let rec go idx = + if idx >= len then Lwt.return_unit + else begin + get_uint8 t (logical_address + idx) >>= fun v -> + Bytes.set_uint8 buf (dst_off + idx) v; + go (succ idx) + end + in + go 0 + +let get_string t ~len logical_address = + let buf = Bytes.create len in + blit_to_bytes t ~src_off:logical_address buf ~dst_off:0 ~len >|= fun () -> + Bytes.unsafe_to_string buf + +open Lwt.Syntax + +let get_uint16_ne t logical_address = + let+ str = get_string t ~len:2 logical_address in + String.get_uint16_ne str 0 + +let get_uint16_le t logical_address = + let+ str = get_string t ~len:2 logical_address in + String.get_uint16_le str 0 + +let get_uint16_be t logical_address = + let+ str = get_string t ~len:2 logical_address in + String.get_uint16_be str 0 + +let get_int16_ne t logical_address = + let+ str = get_string t ~len:2 logical_address in + String.get_int16_ne str 0 + +let get_int16_le t logical_address = + let+ str = get_string t ~len:2 logical_address in + String.get_int16_le str 0 + +let get_int16_be t logical_address = + let+ str = get_string t ~len:2 logical_address in + String.get_int16_be str 0 + +let get_int32_ne t logical_address = + let+ str = get_string t ~len:4 logical_address in + String.get_int32_ne str 0 + +let get_int32_le t logical_address = + let+ str = get_string t ~len:4 logical_address in + String.get_int32_le str 0 + +let get_int32_be t logical_address = + let+ str = get_string t ~len:4 logical_address in + String.get_int32_be str 0 + +let get_int64_ne t logical_address = + let+ str = get_string t ~len:8 logical_address in + String.get_int64_ne str 0 + +let get_int64_le t logical_address = + let+ str = get_string t ~len:8 logical_address in + String.get_int64_le str 0 + +let get_int64_be t logical_address = + let+ str = get_string t ~len:8 logical_address in + String.get_int64_be str 0 + +let next t slice = + let pagesize = Cachet.pagesize t in + load t (slice.Cachet.offset + (1 lsl pagesize)) diff --git a/lib-lwt/cachet_lwt.mli b/lib-lwt/cachet_lwt.mli new file mode 100644 index 0000000..a271226 --- /dev/null +++ b/lib-lwt/cachet_lwt.mli @@ -0,0 +1,43 @@ +(** The Lwt variation of Cachet implies a point of cooperation ([Lwt.pause]) as + soon as the syscall [map] is called. In other words, a task developed with + Cachet_lwt will make itself available to be rescheduled if we internally + call [map] instead of using the cache. + + In the event that the functions below were to use the cache, they would + retain the exclusive right to execute and would not allow any cooperation + points to appear. + + Such an approach increases the task's availability if it does I/O in + cooperation with other tasks that would also like to do I/O. *) + +val load : 'fd Cachet.t -> ?len:int -> int -> Cachet.slice option Lwt.t + +val get_int8 : 'fd Cachet.t -> int -> int Lwt.t +(** [get_int8 t logical_address] is [t]'s signed 8-bit integer starting at byte + index [logical_address]. + + @raise Out_of_bounds if [logical_address] is not accessible. *) + +val get_uint8 : 'fd Cachet.t -> int -> int Lwt.t +(** [get_uint8 t logical_address] is [t]'s unsigned 8-bit integer starting at + byte index [logical_address]. + + @raise Out_of_bounds if [logical_address] is not accessible. *) + +val get_uint16_ne : 'fd Cachet.t -> int -> int Lwt.t +val get_uint16_le : 'fd Cachet.t -> int -> int Lwt.t +val get_uint16_be : 'fd Cachet.t -> int -> int Lwt.t +val get_int16_ne : 'fd Cachet.t -> int -> int Lwt.t +val get_int16_le : 'fd Cachet.t -> int -> int Lwt.t +val get_int16_be : 'fd Cachet.t -> int -> int Lwt.t +val get_int32_ne : 'fd Cachet.t -> int -> int32 Lwt.t +val get_int32_le : 'fd Cachet.t -> int -> int32 Lwt.t +val get_int32_be : 'fd Cachet.t -> int -> int32 Lwt.t +val get_int64_ne : 'fd Cachet.t -> int -> int64 Lwt.t +val get_int64_le : 'fd Cachet.t -> int -> int64 Lwt.t +val get_int64_be : 'fd Cachet.t -> int -> int64 Lwt.t +val get_string : 'fd Cachet.t -> len:int -> int -> string Lwt.t +val next : 'fd Cachet.t -> Cachet.slice -> Cachet.slice option Lwt.t + +val blit_to_bytes : + 'fd Cachet.t -> src_off:int -> bytes -> dst_off:int -> len:int -> unit Lwt.t diff --git a/lib-lwt/dune b/lib-lwt/dune new file mode 100644 index 0000000..d145b60 --- /dev/null +++ b/lib-lwt/dune @@ -0,0 +1,4 @@ +(library + (name cachet_lwt) + (public_name cachet-lwt) + (libraries lwt cachet)) diff --git a/lib/cachet.ml b/lib/cachet.ml index 89a26aa..65f3ff3 100644 --- a/lib/cachet.ml +++ b/lib/cachet.ml @@ -441,6 +441,7 @@ type 'fd t = { and 'fd map = 'fd -> pos:int -> int -> bigstring let fd { fd; _ } = fd +let pagesize { pagesize; _ } = pagesize let copy t = { @@ -495,6 +496,13 @@ let load t ?(len = 1) logical_address = let slice = load t logical_address in if slice.length - offset >= len then Some slice else none +let is_cached t logical_address = + let page = logical_address lsr t.pagesize in + let hash = hash 0l (page lsl t.pagesize) land ((1 lsl t.cachesize) - 1) in + match t.arr.(hash) with + | Some slice -> slice.offset == page lsl t.pagesize + | None -> false + let invalidate t ~off:logical_address ~len = if logical_address < 0 || len < 0 then invalid_arg diff --git a/lib/cachet.mli b/lib/cachet.mli index e255021..fe2064b 100644 --- a/lib/cachet.mli +++ b/lib/cachet.mli @@ -234,6 +234,7 @@ type 'fd map = 'fd -> pos:int -> int -> bigstring type 'fd t val fd : 'fd t -> 'fd +val pagesize : 'fd t -> int val cache_hit : 'fd t -> int (** [cache_hit t] is the number of times a load hit the cache. *) @@ -262,6 +263,10 @@ val load : 'fd t -> ?len:int -> int -> slice option val invalidate : 'fd t -> off:int -> len:int -> unit (** [invalidate t ~off ~len] invalidates the cache on [len] bytes from [off]. *) +val is_cached : 'fd t -> int -> bool +(** [is_cached t logical_address] returns [true] if the [logicial_address] + requested is available in the cache, otherwise [false]. *) + (** {2:user_friendly User friendly functions.} *) (** {3 Binary decoding of integers.}