summaryrefslogtreecommitdiff
path: root/crates/jmap/src/email/ingest.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/jmap/src/email/ingest.rs')
-rw-r--r--crates/jmap/src/email/ingest.rs116
1 files changed, 61 insertions, 55 deletions
diff --git a/crates/jmap/src/email/ingest.rs b/crates/jmap/src/email/ingest.rs
index 611b3706..71583eef 100644
--- a/crates/jmap/src/email/ingest.rs
+++ b/crates/jmap/src/email/ingest.rs
@@ -39,10 +39,11 @@ use store::{
ahash::AHashSet,
query::Filter,
write::{
- log::ChangeLogBuilder, now, BatchBuilder, BitmapClass, TagValue, ValueClass, F_BITMAP,
- F_CLEAR, F_VALUE,
+ log::{ChangeLogBuilder, Changes, LogInsert},
+ now, AssignedIds, BatchBuilder, BitmapClass, MaybeDynamicId, MaybeDynamicValue,
+ SerializeWithId, TagValue, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
- BitmapKey, BlobClass,
+ BitmapKey, BlobClass, Serialize,
};
use utils::map::vec_map::VecMap;
@@ -262,20 +263,6 @@ impl JMAP {
}
// Obtain a documentId and changeId
- let document_id = self
- .core
- .storage
- .data
- .assign_document_id(params.account_id, Collection::Email)
- .await
- .map_err(|err| {
- tracing::error!(
- event = "error",
- context = "email_ingest",
- error = ?err,
- "Failed to assign documentId.");
- IngestError::Temporary
- })?;
let change_id = self
.assign_change_id(params.account_id)
.await
@@ -322,44 +309,26 @@ impl JMAP {
// Prepare batch
let mut batch = BatchBuilder::new();
- batch.with_account_id(params.account_id);
-
- // Build change log
- let mut changes = ChangeLogBuilder::with_change_id(change_id);
- let thread_id = if let Some(thread_id) = thread_id {
- changes.log_child_update(Collection::Thread, thread_id);
- thread_id
+ batch
+ .with_change_id(change_id)
+ .with_account_id(params.account_id)
+ .with_collection(Collection::Thread);
+ if let Some(thread_id) = thread_id {
+ batch.log(Changes::update([thread_id]));
} else {
- let thread_id = self
- .core
- .storage
- .data
- .assign_document_id(params.account_id, Collection::Thread)
- .await
- .map_err(|err| {
- tracing::error!(
- event = "error",
- context = "email_ingest",
- error = ?err,
- "Failed to assign documentId for new thread.");
- IngestError::Temporary
- })?;
- batch
- .with_collection(Collection::Thread)
- .create_document(thread_id);
- changes.log_insert(Collection::Thread, thread_id);
- thread_id
- };
- let id = Id::from_parts(thread_id, document_id);
- changes.log_insert(Collection::Email, id);
- for mailbox_id in &params.mailbox_ids {
- changes.log_child_update(Collection::Mailbox, *mailbox_id);
+ batch.create_document().log(LogInsert());
}
// Build write batch
+ let maybe_thread_id = thread_id
+ .map(MaybeDynamicId::Static)
+ .unwrap_or(MaybeDynamicId::Dynamic(0));
batch
+ .with_collection(Collection::Mailbox)
+ .log(Changes::child_update(params.mailbox_ids.iter().copied()))
.with_collection(Collection::Email)
- .create_document(document_id)
+ .create_document()
+ .log(LogEmailInsert(thread_id))
.index_message(
message,
blob_id.hash.clone(),
@@ -368,16 +337,19 @@ impl JMAP {
params.received_at.unwrap_or_else(now),
)
.value(Property::Cid, change_id, F_VALUE)
- .value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP)
- .custom(changes)
+ .set(Property::ThreadId, maybe_thread_id)
+ .tag(Property::ThreadId, TagValue::Id(maybe_thread_id), 0)
.set(
ValueClass::IndexEmail(
self.generate_snowflake_id()
.map_err(|_| IngestError::Temporary)?,
),
- blob_id.hash.clone(),
+ blob_id.hash.as_ref(),
);
- self.core
+
+ // Insert and obtain ids
+ let ids = self
+ .core
.storage
.data
.write(batch.build())
@@ -390,6 +362,14 @@ impl JMAP {
"Failed to write message to database.");
IngestError::Temporary
})?;
+ let thread_id = match thread_id {
+ Some(thread_id) => thread_id,
+ None => ids
+ .first_document_id()
+ .map_err(|_| IngestError::Temporary)?,
+ };
+ let document_id = ids.last_document_id().map_err(|_| IngestError::Temporary)?;
+ let id = Id::from_parts(thread_id, document_id);
// Request FTS index
let _ = self.inner.housekeeper_tx.send(Event::IndexStart).await;
@@ -545,7 +525,7 @@ impl JMAP {
field: Property::ThreadId.into(),
value: TagValue::Id(old_thread_id),
},
- block_num: 0,
+ document_id: 0,
})
.await
.map_err(|err| {
@@ -605,7 +585,33 @@ impl JMAP {
.data
.write(batch.build())
.await
- .map(|v| v.expect("UID next") as u32)
+ .and_then(|v| v.last_counter_id().map(|id| id as u32))
+ }
+}
+
+pub struct LogEmailInsert(Option<u32>);
+
+impl LogEmailInsert {
+ pub fn new(thread_id: Option<u32>) -> Self {
+ Self(thread_id)
+ }
+}
+
+impl SerializeWithId for LogEmailInsert {
+ fn serialize_with_id(&self, ids: &AssignedIds) -> store::Result<Vec<u8>> {
+ let thread_id = match self.0 {
+ Some(thread_id) => thread_id,
+ None => ids.first_document_id()?,
+ };
+ let document_id = ids.last_document_id()?;
+
+ Ok(Changes::insert([Id::from_parts(thread_id, document_id)]).serialize())
+ }
+}
+
+impl From<LogEmailInsert> for MaybeDynamicValue {
+ fn from(log: LogEmailInsert) -> Self {
+ MaybeDynamicValue::Dynamic(Box::new(log))
}
}