summaryrefslogtreecommitdiff
path: root/crates/store/src/backend/postgres/main.rs
blob: 3757f4c4d97b513c79cf9469d6fb9f6ac851d1e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
 * Copyright (c) 2023 Stalwart Labs Ltd.
 *
 * This file is part of the Stalwart Mail Server.
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU Affero General Public License for more details.
 * in the LICENSE file at the top-level directory of this distribution.
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * You can be released from the requirements of the AGPLv3 license by
 * purchasing a commercial license. Please contact licensing@stalw.art
 * for more details.
*/

use crate::{
    backend::postgres::tls::MakeRustlsConnect, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS,
    SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES,
};

use super::PostgresStore;

use deadpool_postgres::{
    Config, CreatePoolError, ManagerConfig, PoolConfig, RecyclingMethod, Runtime,
};
use tokio_postgres::NoTls;
use utils::{config::utils::AsKey, rustls_client_config};

impl PostgresStore {
    pub async fn open(config: &utils::config::Config, prefix: impl AsKey) -> crate::Result<Self> {
        let prefix = prefix.as_key();
        let mut cfg = Config::new();
        cfg.dbname = config
            .value_require((&prefix, "database"))?
            .to_string()
            .into();
        cfg.host = config.value((&prefix, "host")).map(|s| s.to_string());
        cfg.user = config.value((&prefix, "user")).map(|s| s.to_string());
        cfg.password = config.value((&prefix, "password")).map(|s| s.to_string());
        cfg.port = config.property((&prefix, "port"))?;
        cfg.connect_timeout = config.property((&prefix, "timeout.connect"))?;
        cfg.manager = Some(ManagerConfig {
            recycling_method: RecyclingMethod::Fast,
        });
        if let Some(max_conn) = config.property::<usize>((&prefix, "pool.max-connections"))? {
            cfg.pool = PoolConfig::new(max_conn).into();
        }
        let db = Self {
            conn_pool: if config.property_or_static::<bool>((&prefix, "tls.enable"), "false")? {
                cfg.create_pool(
                    Some(Runtime::Tokio1),
                    MakeRustlsConnect::new(rustls_client_config(
                        config.property_or_static((&prefix, "tls.allow-invalid-certs"), "false")?,
                    )),
                )?
            } else {
                cfg.create_pool(Some(Runtime::Tokio1), NoTls)?
            },
        };

        db.create_tables().await?;

        Ok(db)
    }

    pub(super) async fn create_tables(&self) -> crate::Result<()> {
        let conn = self.conn_pool.get().await?;

        for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] {
            let table = char::from(table);
            conn.execute(
                &format!(
                    "CREATE TABLE IF NOT EXISTS {table} (
                        k BYTEA PRIMARY KEY,
                        v BYTEA NOT NULL
                    )"
                ),
                &[],
            )
            .await?;
        }

        for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] {
            let table = char::from(table);
            conn.execute(
                &format!(
                    "CREATE TABLE IF NOT EXISTS {table} (
                        k BYTEA PRIMARY KEY
                    )"
                ),
                &[],
            )
            .await?;
        }

        conn.execute(
            &format!(
                "CREATE TABLE IF NOT EXISTS {} (
                    k BYTEA PRIMARY KEY,
                    v BIGINT NOT NULL DEFAULT 0
                )",
                char::from(SUBSPACE_COUNTERS)
            ),
            &[],
        )
        .await?;

        Ok(())
    }
}

impl From<CreatePoolError> for crate::Error {
    fn from(err: CreatePoolError) -> Self {
        crate::Error::InternalError(format!("Failed to create connection pool: {}", err))
    }
}