From 5c3c5c809023f3ea235163e9d5e46cd2c19a6106 Mon Sep 17 00:00:00 2001 From: daladim Date: Mon, 22 Mar 2021 23:42:41 +0100 Subject: [PATCH] Re-wrote Provider::sync --- Cargo.toml | 4 + src/calendar/cached_calendar.rs | 30 +++++- src/calendar/remote_calendar.rs | 7 +- src/event.rs | 11 +- src/item.rs | 39 ++++++- src/lib.rs | 1 + src/provider.rs | 181 ++++++++++++++++++++++++++------ src/task.rs | 15 +-- src/traits.rs | 3 + tests/sync.rs | 34 +++--- 10 files changed, 256 insertions(+), 69 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9a277f9..b6a8cf5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,10 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +integration_tests = ["mock_version_tag"] +mock_version_tag = [] + [dependencies] env_logger = "0.8" log = "0.4" diff --git a/src/calendar/cached_calendar.rs b/src/calendar/cached_calendar.rs index 4376846..8bbc433 100644 --- a/src/calendar/cached_calendar.rs +++ b/src/calendar/cached_calendar.rs @@ -70,16 +70,38 @@ impl PartialCalendar for CachedCalendar { Ok(()) } + #[cfg(not(feature = "mock_version_tag"))] + #[allow(unreachable_code)] async fn get_item_version_tags(&self) -> Result, Box> { - Ok(self.items.iter() - .map(|(id, item)| (id.clone(), item.version_tag().clone())) - .collect() - ) + panic!("This function only makes sense in remote calendars and in mocked calendars"); + Err("This function only makes sense in remote calendars and in mocked calendars".into()) + } + #[cfg(feature = "mock_version_tag")] + async fn get_item_version_tags(&self) -> Result, Box> { + use crate::item::SyncStatus; + + let mut result = HashMap::new(); + + for (id, item) in &self.items { + let vt = match item.sync_status() { + SyncStatus::Synced(vt) => vt.clone(), + _ => { + panic!("Mock calendars must contain only SyncStatus::Synced. Got {:?}", item); + } + }; + result.insert(id.clone(), vt); + } + + Ok(result) } async fn get_item_by_id_mut<'a>(&'a mut self, id: &ItemId) -> Option<&'a mut Item> { self.items.get_mut(id) } + + async fn get_item_by_id<'a>(&'a self, id: &ItemId) -> Option<&'a Item> { + self.items.get(id) + } } #[async_trait] diff --git a/src/calendar/remote_calendar.rs b/src/calendar/remote_calendar.rs index e985d60..7618943 100644 --- a/src/calendar/remote_calendar.rs +++ b/src/calendar/remote_calendar.rs @@ -1,7 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; -use chrono::{DateTime, Utc}; use async_trait::async_trait; use crate::traits::PartialCalendar; @@ -86,6 +85,12 @@ impl PartialCalendar for RemoteCalendar { None } + async fn get_item_by_id<'a>(&'a self, id: &ItemId) -> Option<&'a Item> { + log::error!("Not implemented"); + None + } + + /// Add an item into this calendar async fn add_item(&mut self, _item: Item) { log::error!("Not implemented"); diff --git a/src/event.rs b/src/event.rs index 829b0ac..c60e2b8 100644 --- a/src/event.rs +++ b/src/event.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use chrono::{Utc, DateTime}; use crate::item::ItemId; -use crate::item::VersionTag; +use crate::item::SyncStatus; /// TODO: implement Event one day. /// This crate currently only supports tasks, not calendar events. @@ -13,7 +13,7 @@ pub struct Event { id: ItemId, name: String, last_modified: DateTime, - version_tag: VersionTag, + sync_status: SyncStatus, } impl Event { @@ -29,7 +29,10 @@ impl Event { self.last_modified } - pub fn version_tag(&self) -> &VersionTag { - &self.version_tag + pub fn sync_status(&self) -> &SyncStatus { + &self.sync_status + } + pub fn set_sync_status(&mut self, new_status: SyncStatus) { + self.sync_status = new_status; } } diff --git a/src/item.rs b/src/item.rs index 3f7facb..a9b2ca6 100644 --- a/src/item.rs +++ b/src/item.rs @@ -37,10 +37,16 @@ impl Item { } } - pub fn version_tag(&self) -> &VersionTag { + pub fn sync_status(&self) -> &SyncStatus { match self { - Item::Event(e) => e.version_tag(), - Item::Task(t) => t.version_tag(), + Item::Event(e) => e.sync_status(), + Item::Task(t) => t.sync_status(), + } + } + pub fn set_sync_status(&mut self, new_status: SyncStatus) { + match self { + Item::Event(e) => e.set_sync_status(new_status), + Item::Task(t) => t.set_sync_status(new_status), } } @@ -87,7 +93,7 @@ pub struct ItemId { content: Url, } impl ItemId{ - /// Generate a random ItemId. This is only useful in tests + /// Generate a random ItemId. This should only be useful in tests pub fn random() -> Self { let random = uuid::Uuid::new_v4().to_hyphenated().to_string(); let s = format!("https://server.com/{}", random); @@ -133,9 +139,32 @@ impl From for VersionTag { } impl VersionTag { - /// Generate a random VesionTag. This is only useful in tests + /// Generate a random VesionTag + #[cfg(feature = "mock_version_tag")] pub fn random() -> Self { let random = uuid::Uuid::new_v4().to_hyphenated().to_string(); Self { tag: random } } } + + + +/// Desribes whether this item has been synced already, or modified since the last time it was synced +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum SyncStatus { + /// This item has ben locally created, and never synced yet + NotSynced, + /// At the time this item has ben synced, it has a given version tag, and has not been locally modified since then + Synced(VersionTag), + /// This item has been synced when it had a given version tag, and has been locally modified since then. + LocallyModified(VersionTag), + /// This item has been synced when it had a given version tag, and has been locally deleted since then. + LocallyDeleted(VersionTag), +} +impl SyncStatus { + /// Generate a random SyncStatus::Synced + #[cfg(feature = "mock_version_tag")] + pub fn random_synced() -> Self { + Self::Synced(VersionTag::random()) + } +} diff --git a/src/lib.rs b/src/lib.rs index d08286f..c671e53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ mod item; pub use item::Item; pub use item::ItemId; pub use item::VersionTag; +pub use item::SyncStatus; mod task; pub use task::Task; mod event; diff --git a/src/provider.rs b/src/provider.rs index e424f00..3b99f67 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -4,18 +4,15 @@ use std::error::Error; use std::collections::HashSet; use std::marker::PhantomData; -use chrono::{DateTime, Utc}; - use crate::traits::{CalDavSource, CompleteCalendar}; use crate::traits::PartialCalendar; -use crate::Item; -use crate::item::ItemId; +use crate::item::SyncStatus; /// A data source that combines two `CalDavSources` (usually a server and a local cache), which is able to sync both sources. pub struct Provider where L: CalDavSource, - T: CompleteCalendar, + T: CompleteCalendar + Sync + Send, R: CalDavSource, U: PartialCalendar + Sync + Send, { @@ -31,7 +28,7 @@ where impl Provider where L: CalDavSource, - T: CompleteCalendar, + T: CompleteCalendar + Sync + Send, R: CalDavSource, U: PartialCalendar + Sync + Send, { @@ -45,15 +42,15 @@ where } } - /// Returns the data source described as the `server` + /// Returns the data source described as the `remote` pub fn remote(&self) -> &R { &self.remote } /// Returns the data source described as the `local` pub fn local(&self) -> &L { &self.local } - /// Performs a synchronisation between `local` and `server`. + /// Performs a synchronisation between `local` and `remote`. /// /// This bidirectional sync applies additions/deletions made on a source to the other source. - /// In case of conflicts (the same item has been modified on both ends since the last sync, `server` always wins) + /// In case of conflicts (the same item has been modified on both ends since the last sync, `remote` always wins) pub async fn sync(&mut self) -> Result<(), Box> { log::info!("Starting a sync."); @@ -71,33 +68,151 @@ where let mut cal_local = cal_local.lock().unwrap(); // Step 1 - find the differences - // let mut local_del = HashSet::new(); - // let mut remote_del = HashSet::new(); - // let mut local_changes = HashSet::new(); - // let mut remote_change = HashSet::new(); - // let mut local_additions = HashSet::new(); - // let mut remote_additions = HashSet::new(); + let mut local_del = HashSet::new(); + let mut remote_del = HashSet::new(); + let mut local_changes = HashSet::new(); + let mut remote_changes = HashSet::new(); + let mut local_additions = HashSet::new(); + let mut remote_additions = HashSet::new(); + + let remote_items = cal_remote.get_item_version_tags().await?; + let mut local_items_to_handle = cal_local.get_item_ids().await?; + for (id, remote_tag) in remote_items { + match cal_local.get_item_by_id(&id).await { + None => { + // This was created on the remote + remote_additions.insert(id); + }, + Some(local_item) => { + if local_items_to_handle.remove(&id) == true { + log::error!("Inconsistent state: missing task {} from the local tasks", id); + } + + match local_item.sync_status() { + SyncStatus::NotSynced => { + log::error!("ID reuse between remote and local sources ({}). Ignoring this item in the sync", id); + continue; + }, + SyncStatus::Synced(local_tag) => { + if &remote_tag != local_tag { + // This has been modified on the remote + remote_changes.insert(id); + } + }, + SyncStatus::LocallyModified(local_tag) => { + if &remote_tag == local_tag { + // This has been changed locally + local_changes.insert(id); + } else { + log::info!("Conflict: task {} has been modified in both sources. Using the remote version.", id); + remote_changes.insert(id); + } + }, + SyncStatus::LocallyDeleted(local_tag) => { + if &remote_tag == local_tag { + // This has been locally deleted + local_del.insert(id); + } else { + log::info!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", id); + remote_changes.insert(id); + } + }, + } + } + } + } + + // Also iterate on the local tasks that are not on the remote + for id in local_items_to_handle { + let local_item = match cal_local.get_item_by_id(&id).await { + None => { + log::error!("Inconsistent state: missing task {} from the local tasks", id); + continue; + }, + Some(item) => item, + }; + match local_item.sync_status() { + SyncStatus::Synced(_) => { + // This item has been removed from the remote + remote_del.insert(id); + }, + SyncStatus::NotSynced => { + // This item has just been locally created + local_additions.insert(id); + }, + variant @ _=> { + log::error!("Inconsistent state: unexpected variant {:?}", variant); + } + } + } + + + // Step 2 - commit changes + for id_del in local_del { + if let Err(err) = cal_remote.delete_item(&id_del).await { + log::warn!("Unable to delete remote item {}: {}", id_del, err); + } + } + + for id_del in remote_del { + if let Err(err) = cal_local.delete_item(&id_del).await { + log::warn!("Unable to delete local item {}: {}", id_del, err); + } + } + + for id_add in remote_additions { + match cal_remote.get_item_by_id(&id_add).await { + None => { + log::error!("Inconsistency: new item {} has vanished from the remote end", id_add); + continue; + }, + Some(new_item) => cal_local.add_item(new_item.clone()).await, + } + } + + for id_change in remote_changes { + match cal_remote.get_item_by_id(&id_change).await { + None => { + log::error!("Inconsistency: modified item {} has vanished from the remote end", id_change); + continue; + }, + Some(item) => { + if let Err(err) = cal_local.delete_item(&id_change).await { + log::error!("Unable to delete item {} from local calendar: {}", id_change, err); + } + cal_local.add_item(item.clone()); + }, + } + } + + + for id_add in local_additions { + match cal_local.get_item_by_id(&id_add).await { + None => { + log::error!("Inconsistency: created item {} has been marked for upload but is locally missing", id_add); + continue; + }, + Some(item) => cal_remote.add_item(item.clone()), + }; + } + + for id_change in local_changes { + match cal_local.get_item_by_id(&id_change).await { + None => { + log::error!("Inconsistency: modified item {} has been marked for upload but is locally missing", id_change); + continue; + }, + Some(item) => { + if let Err(err) = cal_remote.delete_item(&id_change).await { + log::error!("Unable to delete item {} from remote calendar: {}", id_change, err); + } + cal_remote.add_item(item.clone()); + } + }; + } } Ok(()) } } - - -async fn move_to_calendar(items: &mut Vec, calendar: &mut C) { - while items.len() > 0 { - let item = items.remove(0); - log::warn!("Moving {} to calendar", item.name()); - calendar.add_item(item).await; - } -} - -async fn remove_from_calendar(ids: &Vec, calendar: &mut C) { - for id in ids { - log::info!(" Removing {:?} from calendar", id); - if let Err(err) = calendar.delete_item(id).await { - log::warn!("Unable to delete item {:?} from calendar: {}", id, err); - } - } -} diff --git a/src/task.rs b/src/task.rs index f19e9e0..137c6ae 100644 --- a/src/task.rs +++ b/src/task.rs @@ -2,7 +2,7 @@ use chrono::{Utc, DateTime}; use serde::{Deserialize, Serialize}; use crate::item::ItemId; -use crate::item::VersionTag; +use crate::item::SyncStatus; /// A to-do task #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -12,8 +12,8 @@ pub struct Task { /// The last modification date of this task last_modified: DateTime, - /// The version tag of this item - version_tag: VersionTag, + /// The sync status of this item + sync_status: SyncStatus, /// The display name of the task name: String, @@ -23,12 +23,12 @@ pub struct Task { impl Task { /// Create a new Task - pub fn new(name: String, id: ItemId, last_modified: DateTime, version_tag: VersionTag) -> Self { + pub fn new(name: String, id: ItemId, last_modified: DateTime, sync_status: SyncStatus) -> Self { Self { id, name, last_modified, - version_tag, + sync_status, completed: false, } } @@ -37,7 +37,10 @@ impl Task { pub fn name(&self) -> &str { &self.name } pub fn completed(&self) -> bool { self.completed } pub fn last_modified(&self) -> DateTime { self.last_modified } - pub fn version_tag(&self) -> &VersionTag { &self.version_tag } + pub fn sync_status(&self) -> &SyncStatus { &self.sync_status } + pub fn set_sync_status(&mut self, new_status: SyncStatus) { + self.sync_status = new_status; + } fn update_last_modified(&mut self) { self.last_modified = Utc::now(); diff --git a/src/traits.rs b/src/traits.rs index ef2216e..dbff2da 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -39,6 +39,9 @@ pub trait PartialCalendar { /// Returns a particular item async fn get_item_by_id_mut<'a>(&'a mut self, id: &ItemId) -> Option<&'a mut Item>; + /// Returns a particular item + async fn get_item_by_id<'a>(&'a self, id: &ItemId) -> Option<&'a Item>; + /// Add an item into this calendar async fn add_item(&mut self, item: Item); diff --git a/tests/sync.rs b/tests/sync.rs index b731ed6..1ac53f3 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "integration_tests")] + use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -9,7 +11,7 @@ use my_tasks::traits::PartialCalendar; use my_tasks::cache::Cache; use my_tasks::Item; use my_tasks::ItemId; -use my_tasks::VersionTag; +use my_tasks::SyncStatus; use my_tasks::Task; use my_tasks::calendar::cached_calendar::CachedCalendar; use my_tasks::Provider; @@ -54,19 +56,19 @@ async fn populate_test_provider() -> Provider Provider Provider