diff --git a/src/provider.rs b/src/provider/mod.rs similarity index 60% rename from src/provider.rs rename to src/provider/mod.rs index d52e411..2354fd2 100644 --- a/src/provider.rs +++ b/src/provider/mod.rs @@ -1,4 +1,6 @@ //! This modules abstracts data sources and merges them in a single virtual one +//! +//! It is also responsible for syncing them together use std::error::Error; use std::collections::HashSet; @@ -7,39 +9,12 @@ use std::sync::{Arc, Mutex}; use crate::traits::{BaseCalendar, CalDavSource, DavCalendar}; use crate::traits::CompleteCalendar; -use crate::item::SyncStatus; +use crate::item::{ItemId, SyncStatus}; use crate::calendar::CalendarId; -/// A counter of errors that happen during a sync -struct SyncResult { - n_errors: u32, -} -impl SyncResult { - pub fn new() -> Self { - Self { n_errors: 0 } - } - pub fn is_success(&self) -> bool { - self.n_errors == 0 - } - - pub fn error(&mut self, text: &str) { - log::error!("{}", text); - self.n_errors += 1; - } - pub fn warn(&mut self, text: &str) { - log::warn!("{}", text); - self.n_errors += 1; - } - pub fn info(&mut self, text: &str) { - log::info!("{}", text); - } - pub fn debug(&mut self, text: &str) { - log::debug!("{}", text); - } - pub fn trace(&mut self, text: &str) { - log::trace!("{}", text); - } -} +pub mod sync_progress; +use sync_progress::SyncProgress; +use sync_progress::{FeedbackSender, SyncEvent}; /// A data source that combines two `CalDavSource`s, which is able to sync both sources. /// @@ -90,23 +65,37 @@ where /// To be sure `local` accurately mirrors the `remote` source, you can run [`Provider::sync`] pub fn remote(&self) -> &R { &self.remote } - /// Performs a synchronisation between `local` and `remote`. + /// Performs a synchronisation between `local` and `remote`, and provide feeedback to the user about the progress. /// /// This bidirectional sync applies additions/deletions made on a source to the other source. /// In case of conflicts (the same item has been modified on both ends since the last sync, `remote` always wins) /// /// It returns whether the sync was totally successful (details about errors are logged using the `log::*` macros). /// In case errors happened, the sync might have been partially executed, and you can safely run this function again, since it has been designed to gracefully recover from errors. - pub async fn sync(&mut self) -> bool { - let mut result = SyncResult::new(); - if let Err(err) = self.run_sync(&mut result).await { - result.error(&format!("Sync terminated because of an error: {}", err)); - } - result.is_success() + pub async fn sync_with_feedback(&mut self, feedback_sender: FeedbackSender) -> bool { + let mut progress = SyncProgress::new_with_feedback_channel(feedback_sender); + self.run_sync(&mut progress).await } - async fn run_sync(&mut self, result: &mut SyncResult) -> Result<(), Box> { - result.info("Starting a sync"); + /// Performs a synchronisation between `local` and `remote`, without giving any feedback. + /// + /// See [sync_with_feedback] + pub async fn sync(&mut self) -> bool { + let mut progress = SyncProgress::new(); + self.run_sync(&mut progress).await + } + + async fn run_sync(&mut self, progress: &mut SyncProgress) -> bool { + if let Err(err) = self.run_sync_inner(progress).await { + progress.error(&format!("Sync terminated because of an error: {}", err)); + } + progress.feedback(SyncEvent::Finished{ success: progress.is_success() }); + progress.is_success() + } + + async fn run_sync_inner(&mut self, progress: &mut SyncProgress) -> Result<(), Box> { + progress.info("Starting a sync."); + progress.feedback(SyncEvent::Started); let mut handled_calendars = HashSet::new(); @@ -115,14 +104,14 @@ where for (cal_id, cal_remote) in cals_remote { let counterpart = match self.get_or_insert_local_counterpart_calendar(&cal_id, cal_remote.clone()).await { Err(err) => { - result.warn(&format!("Unable to get or insert local counterpart calendar for {} ({}). Skipping this time", cal_id, err)); + progress.warn(&format!("Unable to get or insert local counterpart calendar for {} ({}). Skipping this time", cal_id, err)); continue; }, Ok(arc) => arc, }; - if let Err(err) = Self::sync_calendar_pair(counterpart, cal_remote, result).await { - result.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err)); + if let Err(err) = Self::sync_calendar_pair(counterpart, cal_remote, progress).await { + progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err)); continue; } handled_calendars.insert(cal_id); @@ -137,19 +126,19 @@ where let counterpart = match self.get_or_insert_remote_counterpart_calendar(&cal_id, cal_local.clone()).await { Err(err) => { - result.warn(&format!("Unable to get or insert remote counterpart calendar for {} ({}). Skipping this time", cal_id, err)); + progress.warn(&format!("Unable to get or insert remote counterpart calendar for {} ({}). Skipping this time", cal_id, err)); continue; }, Ok(arc) => arc, }; - if let Err(err) = Self::sync_calendar_pair(cal_local, counterpart, result).await { - result.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err)); + if let Err(err) = Self::sync_calendar_pair(cal_local, counterpart, progress).await { + progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err)); continue; } } - result.info("Sync ended"); + progress.info("Sync ended"); Ok(()) } @@ -163,12 +152,19 @@ where } - async fn sync_calendar_pair(cal_local: Arc>, cal_remote: Arc>, result: &mut SyncResult) -> Result<(), Box> { + async fn sync_calendar_pair(cal_local: Arc>, cal_remote: Arc>, progress: &mut SyncProgress) -> Result<(), Box> { let mut cal_remote = cal_remote.lock().unwrap(); let mut cal_local = cal_local.lock().unwrap(); + let cal_name = cal_local.name().to_string(); + + progress.info(&format!("Syncing calendar {}", cal_name)); + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: "started".to_string() + }); // Step 1 - find the differences - result.debug("Finding the differences to sync..."); + progress.debug("Finding the differences to sync..."); let mut local_del = HashSet::new(); let mut remote_del = HashSet::new(); let mut local_changes = HashSet::new(); @@ -177,51 +173,56 @@ where let mut remote_additions = HashSet::new(); let remote_items = cal_remote.get_item_version_tags().await?; + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: format!("{} remote items", remote_items.len()), + }); + let mut local_items_to_handle = cal_local.get_item_ids().await?; for (id, remote_tag) in remote_items { - result.trace(&format!("***** Considering remote item {}...", id)); + progress.trace(&format!("***** Considering remote item {}...", id)); match cal_local.get_item_by_id(&id).await { None => { // This was created on the remote - result.debug(&format!("* {} is a remote addition", id)); + progress.debug(&format!("* {} is a remote addition", id)); remote_additions.insert(id); }, Some(local_item) => { if local_items_to_handle.remove(&id) == false { - result.error(&format!("Inconsistent state: missing task {} from the local tasks", id)); + progress.error(&format!("Inconsistent state: missing task {} from the local tasks", id)); } match local_item.sync_status() { SyncStatus::NotSynced => { - result.error(&format!("ID reuse between remote and local sources ({}). Ignoring this item in the sync", id)); + progress.error(&format!("ID reuse between remote and local sources ({}). Ignoring this item in the sync", id)); continue; }, SyncStatus::Synced(local_tag) => { if &remote_tag != local_tag { // This has been modified on the remote - result.debug(&format!("* {} is a remote change", id)); + progress.debug(&format!("* {} is a remote change", id)); remote_changes.insert(id); } }, SyncStatus::LocallyModified(local_tag) => { if &remote_tag == local_tag { // This has been changed locally - result.debug(&format!("* {} is a local change", id)); + progress.debug(&format!("* {} is a local change", id)); local_changes.insert(id); } else { - result.info(&format!("Conflict: task {} has been modified in both sources. Using the remote version.", id)); - result.debug(&format!("* {} is considered a remote change", id)); + progress.info(&format!("Conflict: task {} has been modified in both sources. Using the remote version.", id)); + progress.debug(&format!("* {} is considered a remote change", id)); remote_changes.insert(id); } }, SyncStatus::LocallyDeleted(local_tag) => { if &remote_tag == local_tag { // This has been locally deleted - result.debug(&format!("* {} is a local deletion", id)); + progress.debug(&format!("* {} is a local deletion", id)); local_del.insert(id); } else { - result.info(&format!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", id)); - result.debug(&format!("* {} is a considered a remote change", id)); + progress.info(&format!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", id)); + progress.debug(&format!("* {} is a considered a remote change", id)); remote_changes.insert(id); } }, @@ -232,10 +233,10 @@ where // Also iterate on the local tasks that are not on the remote for id in local_items_to_handle { - result.trace(&format!("##### Considering local item {}...", id)); + progress.trace(&format!("##### Considering local item {}...", id)); let local_item = match cal_local.get_item_by_id(&id).await { None => { - result.error(&format!("Inconsistent state: missing task {} from the local tasks", id)); + progress.error(&format!("Inconsistent state: missing task {} from the local tasks", id)); continue; }, Some(item) => item, @@ -244,21 +245,21 @@ where match local_item.sync_status() { SyncStatus::Synced(_) => { // This item has been removed from the remote - result.debug(&format!("# {} is a deletion from the server", id)); + progress.debug(&format!("# {} is a deletion from the server", id)); remote_del.insert(id); }, SyncStatus::NotSynced => { // This item has just been locally created - result.debug(&format!("# {} has been locally created", id)); + progress.debug(&format!("# {} has been locally created", id)); local_additions.insert(id); }, SyncStatus::LocallyDeleted(_) => { // This item has been deleted from both sources - result.debug(&format!("# {} has been deleted from both sources", id)); + progress.debug(&format!("# {} has been deleted from both sources", id)); remote_del.insert(id); }, SyncStatus::LocallyModified(_) => { - result.info(&format!("Conflict: item {} has been deleted from the server and locally modified. Deleting the local copy", id)); + progress.info(&format!("Conflict: item {} has been deleted from the server and locally modified. Deleting the local copy", id)); remote_del.insert(id); }, } @@ -266,44 +267,56 @@ where // Step 2 - commit changes - result.trace("Committing changes..."); + progress.trace("Committing changes..."); for id_del in local_del { - result.debug(&format!("> Pushing local deletion {} to the server", id_del)); + progress.debug(&format!("> Pushing local deletion {} to the server", id_del)); + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: Self::item_name(&cal_local, &id_del).await, + }); match cal_remote.delete_item(&id_del).await { Err(err) => { - result.warn(&format!("Unable to delete remote item {}: {}", id_del, err)); + progress.warn(&format!("Unable to delete remote item {}: {}", id_del, err)); }, Ok(()) => { // Change the local copy from "marked to deletion" to "actually deleted" if let Err(err) = cal_local.immediately_delete_item(&id_del).await { - result.error(&format!("Unable to permanently delete local item {}: {}", id_del, err)); + progress.error(&format!("Unable to permanently delete local item {}: {}", id_del, err)); } }, } } for id_del in remote_del { - result.debug(&format!("> Applying remote deletion {} locally", id_del)); + progress.debug(&format!("> Applying remote deletion {} locally", id_del)); + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: Self::item_name(&cal_local, &id_del).await, + }); if let Err(err) = cal_local.immediately_delete_item(&id_del).await { - result.warn(&format!("Unable to delete local item {}: {}", id_del, err)); + progress.warn(&format!("Unable to delete local item {}: {}", id_del, err)); } } for id_add in remote_additions { - result.debug(&format!("> Applying remote addition {} locally", id_add)); + progress.debug(&format!("> Applying remote addition {} locally", id_add)); + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: Self::item_name(&cal_local, &id_add).await, + }); match cal_remote.get_item_by_id(&id_add).await { Err(err) => { - result.warn(&format!("Unable to get remote item {}: {}. Skipping it.", id_add, err)); + progress.warn(&format!("Unable to get remote item {}: {}. Skipping it.", id_add, err)); continue; }, Ok(item) => match item { None => { - result.error(&format!("Inconsistency: new item {} has vanished from the remote end", id_add)); + progress.error(&format!("Inconsistency: new item {} has vanished from the remote end", id_add)); continue; }, Some(new_item) => { if let Err(err) = cal_local.add_item(new_item.clone()).await { - result.error(&format!("Not able to add item {} to local calendar: {}", id_add, err)); + progress.error(&format!("Not able to add item {} to local calendar: {}", id_add, err)); } }, }, @@ -311,20 +324,24 @@ where } for id_change in remote_changes { - result.debug(&format!("> Applying remote change {} locally", id_change)); + progress.debug(&format!("> Applying remote change {} locally", id_change)); + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: Self::item_name(&cal_local, &id_change).await, + }); match cal_remote.get_item_by_id(&id_change).await { Err(err) => { - result.warn(&format!("Unable to get remote item {}: {}. Skipping it", id_change, err)); + progress.warn(&format!("Unable to get remote item {}: {}. Skipping it", id_change, err)); continue; }, Ok(item) => match item { None => { - result.error(&format!("Inconsistency: modified item {} has vanished from the remote end", id_change)); + progress.error(&format!("Inconsistency: modified item {} has vanished from the remote end", id_change)); continue; }, Some(item) => { if let Err(err) = cal_local.update_item(item.clone()).await { - result.error(&format!("Unable to update item {} in local calendar: {}", id_change, err)); + progress.error(&format!("Unable to update item {} in local calendar: {}", id_change, err)); } }, } @@ -333,15 +350,19 @@ where for id_add in local_additions { - result.debug(&format!("> Pushing local addition {} to the server", id_add)); + progress.debug(&format!("> Pushing local addition {} to the server", id_add)); + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: Self::item_name(&cal_local, &id_add).await, + }); match cal_local.get_item_by_id_mut(&id_add).await { None => { - result.error(&format!("Inconsistency: created item {} has been marked for upload but is locally missing", id_add)); + progress.error(&format!("Inconsistency: created item {} has been marked for upload but is locally missing", id_add)); continue; }, Some(item) => { match cal_remote.add_item(item.clone()).await { - Err(err) => result.error(&format!("Unable to add item {} to remote calendar: {}", id_add, err)), + Err(err) => progress.error(&format!("Unable to add item {} to remote calendar: {}", id_add, err)), Ok(new_ss) => { // Update local sync status item.set_sync_status(new_ss); @@ -352,15 +373,19 @@ where } for id_change in local_changes { - result.debug(&format!("> Pushing local change {} to the server", id_change)); + progress.debug(&format!("> Pushing local change {} to the server", id_change)); + progress.feedback(SyncEvent::InProgress{ + calendar: cal_name.clone(), + details: Self::item_name(&cal_local, &id_change).await, + }); match cal_local.get_item_by_id_mut(&id_change).await { None => { - result.error(&format!("Inconsistency: modified item {} has been marked for upload but is locally missing", id_change)); + progress.error(&format!("Inconsistency: modified item {} has been marked for upload but is locally missing", id_change)); continue; }, Some(item) => { match cal_remote.update_item(item.clone()).await { - Err(err) => result.error(&format!("Unable to update item {} in remote calendar: {}", id_change, err)), + Err(err) => progress.error(&format!("Unable to update item {} in remote calendar: {}", id_change, err)), Ok(new_ss) => { // Update local sync status item.set_sync_status(new_ss); @@ -372,6 +397,12 @@ where Ok(()) } + + + async fn item_name(cal: &T, id: &ItemId) -> String { + cal.get_item_by_id(id).await.map(|item| item.name()).unwrap_or_default().to_string() + } + } diff --git a/src/provider/sync_progress.rs b/src/provider/sync_progress.rs new file mode 100644 index 0000000..d2a24bf --- /dev/null +++ b/src/provider/sync_progress.rs @@ -0,0 +1,97 @@ +//! Utilities to track the progression of a sync + +use std::fmt::{Display, Error, Formatter}; + +/// An event that happens during a sync +pub enum SyncEvent { + /// Sync has not started + NotStarted, + /// Sync has just started but no calendar is handled yet + Started, + /// Sync is in progress. + InProgress{ calendar: String, details: String}, + /// Sync is finished + Finished{ success: bool }, +} + +impl Display for SyncEvent { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + match self { + SyncEvent::NotStarted => write!(f, "Not started"), + SyncEvent::Started => write!(f, "Sync has started"), + SyncEvent::InProgress{calendar, details} => write!(f, "[{}] {}", calendar, details), + SyncEvent::Finished{success} => match success { + true => write!(f, "Sync successfully finished"), + false => write!(f, "Sync finished with errors"), + } + } + } +} + +impl Default for SyncEvent { + fn default() -> Self { + Self::NotStarted + } +} + + + +pub type FeedbackSender = tokio::sync::watch::Sender; +pub type FeedbackReceiver = tokio::sync::watch::Receiver; + +pub fn feedback_channel() -> (FeedbackSender, FeedbackReceiver) { + tokio::sync::watch::channel(SyncEvent::default()) +} + + + + +/// A structure that tracks the progression and the errors that happen during a sync +pub struct SyncProgress { + n_errors: u32, + feedback_channel: Option +} +impl SyncProgress { + pub fn new() -> Self { + Self { n_errors: 0, feedback_channel: None } + } + pub fn new_with_feedback_channel(channel: FeedbackSender) -> Self { + Self { n_errors: 0, feedback_channel: Some(channel) } + } + + + pub fn is_success(&self) -> bool { + self.n_errors == 0 + } + + /// Log an error + pub fn error(&mut self, text: &str) { + log::error!("{}", text); + self.n_errors += 1; + } + /// Log a warning + pub fn warn(&mut self, text: &str) { + log::warn!("{}", text); + self.n_errors += 1; + } + /// Log an info + pub fn info(&mut self, text: &str) { + log::info!("{}", text); + } + /// Log a debug message + pub fn debug(&mut self, text: &str) { + log::debug!("{}", text); + } + /// Log a trace message + pub fn trace(&mut self, text: &str) { + log::trace!("{}", text); + } + /// Send an event as a feedback to the listener (if any). + pub fn feedback(&mut self, event: SyncEvent) { + self.feedback_channel + .as_ref() + .map(|sender| { + sender.send(event) + }); + } +}