summaryrefslogtreecommitdiff
path: root/crates/smtp
diff options
context:
space:
mode:
authormdecimus <mauro@stalw.art>2024-05-09 09:09:12 +0200
committermdecimus <mauro@stalw.art>2024-05-09 09:09:12 +0200
commit7a25689288cd7d1565f1976962716565066d22a4 (patch)
tree5bace378b704fbdb88bdd6d43b26741ef6c202ec /crates/smtp
parente10083651b4bf58d4a78c49f285efac9c5bad4e2 (diff)
Use permanent links for blobs in the SMTP queue
Diffstat (limited to 'crates/smtp')
-rw-r--r--crates/smtp/src/queue/spool.rs47
-rw-r--r--crates/smtp/src/reporting/scheduler.rs10
2 files changed, 23 insertions, 34 deletions
diff --git a/crates/smtp/src/queue/spool.rs b/crates/smtp/src/queue/spool.rs
index 39ce8121..6167b5de 100644
--- a/crates/smtp/src/queue/spool.rs
+++ b/crates/smtp/src/queue/spool.rs
@@ -36,8 +36,6 @@ use super::{
};
pub const LOCK_EXPIRY: u64 = 300;
-pub const BLOB_EXPIRY: u64 = 3600;
-pub const SPOOL_ACCOUNT_ID: u32 = u32::MAX - 1;
#[derive(Debug)]
pub struct QueueEventLock {
@@ -152,17 +150,12 @@ impl SMTP {
event = "locked",
id = event.queue_id,
due = event.due,
- "Failed to lock event: Event already locked."
+ "Lock busy: Event already locked."
);
None
}
Err(err) => {
- tracing::error!(
- context = "queue",
- event = "error",
- "Failed to lock event: {}",
- err
- );
+ tracing::error!(context = "queue", event = "error", "Lock error: {}", err);
None
}
}
@@ -219,10 +212,11 @@ impl Message {
// Reserve and write blob
let mut batch = BatchBuilder::new();
- batch.with_account_id(SPOOL_ACCOUNT_ID).set(
+ let reserve_until = now() + 120;
+ batch.set(
BlobOp::Reserve {
hash: self.blob_hash.clone(),
- until: self.next_delivery_event() + BLOB_EXPIRY,
+ until: reserve_until,
},
0u32.serialize(),
);
@@ -293,6 +287,17 @@ impl Message {
})),
0u64.serialize(),
)
+ .clear(BlobOp::Reserve {
+ hash: self.blob_hash.clone(),
+ until: reserve_until,
+ })
+ .set(
+ BlobOp::LinkId {
+ hash: self.blob_hash.clone(),
+ id: self.id,
+ },
+ vec![],
+ )
.set(
BlobOp::Commit {
hash: self.blob_hash.clone(),
@@ -411,14 +416,6 @@ impl Message {
);
}
- batch.with_account_id(SPOOL_ACCOUNT_ID).set(
- BlobOp::Reserve {
- hash: self.blob_hash.clone(),
- until: self.next_delivery_event() + BLOB_EXPIRY,
- },
- 0u32.serialize(),
- );
-
batch.set(
ValueClass::Queue(QueueClass::Message(self.id)),
Bincode::new(self).serialize(),
@@ -456,18 +453,10 @@ impl Message {
}
batch
- .with_account_id(SPOOL_ACCOUNT_ID)
- .clear(BlobOp::Reserve {
+ .clear(BlobOp::LinkId {
hash: self.blob_hash.clone(),
- until: prev_event + BLOB_EXPIRY,
+ id: self.id,
})
- .set(
- BlobOp::Reserve {
- hash: self.blob_hash.clone(),
- until: now() - 1,
- },
- 0u32.serialize(),
- )
.clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: prev_event,
queue_id: self.id,
diff --git a/crates/smtp/src/reporting/scheduler.rs b/crates/smtp/src/reporting/scheduler.rs
index b2c73a6d..eba65074 100644
--- a/crates/smtp/src/reporting/scheduler.rs
+++ b/crates/smtp/src/reporting/scheduler.rs
@@ -195,7 +195,7 @@ impl SMTP {
context = "queue",
event = "locked",
key = ?lock,
- "Failed to lock report: Event already locked."
+ "Lock busy: Event already locked."
);
false
}
@@ -203,7 +203,7 @@ impl SMTP {
tracing::error!(
context = "queue",
event = "error",
- "Failed to lock report: {}",
+ "Lock busy: {}",
err
);
false
@@ -215,7 +215,7 @@ impl SMTP {
event = "locked",
key = ?lock,
expiry = expiry - now,
- "Failed to lock report: Report already locked."
+ "Lock busy: Report already locked."
);
false
}
@@ -225,7 +225,7 @@ impl SMTP {
context = "queue",
event = "locked",
key = ?lock,
- "Failed to lock report: Report lock deleted."
+ "Lock busy: Report lock deleted."
);
false
}
@@ -234,7 +234,7 @@ impl SMTP {
context = "queue",
event = "error",
key = ?lock,
- "Failed to lock report: {}",
+ "Lock error: {}",
err
);
false