diff options
Diffstat (limited to 'crates/jmap/src/email/ingest.rs')
-rw-r--r-- | crates/jmap/src/email/ingest.rs | 116 |
1 files changed, 61 insertions, 55 deletions
diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs index 611b3706..71583eef 100644 --- a/crates/jmap/src/email/ingest.rs +++ b/crates/jmap/src/email/ingest.rs @@ -39,10 +39,11 @@ use store::{ ahash::AHashSet, query::Filter, write::{ - log::ChangeLogBuilder, now, BatchBuilder, BitmapClass, TagValue, ValueClass, F_BITMAP, - F_CLEAR, F_VALUE, + log::{ChangeLogBuilder, Changes, LogInsert}, + now, AssignedIds, BatchBuilder, BitmapClass, MaybeDynamicId, MaybeDynamicValue, + SerializeWithId, TagValue, ValueClass, F_BITMAP, F_CLEAR, F_VALUE, }, - BitmapKey, BlobClass, + BitmapKey, BlobClass, Serialize, }; use utils::map::vec_map::VecMap; @@ -262,20 +263,6 @@ impl JMAP { } // Obtain a documentId and changeId - let document_id = self - .core - .storage - .data - .assign_document_id(params.account_id, Collection::Email) - .await - .map_err(|err| { - tracing::error!( - event = "error", - context = "email_ingest", - error = ?err, - "Failed to assign documentId."); - IngestError::Temporary - })?; let change_id = self .assign_change_id(params.account_id) .await @@ -322,44 +309,26 @@ impl JMAP { // Prepare batch let mut batch = BatchBuilder::new(); - batch.with_account_id(params.account_id); - - // Build change log - let mut changes = ChangeLogBuilder::with_change_id(change_id); - let thread_id = if let Some(thread_id) = thread_id { - changes.log_child_update(Collection::Thread, thread_id); - thread_id + batch + .with_change_id(change_id) + .with_account_id(params.account_id) + .with_collection(Collection::Thread); + if let Some(thread_id) = thread_id { + batch.log(Changes::update([thread_id])); } else { - let thread_id = self - .core - .storage - .data - .assign_document_id(params.account_id, Collection::Thread) - .await - .map_err(|err| { - tracing::error!( - event = "error", - context = "email_ingest", - error = ?err, - "Failed to assign documentId for new thread."); - IngestError::Temporary - })?; - batch - .with_collection(Collection::Thread) - .create_document(thread_id); - changes.log_insert(Collection::Thread, thread_id); - thread_id - }; - let id = Id::from_parts(thread_id, document_id); - changes.log_insert(Collection::Email, id); - for mailbox_id in ¶ms.mailbox_ids { - changes.log_child_update(Collection::Mailbox, *mailbox_id); + batch.create_document().log(LogInsert()); } // Build write batch + let maybe_thread_id = thread_id + .map(MaybeDynamicId::Static) + .unwrap_or(MaybeDynamicId::Dynamic(0)); batch + .with_collection(Collection::Mailbox) + .log(Changes::child_update(params.mailbox_ids.iter().copied())) .with_collection(Collection::Email) - .create_document(document_id) + .create_document() + .log(LogEmailInsert(thread_id)) .index_message( message, blob_id.hash.clone(), @@ -368,16 +337,19 @@ impl JMAP { params.received_at.unwrap_or_else(now), ) .value(Property::Cid, change_id, F_VALUE) - .value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP) - .custom(changes) + .set(Property::ThreadId, maybe_thread_id) + .tag(Property::ThreadId, TagValue::Id(maybe_thread_id), 0) .set( ValueClass::IndexEmail( self.generate_snowflake_id() .map_err(|_| IngestError::Temporary)?, ), - blob_id.hash.clone(), + blob_id.hash.as_ref(), ); - self.core + + // Insert and obtain ids + let ids = self + .core .storage .data .write(batch.build()) @@ -390,6 +362,14 @@ impl JMAP { "Failed to write message to database."); IngestError::Temporary })?; + let thread_id = match thread_id { + Some(thread_id) => thread_id, + None => ids + .first_document_id() + .map_err(|_| IngestError::Temporary)?, + }; + let document_id = ids.last_document_id().map_err(|_| IngestError::Temporary)?; + let id = Id::from_parts(thread_id, document_id); // Request FTS index let _ = self.inner.housekeeper_tx.send(Event::IndexStart).await; @@ -545,7 +525,7 @@ impl JMAP { field: Property::ThreadId.into(), value: TagValue::Id(old_thread_id), }, - block_num: 0, + document_id: 0, }) .await .map_err(|err| { @@ -605,7 +585,33 @@ impl JMAP { .data .write(batch.build()) .await - .map(|v| v.expect("UID next") as u32) + .and_then(|v| v.last_counter_id().map(|id| id as u32)) + } +} + +pub struct LogEmailInsert(Option<u32>); + +impl LogEmailInsert { + pub fn new(thread_id: Option<u32>) -> Self { + Self(thread_id) + } +} + +impl SerializeWithId for LogEmailInsert { + fn serialize_with_id(&self, ids: &AssignedIds) -> store::Result<Vec<u8>> { + let thread_id = match self.0 { + Some(thread_id) => thread_id, + None => ids.first_document_id()?, + }; + let document_id = ids.last_document_id()?; + + Ok(Changes::insert([Id::from_parts(thread_id, document_id)]).serialize()) + } +} + +impl From<LogEmailInsert> for MaybeDynamicValue { + fn from(log: LogEmailInsert) -> Self { + MaybeDynamicValue::Dynamic(Box::new(log)) } } |