enable busy_timeout for SQLite

although we have a global RwLock for database access, it still can fail
due to disk sync delays. Though unlikely, it has happened once or twice
during testing. Let's just enable busy_timeout to avoid this issue.
Since we have RwLock anyway, a busy_timeout should not be much of a
problem.

Unfortunately this has to be enabled via implementing our own wrapper
trait.
This commit is contained in:
Peter Cai 2020-02-23 12:51:26 +08:00
parent d4c4be4d63
commit 2eaf7d24e0
No known key found for this signature in database
GPG Key ID: 71F5FB4E4F3FD54F
6 changed files with 134 additions and 54 deletions

View File

@ -88,8 +88,8 @@ fn auth_sign_in(db: DbConn, params: Json<SignInParams>) -> Custom<JsonResp<AuthR
// Shared logic for all interfaces that needs to do an automatic sign-in
fn _sign_in(db: DbConn, mail: &str, passwd: &str) -> Custom<JsonResp<AuthResult>> {
// Try to find the user first
let res = user::User::find_user_by_email(&db, mail)
.and_then(|u| u.create_token(&db, passwd)
let res = user::User::find_user_by_email(&db.0, mail)
.and_then(|u| u.create_token(&db.0, passwd)
.map(|x| (u.uuid, u.email, x)));
match res {
Ok((uuid, email, token)) => success_resp(AuthResult {
@ -123,7 +123,7 @@ impl Into<AuthParams> for user::User {
#[get("/auth/params?<email>")]
fn auth_params(db: DbConn, email: String) -> Custom<JsonResp<AuthParams>> {
match user::User::find_user_by_email(&db, &email) {
match user::User::find_user_by_email(&db.0, &email) {
Ok(u) => success_resp(u.into()),
Err(user::UserOpError(e)) =>
error_resp(Status::InternalServerError, vec![e])
@ -139,9 +139,9 @@ struct ChangePwParams {
#[post("/auth/change_pw", format = "json", data = "<params>")]
fn auth_change_pw(db: DbConn, params: Json<ChangePwParams>) -> Custom<JsonResp<()>> {
let res = user::User::find_user_by_email(&db, &params.email)
let res = user::User::find_user_by_email(&db.0, &params.email)
.and_then(|u|
u.change_pw(&db, &params.current_password, &params.password));
u.change_pw(&db.0, &params.current_password, &params.password));
match res {
Ok(_) => Custom(Status::NoContent, Json(Response::Success(()))),
Err(user::UserOpError(e)) =>
@ -216,7 +216,7 @@ fn items_sync(
// Remember that we have a mutex at the beginning of this function,
// so all that can change the current_max_id for the current user
// is operations later in this function.
let new_sync_token = match item::SyncItem::get_current_max_id(&db, &u) {
let new_sync_token = match item::SyncItem::get_current_max_id(&db.0, &u) {
Ok(Some(id)) => Some(id.to_string()),
Ok(None) => None,
Err(item::ItemOpError(e)) =>
@ -247,7 +247,7 @@ fn items_sync(
};
// First, retrieve what the client needs
let result = item::SyncItem::items_of_user(&db, &u,
let result = item::SyncItem::items_of_user(&db.0, &u,
from_id, None, inner_params.limit);
match result {
@ -307,7 +307,7 @@ fn items_sync(
it.updated_at =
Some(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
match item::SyncItem::items_insert(&db, &u, &it) {
match item::SyncItem::items_insert(&db.0, &u, &it) {
Err(item::ItemOpError(e)) => {
return error_resp(Status::InternalServerError, vec![e]);
},

105
src/db.rs Normal file
View File

@ -0,0 +1,105 @@
use diesel::connection::{SimpleConnection, Connection};
use diesel::deserialize::{Queryable, QueryableByName};
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
use diesel::result::{ConnectionResult, QueryResult};
use diesel::sqlite::{Sqlite, SqliteConnection};
use diesel::sql_types::*;
use rocket_contrib::databases::{r2d2, DatabaseConfig, Poolable};
use std::sync::RwLock;
// We need a global RwLock for SQLite
// This is unfortunate when we still use SQLite
// but should be mostly fine for our purpose
// (however, due to disk sync delays, the RwLock alone
// may still produce some SQLITE_BUSY errors randomly.
// We implemented a wrapper later in this module to enable busy_timeout
// to avoid this.)
lazy_static! {
pub static ref DB_LOCK: RwLock<()> = RwLock::new(());
}
#[macro_export]
macro_rules! lock_db_write {
() => {
crate::DB_LOCK.write()
.map_err(|_| "Cannot lock database for writing".into())
};
}
#[macro_export]
macro_rules! lock_db_read {
() => {
crate::DB_LOCK.read()
.map_err(|_| "Cannot lock database for reading".into())
};
}
pub trait SqliteLike = Connection<Backend = Sqlite>;
pub struct BusyWaitSqliteConnection(SqliteConnection);
impl Poolable for BusyWaitSqliteConnection {
type Manager = diesel::r2d2::ConnectionManager<BusyWaitSqliteConnection>;
type Error = r2d2::Error;
fn pool(config: DatabaseConfig) -> Result<r2d2::Pool<Self::Manager>, Self::Error> {
let manager = diesel::r2d2::ConnectionManager::new(config.url);
r2d2::Pool::builder().max_size(config.pool_size).build(manager)
}
}
// Enable busy_timeout for SQLite connections by re-implementing the Connection trait
// (Note: busy_timeout is never the best solution, so the global RwLock is still needed,
// and this busy_timeout is just to make sure that we won't fail due to disk sync lagging behind
// when we acquire the RwLock because it may take some time for the SQLite lock state to be written to disk)
// <https://stackoverflow.com/questions/57123453/how-to-use-diesel-with-sqlite-connections-and-avoid-database-is-locked-type-of>
impl SimpleConnection for BusyWaitSqliteConnection {
fn batch_execute(&self, query: &str) -> QueryResult<()> {
self.0.batch_execute(query)
}
}
impl Connection for BusyWaitSqliteConnection {
type Backend = <SqliteConnection as Connection>::Backend;
type TransactionManager = <SqliteConnection as Connection>::TransactionManager;
fn establish(database_url: &str) -> ConnectionResult<Self> {
let c = SqliteConnection::establish(database_url)?;
c.batch_execute("PRAGMA foreign_keys = ON; PRAGMA busy_timeout = 60000;")
.unwrap();
Ok(Self(c))
}
fn execute(&self, query: &str) -> QueryResult<usize> {
self.0.execute(query)
}
fn query_by_index<T, U>(&self, source: T) -> QueryResult<Vec<U>>
where
T: AsQuery,
T::Query: QueryFragment<Self::Backend> + QueryId,
Self::Backend: HasSqlType<T::SqlType>,
U: Queryable<T::SqlType, Self::Backend>,
{
self.0.query_by_index(source)
}
fn query_by_name<T, U>(&self, source: &T) -> QueryResult<Vec<U>>
where
T: QueryFragment<Self::Backend> + QueryId,
U: QueryableByName<Self::Backend>,
{
self.0.query_by_name(source)
}
fn execute_returning_count<T>(&self, source: &T) -> QueryResult<usize>
where
T: QueryFragment<Self::Backend> + QueryId,
{
self.0.execute_returning_count(source)
}
fn transaction_manager(&self) -> &Self::TransactionManager {
self.0.transaction_manager()
}
}

View File

@ -1,10 +1,9 @@
use crate::schema::items;
use crate::schema::items::dsl::*;
use crate::{lock_db_write, lock_db_read};
use crate::{SqliteLike, lock_db_write, lock_db_read};
use crate::user;
use diesel::dsl::max;
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use serde::{Serialize, Deserialize};
use std::vec::Vec;
@ -85,7 +84,7 @@ impl Into<SyncItem> for Item {
impl SyncItem {
pub fn items_of_user(
db: &SqliteConnection, u: &user::User,
db: &impl SqliteLike, u: &user::User,
since_id: Option<i64>, max_id: Option<i64>,
limit: Option<i64>
) -> Result<Vec<Item>, ItemOpError> {
@ -110,7 +109,7 @@ impl SyncItem {
})
}
pub fn find_item_by_uuid(db: &SqliteConnection, u: &user::User, i: &str) -> Result<Item, ItemOpError> {
pub fn find_item_by_uuid(db: &impl SqliteLike, u: &user::User, i: &str) -> Result<Item, ItemOpError> {
lock_db_read!()
.and_then(|_| {
items.filter(owner.eq(u.id).and(uuid.eq(i)))
@ -123,7 +122,7 @@ impl SyncItem {
// Remember that IDs do not identify item; instead, they are incremented to the largest value
// every time an item is updated (see Self::items_insert).
// The ID returned by this function is more like a "timestamp" of the latest "state"
pub fn get_current_max_id(db: &SqliteConnection, u: &user::User) -> Result<Option<i64>, ItemOpError> {
pub fn get_current_max_id(db: &impl SqliteLike, u: &user::User) -> Result<Option<i64>, ItemOpError> {
lock_db_read!()
.and_then(|_| {
items.filter(owner.eq(u.id))
@ -133,7 +132,7 @@ impl SyncItem {
})
}
pub fn items_insert(db: &SqliteConnection, u: &user::User, it: &SyncItem) -> Result<i64, ItemOpError> {
pub fn items_insert(db: &impl SqliteLike, u: &user::User, it: &SyncItem) -> Result<i64, ItemOpError> {
// First, try to find the original item, if any, delete it, and insert a new one with the same UUID
// This way, the ID is updated each time an item is updated
// This method acts both as insertion and update

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene, decl_macro)]
#![feature(proc_macro_hygiene, decl_macro, trait_alias)]
#[macro_use]
extern crate rocket;
@ -13,6 +13,7 @@ extern crate serde;
#[macro_use]
extern crate lazy_static;
mod db;
mod schema;
mod api;
mod tokens;
@ -23,42 +24,19 @@ mod lock;
#[cfg(test)]
mod tests;
pub use db::*;
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use dotenv::dotenv;
use rocket::Rocket;
use rocket::config::{Config, Environment, Value};
use std::collections::HashMap;
use std::env;
use std::sync::RwLock;
embed_migrations!();
// We need a global RwLock for SQLite
// This is unfortunate when we still use SQLite
// but should be mostly fine for our purpose
lazy_static! {
pub static ref DB_LOCK: RwLock<()> = RwLock::new(());
}
#[macro_export]
macro_rules! lock_db_write {
() => {
crate::DB_LOCK.write()
.map_err(|_| "Cannot lock database for writing".into())
};
}
#[macro_export]
macro_rules! lock_db_read {
() => {
crate::DB_LOCK.read()
.map_err(|_| "Cannot lock database for reading".into())
};
}
#[database("db")]
pub struct DbConn(SqliteConnection);
pub struct DbConn(BusyWaitSqliteConnection);
#[get("/")]
fn index() -> &'static str {

View File

@ -1,8 +1,7 @@
use crate::schema::tokens;
use crate::schema::tokens::dsl::*;
use crate::{lock_db_write, lock_db_read};
use crate::{SqliteLike, lock_db_write, lock_db_read};
use chrono::NaiveDateTime;
use diesel::sqlite::SqliteConnection;
use diesel::prelude::*;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use uuid::Uuid;
@ -17,7 +16,7 @@ pub struct Token {
impl Token {
// Return user id if any
pub fn find_token_by_id(db: &SqliteConnection, tid: &str) -> Option<i32> {
pub fn find_token_by_id(db: &impl SqliteLike, tid: &str) -> Option<i32> {
(lock_db_read!() as Result<RwLockReadGuard<()>, String>).ok()
.and_then(|_| {
tokens.filter(id.eq(tid))
@ -34,7 +33,7 @@ impl Token {
}
// Create a new token for a user
pub fn create_token(db: &SqliteConnection, user: i32) -> Option<String> {
pub fn create_token(db: &impl SqliteLike, user: i32) -> Option<String> {
let tid = Uuid::new_v4().to_hyphenated().to_string();
(lock_db_write!() as Result<RwLockWriteGuard<()>, String>).ok()
.and_then(|_| {

View File

@ -1,9 +1,8 @@
use crate::schema::users;
use crate::schema::users::dsl::*;
use crate::{lock_db_write, lock_db_read};
use crate::{SqliteLike, lock_db_write, lock_db_read};
use ::uuid::Uuid;
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use rocket::request;
use rocket::http::Status;
use serde::Deserialize;
@ -114,7 +113,7 @@ struct NewUserInsert {
}
impl User {
pub fn create(db: &SqliteConnection, new_user: &NewUser) -> Result<String, UserOpError> {
pub fn create(db: &impl SqliteLike, new_user: &NewUser) -> Result<String, UserOpError> {
let uid = Uuid::new_v4().to_hyphenated().to_string();
let user_hashed = NewUserInsert {
uuid: uid.clone(),
@ -136,7 +135,7 @@ impl User {
}
}
pub fn find_user_by_email(db: &SqliteConnection, user_email: &str) -> Result<User, UserOpError> {
pub fn find_user_by_email(db: &impl SqliteLike, user_email: &str) -> Result<User, UserOpError> {
let mut results = lock_db_read!()
.and_then(|_| users.filter(email.eq(user_email))
.limit(1)
@ -149,7 +148,7 @@ impl User {
}
}
pub fn find_user_by_id(db: &SqliteConnection, user_id: i32) -> Result<User, UserOpError> {
pub fn find_user_by_id(db: &impl SqliteLike, user_id: i32) -> Result<User, UserOpError> {
let mut results = lock_db_read!()
.and_then(|_| users.filter(id.eq(user_id))
.limit(1)
@ -162,14 +161,14 @@ impl User {
}
}
pub fn find_user_by_token(db: &SqliteConnection, token: &str) -> Result<User, UserOpError> {
pub fn find_user_by_token(db: &impl SqliteLike, token: &str) -> Result<User, UserOpError> {
crate::tokens::Token::find_token_by_id(db, token)
.ok_or("Invalid token".into())
.and_then(|uid| Self::find_user_by_id(db, uid))
}
// Create a JWT token for the current user if password matches
pub fn create_token(&self, db: &SqliteConnection, passwd: &str) -> Result<String, UserOpError> {
pub fn create_token(&self, db: &impl SqliteLike, passwd: &str) -> Result<String, UserOpError> {
if self.password != passwd {
Err(UserOpError::new("Password mismatch"))
} else {
@ -180,7 +179,7 @@ impl User {
// Change the password in database, if old password is provided
// The current instance of User model will not be mutated
pub fn change_pw(&self, db: &SqliteConnection, passwd: &str, new_passwd: &str) -> Result<(), UserOpError> {
pub fn change_pw(&self, db: &impl SqliteLike, passwd: &str, new_passwd: &str) -> Result<(), UserOpError> {
if self.password != passwd {
Err(UserOpError::new("Password mismatch"))
} else {
@ -212,7 +211,7 @@ impl<'a, 'r> request::FromRequest<'a, 'r> for User {
}
let result = Self::find_user_by_token(
&request.guard::<crate::DbConn>().unwrap(), &token[7..]);
&request.guard::<crate::DbConn>().unwrap().0, &token[7..]);
match result {
Ok(u) => request::Outcome::Success(u),
Err(err) => request::Outcome::Failure((Status::Unauthorized, err))