Merge branch 'feedback'

This commit is contained in:
daladim 2021-10-13 08:17:39 +02:00
commit 9bd45d5a07
2 changed files with 214 additions and 86 deletions

View file

@ -1,4 +1,6 @@
//! This modules abstracts data sources and merges them in a single virtual one
//!
//! It is also responsible for syncing them together
use std::error::Error;
use std::collections::HashSet;
@ -7,39 +9,12 @@ use std::sync::{Arc, Mutex};
use crate::traits::{BaseCalendar, CalDavSource, DavCalendar};
use crate::traits::CompleteCalendar;
use crate::item::SyncStatus;
use crate::item::{ItemId, SyncStatus};
use crate::calendar::CalendarId;
/// A counter of errors that happen during a sync
struct SyncResult {
n_errors: u32,
}
impl SyncResult {
pub fn new() -> Self {
Self { n_errors: 0 }
}
pub fn is_success(&self) -> bool {
self.n_errors == 0
}
pub fn error(&mut self, text: &str) {
log::error!("{}", text);
self.n_errors += 1;
}
pub fn warn(&mut self, text: &str) {
log::warn!("{}", text);
self.n_errors += 1;
}
pub fn info(&mut self, text: &str) {
log::info!("{}", text);
}
pub fn debug(&mut self, text: &str) {
log::debug!("{}", text);
}
pub fn trace(&mut self, text: &str) {
log::trace!("{}", text);
}
}
pub mod sync_progress;
use sync_progress::SyncProgress;
use sync_progress::{FeedbackSender, SyncEvent};
/// A data source that combines two `CalDavSource`s, which is able to sync both sources.
///
@ -90,23 +65,37 @@ where
/// To be sure `local` accurately mirrors the `remote` source, you can run [`Provider::sync`]
pub fn remote(&self) -> &R { &self.remote }
/// Performs a synchronisation between `local` and `remote`.
/// Performs a synchronisation between `local` and `remote`, and provide feeedback to the user about the progress.
///
/// This bidirectional sync applies additions/deletions made on a source to the other source.
/// In case of conflicts (the same item has been modified on both ends since the last sync, `remote` always wins)
///
/// It returns whether the sync was totally successful (details about errors are logged using the `log::*` macros).
/// In case errors happened, the sync might have been partially executed, and you can safely run this function again, since it has been designed to gracefully recover from errors.
pub async fn sync(&mut self) -> bool {
let mut result = SyncResult::new();
if let Err(err) = self.run_sync(&mut result).await {
result.error(&format!("Sync terminated because of an error: {}", err));
}
result.is_success()
pub async fn sync_with_feedback(&mut self, feedback_sender: FeedbackSender) -> bool {
let mut progress = SyncProgress::new_with_feedback_channel(feedback_sender);
self.run_sync(&mut progress).await
}
async fn run_sync(&mut self, result: &mut SyncResult) -> Result<(), Box<dyn Error>> {
result.info("Starting a sync");
/// Performs a synchronisation between `local` and `remote`, without giving any feedback.
///
/// See [sync_with_feedback]
pub async fn sync(&mut self) -> bool {
let mut progress = SyncProgress::new();
self.run_sync(&mut progress).await
}
async fn run_sync(&mut self, progress: &mut SyncProgress) -> bool {
if let Err(err) = self.run_sync_inner(progress).await {
progress.error(&format!("Sync terminated because of an error: {}", err));
}
progress.feedback(SyncEvent::Finished{ success: progress.is_success() });
progress.is_success()
}
async fn run_sync_inner(&mut self, progress: &mut SyncProgress) -> Result<(), Box<dyn Error>> {
progress.info("Starting a sync.");
progress.feedback(SyncEvent::Started);
let mut handled_calendars = HashSet::new();
@ -115,14 +104,14 @@ where
for (cal_id, cal_remote) in cals_remote {
let counterpart = match self.get_or_insert_local_counterpart_calendar(&cal_id, cal_remote.clone()).await {
Err(err) => {
result.warn(&format!("Unable to get or insert local counterpart calendar for {} ({}). Skipping this time", cal_id, err));
progress.warn(&format!("Unable to get or insert local counterpart calendar for {} ({}). Skipping this time", cal_id, err));
continue;
},
Ok(arc) => arc,
};
if let Err(err) = Self::sync_calendar_pair(counterpart, cal_remote, result).await {
result.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err));
if let Err(err) = Self::sync_calendar_pair(counterpart, cal_remote, progress).await {
progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err));
continue;
}
handled_calendars.insert(cal_id);
@ -137,19 +126,19 @@ where
let counterpart = match self.get_or_insert_remote_counterpart_calendar(&cal_id, cal_local.clone()).await {
Err(err) => {
result.warn(&format!("Unable to get or insert remote counterpart calendar for {} ({}). Skipping this time", cal_id, err));
progress.warn(&format!("Unable to get or insert remote counterpart calendar for {} ({}). Skipping this time", cal_id, err));
continue;
},
Ok(arc) => arc,
};
if let Err(err) = Self::sync_calendar_pair(cal_local, counterpart, result).await {
result.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err));
if let Err(err) = Self::sync_calendar_pair(cal_local, counterpart, progress).await {
progress.warn(&format!("Unable to sync calendar {}: {}, skipping this time.", cal_id, err));
continue;
}
}
result.info("Sync ended");
progress.info("Sync ended");
Ok(())
}
@ -163,12 +152,19 @@ where
}
async fn sync_calendar_pair(cal_local: Arc<Mutex<T>>, cal_remote: Arc<Mutex<U>>, result: &mut SyncResult) -> Result<(), Box<dyn Error>> {
async fn sync_calendar_pair(cal_local: Arc<Mutex<T>>, cal_remote: Arc<Mutex<U>>, progress: &mut SyncProgress) -> Result<(), Box<dyn Error>> {
let mut cal_remote = cal_remote.lock().unwrap();
let mut cal_local = cal_local.lock().unwrap();
let cal_name = cal_local.name().to_string();
progress.info(&format!("Syncing calendar {}", cal_name));
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: "started".to_string()
});
// Step 1 - find the differences
result.debug("Finding the differences to sync...");
progress.debug("Finding the differences to sync...");
let mut local_del = HashSet::new();
let mut remote_del = HashSet::new();
let mut local_changes = HashSet::new();
@ -177,51 +173,56 @@ where
let mut remote_additions = HashSet::new();
let remote_items = cal_remote.get_item_version_tags().await?;
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: format!("{} remote items", remote_items.len()),
});
let mut local_items_to_handle = cal_local.get_item_ids().await?;
for (id, remote_tag) in remote_items {
result.trace(&format!("***** Considering remote item {}...", id));
progress.trace(&format!("***** Considering remote item {}...", id));
match cal_local.get_item_by_id(&id).await {
None => {
// This was created on the remote
result.debug(&format!("* {} is a remote addition", id));
progress.debug(&format!("* {} is a remote addition", id));
remote_additions.insert(id);
},
Some(local_item) => {
if local_items_to_handle.remove(&id) == false {
result.error(&format!("Inconsistent state: missing task {} from the local tasks", id));
progress.error(&format!("Inconsistent state: missing task {} from the local tasks", id));
}
match local_item.sync_status() {
SyncStatus::NotSynced => {
result.error(&format!("ID reuse between remote and local sources ({}). Ignoring this item in the sync", id));
progress.error(&format!("ID reuse between remote and local sources ({}). Ignoring this item in the sync", id));
continue;
},
SyncStatus::Synced(local_tag) => {
if &remote_tag != local_tag {
// This has been modified on the remote
result.debug(&format!("* {} is a remote change", id));
progress.debug(&format!("* {} is a remote change", id));
remote_changes.insert(id);
}
},
SyncStatus::LocallyModified(local_tag) => {
if &remote_tag == local_tag {
// This has been changed locally
result.debug(&format!("* {} is a local change", id));
progress.debug(&format!("* {} is a local change", id));
local_changes.insert(id);
} else {
result.info(&format!("Conflict: task {} has been modified in both sources. Using the remote version.", id));
result.debug(&format!("* {} is considered a remote change", id));
progress.info(&format!("Conflict: task {} has been modified in both sources. Using the remote version.", id));
progress.debug(&format!("* {} is considered a remote change", id));
remote_changes.insert(id);
}
},
SyncStatus::LocallyDeleted(local_tag) => {
if &remote_tag == local_tag {
// This has been locally deleted
result.debug(&format!("* {} is a local deletion", id));
progress.debug(&format!("* {} is a local deletion", id));
local_del.insert(id);
} else {
result.info(&format!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", id));
result.debug(&format!("* {} is a considered a remote change", id));
progress.info(&format!("Conflict: task {} has been locally deleted and remotely modified. Reverting to the remote version.", id));
progress.debug(&format!("* {} is a considered a remote change", id));
remote_changes.insert(id);
}
},
@ -232,10 +233,10 @@ where
// Also iterate on the local tasks that are not on the remote
for id in local_items_to_handle {
result.trace(&format!("##### Considering local item {}...", id));
progress.trace(&format!("##### Considering local item {}...", id));
let local_item = match cal_local.get_item_by_id(&id).await {
None => {
result.error(&format!("Inconsistent state: missing task {} from the local tasks", id));
progress.error(&format!("Inconsistent state: missing task {} from the local tasks", id));
continue;
},
Some(item) => item,
@ -244,21 +245,21 @@ where
match local_item.sync_status() {
SyncStatus::Synced(_) => {
// This item has been removed from the remote
result.debug(&format!("# {} is a deletion from the server", id));
progress.debug(&format!("# {} is a deletion from the server", id));
remote_del.insert(id);
},
SyncStatus::NotSynced => {
// This item has just been locally created
result.debug(&format!("# {} has been locally created", id));
progress.debug(&format!("# {} has been locally created", id));
local_additions.insert(id);
},
SyncStatus::LocallyDeleted(_) => {
// This item has been deleted from both sources
result.debug(&format!("# {} has been deleted from both sources", id));
progress.debug(&format!("# {} has been deleted from both sources", id));
remote_del.insert(id);
},
SyncStatus::LocallyModified(_) => {
result.info(&format!("Conflict: item {} has been deleted from the server and locally modified. Deleting the local copy", id));
progress.info(&format!("Conflict: item {} has been deleted from the server and locally modified. Deleting the local copy", id));
remote_del.insert(id);
},
}
@ -266,44 +267,56 @@ where
// Step 2 - commit changes
result.trace("Committing changes...");
progress.trace("Committing changes...");
for id_del in local_del {
result.debug(&format!("> Pushing local deletion {} to the server", id_del));
progress.debug(&format!("> Pushing local deletion {} to the server", id_del));
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: Self::item_name(&cal_local, &id_del).await,
});
match cal_remote.delete_item(&id_del).await {
Err(err) => {
result.warn(&format!("Unable to delete remote item {}: {}", id_del, err));
progress.warn(&format!("Unable to delete remote item {}: {}", id_del, err));
},
Ok(()) => {
// Change the local copy from "marked to deletion" to "actually deleted"
if let Err(err) = cal_local.immediately_delete_item(&id_del).await {
result.error(&format!("Unable to permanently delete local item {}: {}", id_del, err));
progress.error(&format!("Unable to permanently delete local item {}: {}", id_del, err));
}
},
}
}
for id_del in remote_del {
result.debug(&format!("> Applying remote deletion {} locally", id_del));
progress.debug(&format!("> Applying remote deletion {} locally", id_del));
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: Self::item_name(&cal_local, &id_del).await,
});
if let Err(err) = cal_local.immediately_delete_item(&id_del).await {
result.warn(&format!("Unable to delete local item {}: {}", id_del, err));
progress.warn(&format!("Unable to delete local item {}: {}", id_del, err));
}
}
for id_add in remote_additions {
result.debug(&format!("> Applying remote addition {} locally", id_add));
progress.debug(&format!("> Applying remote addition {} locally", id_add));
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: Self::item_name(&cal_local, &id_add).await,
});
match cal_remote.get_item_by_id(&id_add).await {
Err(err) => {
result.warn(&format!("Unable to get remote item {}: {}. Skipping it.", id_add, err));
progress.warn(&format!("Unable to get remote item {}: {}. Skipping it.", id_add, err));
continue;
},
Ok(item) => match item {
None => {
result.error(&format!("Inconsistency: new item {} has vanished from the remote end", id_add));
progress.error(&format!("Inconsistency: new item {} has vanished from the remote end", id_add));
continue;
},
Some(new_item) => {
if let Err(err) = cal_local.add_item(new_item.clone()).await {
result.error(&format!("Not able to add item {} to local calendar: {}", id_add, err));
progress.error(&format!("Not able to add item {} to local calendar: {}", id_add, err));
}
},
},
@ -311,20 +324,24 @@ where
}
for id_change in remote_changes {
result.debug(&format!("> Applying remote change {} locally", id_change));
progress.debug(&format!("> Applying remote change {} locally", id_change));
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: Self::item_name(&cal_local, &id_change).await,
});
match cal_remote.get_item_by_id(&id_change).await {
Err(err) => {
result.warn(&format!("Unable to get remote item {}: {}. Skipping it", id_change, err));
progress.warn(&format!("Unable to get remote item {}: {}. Skipping it", id_change, err));
continue;
},
Ok(item) => match item {
None => {
result.error(&format!("Inconsistency: modified item {} has vanished from the remote end", id_change));
progress.error(&format!("Inconsistency: modified item {} has vanished from the remote end", id_change));
continue;
},
Some(item) => {
if let Err(err) = cal_local.update_item(item.clone()).await {
result.error(&format!("Unable to update item {} in local calendar: {}", id_change, err));
progress.error(&format!("Unable to update item {} in local calendar: {}", id_change, err));
}
},
}
@ -333,15 +350,19 @@ where
for id_add in local_additions {
result.debug(&format!("> Pushing local addition {} to the server", id_add));
progress.debug(&format!("> Pushing local addition {} to the server", id_add));
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: Self::item_name(&cal_local, &id_add).await,
});
match cal_local.get_item_by_id_mut(&id_add).await {
None => {
result.error(&format!("Inconsistency: created item {} has been marked for upload but is locally missing", id_add));
progress.error(&format!("Inconsistency: created item {} has been marked for upload but is locally missing", id_add));
continue;
},
Some(item) => {
match cal_remote.add_item(item.clone()).await {
Err(err) => result.error(&format!("Unable to add item {} to remote calendar: {}", id_add, err)),
Err(err) => progress.error(&format!("Unable to add item {} to remote calendar: {}", id_add, err)),
Ok(new_ss) => {
// Update local sync status
item.set_sync_status(new_ss);
@ -352,15 +373,19 @@ where
}
for id_change in local_changes {
result.debug(&format!("> Pushing local change {} to the server", id_change));
progress.debug(&format!("> Pushing local change {} to the server", id_change));
progress.feedback(SyncEvent::InProgress{
calendar: cal_name.clone(),
details: Self::item_name(&cal_local, &id_change).await,
});
match cal_local.get_item_by_id_mut(&id_change).await {
None => {
result.error(&format!("Inconsistency: modified item {} has been marked for upload but is locally missing", id_change));
progress.error(&format!("Inconsistency: modified item {} has been marked for upload but is locally missing", id_change));
continue;
},
Some(item) => {
match cal_remote.update_item(item.clone()).await {
Err(err) => result.error(&format!("Unable to update item {} in remote calendar: {}", id_change, err)),
Err(err) => progress.error(&format!("Unable to update item {} in remote calendar: {}", id_change, err)),
Ok(new_ss) => {
// Update local sync status
item.set_sync_status(new_ss);
@ -372,6 +397,12 @@ where
Ok(())
}
async fn item_name(cal: &T, id: &ItemId) -> String {
cal.get_item_by_id(id).await.map(|item| item.name()).unwrap_or_default().to_string()
}
}

View file

@ -0,0 +1,97 @@
//! Utilities to track the progression of a sync
use std::fmt::{Display, Error, Formatter};
/// An event that happens during a sync
pub enum SyncEvent {
/// Sync has not started
NotStarted,
/// Sync has just started but no calendar is handled yet
Started,
/// Sync is in progress.
InProgress{ calendar: String, details: String},
/// Sync is finished
Finished{ success: bool },
}
impl Display for SyncEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
SyncEvent::NotStarted => write!(f, "Not started"),
SyncEvent::Started => write!(f, "Sync has started"),
SyncEvent::InProgress{calendar, details} => write!(f, "[{}] {}", calendar, details),
SyncEvent::Finished{success} => match success {
true => write!(f, "Sync successfully finished"),
false => write!(f, "Sync finished with errors"),
}
}
}
}
impl Default for SyncEvent {
fn default() -> Self {
Self::NotStarted
}
}
pub type FeedbackSender = tokio::sync::watch::Sender<SyncEvent>;
pub type FeedbackReceiver = tokio::sync::watch::Receiver<SyncEvent>;
pub fn feedback_channel() -> (FeedbackSender, FeedbackReceiver) {
tokio::sync::watch::channel(SyncEvent::default())
}
/// A structure that tracks the progression and the errors that happen during a sync
pub struct SyncProgress {
n_errors: u32,
feedback_channel: Option<FeedbackSender>
}
impl SyncProgress {
pub fn new() -> Self {
Self { n_errors: 0, feedback_channel: None }
}
pub fn new_with_feedback_channel(channel: FeedbackSender) -> Self {
Self { n_errors: 0, feedback_channel: Some(channel) }
}
pub fn is_success(&self) -> bool {
self.n_errors == 0
}
/// Log an error
pub fn error(&mut self, text: &str) {
log::error!("{}", text);
self.n_errors += 1;
}
/// Log a warning
pub fn warn(&mut self, text: &str) {
log::warn!("{}", text);
self.n_errors += 1;
}
/// Log an info
pub fn info(&mut self, text: &str) {
log::info!("{}", text);
}
/// Log a debug message
pub fn debug(&mut self, text: &str) {
log::debug!("{}", text);
}
/// Log a trace message
pub fn trace(&mut self, text: &str) {
log::trace!("{}", text);
}
/// Send an event as a feedback to the listener (if any).
pub fn feedback(&mut self, event: SyncEvent) {
self.feedback_channel
.as_ref()
.map(|sender| {
sender.send(event)
});
}
}