Re-wrote Provider::sync

This commit is contained in:
daladim 2021-03-22 23:42:41 +01:00
parent c46222c8c7
commit 5c3c5c8090
10 changed files with 256 additions and 69 deletions

View file

@ -6,6 +6,10 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
integration_tests = ["mock_version_tag"]
mock_version_tag = []
[dependencies] [dependencies]
env_logger = "0.8" env_logger = "0.8"
log = "0.4" log = "0.4"

View file

@ -70,16 +70,38 @@ impl PartialCalendar for CachedCalendar {
Ok(()) Ok(())
} }
#[cfg(not(feature = "mock_version_tag"))]
#[allow(unreachable_code)]
async fn get_item_version_tags(&self) -> Result<HashMap<ItemId, VersionTag>, Box<dyn Error>> { async fn get_item_version_tags(&self) -> Result<HashMap<ItemId, VersionTag>, Box<dyn Error>> {
Ok(self.items.iter() panic!("This function only makes sense in remote calendars and in mocked calendars");
.map(|(id, item)| (id.clone(), item.version_tag().clone())) Err("This function only makes sense in remote calendars and in mocked calendars".into())
.collect() }
) #[cfg(feature = "mock_version_tag")]
async fn get_item_version_tags(&self) -> Result<HashMap<ItemId, VersionTag>, Box<dyn Error>> {
use crate::item::SyncStatus;
let mut result = HashMap::new();
for (id, item) in &self.items {
let vt = match item.sync_status() {
SyncStatus::Synced(vt) => vt.clone(),
_ => {
panic!("Mock calendars must contain only SyncStatus::Synced. Got {:?}", item);
}
};
result.insert(id.clone(), vt);
}
Ok(result)
} }
async fn get_item_by_id_mut<'a>(&'a mut self, id: &ItemId) -> Option<&'a mut Item> { async fn get_item_by_id_mut<'a>(&'a mut self, id: &ItemId) -> Option<&'a mut Item> {
self.items.get_mut(id) self.items.get_mut(id)
} }
async fn get_item_by_id<'a>(&'a self, id: &ItemId) -> Option<&'a Item> {
self.items.get(id)
}
} }
#[async_trait] #[async_trait]

View file

@ -1,7 +1,6 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::error::Error; use std::error::Error;
use chrono::{DateTime, Utc};
use async_trait::async_trait; use async_trait::async_trait;
use crate::traits::PartialCalendar; use crate::traits::PartialCalendar;
@ -86,6 +85,12 @@ impl PartialCalendar for RemoteCalendar {
None None
} }
async fn get_item_by_id<'a>(&'a self, id: &ItemId) -> Option<&'a Item> {
log::error!("Not implemented");
None
}
/// Add an item into this calendar /// Add an item into this calendar
async fn add_item(&mut self, _item: Item) { async fn add_item(&mut self, _item: Item) {
log::error!("Not implemented"); log::error!("Not implemented");

View file

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use chrono::{Utc, DateTime}; use chrono::{Utc, DateTime};
use crate::item::ItemId; use crate::item::ItemId;
use crate::item::VersionTag; use crate::item::SyncStatus;
/// TODO: implement Event one day. /// TODO: implement Event one day.
/// This crate currently only supports tasks, not calendar events. /// This crate currently only supports tasks, not calendar events.
@ -13,7 +13,7 @@ pub struct Event {
id: ItemId, id: ItemId,
name: String, name: String,
last_modified: DateTime<Utc>, last_modified: DateTime<Utc>,
version_tag: VersionTag, sync_status: SyncStatus,
} }
impl Event { impl Event {
@ -29,7 +29,10 @@ impl Event {
self.last_modified self.last_modified
} }
pub fn version_tag(&self) -> &VersionTag { pub fn sync_status(&self) -> &SyncStatus {
&self.version_tag &self.sync_status
}
pub fn set_sync_status(&mut self, new_status: SyncStatus) {
self.sync_status = new_status;
} }
} }

View file

@ -37,10 +37,16 @@ impl Item {
} }
} }
pub fn version_tag(&self) -> &VersionTag { pub fn sync_status(&self) -> &SyncStatus {
match self { match self {
Item::Event(e) => e.version_tag(), Item::Event(e) => e.sync_status(),
Item::Task(t) => t.version_tag(), Item::Task(t) => t.sync_status(),
}
}
pub fn set_sync_status(&mut self, new_status: SyncStatus) {
match self {
Item::Event(e) => e.set_sync_status(new_status),
Item::Task(t) => t.set_sync_status(new_status),
} }
} }
@ -87,7 +93,7 @@ pub struct ItemId {
content: Url, content: Url,
} }
impl ItemId{ impl ItemId{
/// Generate a random ItemId. This is only useful in tests /// Generate a random ItemId. This should only be useful in tests
pub fn random() -> Self { pub fn random() -> Self {
let random = uuid::Uuid::new_v4().to_hyphenated().to_string(); let random = uuid::Uuid::new_v4().to_hyphenated().to_string();
let s = format!("https://server.com/{}", random); let s = format!("https://server.com/{}", random);
@ -133,9 +139,32 @@ impl From<String> for VersionTag {
} }
impl VersionTag { impl VersionTag {
/// Generate a random VesionTag. This is only useful in tests /// Generate a random VesionTag
#[cfg(feature = "mock_version_tag")]
pub fn random() -> Self { pub fn random() -> Self {
let random = uuid::Uuid::new_v4().to_hyphenated().to_string(); let random = uuid::Uuid::new_v4().to_hyphenated().to_string();
Self { tag: random } Self { tag: random }
} }
} }
/// Desribes whether this item has been synced already, or modified since the last time it was synced
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum SyncStatus {
/// This item has ben locally created, and never synced yet
NotSynced,
/// At the time this item has ben synced, it has a given version tag, and has not been locally modified since then
Synced(VersionTag),
/// This item has been synced when it had a given version tag, and has been locally modified since then.
LocallyModified(VersionTag),
/// This item has been synced when it had a given version tag, and has been locally deleted since then.
LocallyDeleted(VersionTag),
}
impl SyncStatus {
/// Generate a random SyncStatus::Synced
#[cfg(feature = "mock_version_tag")]
pub fn random_synced() -> Self {
Self::Synced(VersionTag::random())
}
}

View file

@ -16,6 +16,7 @@ mod item;
pub use item::Item; pub use item::Item;
pub use item::ItemId; pub use item::ItemId;
pub use item::VersionTag; pub use item::VersionTag;
pub use item::SyncStatus;
mod task; mod task;
pub use task::Task; pub use task::Task;
mod event; mod event;

View file

@ -4,18 +4,15 @@ use std::error::Error;
use std::collections::HashSet; use std::collections::HashSet;
use std::marker::PhantomData; use std::marker::PhantomData;
use chrono::{DateTime, Utc};
use crate::traits::{CalDavSource, CompleteCalendar}; use crate::traits::{CalDavSource, CompleteCalendar};
use crate::traits::PartialCalendar; use crate::traits::PartialCalendar;
use crate::Item; use crate::item::SyncStatus;
use crate::item::ItemId;
/// A data source that combines two `CalDavSources` (usually a server and a local cache), which is able to sync both sources. /// A data source that combines two `CalDavSources` (usually a server and a local cache), which is able to sync both sources.
pub struct Provider<L, T, R, U> pub struct Provider<L, T, R, U>
where where
L: CalDavSource<T>, L: CalDavSource<T>,
T: CompleteCalendar, T: CompleteCalendar + Sync + Send,
R: CalDavSource<U>, R: CalDavSource<U>,
U: PartialCalendar + Sync + Send, U: PartialCalendar + Sync + Send,
{ {
@ -31,7 +28,7 @@ where
impl<L, T, R, U> Provider<L, T, R, U> impl<L, T, R, U> Provider<L, T, R, U>
where where
L: CalDavSource<T>, L: CalDavSource<T>,
T: CompleteCalendar, T: CompleteCalendar + Sync + Send,
R: CalDavSource<U>, R: CalDavSource<U>,
U: PartialCalendar + Sync + Send, U: PartialCalendar + Sync + Send,
{ {
@ -45,15 +42,15 @@ where
} }
} }
/// Returns the data source described as the `server` /// Returns the data source described as the `remote`
pub fn remote(&self) -> &R { &self.remote } pub fn remote(&self) -> &R { &self.remote }
/// Returns the data source described as the `local` /// Returns the data source described as the `local`
pub fn local(&self) -> &L { &self.local } pub fn local(&self) -> &L { &self.local }
/// Performs a synchronisation between `local` and `server`. /// Performs a synchronisation between `local` and `remote`.
/// ///
/// This bidirectional sync applies additions/deletions made on a source to the other source. /// This bidirectional sync applies additions/deletions made on a source to the other source.
/// In case of conflicts (the same item has been modified on both ends since the last sync, `server` always wins) /// In case of conflicts (the same item has been modified on both ends since the last sync, `remote` always wins)
pub async fn sync(&mut self) -> Result<(), Box<dyn Error>> { pub async fn sync(&mut self) -> Result<(), Box<dyn Error>> {
log::info!("Starting a sync."); log::info!("Starting a sync.");
@ -71,33 +68,151 @@ where
let mut cal_local = cal_local.lock().unwrap(); let mut cal_local = cal_local.lock().unwrap();
// Step 1 - find the differences // Step 1 - find the differences
// let mut local_del = HashSet::new(); let mut local_del = HashSet::new();
// let mut remote_del = HashSet::new(); let mut remote_del = HashSet::new();
// let mut local_changes = HashSet::new(); let mut local_changes = HashSet::new();
// let mut remote_change = HashSet::new(); let mut remote_changes = HashSet::new();
// let mut local_additions = HashSet::new(); let mut local_additions = HashSet::new();
// let mut remote_additions = HashSet::new(); let mut remote_additions = HashSet::new();
let remote_items = cal_remote.get_item_version_tags().await?;
let mut local_items_to_handle = cal_local.get_item_ids().await?;
for (id, remote_tag) in remote_items {
match cal_local.get_item_by_id(&id).await {
None => {
// This was created on the remote
remote_additions.insert(id);
},
Some(local_item) => {
if local_items_to_handle.remove(&id) == true {
log::error!("Inconsistent state: missing task {} from the local tasks", id);
}
match local_item.sync_status() {
SyncStatus::NotSynced => {
log::error!("ID reuse between remote and local sources ({}). Ignoring this item in the sync", id);
continue;
},
SyncStatus::Synced(local_tag) => {
if &remote_tag != local_tag {
// This has been modified on the remote
remote_changes.insert(id);
}
},
SyncStatus::LocallyModified(local_tag) => {
if &remote_tag == local_tag {
// This has been changed locally
local_changes.insert(id);
} else {
log::info!("Conflict: task {} has been modified in both sources. Using the remote version.", id);
remote_changes.insert(id);
}
},
SyncStatus::LocallyDeleted(local_tag) => {
if &remote_tag == local_tag {
// This has been locally deleted
local_del.insert(id);
} else {
log::info!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", id);
remote_changes.insert(id);
}
},
}
}
}
}
// Also iterate on the local tasks that are not on the remote
for id in local_items_to_handle {
let local_item = match cal_local.get_item_by_id(&id).await {
None => {
log::error!("Inconsistent state: missing task {} from the local tasks", id);
continue;
},
Some(item) => item,
};
match local_item.sync_status() {
SyncStatus::Synced(_) => {
// This item has been removed from the remote
remote_del.insert(id);
},
SyncStatus::NotSynced => {
// This item has just been locally created
local_additions.insert(id);
},
variant @ _=> {
log::error!("Inconsistent state: unexpected variant {:?}", variant);
}
}
}
// Step 2 - commit changes
for id_del in local_del {
if let Err(err) = cal_remote.delete_item(&id_del).await {
log::warn!("Unable to delete remote item {}: {}", id_del, err);
}
}
for id_del in remote_del {
if let Err(err) = cal_local.delete_item(&id_del).await {
log::warn!("Unable to delete local item {}: {}", id_del, err);
}
}
for id_add in remote_additions {
match cal_remote.get_item_by_id(&id_add).await {
None => {
log::error!("Inconsistency: new item {} has vanished from the remote end", id_add);
continue;
},
Some(new_item) => cal_local.add_item(new_item.clone()).await,
}
}
for id_change in remote_changes {
match cal_remote.get_item_by_id(&id_change).await {
None => {
log::error!("Inconsistency: modified item {} has vanished from the remote end", id_change);
continue;
},
Some(item) => {
if let Err(err) = cal_local.delete_item(&id_change).await {
log::error!("Unable to delete item {} from local calendar: {}", id_change, err);
}
cal_local.add_item(item.clone());
},
}
}
for id_add in local_additions {
match cal_local.get_item_by_id(&id_add).await {
None => {
log::error!("Inconsistency: created item {} has been marked for upload but is locally missing", id_add);
continue;
},
Some(item) => cal_remote.add_item(item.clone()),
};
}
for id_change in local_changes {
match cal_local.get_item_by_id(&id_change).await {
None => {
log::error!("Inconsistency: modified item {} has been marked for upload but is locally missing", id_change);
continue;
},
Some(item) => {
if let Err(err) = cal_remote.delete_item(&id_change).await {
log::error!("Unable to delete item {} from remote calendar: {}", id_change, err);
}
cal_remote.add_item(item.clone());
}
};
}
} }
Ok(()) Ok(())
} }
} }
async fn move_to_calendar<C: PartialCalendar>(items: &mut Vec<Item>, calendar: &mut C) {
while items.len() > 0 {
let item = items.remove(0);
log::warn!("Moving {} to calendar", item.name());
calendar.add_item(item).await;
}
}
async fn remove_from_calendar<C: PartialCalendar>(ids: &Vec<ItemId>, calendar: &mut C) {
for id in ids {
log::info!(" Removing {:?} from calendar", id);
if let Err(err) = calendar.delete_item(id).await {
log::warn!("Unable to delete item {:?} from calendar: {}", id, err);
}
}
}

View file

@ -2,7 +2,7 @@ use chrono::{Utc, DateTime};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::item::ItemId; use crate::item::ItemId;
use crate::item::VersionTag; use crate::item::SyncStatus;
/// A to-do task /// A to-do task
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@ -12,8 +12,8 @@ pub struct Task {
/// The last modification date of this task /// The last modification date of this task
last_modified: DateTime<Utc>, last_modified: DateTime<Utc>,
/// The version tag of this item /// The sync status of this item
version_tag: VersionTag, sync_status: SyncStatus,
/// The display name of the task /// The display name of the task
name: String, name: String,
@ -23,12 +23,12 @@ pub struct Task {
impl Task { impl Task {
/// Create a new Task /// Create a new Task
pub fn new(name: String, id: ItemId, last_modified: DateTime<Utc>, version_tag: VersionTag) -> Self { pub fn new(name: String, id: ItemId, last_modified: DateTime<Utc>, sync_status: SyncStatus) -> Self {
Self { Self {
id, id,
name, name,
last_modified, last_modified,
version_tag, sync_status,
completed: false, completed: false,
} }
} }
@ -37,7 +37,10 @@ impl Task {
pub fn name(&self) -> &str { &self.name } pub fn name(&self) -> &str { &self.name }
pub fn completed(&self) -> bool { self.completed } pub fn completed(&self) -> bool { self.completed }
pub fn last_modified(&self) -> DateTime<Utc> { self.last_modified } pub fn last_modified(&self) -> DateTime<Utc> { self.last_modified }
pub fn version_tag(&self) -> &VersionTag { &self.version_tag } pub fn sync_status(&self) -> &SyncStatus { &self.sync_status }
pub fn set_sync_status(&mut self, new_status: SyncStatus) {
self.sync_status = new_status;
}
fn update_last_modified(&mut self) { fn update_last_modified(&mut self) {
self.last_modified = Utc::now(); self.last_modified = Utc::now();

View file

@ -39,6 +39,9 @@ pub trait PartialCalendar {
/// Returns a particular item /// Returns a particular item
async fn get_item_by_id_mut<'a>(&'a mut self, id: &ItemId) -> Option<&'a mut Item>; async fn get_item_by_id_mut<'a>(&'a mut self, id: &ItemId) -> Option<&'a mut Item>;
/// Returns a particular item
async fn get_item_by_id<'a>(&'a self, id: &ItemId) -> Option<&'a Item>;
/// Add an item into this calendar /// Add an item into this calendar
async fn add_item(&mut self, item: Item); async fn add_item(&mut self, item: Item);

View file

@ -1,3 +1,5 @@
#![cfg(feature = "integration_tests")]
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -9,7 +11,7 @@ use my_tasks::traits::PartialCalendar;
use my_tasks::cache::Cache; use my_tasks::cache::Cache;
use my_tasks::Item; use my_tasks::Item;
use my_tasks::ItemId; use my_tasks::ItemId;
use my_tasks::VersionTag; use my_tasks::SyncStatus;
use my_tasks::Task; use my_tasks::Task;
use my_tasks::calendar::cached_calendar::CachedCalendar; use my_tasks::calendar::cached_calendar::CachedCalendar;
use my_tasks::Provider; use my_tasks::Provider;
@ -54,19 +56,19 @@ async fn populate_test_provider() -> Provider<Cache, CachedCalendar, Cache, Cach
let cal_id = Url::parse("http://todo.list/cal").unwrap(); let cal_id = Url::parse("http://todo.list/cal").unwrap();
let task_a = Item::Task(Task::new("task A".into(), ItemId::random(), Utc.ymd(2000, 1, 1).and_hms(0, 0, 0), VersionTag::random())); let task_a = Item::Task(Task::new("task A".into(), ItemId::random(), Utc.ymd(2000, 1, 1).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_b = Item::Task(Task::new("task B".into(), ItemId::random(), Utc.ymd(2000, 1, 2).and_hms(0, 0, 0), VersionTag::random())); let task_b = Item::Task(Task::new("task B".into(), ItemId::random(), Utc.ymd(2000, 1, 2).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_c = Item::Task(Task::new("task C".into(), ItemId::random(), Utc.ymd(2000, 1, 3).and_hms(0, 0, 0), VersionTag::random())); let task_c = Item::Task(Task::new("task C".into(), ItemId::random(), Utc.ymd(2000, 1, 3).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_d = Item::Task(Task::new("task D".into(), ItemId::random(), Utc.ymd(2000, 1, 4).and_hms(0, 0, 0), VersionTag::random())); let task_d = Item::Task(Task::new("task D".into(), ItemId::random(), Utc.ymd(2000, 1, 4).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_e = Item::Task(Task::new("task E".into(), ItemId::random(), Utc.ymd(2000, 1, 5).and_hms(0, 0, 0), VersionTag::random())); let task_e = Item::Task(Task::new("task E".into(), ItemId::random(), Utc.ymd(2000, 1, 5).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_f = Item::Task(Task::new("task F".into(), ItemId::random(), Utc.ymd(2000, 1, 6).and_hms(0, 0, 0), VersionTag::random())); let task_f = Item::Task(Task::new("task F".into(), ItemId::random(), Utc.ymd(2000, 1, 6).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_g = Item::Task(Task::new("task G".into(), ItemId::random(), Utc.ymd(2000, 1, 7).and_hms(0, 0, 0), VersionTag::random())); let task_g = Item::Task(Task::new("task G".into(), ItemId::random(), Utc.ymd(2000, 1, 7).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_h = Item::Task(Task::new("task H".into(), ItemId::random(), Utc.ymd(2000, 1, 8).and_hms(0, 0, 0), VersionTag::random())); let task_h = Item::Task(Task::new("task H".into(), ItemId::random(), Utc.ymd(2000, 1, 8).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_i = Item::Task(Task::new("task I".into(), ItemId::random(), Utc.ymd(2000, 1, 9).and_hms(0, 0, 0), VersionTag::random())); let task_i = Item::Task(Task::new("task I".into(), ItemId::random(), Utc.ymd(2000, 1, 9).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_j = Item::Task(Task::new("task J".into(), ItemId::random(), Utc.ymd(2000, 1, 10).and_hms(0, 0, 0), VersionTag::random())); let task_j = Item::Task(Task::new("task J".into(), ItemId::random(), Utc.ymd(2000, 1, 10).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_k = Item::Task(Task::new("task K".into(), ItemId::random(), Utc.ymd(2000, 1, 11).and_hms(0, 0, 0), VersionTag::random())); let task_k = Item::Task(Task::new("task K".into(), ItemId::random(), Utc.ymd(2000, 1, 11).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_l = Item::Task(Task::new("task L".into(), ItemId::random(), Utc.ymd(2000, 1, 12).and_hms(0, 0, 0), VersionTag::random())); let task_l = Item::Task(Task::new("task L".into(), ItemId::random(), Utc.ymd(2000, 1, 12).and_hms(0, 0, 0), SyncStatus::random_synced()));
let task_m = Item::Task(Task::new("task M".into(), ItemId::random(), Utc.ymd(2000, 1, 12).and_hms(0, 0, 0), VersionTag::random())); let task_m = Item::Task(Task::new("task M".into(), ItemId::random(), Utc.ymd(2000, 1, 12).and_hms(0, 0, 0), SyncStatus::random_synced()));
// let last_sync = task_m.last_modified(); // let last_sync = task_m.last_modified();
// local.update_last_sync(Some(last_sync)); // local.update_last_sync(Some(last_sync));
@ -130,7 +132,7 @@ async fn populate_test_provider() -> Provider<Cache, CachedCalendar, Cache, Cach
cal_server.delete_item(&task_l_id).await.unwrap(); cal_server.delete_item(&task_l_id).await.unwrap();
let task_n = Item::Task(Task::new("task N (new from server)".into(), ItemId::random(), Utc::now(), VersionTag::random())); let task_n = Item::Task(Task::new("task N (new from server)".into(), ItemId::random(), Utc::now(), SyncStatus::random_synced()));
cal_server.add_item(task_n).await; cal_server.add_item(task_n).await;
@ -159,7 +161,7 @@ async fn populate_test_provider() -> Provider<Cache, CachedCalendar, Cache, Cach
cal_local.delete_item(&task_k_id).await.unwrap(); cal_local.delete_item(&task_k_id).await.unwrap();
cal_local.delete_item(&task_l_id).await.unwrap(); cal_local.delete_item(&task_l_id).await.unwrap();
let task_o = Item::Task(Task::new("task O (new from local)".into(), ItemId::random(), Utc::now(), VersionTag::random())); let task_o = Item::Task(Task::new("task O (new from local)".into(), ItemId::random(), Utc::now(), SyncStatus::NotSynced));
cal_local.add_item(task_o).await; cal_local.add_item(task_o).await;
Provider::new(server, local) Provider::new(server, local)