Add the lwt support of cachet #1

Merged
dinosaure merged 1 commit from lwt into main 2024-12-11 10:49:50 +00:00
6 changed files with 181 additions and 0 deletions

24
cachet-lwt.opam Normal file
View file

@ -0,0 +1,24 @@
opam-version: "2.0"
name: "cachet"
maintainer: [ "Romain Calascibetta <romain.calascibetta@gmail.com>"
"Reynir Björnsson <reynir@reynir.dk>" ]
authors: [ "Romain Calascibetta <romain.calascibetta@gmail.com>"
"Reynir Björnsson <reynir@reynir.dk>" ]
homepage: "https://git.robur.coop/robur/cachet"
bug-reports: "https://git.robur.coop/robur/cachet"
dev-repo: "git+https://git.robur.coop/robur/cachet"
doc: "https://robur-coop.github.io/cachet/"
license: "MIT"
synopsis: "A simple cache system for mmap and lwt"
description: """A small library that provides a simple cache system for page-by-page read access on a block device with lwt."""
build: [ "dune" "build" "-p" name "-j" jobs ]
run-test: [ "dune" "runtest" "-p" name "-j" jobs ]
depends: [
"ocaml" {>= "4.14.0"}
"dune" {>= "3.5.0"}
"lwt"
"cachet" {= version}
"alcotest" {with-test & >= "1.8.0"}
]

97
lib-lwt/cachet_lwt.ml Normal file
View file

@ -0,0 +1,97 @@
open Lwt
let load t ?len logical_address =
let cached = Cachet.is_cached t logical_address in
let res = Cachet.load t ?len logical_address in
if not cached then Lwt.pause () >|= fun () -> res else Lwt.return res
let get_uint8 t logical_address =
let res = Cachet.get_uint8 t logical_address in
Lwt.pause () >|= fun () -> res
let get_int8 t logical_address =
let res = Cachet.get_int8 t logical_address in
Lwt.pause () >|= fun () -> res
let is_aligned x = x land ((1 lsl 2) - 1) == 0
let[@inline never] out_of_bounds offset = raise (Cachet.Out_of_bounds offset)
let blit_to_bytes t ~src_off:logical_address buf ~dst_off ~len =
if len < 0 || dst_off < 0 || dst_off > Bytes.length buf - len then
invalid_arg "Cachet_lwt.blit_to_bytes";
let pagesize = Cachet.pagesize t in
let off = logical_address land ((1 lsl pagesize) - 1) in
if is_aligned off && (1 lsl pagesize) - off >= len then
load t ~len logical_address >|= function
| None -> out_of_bounds logical_address
| Some slice ->
Cachet.Bstr.blit_to_bytes slice.payload ~src_off:off buf ~dst_off:0 ~len
else
let rec go idx =
if idx >= len then Lwt.return_unit
else begin
get_uint8 t (logical_address + idx) >>= fun v ->
Bytes.set_uint8 buf (dst_off + idx) v;
go (succ idx)
end
Review

Seems a perf killer but it's not. The bind operator on lwt is not a cooperative point, so if we look about the load function, we do an Lwt.pause only if we require to do a map. Otherwise, lwt will try to go as far as it can.

Seems a perf killer but it's not. The bind operator on lwt is not a cooperative point, so if we look about the `load` function, we do an `Lwt.pause` only if we require to do a `map`. Otherwise, lwt will try to go as far as it can.
in
go 0
let get_string t ~len logical_address =
let buf = Bytes.create len in
blit_to_bytes t ~src_off:logical_address buf ~dst_off:0 ~len >|= fun () ->
Bytes.unsafe_to_string buf
open Lwt.Syntax
let get_uint16_ne t logical_address =
let+ str = get_string t ~len:2 logical_address in
String.get_uint16_ne str 0
let get_uint16_le t logical_address =
let+ str = get_string t ~len:2 logical_address in
String.get_uint16_le str 0
let get_uint16_be t logical_address =
let+ str = get_string t ~len:2 logical_address in
String.get_uint16_be str 0
let get_int16_ne t logical_address =
let+ str = get_string t ~len:2 logical_address in
String.get_int16_ne str 0
let get_int16_le t logical_address =
let+ str = get_string t ~len:2 logical_address in
String.get_int16_le str 0
let get_int16_be t logical_address =
let+ str = get_string t ~len:2 logical_address in
String.get_int16_be str 0
let get_int32_ne t logical_address =
let+ str = get_string t ~len:4 logical_address in
String.get_int32_ne str 0
let get_int32_le t logical_address =
let+ str = get_string t ~len:4 logical_address in
String.get_int32_le str 0
let get_int32_be t logical_address =
let+ str = get_string t ~len:4 logical_address in
String.get_int32_be str 0
let get_int64_ne t logical_address =
let+ str = get_string t ~len:8 logical_address in
String.get_int64_ne str 0
let get_int64_le t logical_address =
let+ str = get_string t ~len:8 logical_address in
String.get_int64_le str 0
let get_int64_be t logical_address =
let+ str = get_string t ~len:8 logical_address in
String.get_int64_be str 0
let next t slice =
let pagesize = Cachet.pagesize t in
load t (slice.Cachet.offset + (1 lsl pagesize))

43
lib-lwt/cachet_lwt.mli Normal file
View file

@ -0,0 +1,43 @@
(** The Lwt variation of Cachet implies a point of cooperation ([Lwt.pause]) as
soon as the syscall [map] is called. In other words, a task developed with
Cachet_lwt will make itself available to be rescheduled if we internally
call [map] instead of using the cache.
In the event that the functions below were to use the cache, they would
retain the exclusive right to execute and would not allow any cooperation
points to appear.
Such an approach increases the task's availability if it does I/O in
cooperation with other tasks that would also like to do I/O. *)
val load : 'fd Cachet.t -> ?len:int -> int -> Cachet.slice option Lwt.t
val get_int8 : 'fd Cachet.t -> int -> int Lwt.t
(** [get_int8 t logical_address] is [t]'s signed 8-bit integer starting at byte
index [logical_address].
@raise Out_of_bounds if [logical_address] is not accessible. *)
val get_uint8 : 'fd Cachet.t -> int -> int Lwt.t
(** [get_uint8 t logical_address] is [t]'s unsigned 8-bit integer starting at
byte index [logical_address].
@raise Out_of_bounds if [logical_address] is not accessible. *)
val get_uint16_ne : 'fd Cachet.t -> int -> int Lwt.t
val get_uint16_le : 'fd Cachet.t -> int -> int Lwt.t
val get_uint16_be : 'fd Cachet.t -> int -> int Lwt.t
val get_int16_ne : 'fd Cachet.t -> int -> int Lwt.t
val get_int16_le : 'fd Cachet.t -> int -> int Lwt.t
val get_int16_be : 'fd Cachet.t -> int -> int Lwt.t
val get_int32_ne : 'fd Cachet.t -> int -> int32 Lwt.t
val get_int32_le : 'fd Cachet.t -> int -> int32 Lwt.t
val get_int32_be : 'fd Cachet.t -> int -> int32 Lwt.t
val get_int64_ne : 'fd Cachet.t -> int -> int64 Lwt.t
val get_int64_le : 'fd Cachet.t -> int -> int64 Lwt.t
val get_int64_be : 'fd Cachet.t -> int -> int64 Lwt.t
val get_string : 'fd Cachet.t -> len:int -> int -> string Lwt.t
val next : 'fd Cachet.t -> Cachet.slice -> Cachet.slice option Lwt.t
val blit_to_bytes :
'fd Cachet.t -> src_off:int -> bytes -> dst_off:int -> len:int -> unit Lwt.t

4
lib-lwt/dune Normal file
View file

@ -0,0 +1,4 @@
(library
(name cachet_lwt)
(public_name cachet-lwt)
(libraries lwt cachet))

View file

@ -441,6 +441,7 @@ type 'fd t = {
and 'fd map = 'fd -> pos:int -> int -> bigstring
let fd { fd; _ } = fd
let pagesize { pagesize; _ } = pagesize
let copy t =
{
@ -495,6 +496,13 @@ let load t ?(len = 1) logical_address =
let slice = load t logical_address in
if slice.length - offset >= len then Some slice else none
let is_cached t logical_address =
let page = logical_address lsr t.pagesize in
let hash = hash 0l (page lsl t.pagesize) land ((1 lsl t.cachesize) - 1) in
match t.arr.(hash) with
| Some slice -> slice.offset == page lsl t.pagesize
| None -> false
let invalidate t ~off:logical_address ~len =
if logical_address < 0 || len < 0 then
invalid_arg

View file

@ -234,6 +234,7 @@ type 'fd map = 'fd -> pos:int -> int -> bigstring
type 'fd t
val fd : 'fd t -> 'fd
val pagesize : 'fd t -> int
val cache_hit : 'fd t -> int
(** [cache_hit t] is the number of times a load hit the cache. *)
@ -262,6 +263,10 @@ val load : 'fd t -> ?len:int -> int -> slice option
val invalidate : 'fd t -> off:int -> len:int -> unit
(** [invalidate t ~off ~len] invalidates the cache on [len] bytes from [off]. *)
val is_cached : 'fd t -> int -> bool
(** [is_cached t logical_address] returns [true] if the [logicial_address]
requested is available in the cache, otherwise [false]. *)
(** {2:user_friendly User friendly functions.} *)
(** {3 Binary decoding of integers.}