summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2023-11-17 14:59:09 +0100
committermdecimus <mauro@stalw.art>2023-11-17 14:59:09 +0100
commitbcc05340b2c15270229272e0c9738aa261b9668f (patch)
tree86f0af4c0fd6ae643e3558df1fb3525771d5c44f
parenta3e6d152c91fa561f7e4789783e82a699db5ef04 (diff)
FTS storage implementation and background indexing
-rw-r--r--Cargo.lock3
-rw-r--r--crates/imap-proto/Cargo.toml1
-rw-r--r--crates/imap-proto/src/protocol/search.rs34
-rw-r--r--crates/imap/src/op/search.rs716
-rw-r--r--crates/jmap-proto/src/method/query.rs46
-rw-r--r--crates/jmap-proto/src/object/index.rs12
-rw-r--r--crates/jmap/src/api/admin.rs3
-rw-r--r--crates/jmap/src/api/config.rs3
-rw-r--r--crates/jmap/src/api/http.rs13
-rw-r--r--crates/jmap/src/changes/write.rs32
-rw-r--r--crates/jmap/src/email/copy.rs15
-rw-r--r--crates/jmap/src/email/index.rs81
-rw-r--r--crates/jmap/src/email/ingest.rs54
-rw-r--r--crates/jmap/src/email/query.rs381
-rw-r--r--crates/jmap/src/email/set.rs17
-rw-r--r--crates/jmap/src/email/snippet.rs274
-rw-r--r--crates/jmap/src/lib.rs86
-rw-r--r--crates/jmap/src/services/housekeeper.rs63
-rw-r--r--crates/jmap/src/services/index.rs224
-rw-r--r--crates/jmap/src/services/mod.rs1
-rw-r--r--crates/nlp/src/language/mod.rs1
-rw-r--r--crates/nlp/src/language/search_snippet.rs (renamed from crates/store/src/fts/search_snippet.rs)110
-rw-r--r--crates/nlp/src/language/stemmer.rs1
-rw-r--r--crates/nlp/src/language/stopwords.rs1
-rw-r--r--crates/store/Cargo.toml1
-rw-r--r--crates/store/src/backend/foundationdb/blob.rs44
-rw-r--r--crates/store/src/backend/foundationdb/id_assign.rs37
-rw-r--r--crates/store/src/backend/foundationdb/mod.rs1
-rw-r--r--crates/store/src/backend/foundationdb/read.rs18
-rw-r--r--crates/store/src/backend/fs/mod.rs4
-rw-r--r--crates/store/src/backend/mod.rs4
-rw-r--r--crates/store/src/backend/rocksdb/mod.rs5
-rw-r--r--crates/store/src/backend/sqlite/blob.rs83
-rw-r--r--crates/store/src/backend/sqlite/id_assign.rs70
-rw-r--r--crates/store/src/backend/sqlite/main.rs11
-rw-r--r--crates/store/src/backend/sqlite/mod.rs1
-rw-r--r--crates/store/src/backend/sqlite/purge.rs1
-rw-r--r--crates/store/src/backend/sqlite/read.rs18
-rw-r--r--crates/store/src/backend/sqlite/write.rs5
-rw-r--r--crates/store/src/dispatch.rs82
-rw-r--r--crates/store/src/fts/bloom.rs257
-rw-r--r--crates/store/src/fts/builder.rs250
-rw-r--r--crates/store/src/fts/index.rs372
-rw-r--r--crates/store/src/fts/mod.rs211
-rw-r--r--crates/store/src/fts/query.rs291
-rw-r--r--crates/store/src/fts/term_index.rs2
-rw-r--r--crates/store/src/lib.rs27
-rw-r--r--crates/store/src/query/filter.rs97
-rw-r--r--crates/store/src/query/log.rs4
-rw-r--r--crates/store/src/query/mod.rs65
-rw-r--r--crates/store/src/write/batch.rs4
-rw-r--r--crates/store/src/write/hash.rs158
-rw-r--r--crates/store/src/write/key.rs168
-rw-r--r--crates/store/src/write/mod.rs32
-rw-r--r--crates/utils/Cargo.toml1
-rw-r--r--crates/utils/src/lib.rs1
-rw-r--r--crates/utils/src/snowflake.rs69
-rw-r--r--tests/src/imap/append.rs8
-rw-r--r--tests/src/imap/mod.rs8
-rw-r--r--tests/src/imap/store.rs7
-rw-r--r--tests/src/jmap/auth_acl.rs7
-rw-r--r--tests/src/jmap/auth_limits.rs7
-rw-r--r--tests/src/jmap/auth_oauth.rs10
-rw-r--r--tests/src/jmap/blob.rs7
-rw-r--r--tests/src/jmap/delivery.rs7
-rw-r--r--tests/src/jmap/email_changes.rs7
-rw-r--r--tests/src/jmap/email_copy.rs7
-rw-r--r--tests/src/jmap/email_get.rs8
-rw-r--r--tests/src/jmap/email_parse.rs10
-rw-r--r--tests/src/jmap/email_query.rs10
-rw-r--r--tests/src/jmap/email_query_changes.rs6
-rw-r--r--tests/src/jmap/email_search_snippet.rs8
-rw-r--r--tests/src/jmap/email_set.rs8
-rw-r--r--tests/src/jmap/email_submission.rs7
-rw-r--r--tests/src/jmap/event_source.rs10
-rw-r--r--tests/src/jmap/mailbox.rs7
-rw-r--r--tests/src/jmap/mod.rs49
-rw-r--r--tests/src/jmap/push_subscription.rs8
-rw-r--r--tests/src/jmap/quota.rs9
-rw-r--r--tests/src/jmap/sieve_script.rs6
-rw-r--r--tests/src/jmap/stress_test.rs13
-rw-r--r--tests/src/jmap/thread_get.rs7
-rw-r--r--tests/src/jmap/thread_merge.rs7
-rw-r--r--tests/src/jmap/vacation_response.rs6
-rw-r--r--tests/src/jmap/websocket.rs8
-rw-r--r--tests/src/store/assign_id.rs31
-rw-r--r--tests/src/store/mod.rs6
-rw-r--r--tests/src/store/query.rs414
88 files changed, 3085 insertions, 2194 deletions
diff --git a/Cargo.lock b/Cargo.lock
index cdcb31f9..5ef3c586 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2306,6 +2306,7 @@ dependencies = [
"chrono",
"jmap_proto",
"mail-parser",
+ "store",
"tokio",
]
@@ -5131,6 +5132,7 @@ dependencies = [
"futures",
"lazy_static",
"lru-cache",
+ "lz4_flex",
"nlp",
"num_cpus",
"parking_lot",
@@ -5923,6 +5925,7 @@ dependencies = [
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"privdrop",
+ "rand 0.8.5",
"rustls 0.21.7",
"rustls-pemfile",
"serde",
diff --git a/crates/imap-proto/Cargo.toml b/crates/imap-proto/Cargo.toml
index d466bd06..e4b10faf 100644
--- a/crates/imap-proto/Cargo.toml
+++ b/crates/imap-proto/Cargo.toml
@@ -6,6 +6,7 @@ resolver = "2"
[dependencies]
jmap_proto = { path = "../jmap-proto" }
+store = { path = "../store" }
mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "serde_support", "ludicrous_mode"] }
ahash = { version = "0.8" }
chrono = { version = "0.4"}
diff --git a/crates/imap-proto/src/protocol/search.rs b/crates/imap-proto/src/protocol/search.rs
index 12898de7..01d5b57a 100644
--- a/crates/imap-proto/src/protocol/search.rs
+++ b/crates/imap-proto/src/protocol/search.rs
@@ -21,6 +21,8 @@
* for more details.
*/
+use store::fts::{FilterItem, FilterType};
+
use super::{quoted_string, serialize_sequence, Flag, Sequence};
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -129,6 +131,38 @@ pub enum Filter {
ThreadId(String),
}
+impl FilterItem for Filter {
+ fn filter_type(&self) -> FilterType {
+ match self {
+ Filter::From(_)
+ | Filter::To(_)
+ | Filter::Cc(_)
+ | Filter::Bcc(_)
+ | Filter::Subject(_)
+ | Filter::Body(_)
+ | Filter::Text(_)
+ | Filter::Header(_, _) => FilterType::Fts,
+ Filter::And => FilterType::And,
+ Filter::Or => FilterType::Or,
+ Filter::Not => FilterType::Not,
+ Filter::End => FilterType::End,
+ _ => FilterType::Store,
+ }
+ }
+}
+
+impl From<FilterType> for Filter {
+ fn from(value: FilterType) -> Self {
+ match value {
+ FilterType::And => Filter::And,
+ FilterType::Or => Filter::Or,
+ FilterType::Not => Filter::Not,
+ FilterType::End => Filter::End,
+ _ => unreachable!(),
+ }
+ }
+}
+
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ModSeqEntry {
Shared(Flag),
diff --git a/crates/imap/src/op/search.rs b/crates/imap/src/op/search.rs
index 3f8ec0df..270536d2 100644
--- a/crates/imap/src/op/search.rs
+++ b/crates/imap/src/op/search.rs
@@ -36,6 +36,7 @@ use jmap_proto::types::{collection::Collection, id::Id, keyword::Keyword, proper
use mail_parser::HeaderName;
use nlp::language::Language;
use store::{
+ fts::{Field, FilterGroup, FtsFilter, IntoFilterGroup},
query::{self, log::Query, sort::Pagination, ResultSet},
roaring::RoaringBitmap,
write::now,
@@ -275,371 +276,396 @@ impl SessionData {
// Convert query
let mut include_highest_modseq = false;
- for filter in imap_filter {
- match filter {
- search::Filter::Sequence(sequence, uid_filter) => {
- let mut set = RoaringBitmap::new();
- if let (Sequence::SavedSearch, Some(prev_saved_search)) =
- (&sequence, &prev_saved_search)
- {
- if let Some(prev_saved_search) = prev_saved_search {
- let state = mailbox.state.lock();
- for imap_id in prev_saved_search.iter() {
- if let Some(id) = state.uid_to_id.get(&imap_id.uid) {
- set.insert(*id);
- }
+ for filter_group in imap_filter.into_filter_group() {
+ match filter_group {
+ FilterGroup::Fts(conds) => {
+ let mut fts_filters = Vec::with_capacity(filters.len());
+ for cond in conds {
+ match cond {
+ search::Filter::Bcc(text) => {
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Bcc),
+ text,
+ Language::None,
+ ));
}
- } else {
- return Err(StatusResponse::no("No saved search found."));
- }
- } else {
- for id in mailbox
- .sequence_to_ids(&sequence, is_uid || uid_filter)
- .await?
- .keys()
- {
- set.insert(*id);
- }
- }
- filters.push(query::Filter::is_in_set(set));
- }
- search::Filter::All => {
- filters.push(query::Filter::is_in_set(message_ids.clone()));
- }
- search::Filter::Answered => {
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Answered,
- ));
- }
- /*search::Filter::Bcc(text) => {
- filters.push(query::Filter::has_text(Property::Bcc, text, Language::None));
- }
- search::Filter::Before(date) => {
- filters.push(query::Filter::lt(Property::ReceivedAt, date as u64));
- }
- search::Filter::Body(text) => {
- filters.push(query::Filter::has_text_detect(
- Property::TextBody,
- text,
- self.jmap.config.default_language,
- ));
- }
- search::Filter::Cc(text) => {
- filters.push(query::Filter::has_text(Property::Cc, text, Language::None));
- }
- search::Filter::Deleted => {
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Deleted,
- ));
- }
- search::Filter::Draft => {
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Draft,
- ));
- }
- search::Filter::Flagged => {
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Flagged,
- ));
- }
- search::Filter::From(text) => {
- filters.push(query::Filter::has_text(
- Property::From,
- text,
- Language::None,
- ));
- }
- search::Filter::Header(header, value) => match HeaderName::parse(&header) {
- Some(HeaderName::Other(_)) | None => {
- return Err(StatusResponse::no(format!(
- "Querying non-RFC header '{header}' is not allowed.",
- )));
- }
- Some(header_name) => {
- let is_id = matches!(
- header_name,
- HeaderName::MessageId
- | HeaderName::InReplyTo
- | HeaderName::References
- | HeaderName::ResentMessageId
- );
- let tokens = if !value.is_empty() {
- let header_num = header_name.id().to_string();
- value
- .split_ascii_whitespace()
- .filter_map(|token| {
- if token.len() < MAX_TOKEN_LENGTH {
- if is_id {
- format!("{header_num}{token}")
+ search::Filter::Body(text) => {
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Body,
+ text,
+ self.jmap.config.default_language,
+ ));
+ }
+ search::Filter::Cc(text) => {
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Cc),
+ text,
+ Language::None,
+ ));
+ }
+ search::Filter::From(text) => {
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::From),
+ text,
+ Language::None,
+ ));
+ }
+ search::Filter::Header(header, value) => {
+ match HeaderName::parse(header) {
+ Some(HeaderName::Other(header_name)) => {
+ return Err(StatusResponse::no(format!(
+ "Querying header '{header_name}' is not supported.",
+ )));
+ }
+ Some(header_name) => {
+ if !value.is_empty() {
+ if matches!(
+ header_name,
+ HeaderName::MessageId
+ | HeaderName::InReplyTo
+ | HeaderName::References
+ | HeaderName::ResentMessageId
+ ) {
+ fts_filters.push(FtsFilter::has_keyword(
+ Field::Header(header_name),
+ value,
+ ));
+ } else {
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(header_name),
+ value,
+ Language::None,
+ ));
+ }
} else {
- format!("{header_num}{}", token.to_lowercase())
+ fts_filters.push(FtsFilter::has_keyword(
+ Field::Keyword,
+ header_name.as_str().to_lowercase(),
+ ));
}
- .into()
- } else {
- None
}
- })
- .collect::<Vec<_>>()
- } else {
- vec![]
- };
- match tokens.len() {
- 0 => {
- filters.push(query::Filter::has_raw_text(
- Property::Headers,
- header_name.id().to_string(),
+ None => (),
+ }
+ }
+ search::Filter::Subject(text) => {
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Header(HeaderName::Subject),
+ text,
+ self.jmap.config.default_language,
));
}
- 1 => {
- filters.push(query::Filter::has_raw_text(
- Property::Headers,
- tokens.into_iter().next().unwrap(),
+ search::Filter::Text(text) => {
+ fts_filters.push(FtsFilter::Or);
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::From),
+ &text,
+ Language::None,
+ ));
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::To),
+ &text,
+ Language::None,
+ ));
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Cc),
+ &text,
+ Language::None,
+ ));
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Bcc),
+ &text,
+ Language::None,
+ ));
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Header(HeaderName::Subject),
+ &text,
+ self.jmap.config.default_language,
));
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Body,
+ &text,
+ self.jmap.config.default_language,
+ ));
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Attachment,
+ text,
+ self.jmap.config.default_language,
+ ));
+ fts_filters.push(FtsFilter::End);
}
- _ => {
- filters.push(query::Filter::And);
- for token in tokens {
- filters.push(query::Filter::has_raw_text(
- Property::Headers,
- token,
- ));
- }
- filters.push(query::Filter::End);
+ search::Filter::To(text) => {
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::To),
+ text,
+ Language::None,
+ ));
+ }
+ search::Filter::And => {
+ fts_filters.push(FtsFilter::And);
}
+ search::Filter::Or => {
+ fts_filters.push(FtsFilter::Or);
+ }
+ search::Filter::Not => {
+ fts_filters.push(FtsFilter::Not);
+ }
+ search::Filter::End => {
+ fts_filters.push(FtsFilter::End);
+ }
+ _ => (),
}
}
- },
- search::Filter::Keyword(keyword) => {
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::from(keyword),
- ));
- }
- search::Filter::Larger(size) => {
- filters.push(query::Filter::gt(Property::Size, size));
- }
- search::Filter::On(date) => {
- filters.push(query::Filter::And);
- filters.push(query::Filter::ge(Property::ReceivedAt, date as u64));
- filters.push(query::Filter::lt(
- Property::ReceivedAt,
- (date + 86400) as u64,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::Seen => {
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Seen,
- ));
- }
- search::Filter::SentBefore(date) => {
- filters.push(query::Filter::lt(Property::SentAt, date as u64));
- }
- search::Filter::SentOn(date) => {
- filters.push(query::Filter::And);
- filters.push(query::Filter::ge(Property::SentAt, date as u64));
- filters.push(query::Filter::lt(Property::SentAt, (date + 86400) as u64));
- filters.push(query::Filter::End);
- }
- search::Filter::SentSince(date) => {
- filters.push(query::Filter::ge(Property::SentAt, date as u64));
- }
- search::Filter::Since(date) => {
- filters.push(query::Filter::ge(Property::ReceivedAt, date as u64));
- }
- search::Filter::Smaller(size) => {
- filters.push(query::Filter::lt(Property::Size, size));
- }
- search::Filter::Subject(text) => {
- filters.push(query::Filter::has_text_detect(
- Property::Subject,
- text,
- self.jmap.config.default_language,
- ));
- }
- search::Filter::Text(text) => {
- filters.push(query::Filter::Or);
- filters.push(query::Filter::has_text(
- Property::From,
- &text,
- Language::None,
- ));
- filters.push(query::Filter::has_text(Property::To, &text, Language::None));
- filters.push(query::Filter::has_text(Property::Cc, &text, Language::None));
- filters.push(query::Filter::has_text(
- Property::Bcc,
- &text,
- Language::None,
- ));
- filters.push(query::Filter::has_text_detect(
- Property::Subject,
- &text,
- self.jmap.config.default_language,
- ));
- filters.push(query::Filter::has_text_detect(
- Property::TextBody,
- &text,
- self.jmap.config.default_language,
- ));
- filters.push(query::Filter::has_text_detect(
- Property::Attachments,
- text,
- self.jmap.config.default_language,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::To(text) => {
- filters.push(query::Filter::has_text(Property::To, text, Language::None));
- }*/
- search::Filter::Unanswered => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Answered,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::Undeleted => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Deleted,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::Undraft => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Draft,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::Unflagged => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Flagged,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::Unkeyword(keyword) => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::from(keyword),
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::Unseen => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Seen,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::And => {
- filters.push(query::Filter::And);
- }
- search::Filter::Or => {
- filters.push(query::Filter::Or);
- }
- search::Filter::Not => {
- filters.push(query::Filter::Not);
- }
- search::Filter::End => {
- filters.push(query::Filter::End);
- }
- search::Filter::Recent => {
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Recent,
- ));
- }
- search::Filter::New => {
- filters.push(query::Filter::And);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Recent,
- ));
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Seen,
- ));
- filters.push(query::Filter::End);
- filters.push(query::Filter::End);
- }
- search::Filter::Old => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(
- Property::Keywords,
- Keyword::Seen,
- ));
- filters.push(query::Filter::End);
- }
- search::Filter::Older(secs) => {
- filters.push(query::Filter::le(
- Property::ReceivedAt,
- now().saturating_sub(secs as u64),
- ));
- }
- search::Filter::Younger(secs) => {
- filters.push(query::Filter::ge(
- Property::ReceivedAt,
- now().saturating_sub(secs as u64),
+
+ filters.push(query::Filter::is_in_set(
+ self.jmap
+ .fts_filter(mailbox.id.account_id, Collection::Email, fts_filters)
+ .await?,
));
}
- search::Filter::ModSeq((modseq, _)) => {
- let mut set = RoaringBitmap::new();
- for change in self
- .jmap
- .changes_(
- mailbox.id.account_id,
- Collection::Email,
- Query::from_modseq(modseq),
- )
- .await?
- .changes
- {
- let id = (change.unwrap_id() & u32::MAX as u64) as u32;
- if message_ids.contains(id) {
- set.insert(id);
+ FilterGroup::Store(cond) => match cond {
+ search::Filter::Sequence(sequence, uid_filter) => {
+ let mut set = RoaringBitmap::new();
+ if let (Sequence::SavedSearch, Some(prev_saved_search)) =
+ (&sequence, &prev_saved_search)
+ {
+ if let Some(prev_saved_search) = prev_saved_search {
+ let state = mailbox.state.lock();
+ for imap_id in prev_saved_search.iter() {
+ if let Some(id) = state.uid_to_id.get(&imap_id.uid) {
+ set.insert(*id);
+ }
+ }
+ } else {
+ return Err(StatusResponse::no("No saved search found."));
+ }
+ } else {
+ for id in mailbox
+ .sequence_to_ids(&sequence, is_uid || uid_filter)
+ .await?
+ .keys()
+ {
+ set.insert(*id);
+ }
}
+ filters.push(query::Filter::is_in_set(set));
}
- filters.push(query::Filter::is_in_set(set));
- include_highest_modseq = true;
- }
- search::Filter::EmailId(id) => {
- if let Some(id) = Id::from_bytes(id.as_bytes()) {
- filters.push(query::Filter::is_in_set(
- RoaringBitmap::from_sorted_iter([id.document_id()]).unwrap(),
+ search::Filter::All => {
+ filters.push(query::Filter::is_in_set(message_ids.clone()));
+ }
+ search::Filter::Answered => {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Answered,
));
- } else {
- return Err(StatusResponse::no(format!(
- "Failed to parse email id '{id}'.",
- )));
}
- }
- search::Filter::ThreadId(id) => {
- if let Some(id) = Id::from_bytes(id.as_bytes()) {
+ search::Filter::Before(date) => {
+ filters.push(query::Filter::lt(Property::ReceivedAt, date as u64));
+ }
+ search::Filter::Deleted => {
filters.push(query::Filter::is_in_bitmap(
- Property::ThreadId,
- id.document_id(),
+ Property::Keywords,
+ Keyword::Deleted,
));
- } else {
- return Err(StatusResponse::no(format!(
- "Failed to parse thread id '{id}'.",
- )));
}
- }
- _ => (),
+ search::Filter::Draft => {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Draft,
+ ));
+ }
+ search::Filter::Flagged => {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Flagged,
+ ));
+ }
+ search::Filter::Keyword(keyword) => {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::from(keyword),
+ ));
+ }
+ search::Filter::Larger(size) => {
+ filters.push(query::Filter::gt(Property::Size, size));
+ }
+ search::Filter::On(date) => {
+ filters.push(query::Filter::And);
+ filters.push(query::Filter::ge(Property::ReceivedAt, date as u64));
+ filters.push(query::Filter::lt(
+ Property::ReceivedAt,
+ (date + 86400) as u64,
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Seen => {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Seen,
+ ));
+ }
+ search::Filter::SentBefore(date) => {
+ filters.push(query::Filter::lt(Property::SentAt, date as u64));
+ }
+ search::Filter::SentOn(date) => {
+ filters.push(query::Filter::And);
+ filters.push(query::Filter::ge(Property::SentAt, date as u64));
+ filters.push(query::Filter::lt(Property::SentAt, (date + 86400) as u64));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::SentSince(date) => {
+ filters.push(query::Filter::ge(Property::SentAt, date as u64));
+ }
+ search::Filter::Since(date) => {
+ filters.push(query::Filter::ge(Property::ReceivedAt, date as u64));
+ }
+ search::Filter::Smaller(size) => {
+ filters.push(query::Filter::lt(Property::Size, size));
+ }
+ search::Filter::Unanswered => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Answered,
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Undeleted => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Deleted,
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Undraft => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Draft,
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Unflagged => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Flagged,
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Unkeyword(keyword) => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::from(keyword),
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Unseen => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Seen,
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::And => {
+ filters.push(query::Filter::And);
+ }
+ search::Filter::Or => {
+ filters.push(query::Filter::Or);
+ }
+ search::Filter::Not => {
+ filters.push(query::Filter::Not);
+ }
+ search::Filter::End => {
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Recent => {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Recent,
+ ));
+ }
+ search::Filter::New => {
+ filters.push(query::Filter::And);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Recent,
+ ));
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Seen,
+ ));
+ filters.push(query::Filter::End);
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Old => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(
+ Property::Keywords,
+ Keyword::Seen,
+ ));
+ filters.push(query::Filter::End);
+ }
+ search::Filter::Older(secs) => {
+ filters.push(query::Filter::le(
+ Property::ReceivedAt,
+ now().saturating_sub(secs as u64),
+ ));
+ }
+ search::Filter::Younger(secs) => {
+ filters.push(query::Filter::ge(
+ Property::ReceivedAt,
+ now().saturating_sub(secs as u64),
+ ));
+ }
+ search::Filter::ModSeq((modseq, _)) => {
+ let mut set = RoaringBitmap::new();
+ for change in self
+ .jmap
+ .changes_(
+ mailbox.id.account_id,
+ Collection::Email,
+ Query::from_modseq(modseq),
+ )
+ .await?
+ .changes
+ {
+ let id = (change.unwrap_id() & u32::MAX as u64) as u32;
+ if message_ids.contains(id) {
+ set.insert(id);
+ }
+ }
+ filters.push(query::Filter::is_in_set(set));
+ include_highest_modseq = true;
+ }
+ search::Filter::EmailId(id) => {
+ if let Some(id) = Id::from_bytes(id.as_bytes()) {
+ filters.push(query::Filter::is_in_set(
+ RoaringBitmap::from_sorted_iter([id.document_id()]).unwrap(),
+ ));
+ } else {
+ return Err(StatusResponse::no(format!(
+ "Failed to parse email id '{id}'.",
+ )));
+ }
+ }
+ search::Filter::ThreadId(id) => {
+ if let Some(id) = Id::from_bytes(id.as_bytes()) {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::ThreadId,
+ id.document_id(),
+ ));
+ } else {
+ return Err(StatusResponse::no(format!(
+ "Failed to parse thread id '{id}'.",
+ )));
+ }
+ }
+ _ => (),
+ },
}
}
diff --git a/crates/jmap-proto/src/method/query.rs b/crates/jmap-proto/src/method/query.rs
index b5eee2ea..d32e75f0 100644
--- a/crates/jmap-proto/src/method/query.rs
+++ b/crates/jmap-proto/src/method/query.rs
@@ -23,6 +23,8 @@
use std::fmt::Display;
+use store::fts::{FilterItem, FilterType, FtsFilter};
+
use crate::{
error::method::MethodError,
object::{email, mailbox},
@@ -785,3 +787,47 @@ impl From<Filter> for store::query::Filter {
}
}
}
+
+impl<T: Into<u8> + Display + Clone + std::fmt::Debug> From<Filter> for FtsFilter<T> {
+ fn from(value: Filter) -> Self {
+ match value {
+ Filter::And => Self::And,
+ Filter::Or => Self::Or,
+ Filter::Not => Self::Not,
+ Filter::Close => Self::End,
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl FilterItem for Filter {
+ fn filter_type(&self) -> FilterType {
+ match self {
+ Filter::Text(_)
+ | Filter::From(_)
+ | Filter::To(_)
+ | Filter::Cc(_)
+ | Filter::Bcc(_)
+ | Filter::Subject(_)
+ | Filter::Body(_)
+ | Filter::Header(_) => FilterType::Fts,
+ Filter::And => FilterType::And,
+ Filter::Or => FilterType::Or,
+ Filter::Not => FilterType::Not,
+ Filter::Close => FilterType::End,
+ _ => FilterType::Store,
+ }
+ }
+}
+
+impl From<FilterType> for Filter {
+ fn from(value: FilterType) -> Self {
+ match value {
+ FilterType::And => Filter::And,
+ FilterType::Or => Filter::Or,
+ FilterType::Not => Filter::Not,
+ FilterType::End => Filter::Close,
+ _ => unreachable!(),
+ }
+ }
+}
diff --git a/crates/jmap-proto/src/object/index.rs b/crates/jmap-proto/src/object/index.rs
index 8b8a546d..50a98c0b 100644
--- a/crates/jmap-proto/src/object/index.rs
+++ b/crates/jmap-proto/src/object/index.rs
@@ -25,8 +25,8 @@ use std::{borrow::Cow, collections::HashSet};
use store::{
write::{
- assert::HashedValue, BatchBuilder, BitmapClass, IntoOperations, Operation, TagValue,
- TokenizeText, ValueClass, ValueOp,
+ assert::HashedValue, BatchBuilder, BitmapClass, BitmapHash, IntoOperations, Operation,
+ TagValue, TokenizeText, ValueClass, ValueOp,
},
Serialize,
};
@@ -238,7 +238,7 @@ fn merge_batch(
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
- token: token.into(),
+ token: BitmapHash::new(token),
},
set,
});
@@ -301,7 +301,7 @@ fn merge_batch(
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
- token: token.into_bytes(),
+ token: BitmapHash::new(token),
},
set,
});
@@ -480,7 +480,7 @@ fn build_batch(
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
- token: token.into_bytes(),
+ token: BitmapHash::new(token),
},
set,
});
@@ -512,7 +512,7 @@ fn build_batch(
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
- token: token.into_bytes(),
+ token: BitmapHash::new(token),
},
set,
});
diff --git a/crates/jmap/src/api/admin.rs b/crates/jmap/src/api/admin.rs
index aeebbacf..9f0cd4a5 100644
--- a/crates/jmap/src/api/admin.rs
+++ b/crates/jmap/src/api/admin.rs
@@ -39,6 +39,9 @@ impl JMAP {
// Delete account data
self.store.purge_account(account_id).await?;
+ // Remove FTS index
+ let todo = 1;
+
// Delete account
let mut batch = BatchBuilder::new();
batch
diff --git a/crates/jmap/src/api/config.rs b/crates/jmap/src/api/config.rs
index 23085e33..ed35893e 100644
--- a/crates/jmap/src/api/config.rs
+++ b/crates/jmap/src/api/config.rs
@@ -41,6 +41,9 @@ impl crate::Config {
changes_max_results: settings
.property("jmap.protocol.changes.max-results")?
.unwrap_or(5000),
+ snippet_max_results: settings
+ .property("jmap.protocol.search-snippet.max-results")?
+ .unwrap_or(100),
request_max_size: settings
.property("jmap.protocol.request.max-size")?
.unwrap_or(10000000),
diff --git a/crates/jmap/src/api/http.rs b/crates/jmap/src/api/http.rs
index 56806f90..03ff7dac 100644
--- a/crates/jmap/src/api/http.rs
+++ b/crates/jmap/src/api/http.rs
@@ -377,6 +377,19 @@ pub async fn parse_jmap_request(
.into_http_response(),
};
}
+ ("db", "purge", &Method::GET) => {
+ return match jmap.store.purge_bitmaps().await {
+ Ok(_) => {
+ JsonResponse::new(Value::String("success".into())).into_http_response()
+ }
+ Err(err) => RequestError::blank(
+ StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
+ "Purge database failed",
+ err.to_string(),
+ )
+ .into_http_response(),
+ };
+ }
(path_1 @ ("queue" | "report"), path_2, &Method::GET) => {
return jmap
.smtp
diff --git a/crates/jmap/src/changes/write.rs b/crates/jmap/src/changes/write.rs
index 70e9b791..d2baa0c7 100644
--- a/crates/jmap/src/changes/write.rs
+++ b/crates/jmap/src/changes/write.rs
@@ -33,18 +33,30 @@ impl JMAP {
.map(ChangeLogBuilder::with_change_id)
}
- pub async fn assign_change_id(&self, account_id: u32) -> Result<u64, MethodError> {
- self.store
- .assign_change_id(account_id)
- .await
- .map_err(|err| {
- tracing::error!(
+ pub async fn assign_change_id(&self, _: u32) -> Result<u64, MethodError> {
+ self.generate_snowflake_id()
+ /*self.store
+ .assign_change_id(account_id)
+ .await
+ .map_err(|err| {
+ tracing::error!(
+ event = "error",
+ context = "change_log",
+ error = ?err,
+ "Failed to assign changeId.");
+ MethodError::ServerPartialFail
+ })*/
+ }
+
+ pub fn generate_snowflake_id(&self) -> Result<u64, MethodError> {
+ self.snowflake_id.generate().ok_or_else(|| {
+ tracing::error!(
event = "error",
context = "change_log",
- error = ?err,
- "Failed to assign changeId.");
- MethodError::ServerPartialFail
- })
+ "Failed to generate snowflake id."
+ );
+ MethodError::ServerPartialFail
+ })
}
pub async fn commit_changes(
diff --git a/crates/jmap/src/email/copy.rs b/crates/jmap/src/email/copy.rs
index 0d31b94e..de34b465 100644
--- a/crates/jmap/src/email/copy.rs
+++ b/crates/jmap/src/email/copy.rs
@@ -53,7 +53,7 @@ use store::{
};
use utils::map::vec_map::VecMap;
-use crate::{auth::AccessToken, Bincode, JMAP};
+use crate::{auth::AccessToken, services::housekeeper::Event, Bincode, NamedKey, JMAP};
use super::{
index::{EmailIndexBuilder, TrimTextValue, MAX_SORT_FIELD_LENGTH},
@@ -291,7 +291,7 @@ impl JMAP {
keywords: Vec<Keyword>,
received_at: Option<UTCDate>,
) -> Result<Result<IngestedEmail, SetError>, MethodError> {
- // Obtain term index and metadata
+ // Obtain metadata
let mut metadata = if let Some(metadata) = self
.get_property::<Bincode<MessageMetadata>>(
from_account_id,
@@ -405,6 +405,14 @@ impl JMAP {
.value(Property::MailboxIds, mailboxes, F_VALUE | F_BITMAP)
.value(Property::Keywords, keywords, F_VALUE | F_BITMAP)
.value(Property::Cid, changes.change_id, F_VALUE)
+ .set(
+ NamedKey::IndexEmail::<&[u8]> {
+ account_id,
+ document_id: message_id,
+ seq: self.generate_snowflake_id()?,
+ },
+ metadata.blob_hash.clone(),
+ )
.custom(EmailIndexBuilder::set(metadata))
.custom(changes);
@@ -417,6 +425,9 @@ impl JMAP {
MethodError::ServerPartialFail
})?;
+ // Request FTS index
+ let _ = self.housekeeper_tx.send(Event::IndexStart).await;
+
Ok(Ok(email))
}
}
diff --git a/crates/jmap/src/email/index.rs b/crates/jmap/src/email/index.rs
index db352ad0..8dac003c 100644
--- a/crates/jmap/src/email/index.rs
+++ b/crates/jmap/src/email/index.rs
@@ -32,6 +32,8 @@ use mail_parser::{
};
use nlp::language::Language;
use store::{
+ backend::MAX_TOKEN_LENGTH,
+ fts::{index::FtsDocument, Field},
write::{BatchBuilder, BlobOp, IntoOperations, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE},
BlobHash,
};
@@ -60,13 +62,13 @@ pub(super) trait IndexMessage {
keywords: Vec<Keyword>,
mailbox_ids: Vec<u32>,
received_at: u64,
- ) -> store::Result<&mut Self>;
+ ) -> &mut Self;
fn index_headers(&mut self, headers: &[Header<'_>], options: u32);
}
-pub(super) trait IndexMessageText<'x> {
- fn index_message(&mut self, message: &'x Message<'x>);
+pub trait IndexMessageText<'x>: Sized {
+ fn index_message(self, message: &'x Message<'x>) -> Self;
}
impl IndexMessage for BatchBuilder {
@@ -77,7 +79,7 @@ impl IndexMessage for BatchBuilder {
keywords: Vec<Keyword>,
mailbox_ids: Vec<u32>,
received_at: u64,
- ) -> store::Result<&mut Self> {
+ ) -> &mut Self {
// Index keywords
self.value(Property::Keywords, keywords, F_VALUE | F_BITMAP);
@@ -164,7 +166,7 @@ impl IndexMessage for BatchBuilder {
F_VALUE,
);
- Ok(self)
+ self
}
fn index_headers(&mut self, headers: &[Header<'_>], options: u32) {
@@ -262,9 +264,8 @@ impl IndexMessage for BatchBuilder {
}
}
-/*
-impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
- fn index_message(&mut self, message: &'x Message<'x>) {
+impl<'x> IndexMessageText<'x> for FtsDocument<'x, HeaderName<'x>> {
+ fn index_message(mut self, message: &'x Message<'x>) -> Self {
let mut language = Language::Unknown;
for (part_id, part) in message.parts.iter().take(MAX_MESSAGE_PARTS).enumerate() {
@@ -277,9 +278,9 @@ impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
continue;
}
// Index hasHeader property
- self.index_raw_token(Property::Headers, header.name.as_str());
+ self.index_keyword(Field::Keyword, header.name.as_str().to_ascii_lowercase());
- match header.name {
+ match &header.name {
HeaderName::MessageId
| HeaderName::InReplyTo
| HeaderName::References
@@ -287,45 +288,35 @@ impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
header.value.visit_text(|id| {
// Index ids without stemming
if id.len() < MAX_TOKEN_LENGTH {
- let fix = "true";
- self.index_raw_token(Property::MessageId, id.to_string());
+ self.index_keyword(
+ Field::Header(header.name.clone()),
+ id.to_string(),
+ );
}
});
}
HeaderName::From | HeaderName::To | HeaderName::Cc | HeaderName::Bcc => {
- let property = Property::from_header(&header.name);
-
header.value.visit_addresses(|_, value| {
// Index an address name or email without stemming
- self.index_raw(property.clone(), value.to_string());
+ self.index_tokenized(
+ Field::Header(header.name.clone()),
+ value.to_string(),
+ );
});
}
HeaderName::Subject => {
// Index subject for FTS
- self.index(
- Property::Subject,
- match &header.value {
- HeaderValue::Text(text) => text.clone(),
- HeaderValue::TextList(list) if !list.is_empty() => {
- list.first().unwrap().clone()
- }
- _ => "".into(),
- },
- language,
- );
+ if let Some(subject) = header.value.as_text() {
+ self.index(Field::Header(HeaderName::Subject), subject, language);
+ }
}
HeaderName::Comments | HeaderName::Keywords | HeaderName::ListId => {
// Index headers
header.value.visit_text(|text| {
- for token in text.split_ascii_whitespace() {
- if token.len() < MAX_TOKEN_LENGTH {
- let fix = "true";
- self.index_raw_token(
- Property::Headers,
- token.to_lowercase(),
- );
- }
- }
+ self.index_tokenized(
+ Field::Header(header.name.clone()),
+ text.to_string(),
+ );
});
}
_ => (),
@@ -337,9 +328,9 @@ impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
PartType::Text(text) => {
if message.text_body.contains(&part_id) || message.html_body.contains(&part_id)
{
- self.index(Property::TextBody, text.as_ref(), part_language);
+ self.index(Field::Body, text.as_ref(), part_language);
} else {
- self.index(Property::Attachments, text.as_ref(), part_language);
+ self.index(Field::Attachment, text.as_ref(), part_language);
}
}
PartType::Html(html) => {
@@ -347,9 +338,9 @@ impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
if message.text_body.contains(&part_id) || message.html_body.contains(&part_id)
{
- self.index(Property::TextBody, text, part_language);
+ self.index(Field::Body, text, part_language);
} else {
- self.index(Property::Attachments, text, part_language);
+ self.index(Field::Attachment, text, part_language);
}
}
PartType::Message(nested_message) => {
@@ -360,21 +351,17 @@ impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
if let Some(HeaderValue::Text(subject)) =
nested_message.header(HeaderName::Subject)
{
- self.index(
- Property::Attachments,
- subject.as_ref(),
- nested_message_language,
- );
+ self.index(Field::Attachment, subject.as_ref(), nested_message_language);
}
for sub_part in nested_message.parts.iter().take(MAX_MESSAGE_PARTS) {
let language = sub_part.language().unwrap_or(nested_message_language);
match &sub_part.body {
PartType::Text(text) => {
- self.index(Property::Attachments, text.as_ref(), language);
+ self.index(Field::Attachment, text.as_ref(), language);
}
PartType::Html(html) => {
- self.index(Property::Attachments, html_to_text(html), language);
+ self.index(Field::Attachment, html_to_text(html), language);
}
_ => (),
}
@@ -383,9 +370,9 @@ impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
_ => {}
}
}
+ self
}
}
-*/
pub struct EmailIndexBuilder<'x> {
inner: Bincode<MessageMetadata<'x>>,
diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs
index 72f92136..9fda9e88 100644
--- a/crates/jmap/src/email/ingest.rs
+++ b/crates/jmap/src/email/ingest.rs
@@ -33,6 +33,7 @@ use jmap_proto::{
use mail_parser::{
parsers::fields::thread::thread_name, HeaderName, HeaderValue, Message, PartType,
};
+
use store::{
ahash::AHashSet,
query::Filter,
@@ -46,7 +47,8 @@ use utils::map::vec_map::VecMap;
use crate::{
email::index::{IndexMessage, MAX_ID_LENGTH},
- IngestError, JMAP,
+ services::housekeeper::Event,
+ IngestError, NamedKey, JMAP,
};
use super::{
@@ -237,15 +239,14 @@ impl JMAP {
IngestError::Temporary
})?;
let change_id = self
- .store
.assign_change_id(params.account_id)
.await
- .map_err(|err| {
+ .map_err(|_| {
tracing::error!(
event = "error",
context = "email_ingest",
- error = ?err,
- "Failed to assign changeId.");
+ "Failed to assign changeId."
+ );
IngestError::Temporary
})?;
@@ -307,17 +308,19 @@ impl JMAP {
params.mailbox_ids,
params.received_at.unwrap_or_else(now),
)
- .map_err(|err| {
- tracing::error!(
- event = "error",
- context = "email_ingest",
- error = ?err,
- "Failed to index message.");
- IngestError::Temporary
- })?
.value(Property::Cid, change_id, F_VALUE)
.value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP)
- .custom(changes);
+ .custom(changes)
+ .set(
+ NamedKey::IndexEmail::<&[u8]> {
+ account_id: params.account_id,
+ document_id,
+ seq: self
+ .generate_snowflake_id()
+ .map_err(|_| IngestError::Temporary)?,
+ },
+ blob_id.hash.clone(),
+ );
self.store.write(batch.build()).await.map_err(|err| {
tracing::error!(
event = "error",
@@ -327,6 +330,9 @@ impl JMAP {
IngestError::Temporary
})?;
+ // Request FTS index
+ let _ = self.housekeeper_tx.send(Event::IndexStart).await;
+
Ok(IngestedEmail {
id,
change_id,
@@ -434,18 +440,14 @@ impl JMAP {
// Delete all but the most common threadId
let mut batch = BatchBuilder::new();
- let change_id = self
- .store
- .assign_change_id(account_id)
- .await
- .map_err(|err| {
- tracing::error!(
- event = "error",
- context = "find_or_merge_thread",
- error = ?err,
- "Failed to assign changeId for thread merge.");
- IngestError::Temporary
- })?;
+ let change_id = self.assign_change_id(account_id).await.map_err(|_| {
+ tracing::error!(
+ event = "error",
+ context = "find_or_merge_thread",
+ "Failed to assign changeId for thread merge."
+ );
+ IngestError::Temporary
+ })?;
let mut changes = ChangeLogBuilder::with_change_id(change_id);
batch
.with_account_id(account_id)
diff --git a/crates/jmap/src/email/query.rs b/crates/jmap/src/email/query.rs
index 68499cbd..b6aac6a5 100644
--- a/crates/jmap/src/email/query.rs
+++ b/crates/jmap/src/email/query.rs
@@ -27,7 +27,10 @@ use jmap_proto::{
object::email::QueryArguments,
types::{acl::Acl, collection::Collection, keyword::Keyword, property::Property},
};
+use mail_parser::HeaderName;
+use nlp::language::Language;
use store::{
+ fts::{Field, FilterGroup, FtsFilter, IntoFilterGroup},
query::{self},
roaring::RoaringBitmap,
write::ValueClass,
@@ -45,200 +48,226 @@ impl JMAP {
let account_id = request.account_id.document_id();
let mut filters = Vec::with_capacity(request.filter.len());
- for cond in std::mem::take(&mut request.filter) {
- match cond {
- Filter::InMailbox(mailbox) => filters.push(query::Filter::is_in_bitmap(
- Property::MailboxIds,
- mailbox.document_id(),
- )),
- Filter::InMailboxOtherThan(mailboxes) => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::Or);
- for mailbox in mailboxes {
- filters.push(query::Filter::is_in_bitmap(
- Property::MailboxIds,
- mailbox.document_id(),
- ));
- }
- filters.push(query::Filter::End);
- filters.push(query::Filter::End);
- }
- Filter::Before(date) => filters.push(query::Filter::lt(Property::ReceivedAt, date)),
- Filter::After(date) => filters.push(query::Filter::gt(Property::ReceivedAt, date)),
- Filter::MinSize(size) => filters.push(query::Filter::ge(Property::Size, size)),
- Filter::MaxSize(size) => filters.push(query::Filter::lt(Property::Size, size)),
- Filter::AllInThreadHaveKeyword(keyword) => filters.push(query::Filter::is_in_set(
- self.thread_keywords(account_id, keyword, true).await?,
- )),
- Filter::SomeInThreadHaveKeyword(keyword) => filters.push(query::Filter::is_in_set(
- self.thread_keywords(account_id, keyword, false).await?,
- )),
- Filter::NoneInThreadHaveKeyword(keyword) => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_set(
- self.thread_keywords(account_id, keyword, false).await?,
- ));
- filters.push(query::Filter::End);
- }
- Filter::HasKeyword(keyword) => {
- filters.push(query::Filter::is_in_bitmap(Property::Keywords, keyword))
- }
- Filter::NotKeyword(keyword) => {
- filters.push(query::Filter::Not);
- filters.push(query::Filter::is_in_bitmap(Property::Keywords, keyword));
- filters.push(query::Filter::End);
- }
- Filter::HasAttachment(has_attach) => {
- if !has_attach {
- filters.push(query::Filter::Not);
- }
- filters.push(query::Filter::is_in_bitmap(Property::HasAttachment, ()));
- if !has_attach {
- filters.push(query::Filter::End);
- }
- }
- /*Filter::Text(text) => {
- filters.push(query::Filter::Or);
- filters.push(query::Filter::has_text(
- Property::From,
- &text,
- Language::None,
- ));
- filters.push(query::Filter::has_text(Property::To, &text, Language::None));
- filters.push(query::Filter::has_text(Property::Cc, &text, Language::None));
- filters.push(query::Filter::has_text(
- Property::Bcc,
- &text,
- Language::None,
- ));
- filters.push(query::Filter::has_text_detect(
- Property::Subject,
- &text,
- self.config.default_language,
- ));
- filters.push(query::Filter::has_text_detect(
- Property::TextBody,
- &text,
- self.config.default_language,
- ));
- filters.push(query::Filter::has_text_detect(
- Property::Attachments,
- text,
- self.config.default_language,
- ));
- filters.push(query::Filter::End);
- }
- Filter::From(text) => filters.push(query::Filter::has_text(
- Property::From,
- text,
+ for cond_group in std::mem::take(&mut request.filter).into_filter_group() {
+ match cond_group {
+ FilterGroup::Fts(conds) => {
+ let mut fts_filters = Vec::with_capacity(filters.len());
+ for cond in conds {
+ match cond {
+ Filter::Text(text) => {
+ fts_filters.push(FtsFilter::Or);
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::From),
+ &text,
Language::None,
- )),
- Filter::To(text) => {
- filters.push(query::Filter::has_text(Property::To, text, Language::None))
- }
- Filter::Cc(text) => {
- filters.push(query::Filter::has_text(Property::Cc, text, Language::None))
- }
- Filter::Bcc(text) => {
- filters.push(query::Filter::has_text(Property::Bcc, text, Language::None))
- }
- Filter::Subject(text) => filters.push(query::Filter::has_text_detect(
- Property::Subject,
- text,
+ ));
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::To),
+ &text,
+ Language::None,
+ ));
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Cc),
+ &text,
+ Language::None,
+ ));
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Bcc),
+ &text,
+ Language::None,
+ ));
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Header(HeaderName::Subject),
+ &text,
+ self.config.default_language,
+ ));
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Body,
+ &text,
self.config.default_language,
- )),
- Filter::Body(text) => filters.push(query::Filter::has_text_detect(
- Property::TextBody,
+ ));
+ fts_filters.push(FtsFilter::has_text_detect(
+ Field::Attachment,
text,
self.config.default_language,
- )),
- Filter::Header(header) => {
- let mut header = header.into_iter();
- let header_name = header.next().ok_or_else(|| {
- MethodError::InvalidArguments("Header name is missing.".to_string())
- })?;
+ ));
+ fts_filters.push(FtsFilter::End);
+ }
+ Filter::From(text) => fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::From),
+ text,
+ Language::None,
+ )),
+ Filter::To(text) => fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::To),
+ text,
+ Language::None,
+ )),
+ Filter::Cc(text) => fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Cc),
+ text,
+ Language::None,
+ )),
+ Filter::Bcc(text) => fts_filters.push(FtsFilter::has_text(
+ Field::Header(HeaderName::Bcc),
+ text,
+ Language::None,
+ )),
+ Filter::Subject(text) => fts_filters.push(FtsFilter::has_text_detect(
+ Field::Header(HeaderName::Subject),
+ text,
+ self.config.default_language,
+ )),
+ Filter::Body(text) => fts_filters.push(FtsFilter::has_text_detect(
+ Field::Body,
+ text,
+ self.config.default_language,
+ )),
+ Filter::Header(header) => {
+ let mut header = header.into_iter();
+ let header_name = header.next().ok_or_else(|| {
+ MethodError::InvalidArguments(
+ "Header name is missing.".to_string(),
+ )
+ })?;
- match HeaderName::parse(&header_name) {
- Some(HeaderName::Other(_)) | None => {
- return Err(MethodError::InvalidArguments(format!(
- "Querying non-RFC header '{header_name}' is not allowed.",
- )));
- }
- Some(header_name) => {
- let is_id = matches!(
+ match HeaderName::parse(header_name) {
+ Some(HeaderName::Other(header_name)) => {
+ return Err(MethodError::InvalidArguments(format!(
+ "Querying header '{header_name}' is not supported.",
+ )));
+ }
+ Some(header_name) => {
+ if let Some(header_value) = header.next() {
+ if matches!(
header_name,
HeaderName::MessageId
| HeaderName::InReplyTo
| HeaderName::References
| HeaderName::ResentMessageId
- );
- let tokens = if let Some(header_value) = header.next() {
- let header_num = header_name.id().to_string();
- header_value
- .split_ascii_whitespace()
- .filter_map(|token| {
- if token.len() < MAX_TOKEN_LENGTH {
- if is_id {
- format!("{header_num}{token}")
- } else {
- format!("{header_num}{}", token.to_lowercase())
- }
- .into()
- } else {
- None
- }
- })
- .collect::<Vec<_>>()
+ ) {
+ fts_filters.push(FtsFilter::has_keyword(
+ Field::Header(header_name),
+ header_value,
+ ));
} else {
- vec![]
- };
- match tokens.len() {
- 0 => {
- filters.push(query::Filter::has_raw_text(
- Property::Headers,
- header_name.id().to_string(),
- ));
- }
- 1 => {
- filters.push(query::Filter::has_raw_text(
- Property::Headers,
- tokens.into_iter().next().unwrap(),
- ));
- }
- _ => {
- filters.push(query::Filter::And);
- for token in tokens {
- filters.push(query::Filter::has_raw_text(
- Property::Headers,
- token,
- ));
- }
- filters.push(query::Filter::End);
- }
+ fts_filters.push(FtsFilter::has_text(
+ Field::Header(header_name),
+ header_value,
+ Language::None,
+ ));
}
+ } else {
+ fts_filters.push(FtsFilter::has_keyword(
+ Field::Keyword,
+ header_name.as_str().to_lowercase(),
+ ));
}
}
+ None => (),
}
- */
- // Non-standard
- Filter::Id(ids) => {
- let mut set = RoaringBitmap::new();
- for id in ids {
- set.insert(id.document_id());
+ }
+ Filter::And | Filter::Or | Filter::Not | Filter::Close => {
+ fts_filters.push(cond.into());
+ }
+ other => return Err(MethodError::UnsupportedFilter(other.to_string())),
+ }
}
- filters.push(query::Filter::is_in_set(set));
- }
- Filter::SentBefore(date) => filters.push(query::Filter::lt(Property::SentAt, date)),
- Filter::SentAfter(date) => filters.push(query::Filter::gt(Property::SentAt, date)),
- Filter::InThread(id) => filters.push(query::Filter::is_in_bitmap(
- Property::ThreadId,
- id.document_id(),
- )),
- Filter::And | Filter::Or | Filter::Not | Filter::Close => {
- filters.push(cond.into());
+ filters.push(query::Filter::is_in_set(
+ self.fts_filter(account_id, Collection::Email, fts_filters)
+ .await?,
+ ));
}
+ FilterGroup::Store(cond) => {
+ match cond {
+ Filter::InMailbox(mailbox) => filters.push(query::Filter::is_in_bitmap(
+ Property::MailboxIds,
+ mailbox.document_id(),
+ )),
+ Filter::InMailboxOtherThan(mailboxes) => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::Or);
+ for mailbox in mailboxes {
+ filters.push(query::Filter::is_in_bitmap(
+ Property::MailboxIds,
+ mailbox.document_id(),
+ ));
+ }
+ filters.push(query::Filter::End);
+ filters.push(query::Filter::End);
+ }
+ Filter::Before(date) => {
+ filters.push(query::Filter::lt(Property::ReceivedAt, date))
+ }
+ Filter::After(date) => {
+ filters.push(query::Filter::gt(Property::ReceivedAt, date))
+ }
+ Filter::MinSize(size) => {
+ filters.push(query::Filter::ge(Property::Size, size))
+ }
+ Filter::MaxSize(size) => {
+ filters.push(query::Filter::lt(Property::Size, size))
+ }
+ Filter::AllInThreadHaveKeyword(keyword) => {
+ filters.push(query::Filter::is_in_set(
+ self.thread_keywords(account_id, keyword, true).await?,
+ ))
+ }
+ Filter::SomeInThreadHaveKeyword(keyword) => {
+ filters.push(query::Filter::is_in_set(
+ self.thread_keywords(account_id, keyword, false).await?,
+ ))
+ }
+ Filter::NoneInThreadHaveKeyword(keyword) => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_set(
+ self.thread_keywords(account_id, keyword, false).await?,
+ ));
+ filters.push(query::Filter::End);
+ }
+ Filter::HasKeyword(keyword) => {
+ filters.push(query::Filter::is_in_bitmap(Property::Keywords, keyword))
+ }
+ Filter::NotKeyword(keyword) => {
+ filters.push(query::Filter::Not);
+ filters.push(query::Filter::is_in_bitmap(Property::Keywords, keyword));
+ filters.push(query::Filter::End);
+ }
+ Filter::HasAttachment(has_attach) => {
+ if !has_attach {
+ filters.push(query::Filter::Not);
+ }
+ filters.push(query::Filter::is_in_bitmap(Property::HasAttachment, ()));
+ if !has_attach {
+ filters.push(query::Filter::End);
+ }
+ }
+
+ // Non-standard
+ Filter::Id(ids) => {
+ let mut set = RoaringBitmap::new();
+ for id in ids {
+ set.insert(id.document_id());
+ }
+ filters.push(query::Filter::is_in_set(set));
+ }
+ Filter::SentBefore(date) => {
+ filters.push(query::Filter::lt(Property::SentAt, date))
+ }
+ Filter::SentAfter(date) => {
+ filters.push(query::Filter::gt(Property::SentAt, date))
+ }
+ Filter::InThread(id) => filters.push(query::Filter::is_in_bitmap(
+ Property::ThreadId,
+ id.document_id(),
+ )),
+ Filter::And | Filter::Or | Filter::Not | Filter::Close => {
+ filters.push(cond.into());
+ }
- other => return Err(MethodError::UnsupportedFilter(other.to_string())),
+ other => return Err(MethodError::UnsupportedFilter(other.to_string())),
+ }
+ }
}
}
diff --git a/crates/jmap/src/email/set.rs b/crates/jmap/src/email/set.rs
index af367159..b0a53637 100644
--- a/crates/jmap/src/email/set.rs
+++ b/crates/jmap/src/email/set.rs
@@ -59,7 +59,9 @@ use store::{
Serialize,
};
-use crate::{auth::AccessToken, Bincode, IngestError, JMAP};
+use crate::{
+ auth::AccessToken, services::housekeeper::Event, Bincode, IngestError, NamedKey, JMAP,
+};
use super::{
headers::{BuildHeader, ValueToHeader},
@@ -1208,6 +1210,16 @@ impl JMAP {
.delete_document(thread_id);
}
+ // Remove message from FTS index
+ batch.set(
+ NamedKey::IndexEmail::<&[u8]> {
+ account_id,
+ document_id,
+ seq: self.generate_snowflake_id()?,
+ },
+ vec![],
+ );
+
// Commit batch
match self.store.write(batch.build()).await {
Ok(_) => (),
@@ -1226,6 +1238,9 @@ impl JMAP {
}
}
+ // Request FTS index
+ let _ = self.housekeeper_tx.send(Event::IndexStart).await;
+
Ok(Ok(changes))
}
}
diff --git a/crates/jmap/src/email/snippet.rs b/crates/jmap/src/email/snippet.rs
index d14fab01..0e5b4ede 100644
--- a/crates/jmap/src/email/snippet.rs
+++ b/crates/jmap/src/email/snippet.rs
@@ -27,15 +27,15 @@ use jmap_proto::{
query::Filter,
search_snippet::{GetSearchSnippetRequest, GetSearchSnippetResponse, SearchSnippet},
},
- types::{acl::Acl, collection::Collection},
+ types::{acl::Acl, collection::Collection, property::Property},
};
-use mail_parser::{decoders::html::html_to_text, MessageParser, PartType};
-use nlp::language::{stemmer::Stemmer, Language};
-use store::BlobHash;
+use mail_parser::{decoders::html::html_to_text, GetHeader, HeaderName, PartType};
+use nlp::language::{search_snippet::generate_snippet, stemmer::Stemmer, Language};
+use store::backend::MAX_TOKEN_LENGTH;
-use crate::{auth::AccessToken, JMAP};
+use crate::{auth::AccessToken, Bincode, JMAP};
-use super::index::MAX_MESSAGE_PARTS;
+use super::metadata::{MessageMetadata, MetadataPartType};
impl JMAP {
pub async fn email_search_snippet(
@@ -45,37 +45,33 @@ impl JMAP {
) -> Result<GetSearchSnippetResponse, MethodError> {
let mut filter_stack = vec![];
let mut include_term = true;
- //let mut terms = vec![];
- let mut match_phrase = false;
+ let mut terms = vec![];
+ let mut is_exact = false;
+ let mut language = self.config.default_language;
for cond in request.filter {
match cond {
Filter::Text(text) | Filter::Subject(text) | Filter::Body(text) => {
- /*if include_term {
- let (text, language) = Language::detect(text, self.config.default_language);
+ if include_term {
+ let (text, language_) =
+ Language::detect(text, self.config.default_language);
+ language = language_;
if (text.starts_with('"') && text.ends_with('"'))
|| (text.starts_with('\'') && text.ends_with('\''))
{
- terms.push(
- language
- .tokenize_text(&text, MAX_TOKEN_LENGTH)
- .map(|token| (token.word.into_owned(), None))
- .collect::<Vec<_>>(),
- );
- match_phrase = true;
+ for token in language.tokenize_text(&text, MAX_TOKEN_LENGTH) {
+ terms.push(token.word.into_owned());
+ }
+ is_exact = true;
} else {
- terms.push(
- Stemmer::new(&text, language, MAX_TOKEN_LENGTH)
- .map(|token| {
- (
- token.word.into_owned(),
- token.stemmed_word.map(|w| w.into_owned()),
- )
- })
- .collect::<Vec<_>>(),
- );
+ for token in Stemmer::new(&text, language, MAX_TOKEN_LENGTH) {
+ terms.push(token.word.into_owned());
+ if let Some(stemmed_word) = token.stemmed_word {
+ terms.push(stemmed_word.into_owned());
+ }
+ }
}
- }*/
+ }
}
Filter::And | Filter::Or => {
filter_stack.push(cond);
@@ -103,150 +99,112 @@ impl JMAP {
not_found: vec![],
};
- if email_ids.len() > self.config.get_max_objects {
+ if email_ids.len() > self.config.snippet_max_results {
return Err(MethodError::RequestTooLarge);
}
- /*
- for email_id in email_ids {
- let document_id = email_id.document_id();
- let mut snippet = SearchSnippet {
- email_id,
- subject: None,
- preview: None,
- };
- if !document_ids.contains(document_id) {
- response.not_found.push(email_id);
- continue;
- } else if terms.is_empty() {
- response.list.push(snippet);
- continue;
- }
+ for email_id in email_ids {
+ let document_id = email_id.document_id();
+ let mut snippet = SearchSnippet {
+ email_id,
+ subject: None,
+ preview: None,
+ };
+ if !document_ids.contains(document_id) {
+ response.not_found.push(email_id);
+ continue;
+ } else if terms.is_empty() {
+ response.list.push(snippet);
+ continue;
+ }
+ let metadata = match self
+ .get_property::<Bincode<MessageMetadata>>(
+ account_id,
+ Collection::Email,
+ document_id,
+ &Property::BodyStructure,
+ )
+ .await?
+ {
+ Some(metadata) => metadata.inner,
+ None => {
+ response.not_found.push(email_id);
+ continue;
+ }
+ };
+
+ // Add subject snippet
+ if let Some(subject) = metadata
+ .contents
+ .root_part()
+ .headers
+ .header_value(&HeaderName::Subject)
+ .and_then(|v| v.as_text())
+ .and_then(|v| generate_snippet(v, &terms, language, is_exact))
+ {
+ snippet.subject = subject.into();
+ }
- // Obtain the term index and raw message
- let (term_index, raw_message) = if let (Some(term_index), Some(raw_message)) = (
- self.get_term_index::<TermIndex>(account_id, Collection::Email, document_id)
- .await?,
- self.get_blob(
- &BlobHash::LinkedMaildir {
- account_id,
- document_id,
- },
- 0..u32::MAX,
- )
- .await?,
- ) {
- (term_index, raw_message)
- } else {
- response.not_found.push(email_id);
- continue;
- };
+ // Check if the snippet can be generated from the preview
+ /*if let Some(body) = generate_snippet(&metadata.preview, &terms) {
+ snippet.preview = body.into();
+ } else {*/
+ // Download message
+ let raw_message =
+ if let Some(raw_message) = self.get_blob(&metadata.blob_hash, 0..u32::MAX).await? {
+ raw_message
+ } else {
+ tracing::warn!(event = "not-found",
+ account_id = account_id,
+ collection = ?Collection::Email,
+ document_id = email_id.document_id(),
+ blob_id = ?metadata.blob_hash,
+ "Blob not found");
+ response.not_found.push(email_id);
+ continue;
+ };
- // Parse message
- let message = if let Some(message) = MessageParser::new().parse(&raw_message) {
- message
- } else {
- response.not_found.push(email_id);
- continue;
- };
+ // Find a matching part
+ 'outer: for part in &metadata.contents.parts {
+ match &part.body {
+ MetadataPartType::Text | MetadataPartType::Html => {
+ let text = match part.decode_contents(&raw_message) {
+ PartType::Text(text) => text,
+ PartType::Html(html) => html_to_text(&html).into(),
+ _ => unreachable!(),
+ };
- // Build the match terms
- let mut match_terms = Vec::new();
- for term in &terms {
- for (word, stemmed_word) in term {
- match_terms.push(term_index.get_match_term(word, stemmed_word.as_deref()));
+ if let Some(body) = generate_snippet(&text, &terms, language, is_exact) {
+ snippet.preview = body.into();
+ break;
}
}
+ MetadataPartType::Message(message) => {
+ for part in &message.parts {
+ if let MetadataPartType::Text | MetadataPartType::Html = part.body {
+ let text = match part.decode_contents(&raw_message) {
+ PartType::Text(text) => text,
+ PartType::Html(html) => html_to_text(&html).into(),
+ _ => unreachable!(),
+ };
- 'outer: for term_group in term_index
- .match_terms(&match_terms, None, match_phrase, true, true)
- .map_err(|err| match err {
- term_index::Error::InvalidArgument => {
- MethodError::UnsupportedFilter("Too many search terms.".to_string())
- }
- err => {
- tracing::error!(
- account_id = account_id,
- document_id = document_id,
- reason = ?err,
- "Failed to generate search snippet.");
- MethodError::UnsupportedFilter(
- "Failed to generate search snippet.".to_string(),
- )
- }
- })?
- .unwrap_or_default()
- {
- if term_group.part_id == 0 {
- // Generate subject snippent
- snippet.subject =
- generate_snippet(&term_group.terms, message.subject().unwrap_or_default());
- } else {
- let mut part_num = 1;
- for part in &message.parts {
- match &part.body {
- PartType::Text(text) => {
- if part_num == term_group.part_id {
- snippet.preview = generate_snippet(&term_group.terms, text);
- break 'outer;
- } else {
- part_num += 1;
- }
- }
- PartType::Html(html) => {
- if part_num == term_group.part_id {
- snippet.preview =
- generate_snippet(&term_group.terms, &html_to_text(html));
- break 'outer;
- } else {
- part_num += 1;
- }
- }
- PartType::Message(message) => {
- if let Some(subject) = message.subject() {
- if part_num == term_group.part_id {
- snippet.preview =
- generate_snippet(&term_group.terms, subject);
- break 'outer;
- } else {
- part_num += 1;
- }
- }
- for sub_part in message.parts.iter().take(MAX_MESSAGE_PARTS) {
- match &sub_part.body {
- PartType::Text(text) => {
- if part_num == term_group.part_id {
- snippet.preview =
- generate_snippet(&term_group.terms, text);
- break 'outer;
- } else {
- part_num += 1;
- }
- }
- PartType::Html(html) => {
- if part_num == term_group.part_id {
- snippet.preview = generate_snippet(
- &term_group.terms,
- &html_to_text(html),
- );
- break 'outer;
- } else {
- part_num += 1;
- }
- }
- _ => (),
- }
- }
- }
- _ => (),
+ if let Some(body) =
+ generate_snippet(&text, &terms, language, is_exact)
+ {
+ snippet.preview = body.into();
+ break 'outer;
}
}
}
}
-
- response.list.push(snippet);
+ _ => (),
}
- */
+ }
+ //}
+
+ response.list.push(snippet);
+ }
+
Ok(response)
}
}
diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs
index f8fe0b21..dfc803e2 100644
--- a/crates/jmap/src/lib.rs
+++ b/crates/jmap/src/lib.rs
@@ -21,7 +21,7 @@
* for more details.
*/
-use std::{collections::hash_map::RandomState, sync::Arc, time::Duration};
+use std::{collections::hash_map::RandomState, fmt::Display, sync::Arc, time::Duration};
use ::sieve::{Compiler, Runtime};
use api::session::BaseCapabilities;
@@ -49,17 +49,23 @@ use services::{
use smtp::core::SMTP;
use store::{
backend::{fs::FsStore, sqlite::SqliteStore},
+ fts::FtsFilter,
parking_lot::Mutex,
query::{sort::Pagination, Comparator, Filter, ResultSet, SortedResultSet},
roaring::RoaringBitmap,
- write::{key::KeySerializer, BatchBuilder, BitmapClass, TagValue, ToBitmaps, ValueClass},
- BitmapKey, BlobStore, Deserialize, Key, Serialize, Store, ValueKey, SUBSPACE_VALUES,
+ write::{
+ key::{DeserializeBigEndian, KeySerializer},
+ BatchBuilder, BitmapClass, TagValue, ToBitmaps, ValueClass,
+ },
+ BitmapKey, BlobStore, Deserialize, FtsStore, Key, Serialize, Store, ValueKey, SUBSPACE_VALUES,
+ U32_LEN, U64_LEN,
};
use tokio::sync::mpsc;
use utils::{
config::Rate,
ipc::DeliveryEvent,
map::ttl_dashmap::{TtlDashMap, TtlMap},
+ snowflake::SnowflakeIdGenerator,
UnwrapFailure,
};
@@ -85,11 +91,13 @@ pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24);
pub struct JMAP {
pub store: Store,
pub blob_store: BlobStore,
+ pub fts_store: FtsStore,
pub config: Config,
pub directory: Arc<dyn Directory>,
pub sessions: TtlDashMap<String, u32>,
pub access_tokens: TtlDashMap<u32, Arc<AccessToken>>,
+ pub snowflake_id: SnowflakeIdGenerator,
pub rate_limit_auth: DashMap<u32, Arc<Mutex<AuthenticatedLimiter>>>,
pub rate_limit_unauth: DashMap<RemoteAddress, Arc<Mutex<AnonymousLimiter>>>,
@@ -108,6 +116,7 @@ pub struct Config {
pub default_language: Language,
pub query_max_results: usize,
pub changes_max_results: usize,
+ pub snippet_max_results: usize,
pub request_max_size: usize,
pub request_max_calls: usize,
@@ -187,6 +196,11 @@ impl JMAP {
.property::<u64>("global.shared-map.shard")?
.unwrap_or(32)
.next_power_of_two() as usize;
+ let store = Store::SQLite(Arc::new(
+ SqliteStore::open(config)
+ .await
+ .failed("Unable to open database"),
+ ));
let jmap_server = Arc::new(JMAP {
directory: directory_config
@@ -197,11 +211,12 @@ impl JMAP {
config.value_require("jmap.directory")?
))
.clone(),
- store: Store::SQLite(Arc::new(
- SqliteStore::open(config)
- .await
- .failed("Unable to open database"),
- )),
+ snowflake_id: config
+ .property::<u64>("global.node-id")?
+ .map(SnowflakeIdGenerator::with_node_id)
+ .unwrap_or_else(SnowflakeIdGenerator::new),
+ fts_store: FtsStore::Store(store.clone()),
+ store,
blob_store: BlobStore::Fs(Arc::new(
FsStore::open(config)
.await
@@ -618,7 +633,28 @@ impl JMAP {
.await
.map_err(|err| {
tracing::error!(event = "error",
- context = "mailbox_set",
+ context = "filter",
+ account_id = account_id,
+ collection = ?collection,
+ error = ?err,
+ "Failed to execute filter.");
+
+ MethodError::ServerPartialFail
+ })
+ }
+
+ pub async fn fts_filter<T: Into<u8> + Display + Clone + std::fmt::Debug>(
+ &self,
+ account_id: u32,
+ collection: Collection,
+ filters: Vec<FtsFilter<T>>,
+ ) -> Result<RoaringBitmap, MethodError> {
+ self.fts_store
+ .query(account_id, collection, filters)
+ .await
+ .map_err(|err| {
+ tracing::error!(event = "error",
+ context = "fts-filter",
account_id = account_id,
collection = ?collection,
error = ?err,
@@ -805,6 +841,11 @@ pub enum NamedKey<T: AsRef<[u8]>> {
Name(T),
Id(u32),
Quota(u32),
+ IndexEmail {
+ account_id: u32,
+ document_id: u32,
+ seq: u64,
+ },
}
impl<T: AsRef<[u8]>> From<&NamedKey<T>> for ValueClass {
@@ -817,21 +858,44 @@ impl<T: AsRef<[u8]>> From<&NamedKey<T>> for ValueClass {
.finalize(),
),
NamedKey::Id(id) => ValueClass::Named(
- KeySerializer::new(std::mem::size_of::<u32>())
+ KeySerializer::new(std::mem::size_of::<u32>() + 1)
.write(1u8)
.write_leb128(*id)
.finalize(),
),
NamedKey::Quota(id) => ValueClass::Named(
- KeySerializer::new(std::mem::size_of::<u32>())
+ KeySerializer::new(std::mem::size_of::<u32>() + 1)
.write(2u8)
.write_leb128(*id)
.finalize(),
),
+ NamedKey::IndexEmail {
+ account_id,
+ document_id,
+ seq,
+ } => ValueClass::Named(
+ KeySerializer::new(std::mem::size_of::<u32>() * 4 + 1)
+ .write(3u8)
+ .write(*seq)
+ .write(*account_id)
+ .write(*document_id)
+ .finalize(),
+ ),
}
}
}
+impl<T: AsRef<[u8]>> NamedKey<T> {
+ pub fn deserialize_index_email(bytes: &[u8]) -> store::Result<Self> {
+ let len = bytes.len();
+ Ok(NamedKey::IndexEmail {
+ seq: bytes.deserialize_be_u64(len - U64_LEN - (U32_LEN * 2))?,
+ account_id: bytes.deserialize_be_u32(len - U32_LEN * 2)?,
+ document_id: bytes.deserialize_be_u32(len - U32_LEN)?,
+ })
+ }
+}
+
impl<T: AsRef<[u8]>> From<NamedKey<T>> for ValueClass {
fn from(key: NamedKey<T>) -> Self {
(&key).into()
diff --git a/crates/jmap/src/services/housekeeper.rs b/crates/jmap/src/services/housekeeper.rs
index 8d5504dd..e173d233 100644
--- a/crates/jmap/src/services/housekeeper.rs
+++ b/crates/jmap/src/services/housekeeper.rs
@@ -36,43 +36,73 @@ use super::IPC_CHANNEL_BUFFER;
pub enum Event {
PurgeDb,
- PurgeBlobs,
PurgeSessions,
+ IndexStart,
+ IndexDone,
+ #[cfg(feature = "test_mode")]
+ IndexIsActive(tokio::sync::oneshot::Sender<bool>),
Exit,
}
const TASK_PURGE_DB: usize = 0;
-const TASK_PURGE_BLOBS: usize = 1;
-const TASK_PURGE_SESSIONS: usize = 2;
+const TASK_PURGE_SESSIONS: usize = 1;
pub fn spawn_housekeeper(core: Arc<JMAP>, settings: &Config, mut rx: mpsc::Receiver<Event>) {
let purge_db_at = settings
.property_or_static::<SimpleCron>("jmap.purge.schedule.db", "0 3 *")
.failed("Initialize housekeeper");
- let purge_blobs_at = settings
- .property_or_static::<SimpleCron>("jmap.purge.schedule.blobs", "30 3 *")
- .failed("Initialize housekeeper");
let purge_cache = settings
.property_or_static::<SimpleCron>("jmap.purge.schedule.sessions", "15 * *")
.failed("Initialize housekeeper");
tokio::spawn(async move {
tracing::debug!("Housekeeper task started.");
+
+ let mut index_busy = true;
+ let mut index_pending = false;
+
+ // Index any queued messages
+ let core_ = core.clone();
+ tokio::spawn(async move {
+ core_.fts_index_queued().await;
+ });
+
loop {
- let time_to_next = [
- purge_db_at.time_to_next(),
- purge_blobs_at.time_to_next(),
- purge_cache.time_to_next(),
- ];
- let mut tasks_to_run = [false, false, false];
+ let time_to_next = [purge_db_at.time_to_next(), purge_cache.time_to_next()];
+ let mut tasks_to_run = [false, false];
let start_time = Instant::now();
match tokio::time::timeout(time_to_next.iter().min().copied().unwrap(), rx.recv()).await
{
Ok(Some(event)) => match event {
Event::PurgeDb => tasks_to_run[TASK_PURGE_DB] = true,
- Event::PurgeBlobs => tasks_to_run[TASK_PURGE_BLOBS] = true,
Event::PurgeSessions => tasks_to_run[TASK_PURGE_SESSIONS] = true,
+ Event::IndexStart => {
+ if !index_busy {
+ index_busy = true;
+ let core = core.clone();
+ tokio::spawn(async move {
+ core.fts_index_queued().await;
+ });
+ } else {
+ index_pending = true;
+ }
+ }
+ Event::IndexDone => {
+ if index_pending {
+ index_pending = false;
+ let core = core.clone();
+ tokio::spawn(async move {
+ core.fts_index_queued().await;
+ });
+ } else {
+ index_busy = false;
+ }
+ }
+ #[cfg(feature = "test_mode")]
+ Event::IndexIsActive(tx) => {
+ tx.send(index_busy).ok();
+ }
Event::Exit => {
tracing::debug!("Housekeeper task exiting.");
return;
@@ -104,13 +134,12 @@ pub fn spawn_housekeeper(core: Arc<JMAP>, settings: &Config, mut rx: mpsc::Recei
tokio::spawn(async move {
match task_id {
TASK_PURGE_DB => {
- tracing::info!("Purging database.");
+ tracing::info!("Purging database...");
if let Err(err) = core.store.purge_bitmaps().await {
tracing::error!("Error while purging bitmaps: {}", err);
}
- }
- TASK_PURGE_BLOBS => {
- tracing::info!("Purging temporary blobs.",);
+
+ tracing::info!("Purging blobs...",);
if let Err(err) =
core.store.blob_hash_purge(core.blob_store.clone()).await
{
diff --git a/crates/jmap/src/services/index.rs b/crates/jmap/src/services/index.rs
new file mode 100644
index 00000000..1549c993
--- /dev/null
+++ b/crates/jmap/src/services/index.rs
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use jmap_proto::types::{collection::Collection, property::Property};
+use store::{
+ fts::index::FtsDocument,
+ write::{BatchBuilder, ValueClass},
+ IterateParams, ValueKey,
+};
+
+use crate::{
+ email::{index::IndexMessageText, metadata::MessageMetadata},
+ Bincode, NamedKey, JMAP,
+};
+
+use super::housekeeper::Event;
+
+impl JMAP {
+ pub async fn fts_index_queued(&self) {
+ let from_key = ValueKey::<ValueClass> {
+ account_id: 0,
+ collection: 0,
+ document_id: 0,
+ class: NamedKey::IndexEmail::<&[u8]> {
+ account_id: 0,
+ document_id: 0,
+ seq: 0,
+ }
+ .into(),
+ };
+ let to_key = ValueKey::<ValueClass> {
+ account_id: u32::MAX,
+ collection: u8::MAX,
+ document_id: u32::MAX,
+ class: NamedKey::IndexEmail::<&[u8]> {
+ account_id: u32::MAX,
+ document_id: u32::MAX,
+ seq: u64::MAX,
+ }
+ .into(),
+ };
+
+ // Retrieve entries pending to be indexed
+ // TODO: Support indexing from multiple nodes
+ let mut entries = Vec::new();
+ let _ = self
+ .store
+ .iterate(
+ IterateParams::new(from_key, to_key).ascending(),
+ |key, value| {
+ entries.push((
+ NamedKey::<Vec<u8>>::deserialize_index_email(key)?,
+ value.to_vec(),
+ ));
+ Ok(true)
+ },
+ )
+ .await
+ .map_err(|err| {
+ tracing::error!(
+ context = "fts_index_queued",
+ event = "error",
+ reason = ?err,
+ "Failed to iterate over index emails"
+ );
+ });
+
+ // Index entries
+ for (key, blob_hash) in entries {
+ if let NamedKey::IndexEmail {
+ account_id,
+ document_id,
+ ..
+ } = &key
+ {
+ if !blob_hash.is_empty() {
+ match self
+ .get_property::<Bincode<MessageMetadata>>(
+ *account_id,
+ Collection::Email,
+ *document_id,
+ Property::BodyStructure,
+ )
+ .await
+ {
+ Ok(Some(metadata))
+ if metadata.inner.blob_hash.as_slice() == blob_hash.as_slice() =>
+ {
+ // Obtain raw message
+ let raw_message = if let Ok(Some(raw_message)) =
+ self.get_blob(&metadata.inner.blob_hash, 0..u32::MAX).await
+ {
+ raw_message
+ } else {
+ tracing::warn!(
+ context = "fts_index_queued",
+ event = "error",
+ account_id = *account_id,
+ document_id = *document_id,
+ blob_hash = ?metadata.inner.blob_hash,
+ "Message blob not found"
+ );
+ continue;
+ };
+ let message = metadata.inner.contents.into_message(&raw_message);
+
+ // Index message
+ let document =
+ FtsDocument::with_default_language(self.config.default_language)
+ .with_account_id(*account_id)
+ .with_collection(Collection::Email)
+ .with_document_id(*document_id)
+ .index_message(&message);
+ if let Err(err) = self.fts_store.index(document).await {
+ tracing::error!(
+ context = "fts_index_queued",
+ event = "error",
+ account_id = *account_id,
+ document_id = *document_id,
+ reason = ?err,
+ "Failed to index email in FTS index"
+ );
+ continue;
+ }
+
+ tracing::debug!(
+ context = "fts_index_queued",
+ event = "index",
+ account_id = *account_id,
+ document_id = *document_id,
+ "Indexed document in FTS index"
+ );
+ }
+
+ Err(err) => {
+ tracing::error!(
+ context = "fts_index_queued",
+ event = "error",
+ account_id = *account_id,
+ document_id = *document_id,
+ reason = ?err,
+ "Failed to retrieve email metadata"
+ );
+ break;
+ }
+ _ => {
+ // The message was probably deleted or overwritten
+ tracing::debug!(
+ context = "fts_index_queued",
+ event = "error",
+ account_id = *account_id,
+ document_id = *document_id,
+ "Email metadata not found"
+ );
+ }
+ }
+ } else {
+ if let Err(err) = self
+ .fts_store
+ .remove(*account_id, Collection::Email.into(), *document_id)
+ .await
+ {
+ tracing::error!(
+ context = "fts_index_queued",
+ event = "error",
+ account_id = *account_id,
+ document_id = *document_id,
+ reason = ?err,
+ "Failed to remove document from FTS index"
+ );
+ continue;
+ }
+
+ tracing::debug!(
+ context = "fts_index_queued",
+ event = "delete",
+ account_id = *account_id,
+ document_id = *document_id,
+ "Deleted document from FTS index"
+ );
+ }
+ }
+
+ // Remove entry from queue
+ if let Err(err) = self
+ .store
+ .write(BatchBuilder::new().clear(key).build_batch())
+ .await
+ {
+ tracing::error!(
+ context = "fts_index_queued",
+ event = "error",
+ reason = ?err,
+ "Failed to remove index email from queue"
+ );
+ break;
+ }
+ }
+
+ if let Err(err) = self.housekeeper_tx.send(Event::IndexDone).await {
+ tracing::warn!("Failed to send index done event to housekeeper: {}", err);
+ }
+ }
+}
diff --git a/crates/jmap/src/services/mod.rs b/crates/jmap/src/services/mod.rs
index 899d1397..405cfc47 100644
--- a/crates/jmap/src/services/mod.rs
+++ b/crates/jmap/src/services/mod.rs
@@ -23,6 +23,7 @@
pub mod delivery;
pub mod housekeeper;
+pub mod index;
pub mod ingest;
pub mod state;
diff --git a/crates/nlp/src/language/mod.rs b/crates/nlp/src/language/mod.rs
index 50e1a38f..6b578d50 100644
--- a/crates/nlp/src/language/mod.rs
+++ b/crates/nlp/src/language/mod.rs
@@ -22,6 +22,7 @@
*/
pub mod detect;
+pub mod search_snippet;
pub mod stemmer;
pub mod stopwords;
diff --git a/crates/store/src/fts/search_snippet.rs b/crates/nlp/src/language/search_snippet.rs
index 55d6b6b7..9c428fef 100644
--- a/crates/store/src/fts/search_snippet.rs
+++ b/crates/nlp/src/language/search_snippet.rs
@@ -21,7 +21,7 @@
* for more details.
*/
-use super::term_index::Term;
+use super::Language;
fn escape_char(c: char, string: &mut String) {
match c {
@@ -45,9 +45,53 @@ fn escape_char_len(c: char) -> usize {
}
}
-pub fn generate_snippet(terms: &[Term], text: &str) -> Option<String> {
+pub struct Term {
+ offset: usize,
+ len: usize,
+}
+
+pub fn generate_snippet(
+ text: &str,
+ needles: &[impl AsRef<str>],
+ language: Language,
+ is_exact: bool,
+) -> Option<String> {
+ let mut terms = Vec::new();
+ if is_exact {
+ let tokens = language.tokenize_text(text, 200).collect::<Vec<_>>();
+ for tokens in tokens.windows(needles.len()) {
+ if needles
+ .iter()
+ .zip(tokens)
+ .all(|(needle, token)| needle.as_ref() == token.word.as_ref())
+ {
+ for token in tokens {
+ terms.push(Term {
+ offset: token.from,
+ len: token.to - token.from,
+ });
+ }
+ }
+ }
+ } else {
+ for token in language.tokenize_text(text, 200) {
+ if needles.iter().any(|needle| {
+ let needle = needle.as_ref();
+ needle == token.word.as_ref() || needle.len() > 2 && token.word.contains(needle)
+ }) {
+ terms.push(Term {
+ offset: token.from,
+ len: token.to - token.from,
+ });
+ }
+ }
+ }
+ if terms.is_empty() {
+ return None;
+ }
+
let mut snippet = String::with_capacity(text.len());
- let start_offset = terms.get(0)?.offset as usize;
+ let start_offset = terms.get(0)?.offset;
if start_offset > 0 {
let mut word_count = 0;
@@ -92,25 +136,22 @@ pub fn generate_snippet(terms: &[Term], text: &str) -> Option<String> {
let mut terms = terms.iter().peekable();
'outer: while let Some(term) = terms.next() {
- if snippet.len() + ("<mark>".len() * 2) + term.len as usize + 1 > 255 {
+ if snippet.len() + ("<mark>".len() * 2) + term.len + 1 > 255 {
break;
}
snippet.push_str("<mark>");
- snippet.push_str(text.get(term.offset as usize..term.offset as usize + term.len as usize)?);
+ snippet.push_str(text.get(term.offset..term.offset + term.len)?);
snippet.push_str("</mark>");
let next_offset = if let Some(next_term) = terms.peek() {
- next_term.offset as usize
+ next_term.offset
} else {
text.len()
};
let mut last_is_space = false;
- for char in text
- .get(term.offset as usize + term.len as usize..next_offset)?
- .chars()
- {
+ for char in text.get(term.offset + term.len..next_offset)?.chars() {
if !char.is_whitespace() {
last_is_space = false;
} else {
@@ -133,15 +174,7 @@ pub fn generate_snippet(terms: &[Term], text: &str) -> Option<String> {
#[cfg(test)]
mod tests {
-
- use nlp::language::Language;
-
- use crate::{
- fts::term_index::{TermIndex, TermIndexBuilder},
- Deserialize, Serialize,
- };
-
- use super::*;
+ use crate::language::{search_snippet::generate_snippet, Language};
#[test]
fn search_snippets() {
@@ -236,39 +269,18 @@ mod tests {
];
for (parts, tests) in inputs {
- let mut builder = TermIndexBuilder::new();
-
- for (field_num, part) in parts.iter().enumerate() {
- let mut terms = Vec::new();
- for token in Language::English.tokenize_text(part, 40) {
- terms.push(builder.add_token(token));
- }
- builder.add_terms(field_num as u8, 0, terms);
- }
+ for (needles, snippets) in tests {
+ let mut results = Vec::new();
- let compressed_term_index = builder.serialize();
- let term_index = TermIndex::deserialize(&compressed_term_index[..]).unwrap();
-
- for (match_words, snippets) in tests {
- let mut match_terms = Vec::new();
- for word in &match_words {
- match_terms.push(term_index.get_match_term(word, None));
+ for part in &parts {
+ if let Some(matched) =
+ generate_snippet(part, &needles, Language::English, false)
+ {
+ results.push(matched);
+ }
}
- let term_groups = term_index
- .match_terms(&match_terms, None, false, true, true)
- .unwrap()
- .unwrap();
-
- assert_eq!(term_groups.len(), snippets.len());
-
- for (term_group, snippet) in term_groups.iter().zip(snippets.iter()) {
- assert_eq!(
- snippet,
- &generate_snippet(&term_group.terms, parts[term_group.field_id as usize])
- .unwrap()
- );
- }
+ assert_eq!(snippets, results);
}
}
}
diff --git a/crates/nlp/src/language/stemmer.rs b/crates/nlp/src/language/stemmer.rs
index 9dfc30b9..6732bb56 100644
--- a/crates/nlp/src/language/stemmer.rs
+++ b/crates/nlp/src/language/stemmer.rs
@@ -141,6 +141,7 @@ pub static STEMMER_MAP: &[Option<Algorithm>] = &[
None, // Tagalog = 67,
None, // Armenian = 68,
None, // Unknown = 69,
+ None, // None = 70,
];
#[cfg(test)]
diff --git a/crates/nlp/src/language/stopwords.rs b/crates/nlp/src/language/stopwords.rs
index 3c661ae3..a07e180d 100644
--- a/crates/nlp/src/language/stopwords.rs
+++ b/crates/nlp/src/language/stopwords.rs
@@ -93,6 +93,7 @@ pub static STOP_WORDS: &[Option<&Set<&'static str>>] = &[
None, // Tagalog = 67,
None, // Armenian = 68,
None, // Unknown = 69,
+ None, // None = 70,
];
static ARABIC: Set<&'static str> = phf_set! {
diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml
index 06ec094c..d67c8df3 100644
--- a/crates/store/Cargo.toml
+++ b/crates/store/Cargo.toml
@@ -30,6 +30,7 @@ num_cpus = { version = "1.15.0", optional = true }
blake3 = "1.3.3"
tracing = "0.1"
async-trait = "0.1.68"
+lz4_flex = { version = "0.11" }
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
diff --git a/crates/store/src/backend/foundationdb/blob.rs b/crates/store/src/backend/foundationdb/blob.rs
new file mode 100644
index 00000000..f416a373
--- /dev/null
+++ b/crates/store/src/backend/foundationdb/blob.rs
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of the Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use std::ops::Range;
+
+use super::FdbStore;
+
+impl FdbStore {
+ pub(crate) async fn get_blob(
+ &self,
+ key: &[u8],
+ range: Range<u32>,
+ ) -> crate::Result<Option<Vec<u8>>> {
+ todo!()
+ }
+
+ pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
+ todo!()
+ }
+
+ pub(crate) async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> {
+ todo!()
+ }
+}
diff --git a/crates/store/src/backend/foundationdb/id_assign.rs b/crates/store/src/backend/foundationdb/id_assign.rs
index 9a12886f..8e469efd 100644
--- a/crates/store/src/backend/foundationdb/id_assign.rs
+++ b/crates/store/src/backend/foundationdb/id_assign.rs
@@ -28,10 +28,7 @@ use futures::StreamExt;
use rand::Rng;
use std::time::Instant;
-use crate::{
- write::{key::KeySerializer, now},
- BitmapKey, IndexKey, SUBSPACE_VALUES,
-};
+use crate::{write::now, BitmapKey, IndexKey};
use super::{
bitmap::{next_available_index, BITS_PER_BLOCK},
@@ -183,36 +180,4 @@ impl FdbStore {
}
}
}
-
- pub(crate) async fn assign_change_id(&self, account_id: u32) -> crate::Result<u64> {
- let start = Instant::now();
- let counter = KeySerializer::new(U32_LEN + 2)
- .write(SUBSPACE_VALUES)
- .write(account_id)
- .finalize();
-
- loop {
- // Read id
- let trx = self.db.create_trx()?;
- let id = if let Some(bytes) = trx.get(&counter, false).await? {
- u64::deserialize(&bytes)? + 1
- } else {
- 0
- };
- trx.set(&counter, &id.serialize());
-
- match trx.commit().await {
- Ok(_) => {
- return Ok(id);
- }
- Err(err) => {
- if start.elapsed() < MAX_COMMIT_TIME {
- err.on_error().await?;
- } else {
- return Err(FdbError::from(err).into());
- }
- }
- }
- }
- }
}
diff --git a/crates/store/src/backend/foundationdb/mod.rs b/crates/store/src/backend/foundationdb/mod.rs
index aa051600..48236d3d 100644
--- a/crates/store/src/backend/foundationdb/mod.rs
+++ b/crates/store/src/backend/foundationdb/mod.rs
@@ -26,6 +26,7 @@ use foundationdb::{api::NetworkAutoStop, Database, FdbError};
use crate::Error;
pub mod bitmap;
+pub mod blob;
pub mod id_assign;
pub mod main;
pub mod purge;
diff --git a/crates/store/src/backend/foundationdb/read.rs b/crates/store/src/backend/foundationdb/read.rs
index ab54cd59..3f0e72ea 100644
--- a/crates/store/src/backend/foundationdb/read.rs
+++ b/crates/store/src/backend/foundationdb/read.rs
@@ -95,7 +95,7 @@ impl FdbStore {
account_id: u32,
collection: u8,
field: u8,
- value: Vec<u8>,
+ value: &[u8],
op: query::Operator,
) -> crate::Result<Option<RoaringBitmap>> {
let k1 =
@@ -116,27 +116,23 @@ impl FdbStore {
let (begin, end) = match op {
Operator::LowerThan => (
KeySelector::first_greater_or_equal(k1.finalize()),
- KeySelector::first_greater_or_equal(k2.write(&value[..]).write(0u32).finalize()),
+ KeySelector::first_greater_or_equal(k2.write(value).write(0u32).finalize()),
),
Operator::LowerEqualThan => (
KeySelector::first_greater_or_equal(k1.finalize()),
- KeySelector::first_greater_or_equal(
- k2.write(&value[..]).write(u32::MAX).finalize(),
- ),
+ KeySelector::first_greater_or_equal(k2.write(value).write(u32::MAX).finalize()),
),
Operator::GreaterThan => (
- KeySelector::first_greater_than(k1.write(&value[..]).write(u32::MAX).finalize()),
+ KeySelector::first_greater_than(k1.write(value).write(u32::MAX).finalize()),
KeySelector::first_greater_or_equal(k2.finalize()),
),
Operator::GreaterEqualThan => (
- KeySelector::first_greater_or_equal(k1.write(&value[..]).write(0u32).finalize()),
+ KeySelector::first_greater_or_equal(k1.write(value).write(0u32).finalize()),
KeySelector::first_greater_or_equal(k2.finalize()),
),
Operator::Equal => (
- KeySelector::first_greater_or_equal(k1.write(&value[..]).write(0u32).finalize()),
- KeySelector::first_greater_or_equal(
- k2.write(&value[..]).write(u32::MAX).finalize(),
- ),
+ KeySelector::first_greater_or_equal(k1.write(value).write(0u32).finalize()),
+ KeySelector::first_greater_or_equal(k2.write(value).write(u32::MAX).finalize()),
),
};
let key_len = begin.key().len();
diff --git a/crates/store/src/backend/fs/mod.rs b/crates/store/src/backend/fs/mod.rs
index 5c633a0c..fca25bbb 100644
--- a/crates/store/src/backend/fs/mod.rs
+++ b/crates/store/src/backend/fs/mod.rs
@@ -52,9 +52,7 @@ impl FsStore {
)))
}
}
-}
-impl FsStore {
pub(crate) async fn get_blob(
&self,
key: &[u8],
@@ -113,9 +111,7 @@ impl FsStore {
Ok(false)
}
}
-}
-impl FsStore {
fn build_path(&self, key: &[u8]) -> PathBuf {
let mut path = self.path.clone();
diff --git a/crates/store/src/backend/mod.rs b/crates/store/src/backend/mod.rs
index 050c5751..9cd8cf94 100644
--- a/crates/store/src/backend/mod.rs
+++ b/crates/store/src/backend/mod.rs
@@ -30,8 +30,8 @@ pub mod s3;
#[cfg(feature = "sqlite")]
pub mod sqlite;
-pub(crate) const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 2) as usize;
-pub(crate) const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;
+pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize;
+pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;
#[cfg(feature = "test_mode")]
pub static ID_ASSIGNMENT_EXPIRY: std::sync::atomic::AtomicU64 =
diff --git a/crates/store/src/backend/rocksdb/mod.rs b/crates/store/src/backend/rocksdb/mod.rs
index 7d6b8a9f..2750e7e3 100644
--- a/crates/store/src/backend/rocksdb/mod.rs
+++ b/crates/store/src/backend/rocksdb/mod.rs
@@ -140,3 +140,8 @@ impl From<rocksdb::Error> for crate::Error {
Self::InternalError(format!("RocksDB error: {}", value))
}
}
+
+#[cfg(feature = "rocks")]
+pub struct Store {
+ db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded>,
+}
diff --git a/crates/store/src/backend/sqlite/blob.rs b/crates/store/src/backend/sqlite/blob.rs
new file mode 100644
index 00000000..a098b075
--- /dev/null
+++ b/crates/store/src/backend/sqlite/blob.rs
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of the Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use std::ops::Range;
+
+use rusqlite::OptionalExtension;
+
+use super::SqliteStore;
+
+impl SqliteStore {
+ pub(crate) async fn get_blob(
+ &self,
+ key: &[u8],
+ range: Range<u32>,
+ ) -> crate::Result<Option<Vec<u8>>> {
+ let conn = self.conn_pool.get()?;
+ self.spawn_worker(move || {
+ let mut result = conn.prepare_cached("SELECT v FROM t WHERE k = ?")?;
+ result
+ .query_row([&key], |row| {
+ Ok({
+ let bytes = row.get_ref(0)?.as_bytes()?;
+ if range.start == 0 && range.end == u32::MAX {
+ bytes.to_vec()
+ } else {
+ bytes
+ .get(
+ range.start as usize
+ ..std::cmp::min(bytes.len(), range.end as usize),
+ )
+ .unwrap_or_default()
+ .to_vec()
+ }
+ })
+ })
+ .optional()
+ .map_err(Into::into)
+ })
+ .await
+ }
+
+ pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
+ let conn = self.conn_pool.get()?;
+ self.spawn_worker(move || {
+ conn.prepare_cached("INSERT OR REPLACE INTO t (k, v) VALUES (?, ?)")?
+ .execute([key, data])
+ .map_err(|e| crate::Error::InternalError(format!("Failed to insert blob: {}", e)))
+ .map(|_| ())
+ })
+ .await
+ }
+
+ pub(crate) async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> {
+ let conn = self.conn_pool.get()?;
+ self.spawn_worker(move || {
+ conn.prepare_cached("DELETE FROM t WHERE k = ?")?
+ .execute([key])
+ .map_err(|e| crate::Error::InternalError(format!("Failed to delete blob: {}", e)))
+ .map(|_| true)
+ })
+ .await
+ }
+}
diff --git a/crates/store/src/backend/sqlite/id_assign.rs b/crates/store/src/backend/sqlite/id_assign.rs
index a38378f9..037b5645 100644
--- a/crates/store/src/backend/sqlite/id_assign.rs
+++ b/crates/store/src/backend/sqlite/id_assign.rs
@@ -23,7 +23,7 @@
use roaring::RoaringBitmap;
-use crate::{write::key::DeserializeBigEndian, BitmapKey, IterateParams, LogKey, U64_LEN};
+use crate::BitmapKey;
use super::SqliteStore;
@@ -46,15 +46,13 @@ impl IdCacheKey {
pub struct IdAssigner {
pub freed_document_ids: Option<RoaringBitmap>,
pub next_document_id: u32,
- pub next_change_id: u64,
}
impl IdAssigner {
- pub fn new(used_ids: Option<RoaringBitmap>, next_change_id: u64) -> Self {
+ pub fn new(used_ids: Option<RoaringBitmap>) -> Self {
let mut assigner = IdAssigner {
freed_document_ids: None,
next_document_id: 0,
- next_change_id,
};
if let Some(used_ids) = used_ids {
if let Some(max) = used_ids.max() {
@@ -85,28 +83,9 @@ impl IdAssigner {
id
}
}
-
- pub fn assign_change_id(&mut self) -> u64 {
- let id = self.next_change_id;
- self.next_change_id += 1;
- id
- }
}
impl SqliteStore {
- pub(crate) async fn assign_change_id(&self, account_id: u32) -> crate::Result<u64> {
- let collection = u8::MAX;
- let key = IdCacheKey::new(account_id, collection);
- for _ in 0..2 {
- if let Some(assigner) = self.id_assigner.lock().get_mut(&key) {
- return Ok(assigner.assign_change_id());
- }
- self.build_id_assigner(key).await?;
- }
-
- unreachable!()
- }
-
pub(crate) async fn assign_document_id(
&self,
account_id: u32,
@@ -128,56 +107,16 @@ impl SqliteStore {
let used_ids = self
.get_bitmap(BitmapKey::document_ids(key.account_id, key.collection))
.await?;
- let next_change_id = self
- .get_last_change_id(key.account_id, key.collection)
- .await?
- .map(|id| id + 1)
- .unwrap_or(0);
let id_assigner = self.id_assigner.clone();
let mut id_assigner = id_assigner.lock();
// Make sure id assigner was not added by another thread
if id_assigner.get_mut(&key).is_none() {
- id_assigner.insert(key, IdAssigner::new(used_ids, next_change_id));
+ id_assigner.insert(key, IdAssigner::new(used_ids));
}
Ok(())
}
-
- async fn get_last_change_id(
- &self,
- account_id: u32,
- collection: impl Into<u8> + Sync + Send,
- ) -> crate::Result<Option<u64>> {
- let collection = collection.into();
-
- let from_key = LogKey {
- account_id,
- collection,
- change_id: u64::MAX,
- };
- let to_key = LogKey {
- account_id,
- collection,
- change_id: 0,
- };
-
- let mut last_change_id = None;
-
- self.iterate(
- IterateParams::new(from_key, to_key)
- .descending()
- .no_values()
- .only_first(),
- |key, _| {
- last_change_id = key.deserialize_be_u64(key.len() - U64_LEN)?.into();
- Ok(false)
- },
- )
- .await?;
-
- Ok(last_change_id)
- }
}
#[cfg(test)]
@@ -188,7 +127,7 @@ mod tests {
#[test]
fn id_assigner() {
- let mut assigner = IdAssigner::new(None, 0);
+ let mut assigner = IdAssigner::new(None);
assert_eq!(assigner.assign_document_id(), 0);
assert_eq!(assigner.assign_document_id(), 1);
assert_eq!(assigner.assign_document_id(), 2);
@@ -197,7 +136,6 @@ mod tests {
RoaringBitmap::from_sorted_iter([0, 2, 4, 6])
.unwrap()
.into(),
- 0,
);
assert_eq!(assigner.assign_document_id(), 1);
assert_eq!(assigner.assign_document_id(), 3);
diff --git a/crates/store/src/backend/sqlite/main.rs b/crates/store/src/backend/sqlite/main.rs
index 8b9d21a5..b308a408 100644
--- a/crates/store/src/backend/sqlite/main.rs
+++ b/crates/store/src/backend/sqlite/main.rs
@@ -30,8 +30,8 @@ use tokio::sync::oneshot;
use utils::{config::Config, UnwrapFailure};
use crate::{
- SUBSPACE_ACLS, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES,
- SUBSPACE_LOGS, SUBSPACE_VALUES,
+ SUBSPACE_ACLS, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS,
+ SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES,
};
use super::{pool::SqliteConnectionManager, SqliteStore};
@@ -78,7 +78,12 @@ impl SqliteStore {
pub(super) fn create_tables(&self) -> crate::Result<()> {
let conn = self.conn_pool.get()?;
- for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_ACLS] {
+ for table in [
+ SUBSPACE_VALUES,
+ SUBSPACE_LOGS,
+ SUBSPACE_ACLS,
+ SUBSPACE_BLOB_DATA,
+ ] {
let table = char::from(table);
conn.execute(
&format!(
diff --git a/crates/store/src/backend/sqlite/mod.rs b/crates/store/src/backend/sqlite/mod.rs
index 216ca473..f11f3ab0 100644
--- a/crates/store/src/backend/sqlite/mod.rs
+++ b/crates/store/src/backend/sqlite/mod.rs
@@ -34,6 +34,7 @@ use self::{
pool::SqliteConnectionManager,
};
+pub mod blob;
pub mod id_assign;
pub mod main;
pub mod pool;
diff --git a/crates/store/src/backend/sqlite/purge.rs b/crates/store/src/backend/sqlite/purge.rs
index 61e686ae..04e8c131 100644
--- a/crates/store/src/backend/sqlite/purge.rs
+++ b/crates/store/src/backend/sqlite/purge.rs
@@ -32,7 +32,6 @@ impl SqliteStore {
pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> {
let conn = self.conn_pool.get()?;
self.spawn_worker(move || {
- //Todo
conn.prepare_cached(concat!(
"DELETE FROM b WHERE ",
"a = 0 AND ",
diff --git a/crates/store/src/backend/sqlite/read.rs b/crates/store/src/backend/sqlite/read.rs
index a9bfd3cf..8614d03c 100644
--- a/crates/store/src/backend/sqlite/read.rs
+++ b/crates/store/src/backend/sqlite/read.rs
@@ -110,7 +110,7 @@ impl SqliteStore {
account_id: u32,
collection: u8,
field: u8,
- value: Vec<u8>,
+ value: &[u8],
op: query::Operator,
) -> crate::Result<Option<RoaringBitmap>> {
let conn = self.conn_pool.get()?;
@@ -132,27 +132,27 @@ impl SqliteStore {
Operator::LowerThan => (
("SELECT k FROM i WHERE k >= ? AND k < ?"),
(k1.finalize()),
- (k2.write(&value[..]).write(0u32).finalize()),
+ (k2.write(value).write(0u32).finalize()),
),
Operator::LowerEqualThan => (
("SELECT k FROM i WHERE k >= ? AND k <= ?"),
(k1.finalize()),
- (k2.write(&value[..]).write(u32::MAX).finalize()),
+ (k2.write(value).write(u32::MAX).finalize()),
),
Operator::GreaterThan => (
("SELECT k FROM i WHERE k > ? AND k <= ?"),
- (k1.write(&value[..]).write(u32::MAX).finalize()),
+ (k1.write(value).write(u32::MAX).finalize()),
(k2.finalize()),
),
Operator::GreaterEqualThan => (
("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.write(&value[..]).write(0u32).finalize()),
+ (k1.write(value).write(0u32).finalize()),
(k2.finalize()),
),
Operator::Equal => (
("SELECT k FROM i WHERE k >= ? AND k <= ?"),
- (k1.write(&value[..]).write(0u32).finalize()),
- (k2.write(&value[..]).write(u32::MAX).finalize()),
+ (k1.write(value).write(0u32).finalize()),
+ (k2.write(value).write(u32::MAX).finalize()),
),
};
@@ -314,7 +314,7 @@ impl SqliteStore {
// Values
let mut has_errors = false;
- for table in [crate::SUBSPACE_VALUES, crate::SUBSPACE_ACLS, crate::SUBSPACE_COUNTERS] {
+ for table in [crate::SUBSPACE_VALUES, crate::SUBSPACE_ACLS, crate::SUBSPACE_COUNTERS, crate::SUBSPACE_BLOB_DATA] {
let table = char::from(table);
let mut query = conn.prepare_cached(&format!("SELECT k, v FROM {table}")).unwrap();
let mut rows = query.query([]).unwrap();
@@ -370,7 +370,7 @@ impl SqliteStore {
// Bitmaps
let mut query = conn
- .prepare_cached("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM b")
+ .prepare_cached(&format!("SELECT z, a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p FROM {}", char::from(crate::SUBSPACE_BITMAPS)))
.unwrap();
let mut rows = query.query([]).unwrap();
diff --git a/crates/store/src/backend/sqlite/write.rs b/crates/store/src/backend/sqlite/write.rs
index 39ea01ef..4c759045 100644
--- a/crates/store/src/backend/sqlite/write.rs
+++ b/crates/store/src/backend/sqlite/write.rs
@@ -274,8 +274,8 @@ impl SqliteStore {
#[cfg(feature = "test_mode")]
pub(crate) async fn destroy(&self) {
use crate::{
- SUBSPACE_ACLS, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES,
- SUBSPACE_LOGS, SUBSPACE_VALUES,
+ SUBSPACE_ACLS, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_BLOB_DATA, SUBSPACE_COUNTERS,
+ SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES,
};
let conn = self.conn_pool.get().unwrap();
@@ -287,6 +287,7 @@ impl SqliteStore {
SUBSPACE_BLOBS,
SUBSPACE_ACLS,
SUBSPACE_COUNTERS,
+ SUBSPACE_BLOB_DATA,
] {
conn.execute(&format!("DROP TABLE {}", char::from(table)), [])
.unwrap();
diff --git a/crates/store/src/dispatch.rs b/crates/store/src/dispatch.rs
index e7e59761..a5a69394 100644
--- a/crates/store/src/dispatch.rs
+++ b/crates/store/src/dispatch.rs
@@ -21,23 +21,27 @@
* for more details.
*/
-use std::ops::{BitAndAssign, Range};
+use std::{
+ fmt::Display,
+ ops::{BitAndAssign, Range},
+};
use roaring::RoaringBitmap;
use crate::{
+ fts::{index::FtsDocument, FtsFilter},
query,
write::{Batch, BitmapClass, ValueClass},
- BitmapKey, BlobStore, Deserialize, IterateParams, Key, Store, ValueKey,
+ BitmapKey, BlobStore, Deserialize, FtsStore, IterateParams, Key, Store, ValueKey,
};
impl Store {
- pub async fn assign_change_id(&self, account_id: u32) -> crate::Result<u64> {
+ /*pub async fn assign_change_id(&self, account_id: u32) -> crate::Result<u64> {
match self {
Self::SQLite(store) => store.assign_change_id(account_id).await,
Self::FoundationDb(store) => store.assign_change_id(account_id).await,
}
- }
+ }*/
pub async fn assign_document_id(
&self,
@@ -110,7 +114,7 @@ impl Store {
account_id: u32,
collection: u8,
field: u8,
- value: Vec<u8>,
+ value: &[u8],
op: query::Operator,
) -> crate::Result<Option<RoaringBitmap>> {
match self {
@@ -149,7 +153,7 @@ impl Store {
}
}
- pub(crate) async fn iterate<T: Key>(
+ pub async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result<bool> + Sync + Send,
@@ -190,6 +194,27 @@ impl Store {
}
}
+ pub async fn get_blob(&self, key: &[u8], range: Range<u32>) -> crate::Result<Option<Vec<u8>>> {
+ match self {
+ Self::SQLite(store) => store.get_blob(key, range).await,
+ Self::FoundationDb(store) => store.get_blob(key, range).await,
+ }
+ }
+
+ pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
+ match self {
+ Self::SQLite(store) => store.put_blob(key, data).await,
+ Self::FoundationDb(store) => store.put_blob(key, data).await,
+ }
+ }
+
+ pub async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> {
+ match self {
+ Self::SQLite(store) => store.delete_blob(key).await,
+ Self::FoundationDb(store) => store.delete_blob(key).await,
+ }
+ }
+
#[cfg(feature = "test_mode")]
pub async fn destroy(&self) {
match self {
@@ -269,6 +294,8 @@ impl BlobStore {
match self {
Self::Fs(store) => store.get_blob(key, range).await,
Self::S3(store) => store.get_blob(key, range).await,
+ Self::Sqlite(store) => store.get_blob(key, range).await,
+ Self::FoundationDb(store) => store.get_blob(key, range).await,
}
}
@@ -276,6 +303,8 @@ impl BlobStore {
match self {
Self::Fs(store) => store.put_blob(key, data).await,
Self::S3(store) => store.put_blob(key, data).await,
+ Self::Sqlite(store) => store.put_blob(key, data).await,
+ Self::FoundationDb(store) => store.put_blob(key, data).await,
}
}
@@ -283,6 +312,47 @@ impl BlobStore {
match self {
Self::Fs(store) => store.delete_blob(key).await,
Self::S3(store) => store.delete_blob(key).await,
+ Self::Sqlite(store) => store.delete_blob(key).await,
+ Self::FoundationDb(store) => store.delete_blob(key).await,
+ }
+ }
+}
+
+impl FtsStore {
+ pub async fn index<T: Into<u8> + Display + Clone + std::fmt::Debug>(
+ &self,
+ document: FtsDocument<'_, T>,
+ ) -> crate::Result<()> {
+ match self {
+ FtsStore::Store(store) => store.fts_index(document).await,
+ }
+ }
+
+ pub async fn query<T: Into<u8> + Display + Clone + std::fmt::Debug>(
+ &self,
+ account_id: u32,
+ collection: impl Into<u8>,
+ filters: Vec<FtsFilter<T>>,
+ ) -> crate::Result<RoaringBitmap> {
+ match self {
+ FtsStore::Store(store) => store.fts_query(account_id, collection, filters).await,
+ }
+ }
+
+ pub async fn remove(
+ &self,
+ account_id: u32,
+ collection: u8,
+ document_id: u32,
+ ) -> crate::Result<bool> {
+ match self {
+ FtsStore::Store(store) => store.fts_remove(account_id, collection, document_id).await,
+ }
+ }
+
+ pub async fn remove_all(&self, account_id: u32) -> crate::Result<()> {
+ match self {
+ FtsStore::Store(store) => store.fts_remove_all(account_id).await,
}
}
}
diff --git a/crates/store/src/fts/bloom.rs b/crates/store/src/fts/bloom.rs
deleted file mode 100644
index 6145a637..00000000
--- a/crates/store/src/fts/bloom.rs
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Copyright (c) 2023 Stalwart Labs Ltd.
- *
- * This file is part of the Stalwart Mail Server.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- * in the LICENSE file at the top-level directory of this distribution.
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * You can be released from the requirements of the AGPLv3 license by
- * purchasing a commercial license. Please contact licensing@stalw.art
- * for more details.
-*/
-
-use std::{
- borrow::Cow,
- f64::consts::LN_2,
- hash::{Hash, Hasher},
-};
-
-use nlp::{language::stemmer::StemmedToken, tokenizers::Token};
-use roaring::RoaringBitmap;
-use utils::codec::leb128::{Leb128Reader, Leb128Vec};
-
-use crate::{Deserialize, Error, Serialize};
-
-pub struct BloomFilter {
- m: u64,
- b: RoaringBitmap,
-}
-
-#[derive(Debug)]
-pub struct BloomHash {
- pub h: [u64; 7],
-}
-
-#[derive(Debug)]
-pub struct BloomHashGroup {
- pub h1: BloomHash,
- pub h2: Option<BloomHash>,
-}
-
-const AHASHER: ahash::RandomState = ahash::RandomState::with_seeds(
- 0xaf1f2242106c64b3,
- 0x60ca4cfb4b3ed0ce,
- 0xc7dbc0bb615e82b3,
- 0x520ad065378daf88,
-);
-lazy_static::lazy_static! {
- static ref SIPHASHER: siphasher::sip::SipHasher13 =
- siphasher::sip::SipHasher13::new_with_keys(0x56205cbdba8f02a6, 0xbd0dbc4bb06d687b);
-}
-
-const P: f64 = 0.01;
-
-impl BloomFilter {
- pub fn new(items: usize) -> Self {
- Self {
- m: if items > 0 {
- std::cmp::max(Self::estimate_m(items, P), 10240)
- } else {
- 0
- },
- b: RoaringBitmap::new(),
- }
- }
-
- fn from_params(m: u64, b: RoaringBitmap) -> Self {
- Self { m, b }
- }
-
- fn estimate_m(n: usize, p: f64) -> u64 {
- (((n as f64) * f64::ln(p) / (-8.0 * LN_2.powi(2))).ceil() as u64) * 8
- }
-
- #[allow(dead_code)]
- fn estimate_k(m: u64, n: usize) -> u32 {
- std::cmp::max(((m as f64) / (n as f64) * f64::ln(2.0f64)).ceil() as u32, 1)
- }
-
- pub fn insert(&mut self, hash: &BloomHash) {
- self.b.insert((hash.h[0] % self.m) as u32);
- self.b.insert((hash.h[1] % self.m) as u32);
- self.b.insert((hash.h[2] % self.m) as u32);
- self.b.insert((hash.h[3] % self.m) as u32);
- self.b.insert((hash.h[4] % self.m) as u32);
- self.b.insert((hash.h[5] % self.m) as u32);
- self.b.insert((hash.h[6] % self.m) as u32);
- }
-
- pub fn contains(&self, hash: &BloomHash) -> bool {
- self.b.contains((hash.h[0] % self.m) as u32)
- && self.b.contains((hash.h[1] % self.m) as u32)
- && self.b.contains((hash.h[2] % self.m) as u32)
- && self.b.contains((hash.h[3] % self.m) as u32)
- && self.b.contains((hash.h[4] % self.m) as u32)
- && self.b.contains((hash.h[5] % self.m) as u32)
- && self.b.contains((hash.h[6] % self.m) as u32)
- }
-
- pub fn is_subset(&self, other: &Self) -> bool {
- self.b.is_subset(&other.b)
- }
-
- pub fn is_empty(&self) -> bool {
- self.m == 0 || self.b.is_empty()
- }
-}
-
-pub trait BloomHasher {
- fn hash<T: Hash + AsRef<[u8]> + ?Sized>(item: &T) -> Self;
-}
-
-impl BloomHash {
- pub fn hash<T: Hash + AsRef<[u8]> + ?Sized>(item: &T) -> Self {
- let h1 = xxhash_rust::xxh3::xxh3_64(item.as_ref());
- let h2 = farmhash::hash64(item.as_ref());
- let h3 = AHASHER.hash_one(item);
- let mut sh = *SIPHASHER;
- sh.write(item.as_ref());
- let h4 = sh.finish();
-
- Self {
- h: [h1, h2, h3, h4, h1 ^ h2, h2 ^ h3, h3 ^ h4],
- }
- }
-}
-
-pub fn hash_token(item: &str) -> Vec<u8> {
- let h1 = xxhash_rust::xxh3::xxh3_64(item.as_ref()).to_le_bytes();
- let h2 = farmhash::hash64(item.as_ref()).to_le_bytes();
- let h3 = AHASHER.hash_one(item).to_le_bytes();
- let mut sh = *SIPHASHER;
- sh.write(item.as_ref());
- let h4 = sh.finish().to_le_bytes();
-
- match item.len() {
- 0..=8 => {
- let mut hash = Vec::with_capacity(6);
- hash.extend_from_slice(&h1[..2]);
- hash.extend_from_slice(&h2[..2]);
- hash.push(h3[0]);
- hash.push(h4[0]);
- hash
- }
- 9..=16 => {
- let mut hash = Vec::with_capacity(8);
- hash.extend_from_slice(&h1[..2]);
- hash.extend_from_slice(&h2[..2]);
- hash.extend_from_slice(&h3[..2]);
- hash.extend_from_slice(&h4[..2]);
- hash
- }
- 17..=32 => {
- let mut hash = Vec::with_capacity(12);
- hash.extend_from_slice(&h1[..3]);
- hash.extend_from_slice(&h2[..3]);
- hash.extend_from_slice(&h3[..3]);
- hash.extend_from_slice(&h4[..3]);
- hash
- }
- _ => {
- let mut hash = Vec::with_capacity(16);
- hash.extend_from_slice(&h1[..4]);
- hash.extend_from_slice(&h2[..4]);
- hash.extend_from_slice(&h3[..4]);
- hash.extend_from_slice(&h4[..4]);
- hash
- }
- }
-}
-
-impl From<&str> for BloomHash {
- fn from(s: &str) -> Self {
- Self::hash(&s)
- }
-}
-
-impl From<String> for BloomHash {
- fn from(s: String) -> Self {
- Self::hash(&s)
- }
-}
-
-impl From<&String> for BloomHash {
- fn from(s: &String) -> Self {
- Self::hash(&s)
- }
-}
-
-impl From<Cow<'_, str>> for BloomHash {
- fn from(s: Cow<'_, str>) -> Self {
- Self::hash(s.as_ref())
- }
-}
-
-impl From<Token<Cow<'_, str>>> for BloomHashGroup {
- fn from(t: Token<Cow<'_, str>>) -> Self {
- Self {
- h1: BloomHash::hash(t.word.as_ref()),
- h2: None,
- }
- }
-}
-
-impl From<StemmedToken<'_>> for BloomHashGroup {
- fn from(t: StemmedToken<'_>) -> Self {
- Self {
- h1: BloomHash::hash(t.word.as_ref()),
- h2: t.stemmed_word.map(|w| BloomHash::hash(&format!("{w}_"))),
- }
- }
-}
-
-impl From<Cow<'_, str>> for BloomHashGroup {
- fn from(t: Cow<'_, str>) -> Self {
- Self {
- h1: BloomHash::hash(t.as_ref()),
- h2: None,
- }
- }
-}
-
-impl Serialize for BloomFilter {
- fn serialize(self) -> Vec<u8> {
- let mut buf = Vec::with_capacity(U64_LEN + self.b.serialized_size());
- buf.push_leb128(self.m);
- let _ = self.b.serialize_into(&mut buf);
- buf
- }
-}
-
-impl Deserialize for BloomFilter {
- fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
- let (m, pos) = bytes.read_leb128().ok_or_else(|| {
- Error::InternalError(
- "Failed to read 'm' value while deserializing bloom filter.".to_string(),
- )
- })?;
- RoaringBitmap::deserialize_unchecked_from(bytes.get(pos..).ok_or_else(|| {
- Error::InternalError(
- "Failed to read bitmap while deserializing bloom filter.".to_string(),
- )
- })?)
- .map_err(|err| Error::InternalError(format!("Failed to deserialize bloom filter: {err}.")))
- .map(|b| Self::from_params(m, b))
- }
-}
diff --git a/crates/store/src/fts/builder.rs b/crates/store/src/fts/builder.rs
deleted file mode 100644
index f4a8422d..00000000
--- a/crates/store/src/fts/builder.rs
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright (c) 2023 Stalwart Labs Ltd.
- *
- * This file is part of the Stalwart Mail Server.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- * in the LICENSE file at the top-level directory of this distribution.
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * You can be released from the requirements of the AGPLv3 license by
- * purchasing a commercial license. Please contact licensing@stalw.art
- * for more details.
-*/
-
-use std::{borrow::Cow, collections::HashSet, fmt::Display};
-
-use ahash::AHashSet;
-use nlp::{
- language::{
- detect::{LanguageDetector, MIN_LANGUAGE_SCORE},
- stemmer::Stemmer,
- Language,
- },
- tokenizers::{space::SpaceTokenizer, Token},
-};
-use utils::map::vec_map::VecMap;
-
-use crate::{
- query::RawValue,
- write::{BatchBuilder, IntoOperations, Operation, ValueClass},
- Serialize, HASH_EXACT, HASH_STEMMED,
-};
-
-use super::term_index::{TermIndexBuilder, TokenIndex};
-
-pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 2) as usize;
-pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;
-
-struct Text<'x, T: Into<u8> + Display> {
- field: T,
- text: Cow<'x, str>,
- language: Type,
-}
-
-enum Type {
- Stem(Language),
- Tokenize,
- Static,
-}
-
-pub struct FtsIndexBuilder<'x, T: Into<u8> + Display> {
- parts: Vec<Text<'x, T>>,
- default_language: Language,
-}
-
-impl<'x, T: Into<u8> + Display> FtsIndexBuilder<'x, T> {
- pub fn with_default_language(default_language: Language) -> FtsIndexBuilder<'x, T> {
- FtsIndexBuilder {
- parts: vec![],
- default_language,
- }
- }
-
- pub fn index(&mut self, field: T, text: impl Into<Cow<'x, str>>, language: Language) {
- self.parts.push(Text {
- field,
- text: text.into(),
- language: Type::Stem(language),
- });
- }
-
- pub fn index_raw(&mut self, field: T, text: impl Into<Cow<'x, str>>) {
- self.parts.push(Text {
- field,
- text: text.into(),
- language: Type::Tokenize,
- });
- }
-
- pub fn index_raw_token(&mut self, field: T, text: impl Into<Cow<'x, str>>) {
- self.parts.push(Text {
- field,
- text: text.into(),
- language: Type::Static,
- });
- }
-}
-
-impl<'x, T: Into<u8> + Display> IntoOperations for FtsIndexBuilder<'x, T> {
- fn build(self, batch: &mut BatchBuilder) {
- let mut detect = LanguageDetector::new();
- let mut tokens: VecMap<u8, AHashSet<String>> = VecMap::new();
- let mut parts = Vec::new();
-
- for text in self.parts {
- match text.language {
- Type::Stem(language) => {
- let language = if language == Language::Unknown {
- detect.detect(&text.text, MIN_LANGUAGE_SCORE)
- } else {
- language
- };
- parts.push((text.field, language, text.text));
- }
- Type::Tokenize => {
- let tokens = tokens.get_mut_or_insert(text.field.into());
- for token in SpaceTokenizer::new(text.text.as_ref(), MAX_TOKEN_LENGTH) {
- tokens.insert(token);
- }
- }
- Type::Static => {
- tokens
- .get_mut_or_insert(text.field.into())
- .insert(text.text.into_owned());
- }
- }
- }
-
- let default_language = detect
- .most_frequent_language()
- .unwrap_or(self.default_language);
- let mut term_index = TermIndexBuilder::new();
- let mut ops = AHashSet::new();
-
- for (part_id, (field, language, text)) in parts.into_iter().enumerate() {
- let language = if language != Language::Unknown {
- language
- } else {
- default_language
- };
- let mut terms = Vec::new();
- let field: u8 = field.into();
-
- for token in Stemmer::new(&text, language, MAX_TOKEN_LENGTH).collect::<Vec<_>>() {
- ops.insert(Operation::hash(&token.word, HASH_EXACT, field, true));
- if let Some(stemmed_word) = &token.stemmed_word {
- ops.insert(Operation::hash(stemmed_word, HASH_STEMMED, field, true));
- }
- terms.push(term_index.add_stemmed_token(token));
- }
-
- if !terms.is_empty() {
- term_index.add_terms(field, part_id as u32, terms);
- }
- }
-
- for (field, tokens) in tokens {
- let mut terms = Vec::with_capacity(tokens.len());
- for token in tokens {
- ops.insert(Operation::hash(&token, HASH_EXACT, field, true));
- terms.push(term_index.add_token(Token {
- word: token.into(),
- from: 0,
- to: 0,
- }));
- }
- term_index.add_terms(field, 0, terms);
- }
-
- for op in ops {
- batch.ops.push(op);
- }
-
- batch.ops.push(Operation::Value {
- class: ValueClass::Property {
- field: u8::MAX,
- family: u8::MAX,
- },
- set: term_index.serialize().into(),
- });
- }
-}
-
-impl TokenIndex {
- fn build_index(self, batch: &mut BatchBuilder, set: bool) {
- let mut ops = AHashSet::with_capacity(self.tokens.len() * 2);
- for term in self.terms {
- for (term_ids, is_exact) in [(term.exact_terms, true), (term.stemmed_terms, false)] {
- for term_id in term_ids {
- if let Some(word) = self.tokens.get(term_id as usize) {
- ops.insert(Operation::hash(
- word,
- if is_exact { HASH_EXACT } else { HASH_STEMMED },
- term.field_id,
- set,
- ));
- }
- }
- }
- }
- for op in ops {
- batch.ops.push(op);
- }
- }
-}
-
-impl IntoOperations for TokenIndex {
- fn build(self, batch: &mut BatchBuilder) {
- self.build_index(batch, false);
- batch.ops.push(Operation::Value {
- class: ValueClass::Property {
- field: u8::MAX,
- family: u8::MAX,
- },
- set: None,
- });
- }
-}
-
-impl IntoOperations for RawValue<TokenIndex> {
- fn build(self, batch: &mut BatchBuilder) {
- self.inner.build_index(batch, true);
- batch.ops.push(Operation::Value {
- class: ValueClass::Property {
- field: u8::MAX,
- family: u8::MAX,
- },
- set: self.raw.into(),
- });
- }
-}
-
-pub trait ToTokens {
- fn to_tokens(&self) -> HashSet<String>;
-}
-
-impl ToTokens for &str {
- fn to_tokens(&self) -> HashSet<String> {
- let mut tokens = HashSet::new();
- for token in SpaceTokenizer::new(self, MAX_TOKEN_LENGTH) {
- tokens.insert(token);
- }
- tokens
- }
-}
-
-impl ToTokens for &String {
- fn to_tokens(&self) -> HashSet<String> {
- self.as_str().to_tokens()
- }
-}
diff --git a/crates/store/src/fts/index.rs b/crates/store/src/fts/index.rs
new file mode 100644
index 00000000..1493fdcd
--- /dev/null
+++ b/crates/store/src/fts/index.rs
@@ -0,0 +1,372 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of the Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use std::{borrow::Cow, fmt::Display};
+
+use ahash::{AHashMap, AHashSet};
+use nlp::{
+ language::{
+ detect::{LanguageDetector, MIN_LANGUAGE_SCORE},
+ stemmer::Stemmer,
+ Language,
+ },
+ tokenizers::word::WordTokenizer,
+};
+
+use crate::{
+ backend::MAX_TOKEN_LENGTH,
+ write::{
+ hash::TokenType, key::KeySerializer, BatchBuilder, BitmapClass, BitmapHash, Operation,
+ ValueClass,
+ },
+ Deserialize, Error, Store, ValueKey, U64_LEN,
+};
+
+use super::Field;
+
+#[derive(Debug)]
+struct Text<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> {
+ field: Field<T>,
+ text: Cow<'x, str>,
+ typ: Type,
+}
+
+#[derive(Debug)]
+enum Type {
+ Text(Language),
+ Tokenize,
+ Keyword,
+}
+
+#[derive(Debug)]
+pub struct FtsDocument<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> {
+ parts: Vec<Text<'x, T>>,
+ default_language: Language,
+ account_id: u32,
+ collection: u8,
+ document_id: u32,
+}
+
+impl<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> FtsDocument<'x, T> {
+ pub fn with_default_language(default_language: Language) -> FtsDocument<'x, T> {
+ FtsDocument {
+ parts: vec![],
+ default_language,
+ account_id: 0,
+ document_id: 0,
+ collection: 0,
+ }
+ }
+
+ pub fn with_account_id(mut self, account_id: u32) -> Self {
+ self.account_id = account_id;
+ self
+ }
+
+ pub fn with_document_id(mut self, document_id: u32) -> Self {
+ self.document_id = document_id;
+ self
+ }
+
+ pub fn with_collection(mut self, collection: impl Into<u8>) -> Self {
+ self.collection = collection.into();
+ self
+ }
+
+ pub fn index(&mut self, field: Field<T>, text: impl Into<Cow<'x, str>>, language: Language) {
+ self.parts.push(Text {
+ field,
+ text: text.into(),
+ typ: Type::Text(language),
+ });
+ }
+
+ pub fn index_tokenized(&mut self, field: Field<T>, text: impl Into<Cow<'x, str>>) {
+ self.parts.push(Text {
+ field,
+ text: text.into(),
+ typ: Type::Tokenize,
+ });
+ }
+
+ pub fn index_keyword(&mut self, field: Field<T>, text: impl Into<Cow<'x, str>>) {
+ self.parts.push(Text {
+ field,
+ text: text.into(),
+ typ: Type::Keyword,
+ });
+ }
+}
+
+impl<T: Into<u8> + Display + Clone + std::fmt::Debug> From<Field<T>> for u8 {
+ fn from(value: Field<T>) -> Self {
+ match value {
+ Field::Body => 0,
+ Field::Attachment => 1,
+ Field::Keyword => 2,
+ Field::Header(value) => 3 + value.into(),
+ }
+ }
+}
+
+impl Store {
+ pub async fn fts_index<T: Into<u8> + Display + Clone + std::fmt::Debug>(
+ &self,
+ document: FtsDocument<'_, T>,
+ ) -> crate::Result<()> {
+ let mut detect = LanguageDetector::new();
+ let mut tokens: AHashMap<BitmapHash, AHashSet<u8>> = AHashMap::new();
+ let mut parts = Vec::new();
+
+ for text in document.parts {
+ match text.typ {
+ Type::Text(language) => {
+ let language = if language == Language::Unknown {
+ detect.detect(&text.text, MIN_LANGUAGE_SCORE)
+ } else {
+ language
+ };
+ parts.push((text.field, language, text.text));
+ }
+ Type::Tokenize => {
+ let field = u8::from(text.field);
+ for token in WordTokenizer::new(text.text.as_ref(), MAX_TOKEN_LENGTH) {
+ tokens
+ .entry(BitmapHash::new(token.word.as_ref()))
+ .or_default()
+ .insert(TokenType::word(field));
+ }
+ }
+ Type::Keyword => {
+ let field = u8::from(text.field);
+ tokens
+ .entry(BitmapHash::new(text.text.as_ref()))
+ .or_default()
+ .insert(TokenType::word(field));
+ }
+ }
+ }
+
+ let default_language = detect
+ .most_frequent_language()
+ .unwrap_or(document.default_language);
+
+ for (field, language, text) in parts.into_iter() {
+ let language = if language != Language::Unknown {
+ language
+ } else {
+ default_language
+ };
+ let field: u8 = field.into();
+
+ let mut last_token = Cow::Borrowed("");
+ for token in Stemmer::new(&text, language, MAX_TOKEN_LENGTH) {
+ if !last_token.is_empty() {
+ tokens
+ .entry(BitmapHash::new(&format!("{} {}", last_token, token.word)))
+ .or_default()
+ .insert(TokenType::bigram(field));
+ }
+
+ tokens
+ .entry(BitmapHash::new(token.word.as_ref()))
+ .or_default()
+ .insert(TokenType::word(field));
+
+ if let Some(stemmed_word) = token.stemmed_word {
+ tokens
+ .entry(BitmapHash::new(stemmed_word.as_ref()))
+ .or_default()
+ .insert(TokenType::stemmed(field));
+ }
+
+ last_token = token.word;
+ }
+ }
+
+ if tokens.is_empty() {
+ return Ok(());
+ }
+
+ // Serialize tokens
+ let mut serializer = KeySerializer::new(tokens.len() * U64_LEN * 2);
+ let mut keys = Vec::with_capacity(tokens.len());
+
+ for (hash, fields) in tokens.into_iter() {
+ serializer = serializer
+ .write(hash.hash.as_slice())
+ .write(hash.len)
+ .write(fields.len() as u8);
+ for field in fields.into_iter() {
+ serializer = serializer.write(field);
+ keys.push(Operation::Bitmap {
+ class: BitmapClass::Text { field, token: hash },
+ set: true,
+ });
+ }
+ }
+
+ // Write term index
+ let mut batch = BatchBuilder::new();
+ batch
+ .with_account_id(document.account_id)
+ .with_collection(document.collection)
+ .update_document(document.document_id)
+ .set(
+ ValueClass::TermIndex,
+ lz4_flex::compress_prepend_size(&serializer.finalize()),
+ );
+ self.write(batch.build()).await?;
+ let mut batch = BatchBuilder::new();
+ batch
+ .with_account_id(document.account_id)
+ .with_collection(document.collection)
+ .update_document(document.document_id);
+
+ for (pos, key) in keys.into_iter().enumerate() {
+ if pos > 0 && pos & 1023 == 0 {
+ self.write(batch.build()).await?;
+ batch = BatchBuilder::new();
+ batch
+ .with_account_id(document.account_id)
+ .with_collection(document.collection)
+ .update_document(document.document_id);
+ }
+ batch.ops.push(key);
+ }
+
+ if !batch.is_empty() {
+ self.write(batch.build()).await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn fts_remove(
+ &self,
+ account_id: u32,
+ collection: u8,
+ document_id: u32,
+ ) -> crate::Result<bool> {
+ // Obtain term index
+ let term_index = if let Some(term_index) = self
+ .get_value::<TermIndex>(ValueKey {
+ account_id,
+ collection,
+ document_id,
+ class: ValueClass::TermIndex,
+ })
+ .await?
+ {
+ term_index
+ } else {
+ return Ok(false);
+ };
+
+ // Remove keys
+ let mut batch = BatchBuilder::new();
+ batch
+ .with_account_id(account_id)
+ .with_collection(collection)
+ .update_document(document_id);
+
+ for (pos, key) in term_index.ops.into_iter().enumerate() {
+ if pos > 0 && pos & 1023 == 0 {
+ self.write(batch.build()).await?;
+ batch = BatchBuilder::new();
+ batch
+ .with_account_id(account_id)
+ .with_collection(collection)
+ .update_document(document_id);
+ }
+ batch.ops.push(key);
+ }
+
+ if !batch.is_empty() {
+ self.write(batch.build()).await?;
+ }
+
+ // Remove term index
+ let mut batch = BatchBuilder::new();
+ batch
+ .with_account_id(account_id)
+ .with_collection(collection)
+ .update_document(document_id)
+ .clear(ValueClass::TermIndex);
+
+ self.write(batch.build()).await?;
+
+ Ok(true)
+ }
+
+ pub async fn fts_remove_all(&self, _: u32) -> crate::Result<()> {
+ // No-op
+ // Term indexes are stored in the same key range as the document
+
+ Ok(())
+ }
+}
+
+struct TermIndex {
+ ops: Vec<Operation>,
+}
+
+impl Deserialize for TermIndex {
+ fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
+ let bytes = lz4_flex::decompress_size_prepended(bytes)
+ .map_err(|_| Error::InternalError("Failed to decompress term index".to_string()))?;
+ let mut ops = Vec::new();
+ let mut bytes = bytes.iter().peekable();
+
+ while bytes.peek().is_some() {
+ let mut hash = BitmapHash {
+ hash: [0; 8],
+ len: 0,
+ };
+
+ for byte in hash.hash.iter_mut() {
+ *byte = *bytes.next().ok_or(Error::InternalError(
+ "Unexpected EOF reading term index".to_string(),
+ ))?;
+ }
+
+ hash.len = *bytes.next().ok_or(Error::InternalError(
+ "Unexpected EOF reading term index".to_string(),
+ ))?;
+ let num_fields = *bytes.next().ok_or(Error::InternalError(
+ "Unexpected EOF reading term index".to_string(),
+ ))?;
+ for _ in 0..num_fields {
+ let field = *bytes.next().ok_or(Error::InternalError(
+ "Unexpected EOF reading term index".to_string(),
+ ))?;
+ ops.push(Operation::Bitmap {
+ class: BitmapClass::Text { field, token: hash },
+ set: false,
+ });
+ }
+ }
+
+ Ok(Self { ops })
+ }
+}
diff --git a/crates/store/src/fts/mod.rs b/crates/store/src/fts/mod.rs
index 8761f076..cb38f1e9 100644
--- a/crates/store/src/fts/mod.rs
+++ b/crates/store/src/fts/mod.rs
@@ -21,55 +21,188 @@
* for more details.
*/
-use crate::{
- write::{BitmapFamily, Operation},
- BitmapKey, Serialize, BM_HASH,
-};
+use std::fmt::Display;
-use self::{bloom::hash_token, builder::MAX_TOKEN_MASK};
+use nlp::language::Language;
-pub mod bloom;
-pub mod builder;
+pub mod index;
pub mod query;
-pub mod search_snippet;
-pub mod term_index;
-
-impl BitmapKey<Vec<u8>> {
- pub fn hash(word: &str, account_id: u32, collection: u8, family: u8, field: u8) -> Self {
- BitmapKey {
- account_id,
- collection,
- family: BM_HASH | family | (word.len() & MAX_TOKEN_MASK) as u8,
- field,
- block_num: 0,
- key: hash_token(word),
+
+#[derive(Clone, Debug)]
+pub enum Field<T: Into<u8> + Display + Clone + std::fmt::Debug> {
+ Header(T),
+ Body,
+ Attachment,
+ Keyword,
+}
+
+#[derive(Debug)]
+pub enum FtsFilter<T: Into<u8> + Display + Clone + std::fmt::Debug> {
+ Exact {
+ field: Field<T>,
+ text: String,
+ language: Language,
+ },
+ Contains {
+ field: Field<T>,
+ text: String,
+ language: Language,
+ },
+ Keyword {
+ field: Field<T>,
+ text: String,
+ },
+ And,
+ Or,
+ Not,
+ End,
+}
+
+impl<T: Into<u8> + Display + Clone + std::fmt::Debug> FtsFilter<T> {
+ pub fn has_text_detect(
+ field: Field<T>,
+ text: impl Into<String>,
+ default_language: Language,
+ ) -> Self {
+ let (text, language) = Language::detect(text.into(), default_language);
+ Self::has_text(field, text, language)
+ }
+
+ pub fn has_text(field: Field<T>, text: impl Into<String>, language: Language) -> Self {
+ let text = text.into();
+ if !matches!(language, Language::None) && (text.starts_with('"') && text.ends_with('"'))
+ || (text.starts_with('\'') && text.ends_with('\''))
+ {
+ FtsFilter::Exact {
+ field,
+ text,
+ language,
+ }
+ } else {
+ FtsFilter::Contains {
+ field,
+ text,
+ language,
+ }
}
}
- pub fn value(
- account_id: u32,
- collection: impl Into<u8>,
- field: impl Into<u8>,
- value: impl BitmapFamily + Serialize,
- ) -> Self {
- BitmapKey {
- account_id,
- collection: collection.into(),
- family: value.family(),
- field: field.into(),
- block_num: 0,
- key: value.serialize(),
+ pub fn has_keyword(field: Field<T>, text: impl Into<String>) -> Self {
+ FtsFilter::Keyword {
+ field,
+ text: text.into(),
}
}
+
+ pub fn has_english_text(field: Field<T>, text: impl Into<String>) -> Self {
+ Self::has_text(field, text, Language::English)
+ }
}
-impl Operation {
- pub fn hash(word: &str, family: u8, field: u8, set: bool) -> Self {
- Operation::Bitmap {
- family: BM_HASH | family | (word.len() & MAX_TOKEN_MASK) as u8,
- field,
- key: hash_token(word),
- set,
+#[derive(Clone, Copy)]
+pub enum FilterType {
+ And,
+ Or,
+ Not,
+ End,
+ Store,
+ Fts,
+}
+
+pub enum FilterGroup<T: FilterItem> {
+ Fts(Vec<T>),
+ Store(T),
+}
+
+pub trait FilterItem: Clone {
+ fn filter_type(&self) -> FilterType;
+}
+
+pub trait IntoFilterGroup<T: FilterItem + From<FilterType>> {
+ fn into_filter_group(self) -> Vec<FilterGroup<T>>;
+}
+
+impl<T: FilterItem + From<FilterType>> IntoFilterGroup<T> for Vec<T> {
+ fn into_filter_group(self) -> Vec<FilterGroup<T>> {
+ let mut filter = Vec::with_capacity(self.len());
+ let mut iter = self.into_iter();
+ let mut logical_op = None;
+
+ while let Some(item) = iter.next() {
+ if matches!(item.filter_type(), FilterType::Fts) {
+ let mut store_item = None;
+ let mut depth = 0;
+ let mut fts = Vec::with_capacity(5);
+
+ // Add the logical operator if there is one
+ let in_logical_op = if let Some(op) = logical_op.take() {
+ fts.push(op);
+ true
+ } else {
+ false
+ };
+ fts.push(item);
+
+ for item in iter.by_ref() {
+ match item.filter_type() {
+ FilterType::And | FilterType::Or | FilterType::Not => {
+ depth += 1;
+ fts.push(item);
+ }
+ FilterType::End if depth > 0 => {
+ depth -= 1;
+ fts.push(item);
+ }
+ FilterType::Fts => {
+ fts.push(item);
+ }
+ _ => {
+ store_item = Some(item);
+ break;
+ }
+ }
+ }
+
+ if in_logical_op {
+ fts.push(T::from(FilterType::End));
+ }
+
+ if depth > 0 {
+ let mut store = Vec::with_capacity(depth * 2);
+ while depth > 0 {
+ let item = fts.pop().unwrap();
+ if matches!(
+ item.filter_type(),
+ FilterType::And | FilterType::Or | FilterType::Not
+ ) {
+ depth -= 1;
+ }
+ store.push(FilterGroup::Store(item));
+ }
+
+ filter.push(FilterGroup::Fts(fts));
+ filter.extend(store);
+ } else {
+ filter.push(FilterGroup::Fts(fts));
+ }
+
+ if let Some(item) = store_item {
+ filter.push(FilterGroup::Store(item));
+ }
+ } else {
+ match item.filter_type() {
+ FilterType::And | FilterType::Or => {
+ logical_op = Some(item.clone());
+ }
+ FilterType::Not => {
+ logical_op = Some(T::from(FilterType::And));
+ }
+ _ => {}
+ }
+ filter.push(FilterGroup::Store(item));
+ }
}
+
+ filter
}
}
diff --git a/crates/store/src/fts/query.rs b/crates/store/src/fts/query.rs
index 37938e3f..6be67a75 100644
--- a/crates/store/src/fts/query.rs
+++ b/crates/store/src/fts/query.rs
@@ -21,138 +21,210 @@
* for more details.
*/
-use std::ops::BitOrAssign;
+use std::{
+ fmt::Display,
+ ops::{BitAndAssign, BitOrAssign, BitXorAssign},
+};
-use nlp::language::{stemmer::Stemmer, Language};
+use nlp::language::stemmer::Stemmer;
use roaring::RoaringBitmap;
-use crate::{fts::builder::MAX_TOKEN_LENGTH, BitmapKey, ValueKey, HASH_EXACT, HASH_STEMMED};
+use crate::{backend::MAX_TOKEN_LENGTH, fts::FtsFilter, write::BitmapClass, BitmapKey, Store};
-use super::term_index::TermIndex;
+struct State<T: Into<u8> + Display + Clone + std::fmt::Debug> {
+ pub op: FtsFilter<T>,
+ pub bm: Option<RoaringBitmap>,
+}
-#[async_trait::async_trait]
-pub trait StoreFts: StoreRead {
- async fn fts_query(
- &mut self,
+impl Store {
+ pub async fn fts_query<T: Into<u8> + Display + Clone + std::fmt::Debug>(
+ &self,
account_id: u32,
- collection: u8,
- field: u8,
- text: &str,
- language: Language,
- match_phrase: bool,
- ) -> crate::Result<Option<RoaringBitmap>> {
- if match_phrase {
- let mut phrase = Vec::new();
- let mut bit_keys = Vec::new();
- for token in language.tokenize_text(text, MAX_TOKEN_LENGTH) {
- let key = BitmapKey::hash(
- token.word.as_ref(),
- account_id,
- collection,
- HASH_EXACT,
+ collection: impl Into<u8>,
+ filters: Vec<FtsFilter<T>>,
+ ) -> crate::Result<RoaringBitmap> {
+ let collection = collection.into();
+ let mut not_mask = RoaringBitmap::new();
+ let mut not_fetch = false;
+
+ let mut state: State<T> = FtsFilter::And.into();
+ let mut stack = Vec::new();
+ let mut filters = filters.into_iter().peekable();
+
+ while let Some(filter) = filters.next() {
+ let mut result = match filter {
+ FtsFilter::Exact {
field,
- );
- if !bit_keys.contains(&key) {
- bit_keys.push(key);
+ text,
+ language,
+ } => {
+ let field: u8 = field.clone().into();
+
+ let tokens = language
+ .tokenize_text(text.as_ref(), MAX_TOKEN_LENGTH)
+ .map(|t| t.word)
+ .collect::<Vec<_>>();
+ let keys = if tokens.len() > 1 {
+ tokens
+ .windows(2)
+ .map(|bg| BitmapKey {
+ account_id,
+ collection,
+ class: BitmapClass::bigram(format!("{} {}", bg[0], bg[1]), field),
+ block_num: 0,
+ })
+ .collect::<Vec<_>>()
+ } else {
+ tokens
+ .into_iter()
+ .map(|word| BitmapKey {
+ account_id,
+ collection,
+ class: BitmapClass::word(word.as_ref(), field),
+ block_num: 0,
+ })
+ .collect::<Vec<_>>()
+ };
+
+ self.get_bitmaps_intersection(keys).await?
}
+ FtsFilter::Contains {
+ field,
+ text,
+ language,
+ } => {
+ let mut result = RoaringBitmap::new();
+ let field: u8 = field.clone().into();
- phrase.push(token.word);
- }
- let bitmaps = match self.get_bitmaps_intersection(bit_keys).await? {
- Some(b) if !b.is_empty() => b,
- _ => return Ok(None),
- };
+ for token in Stemmer::new(text.as_ref(), language, MAX_TOKEN_LENGTH) {
+ let token1 = BitmapKey {
+ account_id,
+ collection,
+ class: BitmapClass::word(token.word.as_ref(), field),
+ block_num: 0,
+ };
+ let token2 = BitmapKey {
+ account_id,
+ collection,
+ class: BitmapClass::stemmed(
+ if let Some(stemmed_word) = token.stemmed_word {
+ stemmed_word
+ } else {
+ token.word
+ }
+ .as_ref(),
+ field,
+ ),
+ block_num: 0,
+ };
- match phrase.len() {
- 0 => return Ok(None),
- 1 => return Ok(Some(bitmaps)),
- _ => (),
- }
+ match self.get_bitmaps_union(vec![token1, token2]).await? {
+ Some(b) if !b.is_empty() => {
+ if !result.is_empty() {
+ result &= b;
+ if result.is_empty() {
+ break;
+ }
+ } else {
+ result = b;
+ }
+ }
+ _ => break,
+ }
+ }
- let mut results = RoaringBitmap::new();
- for document_id in bitmaps {
- if let Some(term_index) = self
- .get_value::<TermIndex>(ValueKey::term_index(
+ if !result.is_empty() {
+ Some(result)
+ } else {
+ None
+ }
+ }
+ FtsFilter::Keyword { field, text } => {
+ self.get_bitmap(BitmapKey {
account_id,
collection,
- document_id,
- ))
+ class: BitmapClass::word(text, field),
+ block_num: 0,
+ })
.await?
- {
- if term_index
- .match_terms(
- &phrase
- .iter()
- .map(|w| term_index.get_match_term(w, None))
- .collect::<Vec<_>>(),
- field.into(),
- true,
- false,
- false,
- )
- .map_err(|e| {
- crate::Error::InternalError(format!(
- "TermIndex match_terms failed for {account_id}/{collection}/{document_id}: {e:?}"
- ))
- })?
- .is_some()
- {
- results.insert(document_id);
+ }
+ op @ (FtsFilter::And | FtsFilter::Or | FtsFilter::Not) => {
+ stack.push(state);
+ state = op.into();
+ continue;
+ }
+ FtsFilter::End => {
+ if let Some(prev_state) = stack.pop() {
+ let bm = state.bm;
+ state = prev_state;
+ bm
+ } else {
+ break;
}
- } else {
- tracing::debug!(
- event = "error",
- context = "fts_query",
- account_id = account_id,
- collection = collection,
- document_id = document_id,
- "Document is missing a term index",
- );
}
- }
+ };
- if !results.is_empty() {
- Ok(Some(results))
- } else {
- Ok(None)
+ // Only fetch not mask if we need it
+ if matches!(state.op, FtsFilter::Not) && !not_fetch {
+ not_mask = self
+ .get_bitmap(BitmapKey::document_ids(account_id, collection))
+ .await?
+ .unwrap_or_else(RoaringBitmap::new);
+ not_fetch = true;
}
- } else {
- let mut bitmaps = RoaringBitmap::new();
-
- for token in Stemmer::new(text, language, MAX_TOKEN_LENGTH) {
- let token1 =
- BitmapKey::hash(&token.word, account_id, collection, HASH_EXACT, field);
- let token2 = if let Some(stemmed_word) = token.stemmed_word {
- BitmapKey::hash(&stemmed_word, account_id, collection, HASH_STEMMED, field)
- } else {
- let mut token2 = token1.clone();
- token2.family &= !HASH_EXACT;
- token2.family |= HASH_STEMMED;
- token2
- };
-
- match self.get_bitmaps_union(vec![token1, token2]).await? {
- Some(b) if !b.is_empty() => {
- if !bitmaps.is_empty() {
- bitmaps &= b;
- if bitmaps.is_empty() {
- return Ok(None);
- }
+
+ // Apply logical operation
+ if let Some(dest) = &mut state.bm {
+ match state.op {
+ FtsFilter::And => {
+ if let Some(result) = result {
+ dest.bitand_assign(result);
} else {
- bitmaps = b;
+ dest.clear();
+ }
+ }
+ FtsFilter::Or => {
+ if let Some(result) = result {
+ dest.bitor_assign(result);
+ }
+ }
+ FtsFilter::Not => {
+ if let Some(mut result) = result {
+ result.bitxor_assign(&not_mask);
+ dest.bitand_assign(result);
}
}
- _ => return Ok(None),
- };
+ _ => unreachable!(),
+ }
+ } else if let Some(ref mut result_) = result {
+ if let FtsFilter::Not = state.op {
+ result_.bitxor_assign(&not_mask);
+ }
+ state.bm = result;
+ } else if let FtsFilter::Not = state.op {
+ state.bm = Some(not_mask.clone());
+ } else {
+ state.bm = Some(RoaringBitmap::new());
}
- Ok(Some(bitmaps))
+ // And short circuit
+ if matches!(state.op, FtsFilter::And) && state.bm.as_ref().unwrap().is_empty() {
+ while let Some(filter) = filters.peek() {
+ if matches!(filter, FtsFilter::End) {
+ break;
+ } else {
+ filters.next();
+ }
+ }
+ }
}
+
+ Ok(state.bm.unwrap_or_default())
}
- async fn get_bitmaps_union<T: AsRef<[u8]> + Sync + Send>(
+ async fn get_bitmaps_union(
&self,
- keys: Vec<BitmapKey<T>>,
+ keys: Vec<BitmapKey<BitmapClass>>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut bm = RoaringBitmap::new();
@@ -165,3 +237,12 @@ pub trait StoreFts: StoreRead {
Ok(if !bm.is_empty() { Some(bm) } else { None })
}
}
+
+impl<T: Into<u8> + Display + Clone + std::fmt::Debug> From<FtsFilter<T>> for State<T> {
+ fn from(value: FtsFilter<T>) -> Self {
+ Self {
+ op: value,
+ bm: None,
+ }
+ }
+}
diff --git a/crates/store/src/fts/term_index.rs b/crates/store/src/fts/term_index.rs
index 2b876578..3ec6e7e8 100644
--- a/crates/store/src/fts/term_index.rs
+++ b/crates/store/src/fts/term_index.rs
@@ -23,7 +23,7 @@
use std::{borrow::Cow, convert::TryInto};
-use crate::{Deserialize, Serialize};
+use crate::{Deserialize, Serialize, U32_LEN, U64_LEN};
use ahash::{AHashMap, AHashSet};
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs
index 5dd85549..698d6886 100644
--- a/crates/store/src/lib.rs
+++ b/crates/store/src/lib.rs
@@ -24,8 +24,8 @@
use std::{fmt::Display, sync::Arc};
pub mod backend;
-//pub mod fts;
pub mod dispatch;
+pub mod fts;
pub mod query;
pub mod write;
@@ -37,11 +37,6 @@ pub use rand;
pub use roaring;
use write::{BitmapClass, BlobOp, ValueClass};
-#[cfg(feature = "rocks")]
-pub struct Store {
- db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded>,
-}
-
pub trait Deserialize: Sized + Sync + Send {
fn deserialize(bytes: &[u8]) -> crate::Result<Self>;
}
@@ -103,9 +98,9 @@ pub struct LogKey {
pub change_id: u64,
}
-const BLOB_HASH_LEN: usize = 32;
-const U64_LEN: usize = std::mem::size_of::<u64>();
-const U32_LEN: usize = std::mem::size_of::<u32>();
+pub const BLOB_HASH_LEN: usize = 32;
+pub const U64_LEN: usize = std::mem::size_of::<u64>();
+pub const U32_LEN: usize = std::mem::size_of::<u32>();
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct BlobHash([u8; BLOB_HASH_LEN]);
@@ -158,6 +153,7 @@ pub const SUBSPACE_VALUES: u8 = b'v';
pub const SUBSPACE_LOGS: u8 = b'l';
pub const SUBSPACE_INDEXES: u8 = b'i';
pub const SUBSPACE_BLOBS: u8 = b'o';
+pub const SUBSPACE_BLOB_DATA: u8 = b't';
pub const SUBSPACE_ACLS: u8 = b'a';
pub const SUBSPACE_COUNTERS: u8 = b'c';
@@ -179,6 +175,13 @@ pub enum Store {
pub enum BlobStore {
Fs(Arc<FsStore>),
S3(Arc<S3Store>),
+ Sqlite(Arc<SqliteStore>),
+ FoundationDb(Arc<FdbStore>),
+}
+
+#[derive(Clone)]
+pub enum FtsStore {
+ Store(Store),
}
impl From<SqliteStore> for Store {
@@ -204,3 +207,9 @@ impl From<S3Store> for BlobStore {
Self::S3(Arc::new(store))
}
}
+
+impl From<Store> for FtsStore {
+ fn from(store: Store) -> Self {
+ Self::Store(store)
+ }
+}
diff --git a/crates/store/src/query/filter.rs b/crates/store/src/query/filter.rs
index 7dfa0043..6c356e93 100644
--- a/crates/store/src/query/filter.rs
+++ b/crates/store/src/query/filter.rs
@@ -24,7 +24,7 @@
use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign};
use ahash::HashSet;
-use nlp::tokenizers::space::SpaceTokenizer;
+use nlp::tokenizers::word::WordTokenizer;
use roaring::RoaringBitmap;
use crate::{backend::MAX_TOKEN_LENGTH, BitmapKey, Store};
@@ -32,8 +32,8 @@ use crate::{backend::MAX_TOKEN_LENGTH, BitmapKey, Store};
use super::{Filter, ResultSet};
struct State {
- op: Filter,
- bm: Option<RoaringBitmap>,
+ pub op: Filter,
+ pub bm: Option<RoaringBitmap>,
}
impl Store {
@@ -44,8 +44,6 @@ impl Store {
filters: Vec<Filter>,
) -> crate::Result<ResultSet> {
let collection = collection.into();
- let mut not_mask = RoaringBitmap::new();
- let mut not_fetch = false;
if filters.is_empty() {
return Ok(ResultSet {
account_id,
@@ -61,10 +59,13 @@ impl Store {
let mut stack = Vec::new();
let mut filters = filters.into_iter().peekable();
+ let mut not_mask = RoaringBitmap::new();
+ let mut not_fetch = false;
+
while let Some(filter) = filters.next() {
- let result = match filter {
+ let mut result = match filter {
Filter::MatchValue { field, op, value } => {
- self.range_to_bitmap(account_id, collection, field, value, op)
+ self.range_to_bitmap(account_id, collection, field, &value, op)
.await?
}
Filter::HasText {
@@ -74,7 +75,8 @@ impl Store {
} => {
if tokenize {
self.get_bitmaps_intersection(
- SpaceTokenizer::new(&text, MAX_TOKEN_LENGTH)
+ WordTokenizer::new(&text, MAX_TOKEN_LENGTH)
+ .map(|token| token.word.into_owned())
.collect::<HashSet<String>>()
.into_iter()
.map(|word| {
@@ -114,6 +116,7 @@ impl Store {
}
};
+ // Only fetch not mask if we need it
if matches!(state.op, Filter::Not) && !not_fetch {
not_mask = self
.get_bitmap(BitmapKey::document_ids(account_id, collection))
@@ -122,8 +125,41 @@ impl Store {
not_fetch = true;
}
- state.op.apply(&mut state.bm, result, &not_mask);
+ // Apply logical operation
+ if let Some(dest) = &mut state.bm {
+ match state.op {
+ Filter::And => {
+ if let Some(result) = result {
+ dest.bitand_assign(result);
+ } else {
+ dest.clear();
+ }
+ }
+ Filter::Or => {
+ if let Some(result) = result {
+ dest.bitor_assign(result);
+ }
+ }
+ Filter::Not => {
+ if let Some(mut result) = result {
+ result.bitxor_assign(&not_mask);
+ dest.bitand_assign(result);
+ }
+ }
+ _ => unreachable!(),
+ }
+ } else if let Some(ref mut result_) = result {
+ if let Filter::Not = state.op {
+ result_.bitxor_assign(&not_mask);
+ }
+ state.bm = result;
+ } else if let Filter::Not = state.op {
+ state.bm = Some(not_mask.clone());
+ } else {
+ state.bm = Some(RoaringBitmap::new());
+ }
+ // And short-circuit
if matches!(state.op, Filter::And) && state.bm.as_ref().unwrap().is_empty() {
while let Some(filter) = filters.peek() {
if matches!(filter, Filter::End) {
@@ -143,49 +179,6 @@ impl Store {
}
}
-impl Filter {
- #[inline(always)]
- pub fn apply(
- &self,
- dest: &mut Option<RoaringBitmap>,
- mut src: Option<RoaringBitmap>,
- not_mask: &RoaringBitmap,
- ) {
- if let Some(dest) = dest {
- match self {
- Filter::And => {
- if let Some(src) = src {
- dest.bitand_assign(src);
- } else {
- dest.clear();
- }
- }
- Filter::Or => {
- if let Some(src) = src {
- dest.bitor_assign(src);
- }
- }
- Filter::Not => {
- if let Some(mut src) = src {
- src.bitxor_assign(not_mask);
- dest.bitand_assign(src);
- }
- }
- _ => unreachable!(),
- }
- } else if let Some(ref mut src_) = src {
- if let Filter::Not = self {
- src_.bitxor_assign(not_mask);
- }
- *dest = src;
- } else if let Filter::Not = self {
- *dest = Some(not_mask.clone());
- } else {
- *dest = Some(RoaringBitmap::new());
- }
- }
-}
-
impl From<Filter> for State {
fn from(value: Filter) -> Self {
Self {
diff --git a/crates/store/src/query/log.rs b/crates/store/src/query/log.rs
index 20e8c896..750bc08c 100644
--- a/crates/store/src/query/log.rs
+++ b/crates/store/src/query/log.rs
@@ -130,12 +130,12 @@ impl Store {
let from_key = LogKey {
account_id,
collection,
- change_id: u64::MAX,
+ change_id: 0,
};
let to_key = LogKey {
account_id,
collection,
- change_id: 0,
+ change_id: u64::MAX,
};
let mut last_change_id = None;
diff --git a/crates/store/src/query/mod.rs b/crates/store/src/query/mod.rs
index d5a9814e..298e309d 100644
--- a/crates/store/src/query/mod.rs
+++ b/crates/store/src/query/mod.rs
@@ -29,7 +29,7 @@ pub mod sort;
use roaring::RoaringBitmap;
use crate::{
- write::{BitmapClass, TagValue},
+ write::{BitmapClass, BitmapHash, TagValue},
BitmapKey, IterateParams, Key, Serialize,
};
@@ -144,48 +144,6 @@ impl Filter {
}
}
- /*pub fn has_text_detect(
- field: impl Into<u8>,
- text: impl Into<String>,
- default_language: Language,
- ) -> Self {
- let (text, language) = Language::detect(text.into(), default_language);
- Self::has_text(field, text, language)
- }
-
- pub fn has_text(field: impl Into<u8>, text: impl Into<String>, language: Language) -> Self {
- let text = text.into();
- let op = if !matches!(language, Language::None) {
- if (text.starts_with('"') && text.ends_with('"'))
- || (text.starts_with('\'') && text.ends_with('\''))
- {
- TextMatch::Exact(language)
- } else {
- TextMatch::Stemmed(language)
- }
- } else {
- TextMatch::Tokenized
- };
-
- Filter::HasText {
- field: field.into(),
- text,
- op,
- }
- }
-
- pub fn has_raw_text(field: impl Into<u8>, text: impl Into<String>) -> Self {
- Filter::HasText {
- field: field.into(),
- text: text.into(),
- op: TextMatch::Raw,
- }
- }
-
- pub fn has_english_text(field: impl Into<u8>, text: impl Into<String>) -> Self {
- Self::has_text(field, text, Language::English)
- }*/
-
pub fn has_text(field: impl Into<u8>, text: impl Into<String>) -> Self {
Filter::HasText {
field: field.into(),
@@ -255,14 +213,14 @@ impl BitmapKey<BitmapClass> {
account_id: u32,
collection: impl Into<u8>,
field: impl Into<u8>,
- token: impl Into<Vec<u8>>,
+ token: impl AsRef<[u8]>,
) -> Self {
BitmapKey {
account_id,
collection: collection.into(),
class: BitmapClass::Text {
field: field.into(),
- token: token.into(),
+ token: BitmapHash::new(token),
},
block_num: 0,
}
@@ -317,20 +275,3 @@ impl<T: Key> IterateParams<T> {
self
}
}
-
-/*
-#[derive(Debug)]
-pub struct RawValue<T: Deserialize> {
- pub raw: Vec<u8>,
- pub inner: T,
-}
-
-impl<T: Deserialize> Deserialize for RawValue<T> {
- fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
- Ok(RawValue {
- inner: T::deserialize(bytes)?,
- raw: bytes.to_vec(),
- })
- }
-}
-*/
diff --git a/crates/store/src/write/batch.rs b/crates/store/src/write/batch.rs
index 0e0241c7..ecd4a11d 100644
--- a/crates/store/src/write/batch.rs
+++ b/crates/store/src/write/batch.rs
@@ -160,10 +160,10 @@ impl BatchBuilder {
self
}
- pub fn set(&mut self, class: impl Into<ValueClass>, value: Vec<u8>) -> &mut Self {
+ pub fn set(&mut self, class: impl Into<ValueClass>, value: impl Into<Vec<u8>>) -> &mut Self {
self.ops.push(Operation::Value {
class: class.into(),
- op: ValueOp::Set(value),
+ op: ValueOp::Set(value.into()),
});
self
}
diff --git a/crates/store/src/write/hash.rs b/crates/store/src/write/hash.rs
new file mode 100644
index 00000000..e1e13723
--- /dev/null
+++ b/crates/store/src/write/hash.rs
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of the Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use crate::backend::MAX_TOKEN_LENGTH;
+
+use super::{BitmapClass, BitmapHash};
+
+impl BitmapClass {
+ pub fn word(token: impl AsRef<[u8]>, field: impl Into<u8>) -> Self {
+ BitmapClass::Text {
+ field: field.into(),
+ token: BitmapHash::new(token),
+ }
+ }
+
+ pub fn stemmed(token: impl AsRef<[u8]>, field: impl Into<u8>) -> Self {
+ BitmapClass::Text {
+ field: field.into() | 1 << 6,
+ token: BitmapHash::new(token),
+ }
+ }
+
+ pub fn bigram(token: impl AsRef<[u8]>, field: impl Into<u8>) -> Self {
+ BitmapClass::Text {
+ field: field.into() | 1 << 7,
+ token: BitmapHash::new(token),
+ }
+ }
+}
+
+impl BitmapHash {
+ pub fn new(item: impl AsRef<[u8]>) -> Self {
+ Self {
+ len: std::cmp::min(item.as_ref().len(), MAX_TOKEN_LENGTH) as u8,
+ hash: hash(item),
+ }
+ }
+
+ pub fn to_u64(&self) -> u64 {
+ u64::from_be_bytes(self.hash)
+ }
+}
+
+fn hash(item: impl AsRef<[u8]>) -> [u8; 8] {
+ let item = item.as_ref();
+ let mut result = [0u8; 8];
+
+ if item.len() <= 8 {
+ result[..item.len()].copy_from_slice(item);
+ } else {
+ result[..4].copy_from_slice(&xxhash_rust::xxh3::xxh3_64(item).to_le_bytes()[..4]);
+ result[4..8].copy_from_slice(&farmhash::hash64(item).to_le_bytes()[..4]);
+ }
+
+ result
+}
+
+#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
+pub struct TokenType {}
+
+impl TokenType {
+ pub fn word(field: u8) -> u8 {
+ field
+ }
+
+ pub fn stemmed(field: u8) -> u8 {
+ 1 << 6 | field
+ }
+
+ pub fn bigram(field: u8) -> u8 {
+ 1 << 7 | field
+ }
+}
+
+/*
+
+const AHASHER: ahash::RandomState = ahash::RandomState::with_seeds(
+ 0xaf1f2242106c64b3,
+ 0x60ca4cfb4b3ed0ce,
+ 0xc7dbc0bb615e82b3,
+ 0x520ad065378daf88,
+);
+lazy_static::lazy_static! {
+ static ref SIPHASHER: siphasher::sip::SipHasher13 =
+ siphasher::sip::SipHasher13::new_with_keys(0x56205cbdba8f02a6, 0xbd0dbc4bb06d687b);
+}
+
+ let h1 = xxhash_rust::xxh3::xxh3_64(item).to_le_bytes();
+ let h2 = farmhash::hash64(item).to_le_bytes();
+ let h3 = AHASHER.hash_one(item).to_le_bytes();
+ let mut sh = *SIPHASHER;
+ sh.write(item.as_ref());
+ let h4 = sh.finish().to_le_bytes();
+
+ result[..2].copy_from_slice(&h1[..2]);
+ result[2..4].copy_from_slice(&h2[..2]);
+ result[4..6].copy_from_slice(&h3[..2]);
+ result[6..8].copy_from_slice(&h4[..2]);
+
+impl KeySerializer {
+ pub fn hash_text(mut self, item: impl AsRef<[u8]>) -> Self {
+ let item = item.as_ref();
+
+ if item.len() <= 8 {
+ self.buf.extend_from_slice(item);
+ } else {
+ let h1 = xxhash_rust::xxh3::xxh3_64(item).to_le_bytes();
+ let h2 = farmhash::hash64(item).to_le_bytes();
+ let h3 = AHASHER.hash_one(item).to_le_bytes();
+ let mut sh = *SIPHASHER;
+ sh.write(item.as_ref());
+ let h4 = sh.finish().to_le_bytes();
+
+ match item.len() {
+ 9..=16 => {
+ self.buf.extend_from_slice(&h1[..2]);
+ self.buf.extend_from_slice(&h2[..2]);
+ self.buf.extend_from_slice(&h3[..2]);
+ self.buf.extend_from_slice(&h4[..2]);
+ }
+ 17..=32 => {
+ self.buf.extend_from_slice(&h1[..3]);
+ self.buf.extend_from_slice(&h2[..3]);
+ self.buf.extend_from_slice(&h3[..3]);
+ self.buf.extend_from_slice(&h4[..3]);
+ }
+ _ => {
+ self.buf.extend_from_slice(&h1[..4]);
+ self.buf.extend_from_slice(&h2[..4]);
+ self.buf.extend_from_slice(&h3[..4]);
+ self.buf.extend_from_slice(&h4[..4]);
+ }
+ }
+ }
+ self
+ }
+}
+*/
diff --git a/crates/store/src/write/key.rs b/crates/store/src/write/key.rs
index 31eb1b41..e4309ac5 100644
--- a/crates/store/src/write/key.rs
+++ b/crates/store/src/write/key.rs
@@ -21,19 +21,19 @@
* for more details.
*/
-use std::{convert::TryInto, hash::Hasher};
+use std::convert::TryInto;
use utils::codec::leb128::Leb128_;
use crate::{
- backend::MAX_TOKEN_MASK, BitmapKey, BlobHash, BlobKey, IndexKey, IndexKeyPrefix, Key, LogKey,
- ValueKey, BLOB_HASH_LEN, SUBSPACE_ACLS, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS,
- SUBSPACE_VALUES, U32_LEN, U64_LEN,
+ BitmapKey, BlobHash, BlobKey, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, BLOB_HASH_LEN,
+ SUBSPACE_ACLS, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN,
+ U64_LEN,
};
use super::{BitmapClass, BlobOp, TagValue, ValueClass};
pub struct KeySerializer {
- buf: Vec<u8>,
+ pub buf: Vec<u8>,
}
pub trait KeySerialize {
@@ -241,6 +241,15 @@ impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> {
}
.write(u32::MAX)
.write(name.as_slice()),
+ ValueClass::TermIndex => if include_subspace {
+ KeySerializer::new(U32_LEN * 2 + 3).write(crate::SUBSPACE_VALUES)
+ } else {
+ KeySerializer::new(U32_LEN * 2 + 2)
+ }
+ .write(self.account_id)
+ .write(self.collection)
+ .write_leb128(self.document_id)
+ .write(u8::MAX),
}
.finalize()
}
@@ -277,35 +286,64 @@ impl<T: AsRef<BitmapClass> + Sync + Send> Key for BitmapKey<T> {
fn serialize(&self, include_subspace: bool) -> Vec<u8> {
const BM_DOCUMENT_IDS: u8 = 0;
- const BM_TAG: u8 = 1 << 5;
- const BM_TEXT: u8 = 1 << 6;
+ const BM_TAG: u8 = 1 << 6;
+ const BM_TEXT: u8 = 1 << 7;
const TAG_ID: u8 = 0;
const TAG_TEXT: u8 = 1 << 0;
const TAG_STATIC: u8 = 1 << 1;
- let ks = if include_subspace {
- KeySerializer::new(self.len() + 1).write(crate::SUBSPACE_BITMAPS)
- } else {
- KeySerializer::new(self.len())
- }
- .write(self.account_id)
- .write(self.collection);
-
match self.class.as_ref() {
- BitmapClass::DocumentIds => ks.write(BM_DOCUMENT_IDS),
+ BitmapClass::DocumentIds => if include_subspace {
+ KeySerializer::new(U32_LEN + 3).write(SUBSPACE_BITMAPS)
+ } else {
+ KeySerializer::new(U32_LEN + 2)
+ }
+ .write(self.account_id)
+ .write(self.collection)
+ .write(BM_DOCUMENT_IDS),
BitmapClass::Tag { field, value } => match value {
- TagValue::Id(id) => ks.write(BM_TAG | TAG_ID).write(*field).write_leb128(*id),
- TagValue::Text(text) => ks
- .write(BM_TAG | TAG_TEXT)
- .write(*field)
- .write(text.as_slice()),
- TagValue::Static(id) => ks.write(BM_TAG | TAG_STATIC).write(*field).write(*id),
- },
- BitmapClass::Text { field, token } => ks
- .write(BM_TEXT | (token.len() & MAX_TOKEN_MASK) as u8)
+ TagValue::Id(id) => if include_subspace {
+ KeySerializer::new((U32_LEN * 2) + 4).write(SUBSPACE_BITMAPS)
+ } else {
+ KeySerializer::new((U32_LEN * 2) + 3)
+ }
+ .write(self.account_id)
+ .write(self.collection)
+ .write(BM_TAG | TAG_ID)
+ .write(*field)
+ .write_leb128(*id),
+ TagValue::Text(text) => if include_subspace {
+ KeySerializer::new(U32_LEN + 4 + text.len()).write(SUBSPACE_BITMAPS)
+ } else {
+ KeySerializer::new(U32_LEN + 3 + text.len())
+ }
+ .write(self.account_id)
+ .write(self.collection)
+ .write(BM_TAG | TAG_TEXT)
.write(*field)
- .hash_text(token),
+ .write(text.as_slice()),
+ TagValue::Static(id) => if include_subspace {
+ KeySerializer::new(U32_LEN + 5).write(SUBSPACE_BITMAPS)
+ } else {
+ KeySerializer::new(U32_LEN + 4)
+ }
+ .write(self.account_id)
+ .write(self.collection)
+ .write(BM_TAG | TAG_STATIC)
+ .write(*field)
+ .write(*id),
+ },
+ BitmapClass::Text { field, token } => if include_subspace {
+ KeySerializer::new(U32_LEN + 16 + 3 + 1).write(SUBSPACE_BITMAPS)
+ } else {
+ KeySerializer::new(U32_LEN + 16 + 3)
+ }
+ .write(self.account_id)
+ .write(self.collection)
+ .write(BM_TEXT | token.len)
+ .write(*field)
+ .write(token.hash.as_slice()),
}
.write(self.block_num)
.finalize()
@@ -349,81 +387,3 @@ impl<T: AsRef<BlobHash> + Sync + Send> Key for BlobKey<T> {
crate::SUBSPACE_BLOBS
}
}
-
-const AHASHER: ahash::RandomState = ahash::RandomState::with_seeds(
- 0xaf1f2242106c64b3,
- 0x60ca4cfb4b3ed0ce,
- 0xc7dbc0bb615e82b3,
- 0x520ad065378daf88,
-);
-lazy_static::lazy_static! {
- static ref SIPHASHER: siphasher::sip::SipHasher13 =
- siphasher::sip::SipHasher13::new_with_keys(0x56205cbdba8f02a6, 0xbd0dbc4bb06d687b);
-}
-
-impl KeySerializer {
- fn hash_text(mut self, item: impl AsRef<[u8]>) -> Self {
- let item = item.as_ref();
-
- if item.len() <= 8 {
- self.buf.extend_from_slice(item);
- } else {
- let h1 = xxhash_rust::xxh3::xxh3_64(item).to_le_bytes();
- let h2 = farmhash::hash64(item).to_le_bytes();
- let h3 = AHASHER.hash_one(item).to_le_bytes();
- let mut sh = *SIPHASHER;
- sh.write(item.as_ref());
- let h4 = sh.finish().to_le_bytes();
-
- match item.len() {
- 9..=16 => {
- self.buf.extend_from_slice(&h1[..2]);
- self.buf.extend_from_slice(&h2[..2]);
- self.buf.extend_from_slice(&h3[..2]);
- self.buf.extend_from_slice(&h4[..2]);
- }
- 17..=32 => {
- self.buf.extend_from_slice(&h1[..3]);
- self.buf.extend_from_slice(&h2[..3]);
- self.buf.extend_from_slice(&h3[..3]);
- self.buf.extend_from_slice(&h4[..3]);
- }
- _ => {
- self.buf.extend_from_slice(&h1[..4]);
- self.buf.extend_from_slice(&h2[..4]);
- self.buf.extend_from_slice(&h3[..4]);
- self.buf.extend_from_slice(&h4[..4]);
- }
- }
- }
- self
- }
-}
-
-impl<T: AsRef<BitmapClass>> BitmapKey<T> {
- #[allow(clippy::len_without_is_empty)]
- pub fn len(&self) -> usize {
- std::mem::size_of::<BitmapKey<BitmapClass>>()
- + match self.class.as_ref() {
- BitmapClass::DocumentIds => 0,
- BitmapClass::Tag { value, .. } => match value {
- TagValue::Id(_) => U32_LEN,
- TagValue::Text(v) => v.len(),
- TagValue::Static(_) => 1,
- },
- BitmapClass::Text { token, .. } => token.len(),
- }
- }
-}
-
-impl<T: AsRef<ValueClass>> ValueKey<T> {
- #[allow(clippy::len_without_is_empty)]
- pub fn len(&self) -> usize {
- std::mem::size_of::<ValueKey<ValueClass>>()
- + match self.class.as_ref() {
- ValueClass::Property(_) => 1,
- ValueClass::Acl(_) => U32_LEN,
- ValueClass::Named(v) => v.len(),
- }
- }
-}
diff --git a/crates/store/src/write/mod.rs b/crates/store/src/write/mod.rs
index 82f73221..7d21b4b1 100644
--- a/crates/store/src/write/mod.rs
+++ b/crates/store/src/write/mod.rs
@@ -23,7 +23,7 @@
use std::{collections::HashSet, hash::Hash, slice::Iter, time::SystemTime};
-use nlp::tokenizers::space::SpaceTokenizer;
+use nlp::tokenizers::word::WordTokenizer;
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
use crate::{
@@ -35,6 +35,7 @@ use self::assert::AssertValue;
pub mod assert;
pub mod batch;
pub mod blob;
+pub mod hash;
pub mod key;
pub mod log;
@@ -92,14 +93,20 @@ pub enum Operation {
},
}
-#[derive(Debug, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum BitmapClass {
DocumentIds,
Tag { field: u8, value: TagValue },
- Text { field: u8, token: Vec<u8> },
+ Text { field: u8, token: BitmapHash },
}
-#[derive(Debug, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct BitmapHash {
+ pub hash: [u8; 8],
+ pub len: u8,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum TagValue {
Id(u32),
Text(Vec<u8>),
@@ -111,6 +118,7 @@ pub enum ValueClass {
Property(u8),
Acl(u32),
Named(Vec<u8>),
+ TermIndex,
}
#[derive(Debug, PartialEq, Eq, Hash, Default)]
@@ -352,7 +360,7 @@ impl ToBitmaps for &str {
ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
- token: token.into_bytes(),
+ token: BitmapHash::new(token),
},
set,
});
@@ -362,8 +370,8 @@ impl ToBitmaps for &str {
impl TokenizeText for &str {
fn tokenize_into(&self, tokens: &mut HashSet<String>) {
- for token in SpaceTokenizer::new(self, MAX_TOKEN_LENGTH) {
- tokens.insert(token);
+ for token in WordTokenizer::new(self, MAX_TOKEN_LENGTH) {
+ tokens.insert(token.word.into_owned());
}
}
@@ -479,6 +487,10 @@ impl BlobHash {
pub fn try_from_hash_slice(value: &[u8]) -> Result<BlobHash, std::array::TryFromSliceError> {
value.try_into().map(BlobHash)
}
+
+ pub fn as_slice(&self) -> &[u8] {
+ self.0.as_ref()
+ }
}
impl From<&[u8]> for BlobHash {
@@ -523,6 +535,12 @@ impl AsRef<BlobClass> for BlobClass {
}
}
+impl From<BlobHash> for Vec<u8> {
+ fn from(value: BlobHash) -> Self {
+ value.0.to_vec()
+ }
+}
+
impl BlobClass {
pub fn account_id(&self) -> u32 {
match self {
diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml
index 75070c85..840e839a 100644
--- a/crates/utils/Cargo.toml
+++ b/crates/utils/Cargo.toml
@@ -23,6 +23,7 @@ opentelemetry-semantic-conventions = { version = "0.12.0" }
dashmap = "5.4"
ahash = { version = "0.8" }
chrono = "0.4"
+rand = "0.8.5"
[target.'cfg(unix)'.dependencies]
privdrop = "0.5.3"
diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs
index 871af2a1..7a99997d 100644
--- a/crates/utils/src/lib.rs
+++ b/crates/utils/src/lib.rs
@@ -30,6 +30,7 @@ pub mod config;
pub mod ipc;
pub mod listener;
pub mod map;
+pub mod snowflake;
pub mod suffixlist;
use opentelemetry::{
diff --git a/crates/utils/src/snowflake.rs b/crates/utils/src/snowflake.rs
new file mode 100644
index 00000000..9be79fb9
--- /dev/null
+++ b/crates/utils/src/snowflake.rs
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2023 Stalwart Labs Ltd.
+ *
+ * This file is part of Stalwart Mail Server.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ * in the LICENSE file at the top-level directory of this distribution.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * You can be released from the requirements of the AGPLv3 license by
+ * purchasing a commercial license. Please contact licensing@stalw.art
+ * for more details.
+*/
+
+use std::{
+ sync::atomic::{AtomicU64, Ordering},
+ time::{Duration, SystemTime},
+};
+
+pub struct SnowflakeIdGenerator {
+ epoch: SystemTime,
+ node_id: u64,
+ sequence: AtomicU64,
+}
+
+const SEQUENCE_LEN: u64 = 12;
+const NODE_ID_LEN: u64 = 9;
+
+const SEQUENCE_MASK: u64 = (1 << SEQUENCE_LEN) - 1;
+const NODE_ID_MASK: u64 = (1 << NODE_ID_LEN) - 1;
+
+impl SnowflakeIdGenerator {
+ pub fn new() -> Self {
+ Self::with_node_id(rand::random::<u64>())
+ }
+
+ pub fn with_node_id(node_id: u64) -> Self {
+ Self {
+ epoch: SystemTime::UNIX_EPOCH + Duration::from_secs(1632280000), // 52 years after UNIX_EPOCH
+ node_id,
+ sequence: 0.into(),
+ }
+ }
+
+ pub fn generate(&self) -> Option<u64> {
+ let elapsed = self.epoch.elapsed().ok()?.as_millis() as u64;
+ let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
+
+ (elapsed << (SEQUENCE_LEN + NODE_ID_LEN)
+ | (self.node_id & NODE_ID_MASK) << SEQUENCE_LEN
+ | (sequence & SEQUENCE_MASK))
+ .into()
+ }
+}
+
+impl Default for SnowflakeIdGenerator {
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/tests/src/imap/append.rs b/tests/src/imap/append.rs
index bb856aef..ede7bed8 100644
--- a/tests/src/imap/append.rs
+++ b/tests/src/imap/append.rs
@@ -25,9 +25,11 @@ use std::{fs, io};
use imap_proto::ResponseType;
-use super::{resources_dir, AssertResult, ImapConnection, Type};
+use crate::jmap::wait_for_index;
-pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) {
+use super::{resources_dir, AssertResult, IMAPTest, ImapConnection, Type};
+
+pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection, handle: &IMAPTest) {
// Invalid APPEND commands
imap.send("APPEND \"All Mail\" {1+}\r\na").await;
imap.assert_read(Type::Tagged, ResponseType::No)
@@ -80,6 +82,8 @@ pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) {
assert_eq!(code.next(), Some(expected_uid.to_string().as_str()));
expected_uid += 1;
}
+
+ wait_for_index(&handle.jmap).await;
}
pub async fn assert_append_message(
diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs
index d3989889..55cc21d2 100644
--- a/tests/src/imap/mod.rs
+++ b/tests/src/imap/mod.rs
@@ -225,7 +225,7 @@ refresh-token-renew = "2s"
"#;
#[allow(dead_code)]
-struct IMAPTest {
+pub struct IMAPTest {
jmap: Arc<JMAP>,
imap: Arc<IMAP>,
temp_dir: TempDir,
@@ -331,7 +331,7 @@ async fn init_imap_tests(delete_if_exists: bool) -> IMAPTest {
pub async fn imap_tests() {
/*tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
- .with_max_level(tracing::Level::TRACE)
+ .with_max_level(tracing::Level::DEBUG)
.finish(),
)
.unwrap();*/
@@ -364,10 +364,10 @@ pub async fn imap_tests() {
}
mailbox::test(&mut imap, &mut imap_check).await;
- append::test(&mut imap, &mut imap_check).await;
+ append::test(&mut imap, &mut imap_check, &handle).await;
search::test(&mut imap, &mut imap_check).await;
fetch::test(&mut imap, &mut imap_check).await;
- store::test(&mut imap, &mut imap_check).await;
+ store::test(&mut imap, &mut imap_check, &handle).await;
copy_move::test(&mut imap, &mut imap_check).await;
thread::test(&mut imap, &mut imap_check).await;
idle::test(&mut imap, &mut imap_check).await;
diff --git a/tests/src/imap/store.rs b/tests/src/imap/store.rs
index 851f2d56..c4c4993f 100644
--- a/tests/src/imap/store.rs
+++ b/tests/src/imap/store.rs
@@ -23,9 +23,11 @@
use imap_proto::ResponseType;
-use super::{AssertResult, ImapConnection, Type};
+use crate::jmap::wait_for_index;
-pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) {
+use super::{AssertResult, IMAPTest, ImapConnection, Type};
+
+pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection, handle: &IMAPTest) {
// Select INBOX
imap.send("SELECT INBOX").await;
imap.assert_read(Type::Tagged, ResponseType::Ok)
@@ -73,6 +75,7 @@ pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) {
.assert_contains("UIDNEXT 11");
// Store using saved searches
+ wait_for_index(&handle.jmap).await;
imap.send("SEARCH RETURN (SAVE) FROM nathaniel").await;
imap.assert_read(Type::Tagged, ResponseType::Ok).await;
imap.send("UID STORE $ +FLAGS (\\Answered)").await;
diff --git a/tests/src/jmap/auth_acl.rs b/tests/src/jmap/auth_acl.rs
index e9598beb..1acbc2b6 100644
--- a/tests/src/jmap/auth_acl.rs
+++ b/tests/src/jmap/auth_acl.rs
@@ -45,7 +45,7 @@ use crate::{
directory::sql::{
add_to_group, create_test_group_with_email, create_test_user_with_email, remove_from_group,
},
- jmap::{mailbox::destroy_all_mailboxes, test_account_login},
+ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, test_account_login},
};
pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
@@ -777,10 +777,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
admin_client.set_default_account_id(&id.to_string());
destroy_all_mailboxes(admin_client).await;
}
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
pub fn assert_forbidden<T: Debug>(result: Result<T, jmap_client::Error>) {
diff --git a/tests/src/jmap/auth_limits.rs b/tests/src/jmap/auth_limits.rs
index 8bede987..0d1ca20f 100644
--- a/tests/src/jmap/auth_limits.rs
+++ b/tests/src/jmap/auth_limits.rs
@@ -33,7 +33,7 @@ use jmap_proto::types::id::Id;
use crate::{
directory::sql::{create_test_user_with_email, link_test_address},
- jmap::mailbox::destroy_all_mailboxes,
+ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes},
};
pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
@@ -202,8 +202,5 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
// Destroy test accounts
admin_client.set_default_account_id(&account_id);
destroy_all_mailboxes(admin_client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
diff --git a/tests/src/jmap/auth_oauth.rs b/tests/src/jmap/auth_oauth.rs
index 2353bbf0..ea36702d 100644
--- a/tests/src/jmap/auth_oauth.rs
+++ b/tests/src/jmap/auth_oauth.rs
@@ -40,7 +40,10 @@ use reqwest::{header, redirect::Policy};
use serde::de::DeserializeOwned;
use store::ahash::AHashMap;
-use crate::{directory::sql::create_test_user_with_email, jmap::mailbox::destroy_all_mailboxes};
+use crate::{
+ directory::sql::create_test_user_with_email,
+ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes},
+};
pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
println!("Running OAuth tests...");
@@ -307,10 +310,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
// Destroy test accounts
admin_client.set_default_account_id(john_id);
destroy_all_mailboxes(admin_client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
async fn post_bytes(url: &str, params: &AHashMap<String, String>) -> Bytes {
diff --git a/tests/src/jmap/blob.rs b/tests/src/jmap/blob.rs
index d6693893..a40bb937 100644
--- a/tests/src/jmap/blob.rs
+++ b/tests/src/jmap/blob.rs
@@ -30,7 +30,7 @@ use serde_json::Value;
use crate::{
directory::sql::create_test_user_with_email,
- jmap::{jmap_json_request, mailbox::destroy_all_mailboxes},
+ jmap::{assert_is_empty, jmap_json_request, mailbox::destroy_all_mailboxes},
};
pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
@@ -489,8 +489,5 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
// Remove test data
admin_client.set_default_account_id(account_id.to_string());
destroy_all_mailboxes(admin_client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
diff --git a/tests/src/jmap/delivery.rs b/tests/src/jmap/delivery.rs
index 9ec1e8a6..33e0873d 100644
--- a/tests/src/jmap/delivery.rs
+++ b/tests/src/jmap/delivery.rs
@@ -34,7 +34,7 @@ use tokio::{
use crate::{
directory::sql::{create_test_user_with_email, link_test_address, remove_test_alias},
- jmap::mailbox::destroy_all_mailboxes,
+ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes},
};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
@@ -248,10 +248,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
client.set_default_account_id(account_id);
destroy_all_mailboxes(client).await;
}
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
pub struct SmtpConnection {
diff --git a/tests/src/jmap/email_changes.rs b/tests/src/jmap/email_changes.rs
index d7947b90..1fd6d943 100644
--- a/tests/src/jmap/email_changes.rs
+++ b/tests/src/jmap/email_changes.rs
@@ -34,6 +34,8 @@ use store::{
write::{log::ChangeLogBuilder, BatchBuilder},
};
+use crate::jmap::assert_is_empty;
+
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Changes tests...");
@@ -315,10 +317,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
assert_eq!(created, vec![2, 3, 11, 12]);
assert_eq!(changes.updated(), Vec::<String>::new());
assert_eq!(changes.destroyed(), Vec::<String>::new());
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
#[derive(Debug, Clone, Copy)]
diff --git a/tests/src/jmap/email_copy.rs b/tests/src/jmap/email_copy.rs
index 585a5a8d..6eb3beaa 100644
--- a/tests/src/jmap/email_copy.rs
+++ b/tests/src/jmap/email_copy.rs
@@ -27,7 +27,7 @@ use jmap::JMAP;
use jmap_client::{client::Client, mailbox::Role};
use jmap_proto::types::id::Id;
-use crate::jmap::mailbox::destroy_all_mailboxes;
+use crate::jmap::{assert_is_empty, mailbox::destroy_all_mailboxes};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Copy tests...");
@@ -116,8 +116,5 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
destroy_all_mailboxes(client).await;
client.set_default_account_id(Id::new(2).to_string());
destroy_all_mailboxes(client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
diff --git a/tests/src/jmap/email_get.rs b/tests/src/jmap/email_get.rs
index 366c9034..2902f949 100644
--- a/tests/src/jmap/email_get.rs
+++ b/tests/src/jmap/email_get.rs
@@ -31,7 +31,7 @@ use jmap_client::{
use jmap_proto::types::id::Id;
use mail_parser::HeaderName;
-use crate::jmap::{mailbox::destroy_all_mailboxes, replace_blob_ids};
+use crate::jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, replace_blob_ids};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Get tests...");
@@ -177,11 +177,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
}
destroy_all_mailboxes(client).await;
-
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
pub fn all_headers() -> Vec<email::Property> {
diff --git a/tests/src/jmap/email_parse.rs b/tests/src/jmap/email_parse.rs
index aacdaa2e..4d85e15b 100644
--- a/tests/src/jmap/email_parse.rs
+++ b/tests/src/jmap/email_parse.rs
@@ -31,7 +31,9 @@ use jmap_client::{
};
use jmap_proto::types::id::Id;
-use crate::jmap::{email_get::all_headers, mailbox::destroy_all_mailboxes, replace_blob_ids};
+use crate::jmap::{
+ assert_is_empty, email_get::all_headers, mailbox::destroy_all_mailboxes, replace_blob_ids,
+};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Email Parse tests...");
@@ -243,9 +245,5 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
}
destroy_all_mailboxes(client).await;
-
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
diff --git a/tests/src/jmap/email_query.rs b/tests/src/jmap/email_query.rs
index a993030e..e69e76f5 100644
--- a/tests/src/jmap/email_query.rs
+++ b/tests/src/jmap/email_query.rs
@@ -24,7 +24,7 @@
use std::{collections::hash_map::Entry, sync::Arc, time::Instant};
use crate::{
- jmap::mailbox::destroy_all_mailboxes,
+ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, wait_for_index},
store::{deflate_artwork_data, query::FIELDS},
};
use jmap::JMAP;
@@ -94,6 +94,9 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client, insert: bool) {
"thread {} found",
MAX_THREADS
);
+
+ // Wait for indexing to complete
+ wait_for_index(&server).await;
}
println!("Running JMAP Mail query tests...");
@@ -115,10 +118,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client, insert: bool) {
.unwrap();
destroy_all_mailboxes(client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
pub async fn query(client: &mut Client) {
diff --git a/tests/src/jmap/email_query_changes.rs b/tests/src/jmap/email_query_changes.rs
index a77acf21..54612a00 100644
--- a/tests/src/jmap/email_query_changes.rs
+++ b/tests/src/jmap/email_query_changes.rs
@@ -37,6 +37,7 @@ use store::{
};
use crate::jmap::{
+ assert_is_empty,
email_changes::{LogAction, ParseState},
mailbox::destroy_all_mailboxes,
};
@@ -287,10 +288,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
}
server.store.write(batch.build_batch()).await.unwrap();
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
#[derive(Debug, Clone)]
diff --git a/tests/src/jmap/email_search_snippet.rs b/tests/src/jmap/email_search_snippet.rs
index 91baacfd..e32e823a 100644
--- a/tests/src/jmap/email_search_snippet.rs
+++ b/tests/src/jmap/email_search_snippet.rs
@@ -23,7 +23,7 @@
use std::{fs, path::PathBuf, sync::Arc};
-use crate::jmap::mailbox::destroy_all_mailboxes;
+use crate::jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, wait_for_index};
use jmap::{mailbox::INBOX_ID, JMAP};
use jmap_client::{client::Client, core::query, email::query::Filter};
use jmap_proto::types::id::Id;
@@ -64,6 +64,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
.take_id();
email_ids.insert(email_name, email_id);
}
+ wait_for_index(&server).await;
// Run tests
for (filter, email_name, snippet_subject, snippet_preview) in [
@@ -179,8 +180,5 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
// Destroy test data
destroy_all_mailboxes(client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
diff --git a/tests/src/jmap/email_set.rs b/tests/src/jmap/email_set.rs
index 7cf4a078..fceb47eb 100644
--- a/tests/src/jmap/email_set.rs
+++ b/tests/src/jmap/email_set.rs
@@ -23,7 +23,7 @@
use std::{fs, path::PathBuf, sync::Arc};
-use crate::jmap::mailbox::destroy_all_mailboxes;
+use crate::jmap::{assert_is_empty, mailbox::destroy_all_mailboxes};
use jmap::{mailbox::INBOX_ID, JMAP};
use jmap_client::{
client::Client,
@@ -46,11 +46,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
update(client, &mailbox_id).await;
destroy_all_mailboxes(client).await;
-
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
async fn create(client: &mut Client, mailbox_id: &str) {
diff --git a/tests/src/jmap/email_submission.rs b/tests/src/jmap/email_submission.rs
index d38e8814..8915b1e0 100644
--- a/tests/src/jmap/email_submission.rs
+++ b/tests/src/jmap/email_submission.rs
@@ -46,7 +46,7 @@ use tokio::{
use crate::{
directory::sql::create_test_user_with_email,
- jmap::{email_set::assert_email_properties, mailbox::destroy_all_mailboxes},
+ jmap::{assert_is_empty, email_set::assert_email_properties, mailbox::destroy_all_mailboxes},
};
#[derive(Default, Debug, PartialEq, Eq)]
@@ -471,10 +471,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
client.email_submission_destroy(&id).await.unwrap();
}
destroy_all_mailboxes(client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
pub fn spawn_mock_smtp_server() -> (mpsc::Receiver<MockMessage>, Arc<Mutex<MockSMTPSettings>>) {
diff --git a/tests/src/jmap/event_source.rs b/tests/src/jmap/event_source.rs
index 2edbadf5..ace050c7 100644
--- a/tests/src/jmap/event_source.rs
+++ b/tests/src/jmap/event_source.rs
@@ -25,7 +25,10 @@ use std::{sync::Arc, time::Duration};
use crate::{
directory::sql::create_test_user_with_email,
- jmap::{delivery::SmtpConnection, mailbox::destroy_all_mailboxes, test_account_login},
+ jmap::{
+ assert_is_empty, delivery::SmtpConnection, mailbox::destroy_all_mailboxes,
+ test_account_login,
+ },
};
use futures::StreamExt;
use jmap::{mailbox::INBOX_ID, JMAP};
@@ -130,10 +133,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
assert_ping(&mut event_rx).await;
destroy_all_mailboxes(admin_client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
async fn assert_state(
diff --git a/tests/src/jmap/mailbox.rs b/tests/src/jmap/mailbox.rs
index c6479a6f..37501bd4 100644
--- a/tests/src/jmap/mailbox.rs
+++ b/tests/src/jmap/mailbox.rs
@@ -37,6 +37,8 @@ use jmap_proto::types::{id::Id, state::State};
use serde::{Deserialize, Serialize};
use store::ahash::AHashMap;
+use crate::jmap::assert_is_empty;
+
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running Mailbox tests...");
@@ -606,10 +608,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
destroy_all_mailboxes(client).await;
client.set_default_account_id(Id::from(1u64));
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
async fn create_test_mailboxes(client: &mut Client) -> AHashMap<String, String> {
diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs
index fc695c39..73064c16 100644
--- a/tests/src/jmap/mod.rs
+++ b/tests/src/jmap/mod.rs
@@ -25,7 +25,11 @@ use std::{sync::Arc, time::Duration};
use base64::{engine::general_purpose, Engine};
use directory::config::ConfigDirectory;
-use jmap::{api::JmapSessionManager, services::IPC_CHANNEL_BUFFER, JMAP};
+use jmap::{
+ api::JmapSessionManager,
+ services::{housekeeper::Event, IPC_CHANNEL_BUFFER},
+ JMAP,
+};
use jmap_client::client::{Client, Credentials};
use jmap_proto::types::id::Id;
use reqwest::header;
@@ -222,17 +226,23 @@ refresh-token-renew = "2s"
#[tokio::test]
pub async fn jmap_tests() {
- let coco = 1;
+ /*let level = "warn";
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
- .with_max_level(tracing::Level::WARN)
+ .with_env_filter(
+ tracing_subscriber::EnvFilter::builder()
+ .parse(
+ format!("smtp={level},imap={level},jmap={level},store={level},utils={level},directory={level}"),
+ )
+ .unwrap(),
+ )
.finish(),
)
- .unwrap();
+ .unwrap();*/
let delete = true;
let mut params = init_jmap_tests(delete).await;
- /*email_query::test(params.server.clone(), &mut params.client, delete).await;
+ email_query::test(params.server.clone(), &mut params.client, delete).await;
email_get::test(params.server.clone(), &mut params.client).await;
email_set::test(params.server.clone(), &mut params.client).await;
email_parse::test(params.server.clone(), &mut params.client).await;
@@ -254,7 +264,7 @@ pub async fn jmap_tests() {
email_submission::test(params.server.clone(), &mut params.client).await;
websocket::test(params.server.clone(), &mut params.client).await;
quota::test(params.server.clone(), &mut params.client).await;
- crypto::test(params.server.clone(), &mut params.client).await;*/
+ crypto::test(params.server.clone(), &mut params.client).await;
blob::test(params.server.clone(), &mut params.client).await;
if delete {
@@ -285,6 +295,33 @@ struct JMAPTest {
shutdown_tx: watch::Sender<bool>,
}
+pub async fn wait_for_index(server: &JMAP) {
+ loop {
+ let (tx, rx) = tokio::sync::oneshot::channel();
+ server
+ .housekeeper_tx
+ .send(Event::IndexIsActive(tx))
+ .await
+ .unwrap();
+ if rx.await.unwrap() {
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ } else {
+ break;
+ }
+ }
+}
+
+pub async fn assert_is_empty(server: Arc<JMAP>) {
+ // Wait for pending FTS index tasks
+ wait_for_index(&server).await;
+
+ // Assert is empty
+ server
+ .store
+ .assert_is_empty(server.blob_store.clone())
+ .await;
+}
+
async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
// Load and parse config
let temp_dir = TempDir::new("jmap_tests", delete_if_exists);
diff --git a/tests/src/jmap/push_subscription.rs b/tests/src/jmap/push_subscription.rs
index 3fd6c450..04260786 100644
--- a/tests/src/jmap/push_subscription.rs
+++ b/tests/src/jmap/push_subscription.rs
@@ -53,7 +53,7 @@ use utils::listener::SessionData;
use crate::{
add_test_certs,
directory::sql::create_test_user_with_email,
- jmap::{mailbox::destroy_all_mailboxes, test_account_login},
+ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, test_account_login},
};
const SERVER: &str = "
@@ -218,11 +218,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
expect_nothing(&mut event_rx).await;
destroy_all_mailboxes(admin_client).await;
-
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
#[derive(Clone)]
diff --git a/tests/src/jmap/quota.rs b/tests/src/jmap/quota.rs
index a70910c6..6de8bd75 100644
--- a/tests/src/jmap/quota.rs
+++ b/tests/src/jmap/quota.rs
@@ -26,8 +26,8 @@ use std::sync::Arc;
use crate::{
directory::sql::{add_to_group, create_test_user_with_email, set_test_quota},
jmap::{
- delivery::SmtpConnection, jmap_raw_request, mailbox::destroy_all_mailboxes,
- test_account_login,
+ assert_is_empty, delivery::SmtpConnection, jmap_raw_request,
+ mailbox::destroy_all_mailboxes, test_account_login,
},
};
use jmap::{blob::upload::DISABLE_UPLOAD_QUOTA, mailbox::INBOX_ID, JMAP};
@@ -320,10 +320,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
admin_client.set_default_account_id(account_id.to_string());
destroy_all_mailboxes(admin_client).await;
}
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
fn assert_over_quota<T: std::fmt::Debug>(result: Result<T, jmap_client::Error>) {
diff --git a/tests/src/jmap/sieve_script.rs b/tests/src/jmap/sieve_script.rs
index 1f466a25..faa56943 100644
--- a/tests/src/jmap/sieve_script.rs
+++ b/tests/src/jmap/sieve_script.rs
@@ -40,6 +40,7 @@ use std::{
use crate::{
directory::sql::create_test_user_with_email,
jmap::{
+ assert_is_empty,
delivery::SmtpConnection,
email_submission::{assert_message_delivery, spawn_mock_smtp_server, MockMessage},
mailbox::destroy_all_mailboxes,
@@ -486,10 +487,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
client.sieve_script_destroy(&id).await.unwrap();
}
destroy_all_mailboxes(client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
fn get_script(name: &str) -> Vec<u8> {
diff --git a/tests/src/jmap/stress_test.rs b/tests/src/jmap/stress_test.rs
index 0fbbdc01..ac1dffdf 100644
--- a/tests/src/jmap/stress_test.rs
+++ b/tests/src/jmap/stress_test.rs
@@ -34,6 +34,8 @@ use jmap_client::{
use jmap_proto::types::{collection::Collection, id::Id, property::Property};
use store::rand::{self, Rng};
+use super::assert_is_empty;
+
const TEST_USER_ID: u32 = 1;
const NUM_PASSES: usize = 1;
@@ -254,11 +256,7 @@ async fn email_tests(server: Arc<JMAP>, client: Arc<Client>) {
}
destroy_all_mailboxes(&client).await;
-
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server.clone()).await;
}
}
@@ -331,10 +329,7 @@ async fn mailbox_tests(server: Arc<JMAP>, client: Arc<Client>) {
join_all(futures).await;
destroy_all_mailboxes(&client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
async fn create_mailbox(client: &Client, mailbox: &str) -> Vec<String> {
diff --git a/tests/src/jmap/thread_get.rs b/tests/src/jmap/thread_get.rs
index 68a6e202..c5397a05 100644
--- a/tests/src/jmap/thread_get.rs
+++ b/tests/src/jmap/thread_get.rs
@@ -23,7 +23,7 @@
use std::sync::Arc;
-use crate::jmap::mailbox::destroy_all_mailboxes;
+use crate::jmap::{assert_is_empty, mailbox::destroy_all_mailboxes};
use jmap::JMAP;
use jmap_client::{client::Client, mailbox::Role};
use jmap_proto::types::id::Id;
@@ -66,8 +66,5 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
);
destroy_all_mailboxes(client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
diff --git a/tests/src/jmap/thread_merge.rs b/tests/src/jmap/thread_merge.rs
index b31b0b27..b867dcc2 100644
--- a/tests/src/jmap/thread_merge.rs
+++ b/tests/src/jmap/thread_merge.rs
@@ -23,7 +23,7 @@
use std::sync::Arc;
-use crate::jmap::mailbox::destroy_all_mailboxes;
+use crate::jmap::{assert_is_empty, mailbox::destroy_all_mailboxes};
use jmap::JMAP;
use jmap_client::{client::Client, email, mailbox::Role};
use jmap_proto::types::id::Id;
@@ -203,10 +203,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
}
}
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
fn build_message(message: usize, in_reply_to: Option<usize>, thread_num: usize) -> String {
diff --git a/tests/src/jmap/vacation_response.rs b/tests/src/jmap/vacation_response.rs
index d9801e63..c9947074 100644
--- a/tests/src/jmap/vacation_response.rs
+++ b/tests/src/jmap/vacation_response.rs
@@ -30,6 +30,7 @@ use std::{sync::Arc, time::Instant};
use crate::{
directory::sql::create_test_user_with_email,
jmap::{
+ assert_is_empty,
delivery::SmtpConnection,
email_submission::{
assert_message_delivery, expect_nothing, spawn_mock_smtp_server, MockMessage,
@@ -173,8 +174,5 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
// Remove test data
client.vacation_response_destroy().await.unwrap();
destroy_all_mailboxes(client).await;
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
diff --git a/tests/src/jmap/websocket.rs b/tests/src/jmap/websocket.rs
index e908afcd..599f7f9c 100644
--- a/tests/src/jmap/websocket.rs
+++ b/tests/src/jmap/websocket.rs
@@ -40,7 +40,7 @@ use tokio::sync::mpsc;
use crate::{
directory::sql::create_test_user_with_email,
- jmap::{mailbox::destroy_all_mailboxes, test_account_login},
+ jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, test_account_login},
};
pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
@@ -125,11 +125,7 @@ pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
admin_client.set_default_account_id(account_id);
destroy_all_mailboxes(admin_client).await;
-
- server
- .store
- .assert_is_empty(server.blob_store.clone())
- .await;
+ assert_is_empty(server).await;
}
async fn expect_response(
diff --git a/tests/src/store/assign_id.rs b/tests/src/store/assign_id.rs
index 15382a17..7cb36b1e 100644
--- a/tests/src/store/assign_id.rs
+++ b/tests/src/store/assign_id.rs
@@ -35,37 +35,12 @@ pub async fn test(db: Store) {
test_1(db.clone()).await;
test_2(db.clone()).await;
- test_3(db.clone()).await;
- test_4(db).await;
+ test_3(db).await;
ID_ASSIGNMENT_EXPIRY.store(60 * 60, std::sync::atomic::Ordering::Relaxed);
}
async fn test_1(db: Store) {
- // Test change id assignment
- let mut handles = Vec::new();
- let mut expected_ids = HashSet::new();
-
- // Create 100 change ids concurrently
- for id in 0..100 {
- handles.push({
- let db = db.clone();
- tokio::spawn(async move { db.assign_change_id(0).await })
- });
- expected_ids.insert(id);
- }
-
- for handle in handles {
- let assigned_id = handle.await.unwrap().unwrap();
- assert!(
- expected_ids.remove(&assigned_id),
- "already assigned or invalid: {assigned_id} "
- );
- }
- db.destroy().await;
-}
-
-async fn test_2(db: Store) {
// Test document id assignment
for wait_for_expiry in [true, false] {
let mut handles = Vec::new();
@@ -101,7 +76,7 @@ async fn test_2(db: Store) {
db.destroy().await;
}
-async fn test_3(db: Store) {
+async fn test_2(db: Store) {
// Create document ids and try reassigning
let mut expected_ids = AHashSet::new();
let mut batch = BatchBuilder::new();
@@ -132,7 +107,7 @@ async fn test_3(db: Store) {
db.destroy().await;
}
-async fn test_4(db: Store) {
+async fn test_3(db: Store) {
// Try reassigning deleted ids
let mut expected_ids = AHashSet::new();
let mut batch = BatchBuilder::new();
diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs
index bcf10172..23fc146b 100644
--- a/tests/src/store/mod.rs
+++ b/tests/src/store/mod.rs
@@ -25,7 +25,7 @@ pub mod assign_id;
pub mod blob;
pub mod query;
-use std::{io::Read, sync::Arc};
+use std::io::Read;
use ::store::Store;
@@ -56,8 +56,8 @@ pub async fn store_tests() {
if insert {
db.destroy().await;
}
- assign_id::test(db.clone()).await;
- query::test(db, insert).await;
+ query::test(db.clone(), insert).await;
+ assign_id::test(db).await;
temp_dir.delete();
}
diff --git a/tests/src/store/query.rs b/tests/src/store/query.rs
index 0dd352da..37df0f7a 100644
--- a/tests/src/store/query.rs
+++ b/tests/src/store/query.rs
@@ -22,13 +22,20 @@
*/
use std::{
+ fmt::Display,
sync::{Arc, Mutex},
time::Instant,
};
use jmap_proto::types::keyword::Keyword;
use nlp::language::Language;
-use store::{ahash::AHashMap, query::sort::Pagination, write::ValueClass};
+use store::{
+ ahash::AHashMap,
+ fts::{index::FtsDocument, Field, FtsFilter},
+ query::sort::Pagination,
+ write::ValueClass,
+ FtsStore,
+};
use store::{
query::{Comparator, Filter},
@@ -93,9 +100,34 @@ const FIELDS_OPTIONS: [FieldType; 20] = [
FieldType::Text, // "url",
];
+#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
+pub struct FieldId(u8);
+
+impl From<FieldId> for u8 {
+ fn from(field_id: FieldId) -> Self {
+ field_id.0
+ }
+}
+impl Display for FieldId {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{} ({})", FIELDS[self.0 as usize], self.0)
+ }
+}
+
+impl FieldId {
+ pub fn new(field_id: u8) -> Field<FieldId> {
+ Field::Header(Self(field_id))
+ }
+
+ pub fn inner(&self) -> u8 {
+ self.0
+ }
+}
+
#[allow(clippy::mutex_atomic)]
pub async fn test(db: Store, do_insert: bool) {
println!("Running Store query tests...");
+ let fts_store = FtsStore::from(db.clone());
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)
@@ -116,7 +148,10 @@ pub async fn test(db: Store, do_insert: bool) {
let documents = documents.clone();
s.spawn_fifo(move |_| {
- /*let mut fts_builder = FtsIndexBuilder::with_default_language(Language::English);
+ let mut fts_builder = FtsDocument::with_default_language(Language::English)
+ .with_account_id(0)
+ .with_collection(COLLECTION_ID)
+ .with_document_id(document_id as u32);
let mut builder = BatchBuilder::new();
builder
.with_account_id(0)
@@ -137,7 +172,7 @@ pub async fn test(db: Store, do_insert: bool) {
FieldType::FullText => {
if !field.is_empty() {
fts_builder.index(
- field_id,
+ FieldId::new(field_id),
field.to_lowercase(),
Language::English,
);
@@ -165,8 +200,10 @@ pub async fn test(db: Store, do_insert: bool) {
}
}
- builder.custom(fts_builder);
- documents.lock().unwrap().push(builder.build());*/
+ documents
+ .lock()
+ .unwrap()
+ .push((builder.build(), fts_builder));
});
}
});
@@ -180,22 +217,31 @@ pub async fn test(db: Store, do_insert: bool) {
let now = Instant::now();
let batches = documents.lock().unwrap().drain(..).collect::<Vec<_>>();
let mut chunk = Vec::new();
+ let mut fts_chunk = Vec::new();
- for batch in batches {
+ for (batch, fts_batch) in batches {
let chunk_instance = Instant::now();
chunk.push({
let db = db.clone();
tokio::spawn(async move { db.write(batch).await })
});
+ fts_chunk.push({
+ let fts_store = fts_store.clone();
+ tokio::spawn(async move { fts_store.index(fts_batch).await })
+ });
if chunk.len() == 1000 {
for handle in chunk {
handle.await.unwrap().unwrap();
}
+ for handle in fts_chunk {
+ handle.await.unwrap().unwrap();
+ }
println!(
- "Chunk insert took {} ms.",
+ "Store insert took {} ms.",
chunk_instance.elapsed().as_millis()
);
chunk = Vec::new();
+ fts_chunk = Vec::new();
}
}
@@ -209,156 +255,232 @@ pub async fn test(db: Store, do_insert: bool) {
}
println!("Running filter tests...");
- test_filter(db.clone()).await;
+ test_filter(db.clone(), fts_store).await;
println!("Running sort tests...");
test_sort(db).await;
}
-pub async fn test_filter(db: Store) {
- /*
- let mut fields = AHashMap::default();
- for (field_num, field) in FIELDS.iter().enumerate() {
- fields.insert(field.to_string(), field_num as u8);
- }
+pub async fn test_filter(db: Store, fts: FtsStore) {
+ let mut fields = AHashMap::default();
+ let mut fields_u8 = AHashMap::default();
+ for (field_num, field) in FIELDS.iter().enumerate() {
+ fields.insert(field.to_string(), FieldId::new(field_num as u8));
+ fields_u8.insert(field.to_string(), field_num as u8);
+ }
- let tests = [
- (
- vec![
- Filter::has_english_text(fields["title"], "water"),
- Filter::eq(fields["year"], 1979u32),
- ],
- vec!["p11293"],
- ),
- (
- vec![
- Filter::has_english_text(fields["medium"], "gelatin"),
- Filter::gt(fields["year"], 2000u32),
- Filter::lt(fields["width"], 180u32),
- Filter::gt(fields["width"], 0u32),
- ],
- vec!["p79426", "p79427", "p79428", "p79429", "p79430"],
- ),
- (
- vec![Filter::has_english_text(fields["title"], "'rustic bridge'")],
- vec!["d05503"],
- ),
- (
- vec![
- Filter::has_english_text(fields["title"], "'rustic'"),
- Filter::has_english_text(fields["title"], "study"),
- ],
- vec!["d00399", "d05352"],
- ),
- (
- vec![
- Filter::has_text(fields["artist"], "mauro kunst", Language::None),
- Filter::is_in_bitmap(fields["artistRole"], Keyword::Other("artist".to_string())),
- Filter::Or,
- Filter::eq(fields["year"], 1969u32),
- Filter::eq(fields["year"], 1971u32),
- Filter::End,
- ],
- vec!["p01764", "t05843"],
- ),
- (
- vec![
- Filter::Not,
- Filter::has_english_text(fields["medium"], "oil"),
- Filter::End,
- Filter::has_english_text(fields["creditLine"], "bequeath"),
- Filter::Or,
- Filter::And,
- Filter::ge(fields["year"], 1900u32),
- Filter::lt(fields["year"], 1910u32),
- Filter::End,
- Filter::And,
- Filter::ge(fields["year"], 2000u32),
- Filter::lt(fields["year"], 2010u32),
- Filter::End,
- Filter::End,
- ],
- vec![
- "n02478", "n02479", "n03568", "n03658", "n04327", "n04328", "n04721", "n04739",
- "n05095", "n05096", "n05145", "n05157", "n05158", "n05159", "n05298", "n05303",
- "n06070", "t01181", "t03571", "t05805", "t05806", "t12147", "t12154", "t12155",
- ],
- ),
- (
- vec![
- Filter::And,
- Filter::has_text(fields["artist"], "warhol", Language::None),
- Filter::Not,
- Filter::has_english_text(fields["title"], "'campbell'"),
- Filter::End,
- Filter::Not,
- Filter::Or,
- Filter::gt(fields["year"], 1980u32),
- Filter::And,
- Filter::gt(fields["width"], 500u32),
- Filter::gt(fields["height"], 500u32),
- Filter::End,
- Filter::End,
- Filter::End,
- Filter::eq(fields["acquisitionYear"], 2008u32),
- Filter::End,
- ],
- vec!["ar00039", "t12600"],
- ),
- (
- vec![
- Filter::has_english_text(fields["title"], "study"),
- Filter::has_english_text(fields["medium"], "paper"),
- Filter::has_english_text(fields["creditLine"], "'purchased'"),
- Filter::Not,
- Filter::has_english_text(fields["title"], "'anatomical'"),
- Filter::has_english_text(fields["title"], "'for'"),
- Filter::End,
- Filter::gt(fields["year"], 1900u32),
- Filter::gt(fields["acquisitionYear"], 2000u32),
- ],
- vec![
- "p80042", "p80043", "p80044", "p80045", "p80203", "t11937", "t12172",
- ],
- ),
- ];
-
- for (filter, expected_results) in tests {
- //println!("Running test: {:?}", filter);
- let docset = db.filter(0, COLLECTION_ID, filter).await.unwrap();
- let sorted_docset = db
- .sort(
- docset,
- vec![Comparator::ascending(fields["accession_number"])],
- Pagination::new(0, 0, None, 0),
+ let tests = [
+ (
+ vec![
+ Filter::is_in_set(
+ fts.query(
+ 0,
+ COLLECTION_ID,
+ vec![FtsFilter::has_english_text(
+ fields["title"].clone(),
+ "water",
+ )],
+ )
+ .await
+ .unwrap(),
+ ),
+ Filter::eq(fields_u8["year"], 1979u32),
+ ],
+ vec!["p11293"],
+ ),
+ (
+ vec![
+ Filter::is_in_set(
+ fts.query(
+ 0,
+ COLLECTION_ID,
+ vec![FtsFilter::has_english_text(
+ fields["medium"].clone(),
+ "gelatin",
+ )],
+ )
+ .await
+ .unwrap(),
+ ),
+ Filter::gt(fields_u8["year"], 2000u32),
+ Filter::lt(fields_u8["width"], 180u32),
+ Filter::gt(fields_u8["width"], 0u32),
+ ],
+ vec!["p79426", "p79427", "p79428", "p79429", "p79430"],
+ ),
+ (
+ vec![Filter::is_in_set(
+ fts.query(
+ 0,
+ COLLECTION_ID,
+ vec![FtsFilter::has_english_text(
+ fields["title"].clone(),
+ "'rustic bridge'",
+ )],
)
.await
- .unwrap();
-
- assert_eq!(
- db.get_values::<String>(
- sorted_docset
- .ids
- .into_iter()
- .map(|document_id| ValueKey {
- account_id: 0,
- collection: COLLECTION_ID,
- document_id: document_id as u32,
- family: 0,
- field: fields["accession_number"],
- })
- .collect()
+ .unwrap(),
+ )],
+ vec!["d05503"],
+ ),
+ (
+ vec![Filter::is_in_set(
+ fts.query(
+ 0,
+ COLLECTION_ID,
+ vec![
+ FtsFilter::has_english_text(fields["title"].clone(), "'rustic'"),
+ FtsFilter::has_english_text(fields["title"].clone(), "study"),
+ ],
)
.await
- .unwrap()
- .into_iter()
- .flatten()
- .collect::<Vec<_>>(),
- expected_results
- );
- }
+ .unwrap(),
+ )],
+ vec!["d00399", "d05352"],
+ ),
+ (
+ vec![
+ Filter::has_text(fields_u8["artist"], "mauro kunst"),
+ Filter::is_in_bitmap(
+ fields_u8["artistRole"],
+ Keyword::Other("artist".to_string()),
+ ),
+ Filter::Or,
+ Filter::eq(fields_u8["year"], 1969u32),
+ Filter::eq(fields_u8["year"], 1971u32),
+ Filter::End,
+ ],
+ vec!["p01764", "t05843"],
+ ),
+ (
+ vec![
+ Filter::is_in_set(
+ fts.query(
+ 0,
+ COLLECTION_ID,
+ vec![
+ FtsFilter::Not,
+ FtsFilter::has_english_text(fields["medium"].clone(), "oil"),
+ FtsFilter::End,
+ FtsFilter::has_english_text(fields["creditLine"].clone(), "bequeath"),
+ ],
+ )
+ .await
+ .unwrap(),
+ ),
+ Filter::Or,
+ Filter::And,
+ Filter::ge(fields_u8["year"], 1900u32),
+ Filter::lt(fields_u8["year"], 1910u32),
+ Filter::End,
+ Filter::And,
+ Filter::ge(fields_u8["year"], 2000u32),
+ Filter::lt(fields_u8["year"], 2010u32),
+ Filter::End,
+ Filter::End,
+ ],
+ vec![
+ "n02478", "n02479", "n03568", "n03658", "n04327", "n04328", "n04721", "n04739",
+ "n05095", "n05096", "n05145", "n05157", "n05158", "n05159", "n05298", "n05303",
+ "n06070", "t01181", "t03571", "t05805", "t05806", "t12147", "t12154", "t12155",
+ ],
+ ),
+ (
+ vec![
+ Filter::And,
+ Filter::has_text(fields_u8["artist"], "warhol"),
+ Filter::Not,
+ Filter::is_in_set(
+ fts.query(
+ 0,
+ COLLECTION_ID,
+ vec![FtsFilter::has_english_text(
+ fields["title"].clone(),
+ "'campbell'",
+ )],
+ )
+ .await
+ .unwrap(),
+ ),
+ Filter::End,
+ Filter::Not,
+ Filter::Or,
+ Filter::gt(fields_u8["year"], 1980u32),
+ Filter::And,
+ Filter::gt(fields_u8["width"], 500u32),
+ Filter::gt(fields_u8["height"], 500u32),
+ Filter::End,
+ Filter::End,
+ Filter::End,
+ Filter::eq(fields_u8["acquisitionYear"], 2008u32),
+ Filter::End,
+ ],
+ vec!["ar00039", "t12600"],
+ ),
+ (
+ vec![
+ Filter::is_in_set(
+ fts.query(
+ 0,
+ COLLECTION_ID,
+ vec![
+ FtsFilter::has_english_text(fields["title"].clone(), "study"),
+ FtsFilter::has_english_text(fields["medium"].clone(), "paper"),
+ FtsFilter::has_english_text(
+ fields["creditLine"].clone(),
+ "'purchased'",
+ ),
+ FtsFilter::Not,
+ FtsFilter::has_english_text(fields["title"].clone(), "'anatomical'"),
+ FtsFilter::has_english_text(fields["title"].clone(), "'for'"),
+ FtsFilter::End,
+ ],
+ )
+ .await
+ .unwrap(),
+ ),
+ Filter::gt(fields_u8["year"], 1900u32),
+ Filter::gt(fields_u8["acquisitionYear"], 2000u32),
+ ],
+ vec![
+ "p80042", "p80043", "p80044", "p80045", "p80203", "t11937", "t12172",
+ ],
+ ),
+ ];
- */
+ for (filter, expected_results) in tests {
+ //println!("Running test: {:?}", filter);
+ let docset = db.filter(0, COLLECTION_ID, filter).await.unwrap();
+ let sorted_docset = db
+ .sort(
+ docset,
+ vec![Comparator::ascending(fields_u8["accession_number"])],
+ Pagination::new(0, 0, None, 0),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ db.get_values::<String>(
+ sorted_docset
+ .ids
+ .into_iter()
+ .map(|document_id| ValueKey {
+ account_id: 0,
+ collection: COLLECTION_ID,
+ document_id: document_id as u32,
+ class: ValueClass::Property(fields_u8["accession_number"])
+ })
+ .collect()
+ )
+ .await
+ .unwrap()
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>(),
+ expected_results
+ );
+ }
}
pub async fn test_sort(db: Store) {