/* * Copyright (c) 2023 Stalwart Labs Ltd. * * This file is part of the Stalwart Mail Server. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * in the LICENSE file at the top-level directory of this distribution. * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * You can be released from the requirements of the AGPLv3 license by * purchasing a commercial license. Please contact licensing@stalw.art * for more details. */ use std::time::{Duration, Instant}; use ahash::AHashMap; use deadpool_postgres::Object; use rand::Rng; use tokio_postgres::{error::SqlState, IsolationLevel}; use crate::{ write::{ Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME, }, BitmapKey, IndexKey, Key, LogKey, ValueKey, }; use super::PostgresStore; impl PostgresStore { pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> { let mut conn = self.conn_pool.get().await?; let start = Instant::now(); let mut retry_count = 0; loop { match self.write_trx(&mut conn, &batch).await { Ok(success) => { return if success { Ok(()) } else { Err(crate::Error::AssertValueFailed) }; } Err(err) => match err.code() { Some( &SqlState::T_R_SERIALIZATION_FAILURE | &SqlState::T_R_DEADLOCK_DETECTED, ) if retry_count < MAX_COMMIT_ATTEMPTS && start.elapsed() < MAX_COMMIT_TIME => { let backoff = rand::thread_rng().gen_range(50..=300); tokio::time::sleep(Duration::from_millis(backoff)).await; retry_count += 1; } Some(&SqlState::UNIQUE_VIOLATION) => { return Err(crate::Error::AssertValueFailed); } _ => return Err(err.into()), }, } } } async fn write_trx( &self, conn: &mut Object, batch: &Batch, ) -> Result { let mut account_id = u32::MAX; let mut collection = u8::MAX; let mut document_id = u32::MAX; let mut asserted_values = AHashMap::new(); let trx = conn .build_transaction() .isolation_level(IsolationLevel::ReadCommitted) .start() .await?; for op in &batch.ops { match op { Operation::AccountId { account_id: account_id_, } => { account_id = *account_id_; } Operation::Collection { collection: collection_, } => { collection = *collection_; } Operation::DocumentId { document_id: document_id_, } => { document_id = *document_id_; } Operation::Value { class, op: ValueOp::Add(by), } => { let key = ValueKey { account_id, collection, document_id, class, } .serialize(false); if *by >= 0 { let s = trx .prepare_cached(concat!( "INSERT INTO c (k, v) VALUES ($1, $2) ", "ON CONFLICT(k) DO UPDATE SET v = c.v + EXCLUDED.v" )) .await?; trx.execute(&s, &[&key, &by]).await?; } else { let s = trx .prepare_cached("UPDATE c SET v = v + $1 WHERE k = $2") .await?; trx.execute(&s, &[&by, &key]).await?; } } Operation::Value { class, op } => { let key = ValueKey { account_id, collection, document_id, class, }; let table = char::from(key.subspace()); let key = key.serialize(false); if let ValueOp::Set(value) = op { let s = if let Some(exists) = asserted_values.get(&key) { if *exists { trx.prepare_cached(&format!( "UPDATE {} SET v = $2 WHERE k = $1", table )) .await? } else { trx.prepare_cached(&format!( "INSERT INTO {} (k, v) VALUES ($1, $2)", table )) .await? } } else { trx .prepare_cached( &format!("INSERT INTO {} (k, v) VALUES ($1, $2) ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v", table), ) .await? }; if trx.execute(&s, &[&key, value]).await? == 0 { return Ok(false); } if matches!(class, ValueClass::ReservedId) { // Make sure the reserved id is not already in use let s = trx.prepare_cached("SELECT 1 FROM b WHERE k = $1").await?; let key = BitmapKey { account_id, collection, class: BitmapClass::DocumentIds, block_num: document_id, } .serialize(false); if trx.query_opt(&s, &[&key]).await?.is_some() { return Ok(false); } } } else { let s = trx .prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table)) .await?; trx.execute(&s, &[&key]).await?; } } Operation::Index { field, key, set } => { let key = IndexKey { account_id, collection, document_id, field: *field, key, } .serialize(false); let s = if *set { trx.prepare_cached( "INSERT INTO i (k) VALUES ($1) ON CONFLICT (k) DO NOTHING", ) .await? } else { trx.prepare_cached("DELETE FROM i WHERE k = $1").await? }; trx.execute(&s, &[&key]).await?; } Operation::Bitmap { class, set } => { let key = BitmapKey { account_id, collection, class, block_num: document_id, } .serialize(false); let s = if *set { if matches!(class, BitmapClass::DocumentIds) { trx.prepare_cached("INSERT INTO b (k) VALUES ($1)").await? } else { trx.prepare_cached( "INSERT INTO b (k) VALUES ($1) ON CONFLICT (k) DO NOTHING", ) .await? } } else { trx.prepare_cached("DELETE FROM b WHERE k = $1").await? }; trx.execute(&s, &[&key]).await?; } Operation::Log { collection, change_id, set, } => { let key = LogKey { account_id, collection: *collection, change_id: *change_id, } .serialize(false); let s = trx .prepare_cached("INSERT INTO l (k, v) VALUES ($1, $2) ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v") .await?; trx.execute(&s, &[&key, set]).await?; } Operation::AssertValue { class, assert_value, } => { let key = ValueKey { account_id, collection, document_id, class, }; let table = char::from(key.subspace()); let key = key.serialize(false); let s = trx .prepare_cached(&format!("SELECT v FROM {} WHERE k = $1 FOR UPDATE", table)) .await?; let (exists, matches) = trx .query_opt(&s, &[&key]) .await? .map(|row| { row.try_get::<_, &[u8]>(0) .map_or((true, false), |v| (true, assert_value.matches(v))) }) .unwrap_or_else(|| (false, assert_value.is_none())); if !matches { return Ok(false); } asserted_values.insert(key, exists); } } } trx.commit().await.map(|_| true) } pub(crate) async fn purge_bitmaps(&self) -> crate::Result<()> { // Not needed for PostgreSQL Ok(()) } pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> { let conn = self.conn_pool.get().await?; let s = conn .prepare_cached(&format!( "DELETE FROM {} WHERE k >= $1 AND k < $2", char::from(from.subspace()), )) .await?; conn.execute(&s, &[&from.serialize(false), &to.serialize(false)]) .await .map(|_| ()) .map_err(Into::into) } }