summaryrefslogtreecommitdiff
path: root/crates/store/src/query
diff options
context:
space:
mode:
authorMauro D <mauro@stalw.art>2023-05-02 15:51:26 +0000
committerMauro D <mauro@stalw.art>2023-05-02 15:51:26 +0000
commita3a0396772cfbfe8573ad15851acd19cacdc6499 (patch)
tree95b6781bdab1941e17a0c726cdc7fa9b23690e5b /crates/store/src/query
parent9360de22b7d92b4ee92ae7034ef388f630ecad39 (diff)
Mailbox implementation before testing
Diffstat (limited to 'crates/store/src/query')
-rw-r--r--crates/store/src/query/log.rs69
-rw-r--r--crates/store/src/query/mod.rs4
-rw-r--r--crates/store/src/query/sort.rs25
3 files changed, 85 insertions, 13 deletions
diff --git a/crates/store/src/query/log.rs b/crates/store/src/query/log.rs
index 53dafcdb..d74d4993 100644
--- a/crates/store/src/query/log.rs
+++ b/crates/store/src/query/log.rs
@@ -1,5 +1,7 @@
use utils::codec::leb128::Leb128Iterator;
+use crate::{write::key::DeserializeBigEndian, Error, LogKey, Store};
+
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Change {
Insert(u64),
@@ -32,6 +34,73 @@ impl Default for Changes {
}
}
+impl Store {
+ pub async fn changes(
+ &self,
+ account_id: u32,
+ collection: impl Into<u8>,
+ query: Query,
+ ) -> crate::Result<Option<Changes>> {
+ let collection = collection.into();
+ let (is_inclusive, from_change_id, to_change_id) = match query {
+ Query::All => (true, 0, u64::MAX),
+ Query::Since(change_id) => (false, change_id, u64::MAX),
+ Query::SinceInclusive(change_id) => (true, change_id, u64::MAX),
+ Query::RangeInclusive(from_change_id, to_change_id) => {
+ (true, from_change_id, to_change_id)
+ }
+ };
+ let from_key = LogKey {
+ account_id,
+ collection,
+ change_id: from_change_id,
+ };
+ let to_key = LogKey {
+ account_id,
+ collection,
+ change_id: to_change_id,
+ };
+
+ let mut changelog = self
+ .iterate(
+ Changes::default(),
+ from_key,
+ to_key,
+ false,
+ true,
+ move |changelog, key, value| {
+ let change_id =
+ key.deserialize_be_u64(key.len() - std::mem::size_of::<u64>())?;
+ if !is_inclusive || change_id != from_change_id {
+ if changelog.changes.is_empty() {
+ changelog.from_change_id = change_id;
+ }
+ changelog.to_change_id = change_id;
+ changelog.deserialize(value).ok_or_else(|| {
+ Error::InternalError(format!(
+ "Failed to deserialize changelog for [{}/{:?}]: [{:?}]",
+ account_id, collection, query
+ ))
+ })?;
+ }
+ Ok(true)
+ },
+ )
+ .await?;
+
+ if changelog.changes.is_empty() {
+ changelog.from_change_id = from_change_id;
+ changelog.to_change_id = if to_change_id != u64::MAX {
+ to_change_id
+ } else {
+ from_change_id
+ };
+ }
+
+ Ok(Some(changelog))
+ }
+}
+
impl Changes {
pub fn deserialize(&mut self, bytes: &[u8]) -> Option<()> {
let mut bytes_it = bytes.iter();
diff --git a/crates/store/src/query/mod.rs b/crates/store/src/query/mod.rs
index 508d2158..9a6ff408 100644
--- a/crates/store/src/query/mod.rs
+++ b/crates/store/src/query/mod.rs
@@ -60,8 +60,8 @@ pub enum Comparator {
#[derive(Debug)]
pub struct ResultSet {
- account_id: u32,
- collection: u8,
+ pub account_id: u32,
+ pub collection: u8,
pub results: RoaringBitmap,
}
diff --git a/crates/store/src/query/sort.rs b/crates/store/src/query/sort.rs
index 70a314cd..b5ae79ec 100644
--- a/crates/store/src/query/sort.rs
+++ b/crates/store/src/query/sort.rs
@@ -9,7 +9,7 @@ use super::{Comparator, ResultSet, SortedResultSet};
pub struct Pagination {
requested_position: i32,
position: i32,
- limit: usize,
+ pub limit: usize,
anchor: u32,
anchor_offset: i32,
has_anchor: bool,
@@ -228,14 +228,7 @@ impl Store {
}
impl Pagination {
- pub fn new(
- limit: usize,
- position: i32,
- anchor: Option<u32>,
- anchor_offset: i32,
- prefix_key: Option<ValueKey>,
- prefix_unique: bool,
- ) -> Self {
+ pub fn new(limit: usize, position: i32, anchor: Option<u32>, anchor_offset: i32) -> Self {
let (has_anchor, anchor) = anchor.map(|anchor| (true, anchor)).unwrap_or((false, 0));
Self {
@@ -247,11 +240,21 @@ impl Pagination {
has_anchor,
anchor_found: false,
ids: Vec::with_capacity(limit),
- prefix_key,
- prefix_unique,
+ prefix_key: None,
+ prefix_unique: false,
}
}
+ pub fn with_prefix_key(mut self, prefix_key: ValueKey) -> Self {
+ self.prefix_key = Some(prefix_key);
+ self
+ }
+
+ pub fn with_prefix_unique(mut self, prefix_unique: bool) -> Self {
+ self.prefix_unique = prefix_unique;
+ self
+ }
+
pub fn add(&mut self, prefix_id: u32, document_id: u32) -> bool {
let id = ((prefix_id as u64) << 32) | document_id as u64;