diff options
author | mdecimus <mauro@stalw.art> | 2024-04-30 19:52:05 +0200 |
---|---|---|
committer | mdecimus <mauro@stalw.art> | 2024-04-30 19:52:05 +0200 |
commit | 1950366629cb82a7b11104d60584bc511ca97f53 (patch) | |
tree | 01539a936d90ae98df61a2c40ad546f0a8d9af45 | |
parent | 592ca3ccc58dd5459a30c4516c4e9220abb48881 (diff) |
v0.7.3
-rw-r--r-- | CHANGELOG.md | 18 | ||||
-rw-r--r-- | Cargo.lock | 22 | ||||
-rw-r--r-- | crates/cli/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/common/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/common/src/manager/backup.rs | 1169 | ||||
-rw-r--r-- | crates/common/src/manager/boot.rs | 101 | ||||
-rw-r--r-- | crates/common/src/manager/mod.rs | 2 | ||||
-rw-r--r-- | crates/common/src/manager/restore.rs | 434 | ||||
-rw-r--r-- | crates/directory/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/imap/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/jmap/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/main/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/managesieve/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/nlp/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/smtp/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/store/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/store/src/dispatch/mod.rs | 20 | ||||
-rw-r--r-- | crates/utils/Cargo.toml | 2 | ||||
-rw-r--r-- | tests/src/store/import_export.rs | 375 | ||||
-rw-r--r-- | tests/src/store/mod.rs | 3 |
20 files changed, 2093 insertions, 73 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index e679223d..40412b6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## [0.7.3] - 2024-05-01 + +To upgrade replace the `stalwart-mail` binary and then upgrade to the latest web-admin version. + +## Added +- Full database export and import functionality +- Add --help and --version command line arguments (#365) +- Allow catch-all addresses when validating must match sender + +### Changed +- Add `groupOfUniqueNames` to the list of LDAP object classes + +### Fixed +- Trim spaces in DNS-01 ACME secrets (#382) +- Allow only one journald tracer (#375) +- `authenticated_as` variable not usable for must-match-sender (#372) +- Fixed `BOGUS_ENCRYPTED_AND_TEXT` spam filter rule +- Fixed parsing of IPv6 DNS server addresses ## [0.7.2] - 2024-04-17 @@ -986,7 +986,7 @@ dependencies = [ [[package]] name = "common" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "arc-swap", @@ -1498,7 +1498,7 @@ dependencies = [ [[package]] name = "directory" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "argon2", @@ -2702,7 +2702,7 @@ checksum = "029d73f573d8e8d63e6d5020011d3255b28c3ba85d6cf870a07184ed23de9284" [[package]] name = "imap" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "common", @@ -2898,7 +2898,7 @@ dependencies = [ [[package]] name = "jmap" -version = "0.7.2" +version = "0.7.3" dependencies = [ "aes", "aes-gcm", @@ -3312,7 +3312,7 @@ dependencies = [ [[package]] name = "mail-server" -version = "0.7.2" +version = "0.7.3" dependencies = [ "common", "directory", @@ -3330,7 +3330,7 @@ dependencies = [ [[package]] name = "managesieve" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "bincode", @@ -3607,7 +3607,7 @@ dependencies = [ [[package]] name = "nlp" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "bincode", @@ -5645,7 +5645,7 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smtp" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "bincode", @@ -5761,7 +5761,7 @@ dependencies = [ [[package]] name = "stalwart-cli" -version = "0.7.2" +version = "0.7.3" dependencies = [ "clap", "console", @@ -5792,7 +5792,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "store" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "arc-swap", @@ -6635,7 +6635,7 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "utils" -version = "0.7.2" +version = "0.7.3" dependencies = [ "ahash 0.8.11", "base64 0.22.0", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 46e2302f..19b1f930 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Stalwart Labs Ltd. <hello@stalw.art>"] license = "AGPL-3.0-only" repository = "https://github.com/stalwartlabs/cli" homepage = "https://github.com/stalwartlabs/cli" -version = "0.7.2" +version = "0.7.3" edition = "2021" readme = "README.md" resolver = "2" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index a481c20a..7a442b94 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "common" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/common/src/manager/backup.rs b/crates/common/src/manager/backup.rs new file mode 100644 index 00000000..5fd10349 --- /dev/null +++ b/crates/common/src/manager/backup.rs @@ -0,0 +1,1169 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{ + collections::BTreeSet, + io::{BufWriter, Write}, + ops::Range, + path::{Path, PathBuf}, + sync::mpsc::{self, SyncSender}, +}; + +use ahash::{AHashMap, AHashSet}; +use jmap_proto::types::{collection::Collection, property::Property}; +use store::{ + write::{ + key::DeserializeBigEndian, AnyKey, BitmapClass, BitmapHash, BlobOp, DirectoryClass, + LookupClass, QueueClass, QueueEvent, TagValue, ValueClass, + }, + BitmapKey, Deserialize, IndexKey, IterateParams, LogKey, Serialize, ValueKey, SUBSPACE_BITMAPS, + U32_LEN, U64_LEN, +}; + +use utils::{ + codec::leb128::{Leb128Reader, Leb128_}, + failed, BlobHash, UnwrapFailure, BLOB_HASH_LEN, +}; + +use crate::Core; + +const KEY_OFFSET: usize = 1; +pub(super) const MAGIC_MARKER: u8 = 123; +pub(super) const FILE_VERSION: u8 = 1; + +#[derive(Debug)] +pub(super) enum Op { + Family(Family), + AccountId(u32), + Collection(u8), + DocumentId(u32), + KeyValue((Vec<u8>, Vec<u8>)), +} + +#[derive(Debug, Clone, Copy)] +pub(super) enum Family { + Property = 0, + TermIndex = 1, + Acl = 2, + Blob = 3, + Config = 4, + LookupValue = 5, + LookupCounter = 6, + Directory = 7, + Queue = 8, + Index = 9, + Bitmap = 10, + Log = 11, + None = 255, +} + +type TaskHandle = (tokio::task::JoinHandle<()>, std::thread::JoinHandle<()>); + +impl Core { + pub async fn backup(&self, dest: PathBuf) { + if !dest.exists() { + std::fs::create_dir_all(&dest).failed("Failed to create backup directory"); + } else if !dest.is_dir() { + eprintln!("Backup destination {dest:?} is not a directory."); + std::process::exit(1); + } + + let mut sync_handles = Vec::new(); + + for (async_handle, sync_handle) in [ + self.backup_properties(&dest), + self.backup_term_index(&dest), + self.backup_acl(&dest), + self.backup_blob(&dest), + self.backup_config(&dest), + self.backup_lookup(&dest), + self.backup_directory(&dest), + self.backup_queue(&dest), + self.backup_index(&dest), + self.backup_bitmaps(&dest), + self.backup_logs(&dest), + ] { + async_handle.await.failed("Task failed"); + sync_handles.push(sync_handle); + } + + for handle in sync_handles { + handle.join().expect("Failed to join thread"); + } + } + + fn backup_properties(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("property")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Property)) + .failed("Failed to send family"); + + let mut keys = BTreeSet::new(); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Property(0), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Property(u8::MAX), + }, + ) + .no_values(), + |key, _| { + let account_id = key.deserialize_be_u32(KEY_OFFSET)?; + let collection = key.deserialize_u8(KEY_OFFSET + U32_LEN)?; + let field = key.deserialize_u8(KEY_OFFSET + U32_LEN + 1)?; + let document_id = key.deserialize_be_u32(KEY_OFFSET + U32_LEN + 2)?; + + keys.insert((account_id, collection, document_id, field)); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + let mut last_account_id = u32::MAX; + let mut last_collection = u8::MAX; + let mut last_document_id = u32::MAX; + + for (account_id, collection, document_id, field) in keys { + if account_id != last_account_id { + writer + .send(Op::AccountId(account_id)) + .failed("Failed to send account id"); + last_account_id = account_id; + } + + if collection != last_collection { + writer + .send(Op::Collection(collection)) + .failed("Failed to send collection"); + last_collection = collection; + } + + if document_id != last_document_id { + writer + .send(Op::DocumentId(document_id)) + .failed("Failed to send document id"); + last_document_id = document_id; + } + + // Obtain UID counter + if collection == u8::from(Collection::Mailbox) + && u8::from(Property::Value) == field + { + let value = store + .get_counter(ValueKey { + account_id, + collection, + document_id, + class: ValueClass::Property(Property::EmailIds.into()), + }) + .await + .failed("Failed to get counter"); + if value != 0 { + writer + .send(Op::KeyValue(( + vec![u8::from(Property::EmailIds)], + value.serialize(), + ))) + .failed("Failed to send key value"); + } + } + + // Write value + let value = store + .get_value::<RawBytes>(ValueKey { + account_id, + collection, + document_id, + class: ValueClass::Property(field), + }) + .await + .failed("Failed to get value") + .failed("Expected value") + .0; + writer + .send(Op::KeyValue((vec![field], value))) + .failed("Failed to send key value"); + } + }), + handle, + ) + } + + fn backup_term_index(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("term_index")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::TermIndex)) + .failed("Failed to send family"); + + let mut keys = BTreeSet::new(); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::TermIndex, + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::TermIndex, + }, + ) + .no_values(), + |key, _| { + let account_id = key.deserialize_be_u32(KEY_OFFSET)?; + let collection = key.deserialize_u8(KEY_OFFSET + U32_LEN)?; + let document_id = key + .range(KEY_OFFSET + U32_LEN + 1..usize::MAX)? + .deserialize_leb128()?; + + keys.insert((account_id, collection, document_id)); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + let mut last_account_id = u32::MAX; + let mut last_collection = u8::MAX; + + for (account_id, collection, document_id) in keys { + if account_id != last_account_id { + writer + .send(Op::AccountId(account_id)) + .failed("Failed to send account id"); + last_account_id = account_id; + } + + if collection != last_collection { + writer + .send(Op::Collection(collection)) + .failed("Failed to send collection"); + last_collection = collection; + } + + writer + .send(Op::DocumentId(document_id)) + .failed("Failed to send document id"); + + let value = store + .get_value::<RawBytes>(ValueKey { + account_id, + collection, + document_id, + class: ValueClass::TermIndex, + }) + .await + .failed("Failed to get value") + .failed("Expected value") + .0; + + writer + .send(Op::KeyValue((value.to_vec(), vec![]))) + .failed("Failed to send key value"); + } + }), + handle, + ) + } + + fn backup_acl(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("acl")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Acl)) + .failed("Failed to send family"); + + let mut last_account_id = u32::MAX; + let mut last_collection = u8::MAX; + let mut last_document_id = u32::MAX; + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Acl(0), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Acl(u32::MAX), + }, + ), + |key, value| { + let grant_account_id = key.deserialize_be_u32(KEY_OFFSET)?; + let account_id = key.deserialize_be_u32(KEY_OFFSET + U32_LEN)?; + let collection = key.deserialize_u8(KEY_OFFSET + (U32_LEN * 2))?; + let document_id = + key.deserialize_be_u32(KEY_OFFSET + (U32_LEN * 2) + 1)?; + + if account_id != last_account_id { + writer + .send(Op::AccountId(account_id)) + .failed("Failed to send account id"); + last_account_id = account_id; + } + + if collection != last_collection { + writer + .send(Op::Collection(collection)) + .failed("Failed to send collection"); + last_collection = collection; + } + + if document_id != last_document_id { + writer + .send(Op::DocumentId(document_id)) + .failed("Failed to send document id"); + last_document_id = document_id; + } + + writer + .send(Op::KeyValue(( + grant_account_id.to_be_bytes().to_vec(), + value.to_vec(), + ))) + .failed("Failed to send key value"); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + }), + handle, + ) + } + + fn backup_blob(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let blob_store = self.storage.blob.clone(); + let (handle, writer) = spawn_writer(dest.join("blob")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Blob)) + .failed("Failed to send family"); + + let mut hashes = Vec::new(); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Blob(BlobOp::Link { + hash: Default::default(), + }), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Blob(BlobOp::Link { + hash: BlobHash::new_max(), + }), + }, + ), + |key, _| { + let account_id = key.deserialize_be_u32(KEY_OFFSET + BLOB_HASH_LEN)?; + let collection = + key.deserialize_u8(KEY_OFFSET + BLOB_HASH_LEN + U32_LEN)?; + let document_id = + key.deserialize_be_u32(KEY_OFFSET + BLOB_HASH_LEN + U32_LEN + 1)?; + + let hash = key.range(KEY_OFFSET..KEY_OFFSET + BLOB_HASH_LEN)?.to_vec(); + + if account_id != u32::MAX && document_id != u32::MAX { + writer + .send(Op::AccountId(account_id)) + .failed("Failed to send account id"); + writer + .send(Op::Collection(collection)) + .failed("Failed to send collection"); + writer + .send(Op::DocumentId(document_id)) + .failed("Failed to send document id"); + writer + .send(Op::KeyValue((hash, vec![]))) + .failed("Failed to send key value"); + } else { + hashes.push(hash); + } + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + if !hashes.is_empty() { + writer + .send(Op::AccountId(u32::MAX)) + .failed("Failed to send account id"); + writer + .send(Op::DocumentId(u32::MAX)) + .failed("Failed to send document id"); + for hash in hashes { + if let Some(value) = blob_store + .get_blob(&hash, 0..usize::MAX) + .await + .failed("Failed to get blob") + { + writer + .send(Op::KeyValue((hash, value))) + .failed("Failed to send key value"); + } else { + eprintln!( + "Warning: blob hash {hash:?} does not exist in blob store. Skipping." + ); + } + } + } + }), + handle, + ) + } + + fn backup_config(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("config")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Config)) + .failed("Failed to send family"); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Config(vec![0]), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Config(vec![ + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + ]), + }, + ), + |key, value| { + writer + .send(Op::KeyValue(( + key.range(KEY_OFFSET..usize::MAX)?.to_vec(), + value.to_vec(), + ))) + .failed("Failed to send key value"); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + }), + handle, + ) + } + + fn backup_lookup(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("lookup")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::LookupValue)) + .failed("Failed to send family"); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Lookup(LookupClass::Key(vec![0])), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Lookup(LookupClass::Key(vec![ + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + ])), + }, + ), + |key, value| { + writer + .send(Op::KeyValue(( + key.range(KEY_OFFSET..usize::MAX)?.to_vec(), + value.to_vec(), + ))) + .failed("Failed to send key value"); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + writer + .send(Op::Family(Family::LookupCounter)) + .failed("Failed to send family"); + + let mut expired_counters = AHashSet::new(); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Lookup(LookupClass::CounterExpiry(vec![0])), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Lookup(LookupClass::CounterExpiry(vec![ + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + ])), + }, + ) + .no_values(), + |key, _| { + expired_counters.insert(key.range(KEY_OFFSET..usize::MAX)?.to_vec()); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + let mut counters = Vec::new(); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Lookup(LookupClass::Counter(vec![0])), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Lookup(LookupClass::Counter(vec![ + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + u8::MAX, + ])), + }, + ) + .no_values(), + |key, _| { + let key = key.range(KEY_OFFSET..usize::MAX)?.to_vec(); + if !expired_counters.contains(&key) { + counters.push(key); + } + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + for key in counters { + let value = store + .get_counter(ValueKey::from(ValueClass::Lookup(LookupClass::Counter( + key.clone(), + )))) + .await + .failed("Failed to get counter"); + + if value != 0 { + writer + .send(Op::KeyValue((key, value.serialize()))) + .failed("Failed to send key value"); + } + } + }), + handle, + ) + } + + fn backup_directory(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("directory")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Directory)) + .failed("Failed to send family"); + + let mut principal_ids = Vec::new(); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Directory(DirectoryClass::NameToId(vec![0])), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Directory(DirectoryClass::Members { + principal_id: u32::MAX, + has_member: u32::MAX, + }), + }, + ), + |key, value| { + let mut key = key.to_vec(); + key[0] -= 20; + + if key[0] == 2 { + principal_ids.push(key.as_slice().range(1..usize::MAX)?.to_vec()); + } + + writer + .send(Op::KeyValue((key, value.to_vec()))) + .failed("Failed to send key value"); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + for principal_bytes in principal_ids { + let value = store + .get_counter(ValueKey::from(ValueClass::Directory( + DirectoryClass::UsedQuota( + principal_bytes + .as_slice() + .deserialize_leb128() + .failed("Failed to deserialize principal id"), + ), + ))) + .await + .failed("Failed to get counter"); + if value != 0 { + let mut key = Vec::with_capacity(U32_LEN + 1); + key.push(4u8); + key.extend_from_slice(&principal_bytes); + + writer + .send(Op::KeyValue((key, value.serialize()))) + .failed("Failed to send key value"); + } + } + }), + handle, + ) + } + + fn backup_queue(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("queue")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Queue)) + .failed("Failed to send family"); + + store + .iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Queue(QueueClass::Message(0)), + }, + ValueKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + class: ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { + due: u64::MAX, + queue_id: u64::MAX, + })), + }, + ), + |key, value| { + let mut key = key.to_vec(); + key[0] -= 50; + + writer + .send(Op::KeyValue((key, value.to_vec()))) + .failed("Failed to send key value"); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + }), + handle, + ) + } + + fn backup_index(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("index")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Index)) + .failed("Failed to send family"); + + let mut last_account_id = u32::MAX; + let mut last_collection = u8::MAX; + + store + .iterate( + IterateParams::new( + IndexKey { + account_id: 0, + collection: 0, + document_id: 0, + field: 0, + key: vec![0], + }, + IndexKey { + account_id: u32::MAX, + collection: u8::MAX, + document_id: u32::MAX, + field: u8::MAX, + key: vec![u8::MAX, u8::MAX, u8::MAX], + }, + ) + .no_values(), + |key, _| { + let account_id = key.deserialize_be_u32(0)?; + let collection = key.deserialize_u8(U32_LEN)?; + let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?; + + let key = key.range(U32_LEN + 1..key.len() - U32_LEN)?.to_vec(); + + if account_id != last_account_id { + writer + .send(Op::AccountId(account_id)) + .failed("Failed to send account id"); + last_account_id = account_id; + } + + if collection != last_collection { + writer + .send(Op::Collection(collection)) + .failed("Failed to send collection"); + last_collection = collection; + } + + writer + .send(Op::DocumentId(document_id)) + .failed("Failed to send document id"); + + writer + .send(Op::KeyValue((key, vec![]))) + .failed("Failed to send key value"); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + }), + handle, + ) + } + + fn backup_bitmaps(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let has_doc_id = store.id() != "rocksdb"; + + let (handle, writer) = spawn_writer(dest.join("bitmap")); + ( + tokio::spawn(async move { + const BM_DOCUMENT_IDS: u8 = 0; + const BM_TEXT: u8 = 1 << 7; + + const TAG_ID: u8 = 1 << 6; + const TAG_TEXT: u8 = 1 << 0 | 1 << 6; + const TAG_STATIC: u8 = 1 << 1 | 1 << 6; + + writer + .send(Op::Family(Family::Bitmap)) + .failed("Failed to send family"); + + let mut bitmaps: AHashMap<(u32, u8), AHashSet<BitmapClass>> = AHashMap::new(); + + store + .iterate( + IterateParams::new( + AnyKey { + subspace: SUBSPACE_BITMAPS, + key: vec![0u8], + }, + AnyKey { + subspace: SUBSPACE_BITMAPS, + key: vec![u8::MAX; 10], + }, + ) + .no_values(), + |key, _| { + let account_id = key.deserialize_be_u32(0)?; + let collection = key.deserialize_u8(U32_LEN)?; + + let entry = bitmaps.entry((account_id, collection)).or_default(); + + let key = if has_doc_id { + key.range(0..key.len() - U32_LEN)? + } else { + key + }; + + match key.deserialize_u8(U32_LEN + 1)? { + BM_DOCUMENT_IDS => { + entry.insert(BitmapClass::DocumentIds); + } + TAG_ID => { + entry.insert(BitmapClass::Tag { + field: key.deserialize_u8(U32_LEN + 2)?, + value: TagValue::Id( + key.range(U32_LEN + 3..usize::MAX)? + .deserialize_leb128()?, + ), + }); + } + TAG_TEXT => { + entry.insert(BitmapClass::Tag { + field: key.deserialize_u8(U32_LEN + 2)?, + value: TagValue::Text( + key.range(U32_LEN + 3..usize::MAX)?.to_vec(), + ), + }); + } + TAG_STATIC => { + entry.insert(BitmapClass::Tag { + field: key.deserialize_u8(U32_LEN + 2)?, + value: TagValue::Static(key.deserialize_u8(U32_LEN + 3)?), + }); + } + text => { + entry.insert(BitmapClass::Text { + field: key.deserialize_u8(U32_LEN + 2)?, + token: BitmapHash { + hash: key + .range(U32_LEN + 3..U32_LEN + 11)? + .try_into() + .unwrap(), + len: text & !BM_TEXT, + }, + }); + } + } + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + + for ((account_id, collection), classes) in bitmaps { + writer + .send(Op::AccountId(account_id)) + .failed("Failed to send account id"); + writer + .send(Op::Collection(collection)) + .failed("Failed to send collection"); + + for class in classes { + if let Some(bitmap) = store + .get_bitmap(BitmapKey { + account_id, + collection, + class: class.clone(), + block_num: 0, + }) + .await + .failed("Failed to get bitmap") + { + let key = match class { + BitmapClass::DocumentIds => { + vec![0u8] + } + BitmapClass::Tag { field, value } => { + let mut key = Vec::with_capacity(3); + + match value { + TagValue::Id(id) => { + key.push(1u8); + key.push(field); + key.extend_from_slice(&id.serialize()); + } + TagValue::Text(text) => { + key.push(2u8); + key.push(field); + key.extend_from_slice(&text); + } + TagValue::Static(id) => { + key.push(3u8); + key.push(field); + key.push(id); + } + } + + key + } + BitmapClass::Text { field, token } => { + let mut key = vec![4u8, field]; + key.push(token.len); + key.extend_from_slice(&token.hash); + key + } + }; + + let mut bytes = Vec::with_capacity(bitmap.serialized_size()); + bitmap + .serialize_into(&mut bytes) + .failed("Failed to serialize bitmap"); + + writer + .send(Op::KeyValue((key, bytes))) + .failed("Failed to send key value"); + } + } + } + }), + handle, + ) + } + + fn backup_logs(&self, dest: &Path) -> TaskHandle { + let store = self.storage.data.clone(); + let (handle, writer) = spawn_writer(dest.join("log")); + ( + tokio::spawn(async move { + writer + .send(Op::Family(Family::Log)) + .failed("Failed to send family"); + + let mut last_account_id = u32::MAX; + let mut last_collection = u8::MAX; + + store + .iterate( + IterateParams::new( + LogKey { + account_id: 0, + collection: 0, + change_id: 0, + }, + LogKey { + account_id: u32::MAX, + collection: u8::MAX, + change_id: u64::MAX, + }, + ), + |key, value| { + let account_id = key.deserialize_be_u32(0)?; + let collection = key.deserialize_u8(U32_LEN)?; + let key = key.range(U32_LEN + 1..usize::MAX)?.to_vec(); + + if key.len() != U64_LEN { + failed(&format!("Found invalid log entry {key:?} {value:?}")); + } + + if account_id != last_account_id { + writer + .send(Op::AccountId(account_id)) + .failed("Failed to send account id"); + last_account_id = account_id; + } + + if collection != last_collection { + writer + .send(Op::Collection(collection)) + .failed("Failed to send collection"); + last_collection = collection; + } + + writer + .send(Op::KeyValue((key, value.to_vec()))) + .failed("Failed to send key value"); + + Ok(true) + }, + ) + .await + .failed("Failed to iterate over data store"); + }), + handle, + ) + } +} + +fn spawn_writer(path: PathBuf) -> (std::thread::JoinHandle<()>, SyncSender<Op>) { + let (tx, rx) = mpsc::sync_channel(10); + + let handle = std::thread::spawn(move || { + let mut file = + BufWriter::new(std::fs::File::create(path).failed("Failed to create backup file")); + file.write_all(&[MAGIC_MARKER, FILE_VERSION]) + .failed("Failed to write version"); + + while let Ok(op) = rx.recv() { + match op { + Op::Family(f) => { + file.write_all(&[0u8, f as u8]) + .failed("Failed to write family"); + } + Op::KeyValue((k, v)) => { + file.write_all(&[if !v.is_empty() { 1u8 } else { 2u8 }]) + .failed("Failed to write key"); + file.write_all(&(k.len() as u32).serialize()) + .failed("Failed to write key value"); + file.write_all(&k).failed("Failed to write key"); + if !v.is_empty() { + file.write_all(&(v.len() as u32).serialize()) + .failed("Failed to write key value"); + file.write_all(&v).failed("Failed to write key value"); + } + } + Op::AccountId(v) => { + file.write_all(&[3u8]).failed("Failed to write account id"); + file.write_all(&v.serialize()) + .failed("Failed to write account id"); + } + Op::Collection(v) => { + file.write_all(&[4u8, v]) + .failed("Failed to write collection"); + } + Op::DocumentId(v) => { + file.write_all(&[5u8]).failed("Failed to write document id"); + file.write_all(&v.serialize()) + .failed("Failed to write document id"); + } + } + } + + file.flush().failed("Failed to flush backup file"); + }); + + (handle, tx) +} + +pub(super) trait DeserializeBytes { + fn range(&self, range: Range<usize>) -> store::Result<&[u8]>; + fn deserialize_u8(&self, offset: usize) -> store::Result<u8>; + fn deserialize_leb128<U: Leb128_>(&self) -> store::Result<U>; +} + +impl DeserializeBytes for &[u8] { + fn range(&self, range: Range<usize>) -> store::Result<&[u8]> { + self.get(range.start..std::cmp::min(range.end, self.len())) + .ok_or_else(|| store::Error::InternalError("Failed to read range".to_string())) + } + + fn deserialize_u8(&self, offset: usize) -> store::Result<u8> { + self.get(offset) + .copied() + .ok_or_else(|| store::Error::InternalError("Failed to read u8".to_string())) + } + + fn deserialize_leb128<U: Leb128_>(&self) -> store::Result<U> { + self.read_leb128::<U>() + .map(|(v, _)| v) + .ok_or_else(|| store::Error::InternalError("Failed to read leb128".to_string())) + } +} + +struct RawBytes(Vec<u8>); + +impl Deserialize for RawBytes { + fn deserialize(bytes: &[u8]) -> store::Result<Self> { + Ok(Self(bytes.to_vec())) + } +} diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs index 6a5a4694..d57edf94 100644 --- a/crates/common/src/manager/boot.rs +++ b/crates/common/src/manager/boot.rs @@ -58,21 +58,28 @@ Usage: stalwart-mail [OPTIONS] Options: -c, --config <PATH> Start server with the specified configuration file - -b, --backup <PATH> Backup all data to a specific path - -r, --restore <PATH> Restore all data from a specific path - -i, --init <PATH> Initialize a new server at a specific path + -e, --export <PATH> Export all store data to a specific path + -i, --import <PATH> Import store data from a specific path + -I, --init <PATH> Initialize a new server at a specific path -h, --help Print help -V, --version Print version "#; +enum ImportExport { + Export(PathBuf), + Import(PathBuf), + None, +} + impl BootManager { pub async fn init() -> Self { let mut config_path = std::env::var("CONFIG_PATH").ok(); + let mut art_vandelay = ImportExport::None; if config_path.is_none() { let mut args = std::env::args().skip(1); - if let Some(arg) = args + while let Some(arg) = args .next() .and_then(|arg| arg.strip_prefix("--").map(|arg| arg.to_string())) { @@ -82,54 +89,34 @@ impl BootManager { (arg, args.next()) }; - match key.as_str() { - "help" | "h" => { + match (key.as_str(), value) { + ("help" | "h", _) => { println!("{HELP}"); std::process::exit(0); } - "version" | "V" => { + ("version" | "V", _) => { println!("{}", env!("CARGO_PKG_VERSION")); std::process::exit(0); } - _ => (), - } - - match key.as_str() { - "config" | "c" => { - config_path = Some(value.unwrap_or_else(|| { - failed(&format!( - "Missing value for argument '{key}', try '--help'." - )) - })); + ("config" | "c", Some(value)) => { + config_path = Some(value); } - "init" | "i" => { - quickstart(value.unwrap_or_else(|| { - failed(&format!( - "Missing value for argument '{key}', try '--help'." - )) - })); + ("init" | "I", Some(value)) => { + quickstart(value); std::process::exit(0); } - - "backup" | "b" => { - let path = value.unwrap_or_else(|| { - failed(&format!( - "Missing value for argument '{key}', try '--help'." - )) - }); - std::process::exit(0); + ("export" | "e", Some(value)) => { + art_vandelay = ImportExport::Export(value.into()); } - "restore" | "r" => { - let path = value.unwrap_or_else(|| { - failed(&format!( - "Missing value for argument '{key}', try '--help'." - )) - }); - std::process::exit(0); + ("import" | "i", Some(value)) => { + art_vandelay = ImportExport::Import(value.into()); } - _ => { + (_, None) => { failed(&format!("Unrecognized command '{key}', try '--help'.")); } + (_, Some(_)) => failed(&format!( + "Missing value for argument '{key}', try '--help'." + )), } } @@ -312,18 +299,30 @@ impl BootManager { stores.parse_lookups(&mut config).await; // Parse settings and build shared core - let core = Core::parse(&mut config, stores, manager) - .await - .into_shared(); - - // Parse TCP acceptors - servers.parse_tcp_acceptors(&mut config, core.clone()); - - BootManager { - core, - guards, - config, - servers, + let core = Core::parse(&mut config, stores, manager).await; + + match art_vandelay { + ImportExport::None => { + let core = core.into_shared(); + + // Parse TCP acceptors + servers.parse_tcp_acceptors(&mut config, core.clone()); + + BootManager { + core, + guards, + config, + servers, + } + } + ImportExport::Export(path) => { + core.backup(path).await; + std::process::exit(0); + } + ImportExport::Import(path) => { + core.restore(path).await; + std::process::exit(0); + } } } } diff --git a/crates/common/src/manager/mod.rs b/crates/common/src/manager/mod.rs index 6ca39137..66164f51 100644 --- a/crates/common/src/manager/mod.rs +++ b/crates/common/src/manager/mod.rs @@ -27,9 +27,11 @@ use crate::USER_AGENT; use self::config::ConfigManager; +pub mod backup; pub mod boot; pub mod config; pub mod reload; +pub mod restore; pub mod webadmin; const DEFAULT_SPAMFILTER_URL: &str = "https://get.stalw.art/resources/config/spamfilter.toml"; diff --git a/crates/common/src/manager/restore.rs b/crates/common/src/manager/restore.rs new file mode 100644 index 00000000..854b852a --- /dev/null +++ b/crates/common/src/manager/restore.rs @@ -0,0 +1,434 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{ + io::ErrorKind, + path::{Path, PathBuf}, +}; + +use crate::Core; +use jmap_proto::types::{collection::Collection, property::Property}; +use store::{ + roaring::RoaringBitmap, + write::{ + key::DeserializeBigEndian, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass, + LookupClass, Operation, TagValue, ValueClass, + }, + BlobStore, Store, U32_LEN, +}; +use store::{ + write::{QueueClass, QueueEvent}, + Deserialize, U64_LEN, +}; +use tokio::{ + fs::File, + io::{AsyncReadExt, BufReader}, +}; +use utils::{failed, BlobHash, UnwrapFailure}; + +use super::backup::{DeserializeBytes, Family, Op, FILE_VERSION, MAGIC_MARKER}; + +impl Core { + pub async fn restore(&self, src: PathBuf) { + // Backup the core + if src.is_dir() { + // Iterate directory and spawn a task for each file + let mut tasks = Vec::new(); + for entry in std::fs::read_dir(&src).failed("Failed to read directory") { + let entry = entry.failed("Failed to read entry"); + let path = entry.path(); + if path.is_file() { + let storage = self.storage.clone(); + let blob_store = self.storage.blob.clone(); + tasks.push(tokio::spawn(async move { + restore_file(storage.data, blob_store, &path).await; + })); + } + } + + for task in tasks { + task.await.failed("Failed to wait for task"); + } + } else { + restore_file(self.storage.data.clone(), self.storage.blob.clone(), &src).await; + } + } +} + +async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) { + let mut reader = OpReader::new(path).await; + let mut account_id = u32::MAX; + let mut document_id = u32::MAX; + let mut collection = u8::MAX; + let mut family = Family::None; + + let mut batch = BatchBuilder::new(); + + while let Some(op) = reader.next().await { + match op { + Op::Family(f) => family = f, + Op::AccountId(a) => { + account_id = a; + batch.with_account_id(account_id); + } + Op::Collection(c) => { + collection = c; + batch.with_collection(collection); + } + Op::DocumentId(d) => { + document_id = d; + batch.update_document(document_id); + } + Op::KeyValue((key, value)) => match family { + Family::Property => { + let field = key + .as_slice() + .deserialize_u8(0) + .expect("Failed to deserialize field"); + if collection == u8::from(Collection::Mailbox) + && u8::from(Property::EmailIds) == field + { + batch.add( + ValueClass::Property(field), + i64::deserialize(&value) + .expect("Failed to deserialize mailbox uidnext"), + ); + } else { + batch.set(ValueClass::Property(field), value); + } + } + Family::TermIndex => { + batch.set(ValueClass::TermIndex, key); + } + Family::Acl => { + batch.set( + ValueClass::Acl( + key.as_slice() + .deserialize_be_u32(0) + .expect("Failed to deserialize acl"), + ), + value, + ); + } + Family::Blob => { + let hash = BlobHash::try_from_hash_slice(&key).expect("Invalid blob hash"); + + if account_id != u32::MAX && document_id != u32::MAX { + batch.set(ValueClass::Blob(BlobOp::Link { hash }), vec![]); + } else { + blob_store + .put_blob(&key, &value) + .await + .expect("Failed to write blob"); + batch.set(ValueClass::Blob(BlobOp::Commit { hash }), vec![]); + } + } + Family::Config => { + batch.set(ValueClass::Config(key), value); + } + Family::LookupValue => { + batch.set(ValueClass::Lookup(LookupClass::Key(key)), value); + } + Family::LookupCounter => { + batch.add( + ValueClass::Lookup(LookupClass::Counter(key)), + i64::deserialize(&value).expect("Failed to deserialize counter"), + ); + } + Family::Directory => { + let key = key.as_slice(); + let class = match key.first().expect("Failed to read directory key type") { + 0 => DirectoryClass::NameToId( + key.get(1..) + .expect("Failed to read directory string") + .to_vec(), + ), + 1 => DirectoryClass::EmailToId( + key.get(1..) + .expect("Failed to read directory string") + .to_vec(), + ), + 2 => DirectoryClass::Principal( + key.get(1..) + .expect("Failed to read range for principal id") + .deserialize_leb128() + .expect("Failed to deserialize principal id"), + ), + 3 => DirectoryClass::Domain( + key.get(1..) + .expect("Failed to read directory string") + .to_vec(), + ), + 4 => { + batch.add( + ValueClass::Directory(DirectoryClass::UsedQuota( + key.get(1..) + .expect("Failed to read principal id") + .deserialize_leb128() + .expect("Failed to read principal id"), + )), + i64::deserialize(&value).expect("Failed to deserialize quota"), + ); + + continue; + } + 5 => DirectoryClass::MemberOf { + principal_id: key + .deserialize_be_u32(1) + .expect("Failed to read principal id"), + member_of: key + .deserialize_be_u32(1 + U32_LEN) + .expect("Failed to read principal id"), + }, + 6 => DirectoryClass::Members { + principal_id: key + .deserialize_be_u32(1) + .expect("Failed to read principal id"), + has_member: key + .deserialize_be_u32(1 + U32_LEN) + .expect("Failed to read principal id"), + }, + + _ => failed("Invalid directory key"), + }; + batch.set(ValueClass::Directory(class), value); + } + Family::Queue => { + let key = key.as_slice(); + + match key.first().expect("Failed to read queue key type") { + 0 => { + batch.set( + ValueClass::Queue(QueueClass::Message( + key.deserialize_be_u64(1) + .expect("Failed to deserialize queue message id"), + )), + value, + ); + } + 1 => { + batch.set( + ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { + due: key + .deserialize_be_u64(1) + .expect("Failed to deserialize queue message id"), + queue_id: key + .deserialize_be_u64(1 + U64_LEN) + .expect("Failed to deserialize queue message id"), + })), + value, + ); + } + _ => failed("Invalid queue key"), + } + } + Family::Index => batch.ops.push(Operation::Index { + field: key.first().copied().expect("Failed to read index field"), + key: key.get(1..).expect("Failed to read index key").to_vec(), + set: true, + }), + Family::Bitmap => { + let document_ids = RoaringBitmap::deserialize_from(&value[..]) + .expect("Failed to deserialize bitmap"); + let key = key.as_slice(); + let class = match key.first().expect("Failed to read bitmap class") { + 0 => BitmapClass::DocumentIds, + 1 => BitmapClass::Tag { + field: key.get(1).copied().expect("Failed to read field"), + value: TagValue::Id( + key.deserialize_be_u32(2).expect("Failed to read tag id"), + ), + }, + 2 => BitmapClass::Tag { + field: key.get(1).copied().expect("Failed to read field"), + value: TagValue::Text( + key.get(2..).expect("Failed to read tag text").to_vec(), + ), + }, + 3 => BitmapClass::Tag { + field: key.get(1).copied().expect("Failed to read field"), + value: TagValue::Static( + key.get(2).copied().expect("Failed to read tag static id"), + ), + }, + 4 => BitmapClass::Text { + field: key.get(1).copied().expect("Failed to read field"), + token: BitmapHash { + len: key.get(2).copied().expect("Failed to read tag static id"), + hash: key + .get(3..11) + .expect("Failed to read tag static id") + .try_into() + .unwrap(), + }, + }, + _ => failed("Invalid bitmap class"), + }; + + for document_id in document_ids { + batch.ops.push(Operation::DocumentId { document_id }); + batch.ops.push(Operation::Bitmap { + class: class.clone(), + set: true, + }); + + if batch.ops.len() >= 1000 { + store + .write(batch.build()) + .await + .failed("Failed to write batch"); + batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(collection); + } + } + } + Family::Log => { + batch.ops.push(Operation::Log { + change_id: key + .as_slice() + .deserialize_be_u64(0) + .expect("Failed to deserialize change id"), + collection, + set: value, + }); + } + Family::None => failed("No family specified in file"), + }, + } + + if batch.ops.len() >= 1000 { + store + .write(batch.build()) + .await + .failed("Failed to write batch"); + batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(collection) + .update_document(document_id); + } + } + + if !batch.is_empty() { + store + .write(batch.build()) + .await + .failed("Failed to write batch"); + } +} + +struct OpReader { + file: BufReader<File>, +} + +impl OpReader { + async fn new(path: &Path) -> Self { + let mut file = BufReader::new(File::open(&path).await.failed("Failed to open file")); + + if file + .read_u8() + .await + .failed(&format!("Failed to read magic marker from {path:?}")) + != MAGIC_MARKER + { + failed(&format!("Invalid magic marker in {path:?}")); + } + + if file + .read_u8() + .await + .failed(&format!("Failed to read version from {path:?}")) + != FILE_VERSION + { + failed(&format!("Invalid file version in {path:?}")); + } + + Self { file } + } + + async fn next(&mut self) -> Option<Op> { + match self.file.read_u8().await { + Ok(byte) => match byte { + 0 => Op::Family( + Family::try_from(self.expect_u8().await).failed("Failed to read family"), + ), + 1 => Op::KeyValue(( + self.expect_sized_bytes().await, + self.expect_sized_bytes().await, + )), + 2 => Op::KeyValue((self.expect_sized_bytes().await, vec![])), + 3 => Op::AccountId(self.expect_u32_be().await), + 4 => Op::Collection(self.expect_u8().await), + 5 => Op::DocumentId(self.expect_u32_be().await), + unknown => { + failed(&format!("Unknown op type {unknown}")); + } + } + .into(), + Err(err) if err.kind() == ErrorKind::UnexpectedEof => None, + Err(err) => failed(&format!("Failed to read file: {err:?}")), + } + } + + async fn expect_u8(&mut self) -> u8 { + self.file.read_u8().await.failed("Failed to read u8") + } + + async fn expect_u32_be(&mut self) -> u32 { + self.file.read_u32().await.failed("Failed to read u32") + } + + async fn expect_sized_bytes(&mut self) -> Vec<u8> { + let len = self.expect_u32_be().await as usize; + let mut bytes = vec![0; len]; + self.file + .read_exact(&mut bytes) + .await + .failed("Failed to read bytes"); + bytes + } +} + +impl TryFrom<u8> for Family { + type Error = String; + + fn try_from(value: u8) -> Result<Self, Self::Error> { + match value { + 0 => Ok(Self::Property), + 1 => Ok(Self::TermIndex), + 2 => Ok(Self::Acl), + 3 => Ok(Self::Blob), + 4 => Ok(Self::Config), + 5 => Ok(Self::LookupValue), + 6 => Ok(Self::LookupCounter), + 7 => Ok(Self::Directory), + 8 => Ok(Self::Queue), + 9 => Ok(Self::Index), + 10 => Ok(Self::Bitmap), + 11 => Ok(Self::Log), + other => Err(format!("Unknown family type {other}")), + } + } +} diff --git a/crates/directory/Cargo.toml b/crates/directory/Cargo.toml index b119039f..9538df7e 100644 --- a/crates/directory/Cargo.toml +++ b/crates/directory/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "directory" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/imap/Cargo.toml b/crates/imap/Cargo.toml index 7f9918bf..b67ef3fc 100644 --- a/crates/imap/Cargo.toml +++ b/crates/imap/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "imap" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/jmap/Cargo.toml b/crates/jmap/Cargo.toml index 7d7753b8..696a5304 100644 --- a/crates/jmap/Cargo.toml +++ b/crates/jmap/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jmap" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index 0fc6b8b4..b34847bf 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -7,7 +7,7 @@ homepage = "https://stalw.art" keywords = ["imap", "jmap", "smtp", "email", "mail", "server"] categories = ["email"] license = "AGPL-3.0-only" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/managesieve/Cargo.toml b/crates/managesieve/Cargo.toml index e0d4edba..89a66137 100644 --- a/crates/managesieve/Cargo.toml +++ b/crates/managesieve/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "managesieve" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/nlp/Cargo.toml b/crates/nlp/Cargo.toml index 963c9c9f..cf6b920a 100644 --- a/crates/nlp/Cargo.toml +++ b/crates/nlp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nlp" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml index 05847634..bcc5a565 100644 --- a/crates/smtp/Cargo.toml +++ b/crates/smtp/Cargo.toml @@ -7,7 +7,7 @@ homepage = "https://stalw.art/smtp" keywords = ["smtp", "email", "mail", "server"] categories = ["email"] license = "AGPL-3.0-only" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index f28b578f..d7965685 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "store" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/crates/store/src/dispatch/mod.rs b/crates/store/src/dispatch/mod.rs index dbc34c81..86a23a3d 100644 --- a/crates/store/src/dispatch/mod.rs +++ b/crates/store/src/dispatch/mod.rs @@ -21,7 +21,27 @@ * for more details. */ +use crate::Store; + pub mod blob; pub mod fts; pub mod lookup; pub mod store; + +impl Store { + pub fn id(&self) -> &'static str { + match self { + #[cfg(feature = "sqlite")] + Self::SQLite(_) => "sqlite", + #[cfg(feature = "foundation")] + Self::FoundationDb(_) => "foundationdb", + #[cfg(feature = "postgres")] + Self::PostgreSQL(_) => "postgresql", + #[cfg(feature = "mysql")] + Self::MySQL(_) => "mysql", + #[cfg(feature = "rocks")] + Self::RocksDb(_) => "rocksdb", + Self::None => "none", + } + } +} diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 59b14be3..2bb353d4 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "utils" -version = "0.7.2" +version = "0.7.3" edition = "2021" resolver = "2" diff --git a/tests/src/store/import_export.rs b/tests/src/store/import_export.rs new file mode 100644 index 00000000..92ba7d1e --- /dev/null +++ b/tests/src/store/import_export.rs @@ -0,0 +1,375 @@ +/* + * Copyright (c) 2023, Stalwart Labs Ltd. + * + * This file is part of the Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use ahash::AHashSet; +use common::Core; +use jmap_proto::types::{collection::Collection, property::Property}; +use store::{ + rand, + write::{ + AnyKey, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass, LookupClass, + Operation, QueueClass, QueueEvent, TagValue, ValueClass, + }, + IterateParams, Store, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, + SUBSPACE_LOGS, SUBSPACE_VALUES, +}; +use utils::BlobHash; + +use crate::store::TempDir; + +pub async fn test(db: Store) { + let mut core = Core::default(); + core.storage.data = db.clone(); + core.storage.blob = db.clone().into(); + core.storage.fts = db.clone().into(); + core.storage.lookup = db.clone().into(); + + // Make sure the store is empty + db.assert_is_empty(db.clone().into()).await; + + // Create blobs + println!("Creating blobs..."); + let mut batch = BatchBuilder::new(); + let mut blob_hashes = Vec::new(); + for blob_size in [16, 128, 1024, 2056, 102400] { + let data = random_bytes(blob_size); + let hash = BlobHash::from(data.as_slice()); + blob_hashes.push(hash.clone()); + core.storage + .blob + .put_blob(hash.as_ref(), &data) + .await + .unwrap(); + batch.set(ValueClass::Blob(BlobOp::Commit { hash }), vec![]); + } + db.write(batch.build()).await.unwrap(); + + // Create account data + println!("Creating account data..."); + for account_id in 0u32..10u32 { + let mut batch = BatchBuilder::new(); + batch.with_account_id(account_id); + + // Create properties of different sizes + for collection in [0, 1, 2, 3] { + batch.with_collection(collection); + + for document_id in [0, 10, 20, 30, 40] { + batch.create_document(document_id); + + if collection == u8::from(Collection::Mailbox) { + batch + .set( + ValueClass::Property(Property::Value.into()), + random_bytes(10), + ) + .add( + ValueClass::Property(Property::EmailIds.into()), + rand::random(), + ); + } + + for (idx, value_size) in [16, 128, 1024, 2056, 102400].into_iter().enumerate() { + batch.set(ValueClass::Property(idx as u8), random_bytes(value_size)); + } + + for value_size in [16, 128, 1024, 2056, 102400] { + batch.set(ValueClass::TermIndex, random_bytes(value_size)); + } + + for grant_account_id in 0u32..10u32 { + if account_id != grant_account_id { + batch.set( + ValueClass::Acl(grant_account_id), + vec![account_id as u8, grant_account_id as u8, document_id as u8], + ); + } + } + + for hash in &blob_hashes { + batch.set( + ValueClass::Blob(BlobOp::Link { hash: hash.clone() }), + vec![], + ); + } + + batch.ops.push(Operation::Log { + change_id: document_id as u64 + account_id as u64 + collection as u64, + collection, + set: vec![account_id as u8, collection, document_id as u8], + }); + + for field in 0..5 { + batch.ops.push(Operation::Bitmap { + class: BitmapClass::Tag { + field, + value: TagValue::Id(rand::random()), + }, + set: true, + }); + + batch.ops.push(Operation::Bitmap { + class: BitmapClass::Tag { + field, + value: TagValue::Static(rand::random()), + }, + set: true, + }); + + batch.ops.push(Operation::Bitmap { + class: BitmapClass::Tag { + field, + value: TagValue::Text(random_bytes(field as usize + 2)), + }, + set: true, + }); + + batch.ops.push(Operation::Bitmap { + class: BitmapClass::Text { + field, + token: BitmapHash::new(&random_bytes(field as usize + 2)), + }, + set: true, + }); + + batch.ops.push(Operation::Index { + field, + key: random_bytes(field as usize + 2), + set: true, + }); + } + } + } + + db.write(batch.build()).await.unwrap(); + } + + // Create queue, config and lookup data + println!("Creating queue, config and lookup data..."); + let mut batch = BatchBuilder::new(); + for idx in [1, 2, 3, 4, 5] { + batch.set( + ValueClass::Queue(QueueClass::Message(rand::random())), + random_bytes(idx), + ); + batch.set( + ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { + due: rand::random(), + queue_id: rand::random(), + })), + random_bytes(idx), + ); + batch.set( + ValueClass::Lookup(LookupClass::Key(random_bytes(idx))), + random_bytes(idx), + ); + batch.add( + ValueClass::Lookup(LookupClass::Counter(random_bytes(idx))), + rand::random(), + ); + batch.set( + ValueClass::Config(random_bytes(idx + 10)), + random_bytes(idx + 10), + ); + } + db.write(batch.build()).await.unwrap(); + + // Create directory data + println!("Creating directory data..."); + let mut batch = BatchBuilder::new(); + batch + .with_account_id(u32::MAX) + .with_collection(Collection::Principal); + + for account_id in [1, 2, 3, 4, 5] { + batch + .create_document(account_id) + .add( + ValueClass::Directory(DirectoryClass::UsedQuota(account_id)), + rand::random(), + ) + .set( + ValueClass::Directory(DirectoryClass::NameToId(random_bytes( + 2 + account_id as usize, + ))), + random_bytes(4), + ) + .set( + ValueClass::Directory(DirectoryClass::EmailToId(random_bytes( + 4 + account_id as usize, + ))), + random_bytes(4), + ) + .set( + ValueClass::Directory(DirectoryClass::Domain(random_bytes( + 4 + account_id as usize, + ))), + random_bytes(4), + ) + .set( + ValueClass::Directory(DirectoryClass::Principal(account_id)), + random_bytes(30), + ) + .set( + ValueClass::Directory(DirectoryClass::MemberOf { + principal_id: account_id, + member_of: rand::random(), + }), + random_bytes(15), + ) + .set( + ValueClass::Directory(DirectoryClass::Members { + principal_id: account_id, + has_member: rand::random(), + }), + random_bytes(15), + ); + } + db.write(batch.build()).await.unwrap(); + + // Obtain store hash + println!("Calculating store hash..."); + let snapshot = Snapshot::new(&db).await; + assert!(!snapshot.keys.is_empty(), "Store hash counts are empty",); + + // Export store + println!("Exporting store..."); + let temp_dir = TempDir::new("art_vandelay_tests", true); + core.backup(temp_dir.path.clone()).await; + + // Destroy store + println!("Destroying store..."); + db.destroy().await; + db.assert_is_empty(db.clone().into()).await; + + // Import store + println!("Importing store..."); + core.restore(temp_dir.path.clone()).await; + + // Verify hash + print!("Verifying store hash..."); + snapshot.assert_is_eq(&Snapshot::new(&db).await); + println!(" GREAT SUCCESS!"); + + // Destroy store + db.destroy().await; + temp_dir.delete(); +} + +#[derive(Debug, PartialEq, Eq)] +struct Snapshot { + keys: AHashSet<KeyValue>, +} + +#[derive(Debug, PartialEq, Eq, Hash)] +struct KeyValue { + subspace: u8, + key: Vec<u8>, + value: Vec<u8>, +} + +impl Snapshot { + async fn new(db: &Store) -> Self { + #[cfg(feature = "rocks")] + let is_rocks = matches!(db, Store::RocksDb(_)); + #[cfg(not(feature = "rocks"))] + let is_rocks = false; + #[cfg(feature = "foundationdb")] + let is_fdb = matches!(db, Store::FoundationDb(_)); + #[cfg(not(feature = "foundationdb"))] + let is_fdb = false; + let is_sql = matches!( + db, + Store::SQLite(_) | Store::PostgreSQL(_) | Store::MySQL(_) + ); + + let mut keys = AHashSet::new(); + + for (subspace, with_values) in [ + (SUBSPACE_VALUES, true), + (SUBSPACE_COUNTERS, !is_sql), + (SUBSPACE_BLOBS, true), + (SUBSPACE_LOGS, true), + (SUBSPACE_BITMAPS, is_rocks | is_fdb), + (SUBSPACE_INDEXES, false), + ] { + let from_key = AnyKey { + subspace, + key: vec![0u8], + }; + let to_key = AnyKey { + subspace, + key: vec![u8::MAX; 10], + }; + + db.iterate( + IterateParams::new(from_key, to_key).set_values(with_values), + |key, value| { + keys.insert(KeyValue { + subspace, + key: key.to_vec(), + value: value.to_vec(), + }); + + Ok(true) + }, + ) + .await + .unwrap(); + } + + Snapshot { keys } + } + + fn assert_is_eq(&self, other: &Self) { + let mut is_err = false; + for key in &self.keys { + if !other.keys.contains(key) { + println!( + "Subspace {}, Key {:?} not found in restored snapshot", + char::from(key.subspace), + key.key, + ); + is_err = true; + } + } + for key in &other.keys { + if !self.keys.contains(key) { + println!( + "Subspace {}, Key {:?} not found in original snapshot", + char::from(key.subspace), + key.key, + ); + is_err = true; + } + } + + if is_err { + panic!("Snapshot mismatch"); + } + } +} + +fn random_bytes(len: usize) -> Vec<u8> { + (0..len).map(|_| rand::random::<u8>()).collect() +} diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs index 8e399317..91ba18e4 100644 --- a/tests/src/store/mod.rs +++ b/tests/src/store/mod.rs @@ -23,6 +23,7 @@ pub mod assign_id; pub mod blob; +pub mod import_export; pub mod lookup; pub mod ops; pub mod query; @@ -106,6 +107,8 @@ pub async fn store_tests() { if insert { store.destroy().await; } + + import_export::test(store.clone()).await; ops::test(store.clone()).await; query::test(store.clone(), FtsStore::Store(store.clone()), insert).await; assign_id::test(store).await; |