try to implement items/sync

* we need to figure out a way to test this API
This commit is contained in:
Peter Cai 2020-02-21 14:26:14 +08:00
parent 15809cd545
commit 4be4de241f
No known key found for this signature in database
GPG Key ID: 71F5FB4E4F3FD54F
7 changed files with 268 additions and 8 deletions

33
Cargo.lock generated
View File

@ -96,6 +96,18 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "chrono"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31850b4a4d6bae316f7a09e691c944c28299298837edc0a03f755618c23cbc01"
dependencies = [
"num-integer",
"num-traits",
"serde",
"time",
]
[[package]]
name = "cloudabi"
version = "0.0.3"
@ -166,6 +178,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d7cc03b910de9935007861dce440881f69102aaaedfd4bc5a6f40340ca5840c"
dependencies = [
"byteorder",
"chrono",
"diesel_derives",
"libsqlite3-sys",
"r2d2",
@ -584,6 +597,25 @@ dependencies = [
"winapi 0.3.8",
]
[[package]]
name = "num-integer"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.12.0"
@ -1001,6 +1033,7 @@ dependencies = [
name = "sfrs"
version = "0.1.0"
dependencies = [
"chrono",
"diesel",
"diesel_migrations",
"dotenv",

View File

@ -8,10 +8,11 @@ edition = "2018"
rocket = "0.4.2"
rocket_contrib = { version = "0.4.2", features = ["diesel_sqlite_pool"] }
jwt = "0.4.0"
diesel = { version = "1.4.3", features = ["sqlite"] }
diesel = { version = "1.4.3", features = ["sqlite", "chrono"] }
diesel_migrations = "1.4.0"
dotenv = "0.9.0"
lazy_static = "1.4.0"
serde = { version = "1.0.104", features = ["derive"] }
scrypt = "0.2.0"
rust-crypto = "0.2.36"
rust-crypto = "0.2.36"
chrono = { version = "0.4", features = ["serde"] }

View File

@ -6,8 +6,8 @@ CREATE TABLE items (
content_type VARCHAR NOT NULL,
enc_item_key VARCHAR,
deleted BOOLEAN NOT NULL,
created_at DATE NOT NULL,
updated_at DATE NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME,
FOREIGN KEY (owner)
REFERENCES users (id)
)

View File

@ -1,5 +1,6 @@
use crate::DbConn;
use crate::user;
use crate::item;
use rocket::http::Status;
use rocket::response::status::Custom;
use rocket_contrib::json::Json;
@ -12,7 +13,8 @@ pub fn routes() -> impl Into<Vec<rocket::Route>> {
auth_change_pw,
auth_sign_in,
auth_params,
auth_ping
auth_ping,
items_sync
]
}
@ -126,4 +128,88 @@ fn auth_change_pw(db: DbConn, params: Json<ChangePwParams>) -> Custom<JsonResp<(
#[get("/auth/ping")]
fn auth_ping(_db: DbConn, u: user::User) -> Custom<JsonResp<String>> {
Custom(Status::Ok, Json(Response::Success(u.email)))
}
#[derive(Deserialize)]
struct SyncParams {
items: Vec<item::SyncItem>,
sync_token: Option<String>,
cursor_token: Option<String>,
limit: Option<i64>
}
#[derive(Serialize)]
struct SyncResp {
retrieved_items: Vec<item::SyncItem>,
saved_items: Vec<item::SyncItem>,
unsaved: Vec<item::SyncItem>,
sync_token: Option<String>, // for convenience, we will actually always return this
cursor_token: Option<String>
}
#[post("/items/sync", format = "json", data = "<params>")]
fn items_sync(db: DbConn, u: user::User, params: Json<SyncParams>) -> Custom<JsonResp<SyncResp>> {
let mut resp = SyncResp {
retrieved_items: vec![],
saved_items: vec![],
unsaved: vec![],
sync_token: None,
cursor_token: None
};
let inner_params = params.into_inner();
// First, update all items sent by client
for it in inner_params.items.into_iter() {
if let Err(item::ItemOpError(_)) = item::SyncItem::items_insert(&db, &u, &it) {
// Let's not fail just because one of them...
// At least the client will know there's an error
// (maybe mistakes it for conflict)
resp.unsaved.push(it);
} else {
resp.saved_items.push(it);
}
}
let mut from_id: Option<i64> = None;
let mut max_id: Option<i64> = None;
if let Some(cursor_token) = inner_params.cursor_token {
// If the client provides cursor_token,
// then, we return all records
// until sync_token (the head of the last sync)
from_id = cursor_token.parse().ok();
max_id = inner_params.sync_token.clone()
.and_then(|i| i.parse().ok());
} else if let Some(sync_token) = inner_params.sync_token {
// If there is no cursor_token, then we are doing
// a normal sync, so just return all records from sync_token
from_id = sync_token.parse().ok();
}
// Then, retrieve what the client needs
let result = item::SyncItem::items_of_user(&db, &u,
from_id, max_id, inner_params.limit);
match result {
Err(item::ItemOpError(e)) => {
error_resp(Status::InternalServerError, vec![e])
},
Ok(items) => {
if !items.is_empty() {
// max_id = the last sync token
// if we still haven't reached the last sync token yet,
// return a new cursor token and keep the sync token
if let Some(max_id) = max_id {
resp.cursor_token = Some(items[0].id.to_string());
resp.sync_token = Some(max_id.to_string());
} else {
// Else, use the current max id as the sync_token
resp.sync_token = Some(items[0].id.to_string());
}
}
resp.retrieved_items = items.into_iter().map(|x| x.into()).collect();
Custom(Status::Ok, Json(Response::Success(resp)))
}
}
}

139
src/item.rs Normal file
View File

@ -0,0 +1,139 @@
use crate::schema::items;
use crate::schema::items::dsl::*;
use crate::{lock_db_write, lock_db_read};
use crate::user;
use chrono::naive::NaiveDateTime;
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use serde::{Serialize, Deserialize};
use std::vec::Vec;
#[derive(Debug)]
pub struct ItemOpError(pub String);
impl ItemOpError {
fn new(s: impl Into<String>) -> ItemOpError {
ItemOpError(s.into())
}
}
impl Into<ItemOpError> for &str {
fn into(self) -> ItemOpError {
ItemOpError::new(self)
}
}
#[derive(Queryable)]
pub struct Item {
pub id: i64,
pub owner: i32,
pub uuid: String,
pub content: Option<String>,
pub content_type: String,
pub enc_item_key: Option<String>,
pub deleted: bool,
pub created_at: NaiveDateTime,
pub updated_at: Option<NaiveDateTime>
}
#[derive(Insertable)]
#[table_name = "items"]
struct InsertItem {
owner: i32,
uuid: String,
content: Option<String>,
content_type: String,
enc_item_key: Option<String>,
deleted: bool,
created_at: NaiveDateTime,
updated_at: Option<NaiveDateTime>
}
#[derive(Serialize, Deserialize)]
pub struct SyncItem {
pub uuid: String,
pub content: Option<String>,
pub content_type: String,
pub enc_item_key: Option<String>,
pub deleted: bool,
pub created_at: NaiveDateTime,
pub updated_at: Option<NaiveDateTime>
}
impl Into<SyncItem> for Item {
fn into(self) -> SyncItem {
SyncItem {
uuid: self.uuid,
content: self.content,
content_type: self.content_type,
enc_item_key: self.enc_item_key,
deleted: self.deleted,
created_at: self.created_at,
updated_at: self.updated_at
}
}
}
impl SyncItem {
pub fn items_of_user(
db: &SqliteConnection, u: &user::User,
since_id: Option<i64>, max_id: Option<i64>,
limit: Option<i64>
) -> Result<Vec<Item>, ItemOpError> {
lock_db_read!()
.and_then(|_| {
let mut stmt = items.filter(owner.eq(u.id)).into_boxed();
if let Some(limit) = limit {
stmt = stmt.limit(limit);
}
if let Some(since_id) = since_id {
stmt = stmt.filter(id.gt(since_id));
}
if let Some(max_id) = max_id {
stmt = stmt.filter(id.le(max_id));
}
stmt.order(id.desc())
.load::<Item>(db)
.map_err(|_| "Database error".into())
})
}
pub fn items_insert(db: &SqliteConnection, u: &user::User, it: &SyncItem) -> Result<(), ItemOpError> {
// First, try to find the original item, if any, delete it, and insert a new one with the same UUID
// This way, the ID is updated each time an item is updated
// This method acts both as insertion and update
let orig = lock_db_read!()
.and_then(|_| {
items.filter(uuid.eq(&it.uuid).and(owner.eq(u.id)))
.load::<Item>(db)
.map_err(|_| "Database error".into())
})?;
// TODO: Detect sync conflict? similar to the Go version.
let _lock = lock_db_write!()?;
if !orig.is_empty() {
diesel::delete(items.filter(uuid.eq(&it.uuid).and(owner.eq(u.id))))
.execute(db)
.map(|_| ())
.map_err(|_| "Database error".into())?;
}
diesel::insert_into(items::table)
.values(InsertItem {
owner: u.id,
uuid: it.uuid.clone(),
content: if it.deleted { None } else { it.content.clone() },
content_type: it.content_type.clone(),
enc_item_key: if it.deleted { None } else { it.enc_item_key.clone() },
deleted: it.deleted,
created_at: it.created_at,
updated_at: it.updated_at
})
.execute(db)
.map(|_| ())
.map_err(|_| "Database error".into())
}
}

View File

@ -19,6 +19,7 @@ extern crate lazy_static;
mod schema;
mod api;
mod user;
mod item;
#[cfg(test)]
mod tests;

View File

@ -1,14 +1,14 @@
table! {
items (id) {
id -> Integer,
id -> BigInt, // Forced, diesel does not support intepreting Integer as i64
owner -> Integer,
uuid -> Text,
content -> Nullable<Text>,
content_type -> Text,
enc_item_key -> Nullable<Text>,
deleted -> Bool,
created_at -> Date,
updated_at -> Date,
created_at -> Timestamp,
updated_at -> Nullable<Timestamp>,
}
}