diff options
author | Mauro D <mauro@stalw.art> | 2023-05-02 15:51:26 +0000 |
---|---|---|
committer | Mauro D <mauro@stalw.art> | 2023-05-02 15:51:26 +0000 |
commit | a3a0396772cfbfe8573ad15851acd19cacdc6499 (patch) | |
tree | 95b6781bdab1941e17a0c726cdc7fa9b23690e5b /crates/store/src/query | |
parent | 9360de22b7d92b4ee92ae7034ef388f630ecad39 (diff) |
Mailbox implementation before testing
Diffstat (limited to 'crates/store/src/query')
-rw-r--r-- | crates/store/src/query/log.rs | 69 | ||||
-rw-r--r-- | crates/store/src/query/mod.rs | 4 | ||||
-rw-r--r-- | crates/store/src/query/sort.rs | 25 |
3 files changed, 85 insertions, 13 deletions
diff --git a/crates/store/src/query/log.rs b/crates/store/src/query/log.rs index 53dafcdb..d74d4993 100644 --- a/crates/store/src/query/log.rs +++ b/crates/store/src/query/log.rs @@ -1,5 +1,7 @@ use utils::codec::leb128::Leb128Iterator; +use crate::{write::key::DeserializeBigEndian, Error, LogKey, Store}; + #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum Change { Insert(u64), @@ -32,6 +34,73 @@ impl Default for Changes { } } +impl Store { + pub async fn changes( + &self, + account_id: u32, + collection: impl Into<u8>, + query: Query, + ) -> crate::Result<Option<Changes>> { + let collection = collection.into(); + let (is_inclusive, from_change_id, to_change_id) = match query { + Query::All => (true, 0, u64::MAX), + Query::Since(change_id) => (false, change_id, u64::MAX), + Query::SinceInclusive(change_id) => (true, change_id, u64::MAX), + Query::RangeInclusive(from_change_id, to_change_id) => { + (true, from_change_id, to_change_id) + } + }; + let from_key = LogKey { + account_id, + collection, + change_id: from_change_id, + }; + let to_key = LogKey { + account_id, + collection, + change_id: to_change_id, + }; + + let mut changelog = self + .iterate( + Changes::default(), + from_key, + to_key, + false, + true, + move |changelog, key, value| { + let change_id = + key.deserialize_be_u64(key.len() - std::mem::size_of::<u64>())?; + if !is_inclusive || change_id != from_change_id { + if changelog.changes.is_empty() { + changelog.from_change_id = change_id; + } + changelog.to_change_id = change_id; + changelog.deserialize(value).ok_or_else(|| { + Error::InternalError(format!( + "Failed to deserialize changelog for [{}/{:?}]: [{:?}]", + account_id, collection, query + )) + })?; + } + Ok(true) + }, + ) + .await?; + + if changelog.changes.is_empty() { + changelog.from_change_id = from_change_id; + changelog.to_change_id = if to_change_id != u64::MAX { + to_change_id + } else { + from_change_id + }; + } + + Ok(Some(changelog)) + } +} + impl Changes { pub fn deserialize(&mut self, bytes: &[u8]) -> Option<()> { let mut bytes_it = bytes.iter(); diff --git a/crates/store/src/query/mod.rs b/crates/store/src/query/mod.rs index 508d2158..9a6ff408 100644 --- a/crates/store/src/query/mod.rs +++ b/crates/store/src/query/mod.rs @@ -60,8 +60,8 @@ pub enum Comparator { #[derive(Debug)] pub struct ResultSet { - account_id: u32, - collection: u8, + pub account_id: u32, + pub collection: u8, pub results: RoaringBitmap, } diff --git a/crates/store/src/query/sort.rs b/crates/store/src/query/sort.rs index 70a314cd..b5ae79ec 100644 --- a/crates/store/src/query/sort.rs +++ b/crates/store/src/query/sort.rs @@ -9,7 +9,7 @@ use super::{Comparator, ResultSet, SortedResultSet}; pub struct Pagination { requested_position: i32, position: i32, - limit: usize, + pub limit: usize, anchor: u32, anchor_offset: i32, has_anchor: bool, @@ -228,14 +228,7 @@ impl Store { } impl Pagination { - pub fn new( - limit: usize, - position: i32, - anchor: Option<u32>, - anchor_offset: i32, - prefix_key: Option<ValueKey>, - prefix_unique: bool, - ) -> Self { + pub fn new(limit: usize, position: i32, anchor: Option<u32>, anchor_offset: i32) -> Self { let (has_anchor, anchor) = anchor.map(|anchor| (true, anchor)).unwrap_or((false, 0)); Self { @@ -247,11 +240,21 @@ impl Pagination { has_anchor, anchor_found: false, ids: Vec::with_capacity(limit), - prefix_key, - prefix_unique, + prefix_key: None, + prefix_unique: false, } } + pub fn with_prefix_key(mut self, prefix_key: ValueKey) -> Self { + self.prefix_key = Some(prefix_key); + self + } + + pub fn with_prefix_unique(mut self, prefix_unique: bool) -> Self { + self.prefix_unique = prefix_unique; + self + } + pub fn add(&mut self, prefix_id: u32, document_id: u32) -> bool { let id = ((prefix_id as u64) << 32) | document_id as u64; |