Add the lwt support of cachet
This commit is contained in:
parent
ae7c28adbc
commit
451f058c50
6 changed files with 181 additions and 0 deletions
24
cachet-lwt.opam
Normal file
24
cachet-lwt.opam
Normal file
|
@ -0,0 +1,24 @@
|
|||
opam-version: "2.0"
|
||||
name: "cachet"
|
||||
maintainer: [ "Romain Calascibetta <romain.calascibetta@gmail.com>"
|
||||
"Reynir Björnsson <reynir@reynir.dk>" ]
|
||||
authors: [ "Romain Calascibetta <romain.calascibetta@gmail.com>"
|
||||
"Reynir Björnsson <reynir@reynir.dk>" ]
|
||||
homepage: "https://git.robur.coop/robur/cachet"
|
||||
bug-reports: "https://git.robur.coop/robur/cachet"
|
||||
dev-repo: "git+https://git.robur.coop/robur/cachet"
|
||||
doc: "https://robur-coop.github.io/cachet/"
|
||||
license: "MIT"
|
||||
synopsis: "A simple cache system for mmap and lwt"
|
||||
description: """A small library that provides a simple cache system for page-by-page read access on a block device with lwt."""
|
||||
|
||||
build: [ "dune" "build" "-p" name "-j" jobs ]
|
||||
run-test: [ "dune" "runtest" "-p" name "-j" jobs ]
|
||||
|
||||
depends: [
|
||||
"ocaml" {>= "4.14.0"}
|
||||
"dune" {>= "3.5.0"}
|
||||
"lwt"
|
||||
"cachet" {= version}
|
||||
"alcotest" {with-test & >= "1.8.0"}
|
||||
]
|
97
lib-lwt/cachet_lwt.ml
Normal file
97
lib-lwt/cachet_lwt.ml
Normal file
|
@ -0,0 +1,97 @@
|
|||
open Lwt
|
||||
|
||||
let load t ?len logical_address =
|
||||
let cached = Cachet.is_cached t logical_address in
|
||||
let res = Cachet.load t ?len logical_address in
|
||||
if not cached then Lwt.pause () >|= fun () -> res else Lwt.return res
|
||||
|
||||
let get_uint8 t logical_address =
|
||||
let res = Cachet.get_uint8 t logical_address in
|
||||
Lwt.pause () >|= fun () -> res
|
||||
|
||||
let get_int8 t logical_address =
|
||||
let res = Cachet.get_int8 t logical_address in
|
||||
Lwt.pause () >|= fun () -> res
|
||||
|
||||
let is_aligned x = x land ((1 lsl 2) - 1) == 0
|
||||
let[@inline never] out_of_bounds offset = raise (Cachet.Out_of_bounds offset)
|
||||
|
||||
let blit_to_bytes t ~src_off:logical_address buf ~dst_off ~len =
|
||||
if len < 0 || dst_off < 0 || dst_off > Bytes.length buf - len then
|
||||
invalid_arg "Cachet_lwt.blit_to_bytes";
|
||||
let pagesize = Cachet.pagesize t in
|
||||
let off = logical_address land ((1 lsl pagesize) - 1) in
|
||||
if is_aligned off && (1 lsl pagesize) - off >= len then
|
||||
load t ~len logical_address >|= function
|
||||
| None -> out_of_bounds logical_address
|
||||
| Some slice ->
|
||||
Cachet.Bstr.blit_to_bytes slice.payload ~src_off:off buf ~dst_off:0 ~len
|
||||
else
|
||||
let rec go idx =
|
||||
if idx >= len then Lwt.return_unit
|
||||
else begin
|
||||
get_uint8 t (logical_address + idx) >>= fun v ->
|
||||
Bytes.set_uint8 buf (dst_off + idx) v;
|
||||
go (succ idx)
|
||||
end
|
||||
in
|
||||
go 0
|
||||
|
||||
let get_string t ~len logical_address =
|
||||
let buf = Bytes.create len in
|
||||
blit_to_bytes t ~src_off:logical_address buf ~dst_off:0 ~len >|= fun () ->
|
||||
Bytes.unsafe_to_string buf
|
||||
|
||||
open Lwt.Syntax
|
||||
|
||||
let get_uint16_ne t logical_address =
|
||||
let+ str = get_string t ~len:2 logical_address in
|
||||
String.get_uint16_ne str 0
|
||||
|
||||
let get_uint16_le t logical_address =
|
||||
let+ str = get_string t ~len:2 logical_address in
|
||||
String.get_uint16_le str 0
|
||||
|
||||
let get_uint16_be t logical_address =
|
||||
let+ str = get_string t ~len:2 logical_address in
|
||||
String.get_uint16_be str 0
|
||||
|
||||
let get_int16_ne t logical_address =
|
||||
let+ str = get_string t ~len:2 logical_address in
|
||||
String.get_int16_ne str 0
|
||||
|
||||
let get_int16_le t logical_address =
|
||||
let+ str = get_string t ~len:2 logical_address in
|
||||
String.get_int16_le str 0
|
||||
|
||||
let get_int16_be t logical_address =
|
||||
let+ str = get_string t ~len:2 logical_address in
|
||||
String.get_int16_be str 0
|
||||
|
||||
let get_int32_ne t logical_address =
|
||||
let+ str = get_string t ~len:4 logical_address in
|
||||
String.get_int32_ne str 0
|
||||
|
||||
let get_int32_le t logical_address =
|
||||
let+ str = get_string t ~len:4 logical_address in
|
||||
String.get_int32_le str 0
|
||||
|
||||
let get_int32_be t logical_address =
|
||||
let+ str = get_string t ~len:4 logical_address in
|
||||
String.get_int32_be str 0
|
||||
|
||||
let get_int64_ne t logical_address =
|
||||
let+ str = get_string t ~len:8 logical_address in
|
||||
String.get_int64_ne str 0
|
||||
|
||||
let get_int64_le t logical_address =
|
||||
let+ str = get_string t ~len:8 logical_address in
|
||||
String.get_int64_le str 0
|
||||
|
||||
let get_int64_be t logical_address =
|
||||
let+ str = get_string t ~len:8 logical_address in
|
||||
String.get_int64_be str 0
|
||||
|
||||
let next t slice =
|
||||
let pagesize = Cachet.pagesize t in
|
||||
load t (slice.Cachet.offset + (1 lsl pagesize))
|
43
lib-lwt/cachet_lwt.mli
Normal file
43
lib-lwt/cachet_lwt.mli
Normal file
|
@ -0,0 +1,43 @@
|
|||
(** The Lwt variation of Cachet implies a point of cooperation ([Lwt.pause]) as
|
||||
soon as the syscall [map] is called. In other words, a task developed with
|
||||
Cachet_lwt will make itself available to be rescheduled if we internally
|
||||
call [map] instead of using the cache.
|
||||
|
||||
In the event that the functions below were to use the cache, they would
|
||||
retain the exclusive right to execute and would not allow any cooperation
|
||||
points to appear.
|
||||
|
||||
Such an approach increases the task's availability if it does I/O in
|
||||
cooperation with other tasks that would also like to do I/O. *)
|
||||
|
||||
val load : 'fd Cachet.t -> ?len:int -> int -> Cachet.slice option Lwt.t
|
||||
|
||||
val get_int8 : 'fd Cachet.t -> int -> int Lwt.t
|
||||
(** [get_int8 t logical_address] is [t]'s signed 8-bit integer starting at byte
|
||||
index [logical_address].
|
||||
|
||||
@raise Out_of_bounds if [logical_address] is not accessible. *)
|
||||
|
||||
val get_uint8 : 'fd Cachet.t -> int -> int Lwt.t
|
||||
(** [get_uint8 t logical_address] is [t]'s unsigned 8-bit integer starting at
|
||||
byte index [logical_address].
|
||||
|
||||
@raise Out_of_bounds if [logical_address] is not accessible. *)
|
||||
|
||||
val get_uint16_ne : 'fd Cachet.t -> int -> int Lwt.t
|
||||
val get_uint16_le : 'fd Cachet.t -> int -> int Lwt.t
|
||||
val get_uint16_be : 'fd Cachet.t -> int -> int Lwt.t
|
||||
val get_int16_ne : 'fd Cachet.t -> int -> int Lwt.t
|
||||
val get_int16_le : 'fd Cachet.t -> int -> int Lwt.t
|
||||
val get_int16_be : 'fd Cachet.t -> int -> int Lwt.t
|
||||
val get_int32_ne : 'fd Cachet.t -> int -> int32 Lwt.t
|
||||
val get_int32_le : 'fd Cachet.t -> int -> int32 Lwt.t
|
||||
val get_int32_be : 'fd Cachet.t -> int -> int32 Lwt.t
|
||||
val get_int64_ne : 'fd Cachet.t -> int -> int64 Lwt.t
|
||||
val get_int64_le : 'fd Cachet.t -> int -> int64 Lwt.t
|
||||
val get_int64_be : 'fd Cachet.t -> int -> int64 Lwt.t
|
||||
val get_string : 'fd Cachet.t -> len:int -> int -> string Lwt.t
|
||||
val next : 'fd Cachet.t -> Cachet.slice -> Cachet.slice option Lwt.t
|
||||
|
||||
val blit_to_bytes :
|
||||
'fd Cachet.t -> src_off:int -> bytes -> dst_off:int -> len:int -> unit Lwt.t
|
4
lib-lwt/dune
Normal file
4
lib-lwt/dune
Normal file
|
@ -0,0 +1,4 @@
|
|||
(library
|
||||
(name cachet_lwt)
|
||||
(public_name cachet-lwt)
|
||||
(libraries lwt cachet))
|
|
@ -441,6 +441,7 @@ type 'fd t = {
|
|||
and 'fd map = 'fd -> pos:int -> int -> bigstring
|
||||
|
||||
let fd { fd; _ } = fd
|
||||
let pagesize { pagesize; _ } = pagesize
|
||||
|
||||
let copy t =
|
||||
{
|
||||
|
@ -495,6 +496,13 @@ let load t ?(len = 1) logical_address =
|
|||
let slice = load t logical_address in
|
||||
if slice.length - offset >= len then Some slice else none
|
||||
|
||||
let is_cached t logical_address =
|
||||
let page = logical_address lsr t.pagesize in
|
||||
let hash = hash 0l (page lsl t.pagesize) land ((1 lsl t.cachesize) - 1) in
|
||||
match t.arr.(hash) with
|
||||
| Some slice -> slice.offset == page lsl t.pagesize
|
||||
| None -> false
|
||||
|
||||
let invalidate t ~off:logical_address ~len =
|
||||
if logical_address < 0 || len < 0 then
|
||||
invalid_arg
|
||||
|
|
|
@ -234,6 +234,7 @@ type 'fd map = 'fd -> pos:int -> int -> bigstring
|
|||
type 'fd t
|
||||
|
||||
val fd : 'fd t -> 'fd
|
||||
val pagesize : 'fd t -> int
|
||||
|
||||
val cache_hit : 'fd t -> int
|
||||
(** [cache_hit t] is the number of times a load hit the cache. *)
|
||||
|
@ -262,6 +263,10 @@ val load : 'fd t -> ?len:int -> int -> slice option
|
|||
val invalidate : 'fd t -> off:int -> len:int -> unit
|
||||
(** [invalidate t ~off ~len] invalidates the cache on [len] bytes from [off]. *)
|
||||
|
||||
val is_cached : 'fd t -> int -> bool
|
||||
(** [is_cached t logical_address] returns [true] if the [logicial_address]
|
||||
requested is available in the cache, otherwise [false]. *)
|
||||
|
||||
(** {2:user_friendly User friendly functions.} *)
|
||||
|
||||
(** {3 Binary decoding of integers.}
|
||||
|
|
Loading…
Reference in a new issue