items/sync: implement conflicts properly

This commit is contained in:
Peter Cai 2020-02-21 20:50:14 +08:00
parent ecf3998b5a
commit 8cc0ec0266
No known key found for this signature in database
GPG Key ID: 71F5FB4E4F3FD54F
2 changed files with 116 additions and 26 deletions

View File

@ -150,11 +150,31 @@ struct SyncParams {
limit: Option<i64>
}
#[derive(Serialize)]
struct SyncConflict {
#[serde(rename(serialize = "type"))]
conf_type: String,
server_item: Option<item::SyncItem>,
unsaved_item: Option<item::SyncItem>
}
impl SyncConflict {
fn uuid<'a>(&self) -> String {
if let Some(ref item) = self.server_item {
item.uuid.clone()
} else if let Some(ref item) = self.unsaved_item {
item.uuid.clone()
} else {
panic!("SyncConflict should have either server_item or unsaved_item");
}
}
}
#[derive(Serialize)]
struct SyncResp {
retrieved_items: Vec<item::SyncItem>,
saved_items: Vec<item::SyncItem>,
unsaved: Vec<item::SyncItem>,
conflicts: Vec<SyncConflict>,
sync_token: Option<String>, // for convenience, we will actually always return this
cursor_token: Option<String>
}
@ -164,27 +184,13 @@ fn items_sync(db: DbConn, u: user::User, params: Json<SyncParams>) -> Custom<Jso
let mut resp = SyncResp {
retrieved_items: vec![],
saved_items: vec![],
unsaved: vec![],
conflicts: vec![],
sync_token: None,
cursor_token: None
};
let inner_params = params.into_inner();
// First, update all items sent by client
for mut it in inner_params.items.into_iter() {
// Always update updated_at for all items on server
it.updated_at =
Some(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
if let Err(item::ItemOpError(e)) = item::SyncItem::items_insert(&db, &u, &it) {
// We can just return an error here
return error_resp(Status::InternalServerError, vec![e]);
} else {
resp.saved_items.push(it);
}
}
let mut from_id: Option<i64> = None;
let mut had_cursor = false;
@ -200,13 +206,13 @@ fn items_sync(db: DbConn, u: user::User, params: Json<SyncParams>) -> Custom<Jso
from_id = sync_token.parse().ok();
}
// Then, retrieve what the client needs
// First, retrieve what the client needs
let result = item::SyncItem::items_of_user(&db, &u,
from_id, None, inner_params.limit);
match result {
Err(item::ItemOpError(e)) => {
error_resp(Status::InternalServerError, vec![e])
return error_resp(Status::InternalServerError, vec![e])
},
Ok(items) => {
if !items.is_empty() {
@ -226,21 +232,93 @@ fn items_sync(db: DbConn, u: user::User, params: Json<SyncParams>) -> Custom<Jso
}
}
// Always set sync_token to equal cursor_token in this case
resp.sync_token = Some(next_from.to_string());
// Always keep sync_token unchanged in this case
// (this may change later when we save items)
resp.sync_token = inner_params.sync_token;
} else {
if had_cursor {
// If we already have no item to give, but the client still holds a cursor
// Revoke that cursor, and make it the sync_token
// (this may change later when we save items)
resp.sync_token = resp.cursor_token.clone();
resp.cursor_token = None;
} else {
// Pass the same sync_token back
resp.sync_token = inner_params.sync_token.clone();
// (this may change later when we save items)
resp.sync_token = inner_params.sync_token;
}
}
resp.retrieved_items = items.into_iter().map(|x| x.into()).collect();
Custom(Status::Ok, Json(Response::Success(resp)))
}
}
// Then, update all items sent by client
let mut last_id: i64 = -1;
for mut it in inner_params.items.into_iter() {
// Handle conflicts
let mut conflicted = false;
for y in resp.retrieved_items.iter() {
if it.uuid == y.uuid {
conflicted = true;
// We assume enc_item_key identifies an "item"
if it.enc_item_key == y.enc_item_key {
// A sync conflict
resp.conflicts.push(SyncConflict {
conf_type: "sync_conflict".to_string(),
server_item: Some(y.clone()),
unsaved_item: None
});
} else {
// A UUID conflict (unlikely)
resp.conflicts.push(SyncConflict {
conf_type: "uuid_conflict".to_string(),
server_item: None,
unsaved_item: Some(it.clone())
})
}
}
}
// do not save conflicted items
if conflicted {
continue;
}
// Always update updated_at for all items on server
it.updated_at =
Some(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true));
match item::SyncItem::items_insert(&db, &u, &it) {
Err(item::ItemOpError(e)) => {
return error_resp(Status::InternalServerError, vec![e]);
},
Ok(id) => {
last_id = id;
resp.saved_items.push(it);
}
}
}
if last_id > -1 {
// Update sync_token to the latest one of our saved items
resp.sync_token = Some(last_id.to_string());
}
// Remove conflicted items from retrieved items
let mut new_retrieved = vec![];
for x in resp.retrieved_items.into_iter() {
let mut is_conflict = false;
for y in resp.conflicts.iter() {
if x.uuid == y.uuid() {
is_conflict = true;
}
}
if !is_conflict {
new_retrieved.push(x);
}
}
resp.retrieved_items = new_retrieved;
Custom(Status::Ok, Json(Response::Success(resp)))
}

View File

@ -48,7 +48,7 @@ struct InsertItem {
updated_at: Option<String>
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct SyncItem {
pub uuid: String,
pub content: Option<String>,
@ -101,7 +101,16 @@ impl SyncItem {
})
}
pub fn items_insert(db: &SqliteConnection, u: &user::User, it: &SyncItem) -> Result<(), ItemOpError> {
pub fn find_item_by_uuid(db: &SqliteConnection, u: &user::User, i: &str) -> Result<Item, ItemOpError> {
lock_db_read!()
.and_then(|_| {
items.filter(owner.eq(u.id).and(uuid.eq(i)))
.first::<Item>(db)
.map_err(|_| "Database error".into())
})
}
pub fn items_insert(db: &SqliteConnection, u: &user::User, it: &SyncItem) -> Result<i64, ItemOpError> {
// First, try to find the original item, if any, delete it, and insert a new one with the same UUID
// This way, the ID is updated each time an item is updated
// This method acts both as insertion and update
@ -133,7 +142,10 @@ impl SyncItem {
updated_at: it.updated_at.clone()
})
.execute(db)
.map(|_| ())
.map_err(|_| "Database error".into())
.map_err(|_| "Database error".into())?;
std::mem::drop(_lock);
Self::find_item_by_uuid(db, u, &it.uuid)
.map(|i| i.id)
}
}