Merge pull request #7814 from AppFlowy-IO/user_awareness_init

chore: init user awareness
This commit is contained in:
Nathan.fooo 2025-04-23 22:08:16 +08:00 committed by GitHub
commit 288e034ec0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 201 additions and 144 deletions

View File

@ -0,0 +1,65 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:appflowy/core/notification/user_notification.dart';
import 'package:appflowy_backend/protobuf/flowy-error/errors.pb.dart';
import 'package:appflowy_backend/protobuf/flowy-notification/subject.pb.dart';
import 'package:appflowy_backend/protobuf/flowy-user/notification.pb.dart';
import 'package:appflowy_backend/protobuf/flowy-user/reminder.pb.dart';
import 'package:appflowy_backend/rust_stream.dart';
import 'package:appflowy_result/appflowy_result.dart';
class UserAwarenessListener {
UserAwarenessListener({
required this.workspaceId,
});
final String workspaceId;
UserNotificationParser? _userParser;
StreamSubscription<SubscribeObject>? _subscription;
void Function()? onLoadedUserAwareness;
void Function(ReminderPB)? onDidUpdateReminder;
/// [onLoadedUserAwareness] is called when the user awareness is loaded. After this, can
/// call fetch reminders releated events
///
void start({
void Function()? onLoadedUserAwareness,
void Function(ReminderPB)? onDidUpdateReminder,
}) {
this.onLoadedUserAwareness = onLoadedUserAwareness;
this.onDidUpdateReminder = onDidUpdateReminder;
_userParser = UserNotificationParser(
id: workspaceId,
callback: _userNotificationCallback,
);
_subscription = RustStreamReceiver.listen((observable) {
_userParser?.parse(observable);
});
}
void stop() {
_userParser = null;
_subscription?.cancel();
_subscription = null;
}
void _userNotificationCallback(
UserNotification ty,
FlowyResult<Uint8List, FlowyError> result,
) {
switch (ty) {
case UserNotification.DidLoadUserAwareness:
onLoadedUserAwareness?.call();
break;
case UserNotification.DidUpdateReminder:
result.map((r) => onDidUpdateReminder?.call(ReminderPB.fromBuffer(r)));
break;
default:
break;
}
}
}

View File

@ -15,7 +15,7 @@ use serde_json::Value;
use std::str::FromStr;
use std::sync::Weak;
use std::{convert::TryInto, sync::Arc};
use tracing::{event, trace};
use tracing::event;
use uuid::Uuid;
fn upgrade_manager(manager: AFPluginState<Weak<UserManager>>) -> FlowyResult<Arc<UserManager>> {
@ -538,11 +538,11 @@ pub async fn get_all_reminder_event_handler(
let reminders = manager
.get_all_reminders()
.await
.unwrap_or_default()
.into_iter()
.map(ReminderPB::from)
.collect::<Vec<_>>();
trace!("number of reminders: {}", reminders.len());
data_result_ok(reminders.into())
}

View File

@ -15,6 +15,9 @@ pub(crate) enum UserNotification {
DidUpdateCloudConfig = 4,
DidUpdateUserWorkspace = 5,
DidUpdateWorkspaceSetting = 6,
DidLoadUserAwareness = 7,
// TODO: implement reminder observer
DidUpdateReminder = 8,
}
impl std::convert::From<UserNotification> for i32 {

View File

@ -2,8 +2,8 @@ use client_api::entity::GotrueTokenResponse;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use flowy_error::FlowyResult;
use std::str::FromStr;
use arc_swap::ArcSwapOption;
use collab::lock::RwLock;
use collab_user::core::UserAwareness;
use dashmap::DashMap;
@ -46,7 +46,7 @@ use flowy_user_pub::sql::*;
pub struct UserManager {
pub(crate) cloud_service: Weak<dyn UserCloudServiceProvider>,
pub(crate) store_preferences: Arc<KVStorePreferences>,
pub(crate) user_awareness: Arc<ArcSwapOption<RwLock<UserAwareness>>>,
pub(crate) user_awareness_by_workspace: DashMap<Uuid, Arc<RwLock<UserAwareness>>>,
pub(crate) user_status_callback: RwLock<Arc<dyn UserStatusCallback>>,
pub(crate) collab_builder: Weak<AppFlowyCollabBuilder>,
pub(crate) collab_interact: RwLock<Arc<dyn UserReminder>>,
@ -77,7 +77,7 @@ impl UserManager {
let user_manager = Arc::new(Self {
cloud_service: cloud_services,
store_preferences,
user_awareness: Default::default(),
user_awareness_by_workspace: Default::default(),
user_status_callback,
collab_builder,
collab_interact: RwLock::new(Arc::new(DefaultCollabInteract)),
@ -273,7 +273,14 @@ impl UserManager {
self.set_first_time_installed_version();
let cloud_config = get_cloud_config(session.user_id, &self.store_preferences);
// Init the user awareness. here we ignore the error
let _ = self.initial_user_awareness(&session, &auth_type).await;
let _ = self
.initial_user_awareness(
session.user_id,
&session.user_uuid,
&workspace_uuid,
&auth_type,
)
.await;
user_status_callback
.on_launch_if_authenticated(
@ -360,7 +367,12 @@ impl UserManager {
self.save_auth_data(&response, auth_type, &session).await?;
let _ = self
.initial_user_awareness(&session, &user_profile.workspace_auth_type)
.initial_user_awareness(
session.user_id,
&session.user_uuid,
&workspace_id,
&user_profile.workspace_auth_type,
)
.await;
self
.user_status_callback
@ -416,11 +428,19 @@ impl UserManager {
auth_type: &AuthType,
) -> FlowyResult<()> {
let new_session = Session::from(&response);
let workspace_id = Uuid::parse_str(&new_session.workspace_id)?;
self.prepare_user(&new_session).await;
self
.save_auth_data(&response, *auth_type, &new_session)
.await?;
let _ = self.initial_user_awareness(&new_session, auth_type).await;
let _ = self
.initial_user_awareness(
new_session.user_id,
&new_session.user_uuid,
&workspace_id,
auth_type,
)
.await;
let workspace_id = Uuid::parse_str(&new_session.workspace_id)?;
self
.user_status_callback
@ -643,9 +663,14 @@ impl UserManager {
Ok(self.get_session()?.user_id)
}
pub fn workspace_id(&self) -> Result<String, FlowyError> {
pub fn user_uuid(&self) -> Result<Uuid, FlowyError> {
Ok(self.get_session()?.user_uuid)
}
pub fn workspace_id(&self) -> Result<Uuid, FlowyError> {
let session = self.get_session()?;
Ok(session.workspace_id.clone())
let uuid = Uuid::from_str(&session.workspace_id)?;
Ok(uuid)
}
pub fn token(&self) -> Result<Option<String>, FlowyError> {

View File

@ -17,8 +17,8 @@ use tracing::{error, info, instrument, trace};
use uuid::Uuid;
use crate::entities::ReminderPB;
use crate::notification::{send_notification, UserNotification};
use crate::user_manager::UserManager;
use flowy_user_pub::session::Session;
impl UserManager {
/// Adds a new reminder based on the given payload.
@ -36,11 +36,10 @@ impl UserManager {
///
pub async fn add_reminder(&self, reminder_pb: ReminderPB) -> FlowyResult<()> {
let reminder = Reminder::from(reminder_pb);
self
.mut_awareness(|user_awareness| {
user_awareness.add_reminder(reminder.clone());
})
.await?;
let workspace_id = self.workspace_id()?;
let awareness = self.get_awareness(&workspace_id).await?;
awareness.write().await.add_reminder(reminder.clone());
self
.collab_interact
.read()
@ -53,11 +52,14 @@ impl UserManager {
/// Removes a specific reminder for the user by its id
///
pub async fn remove_reminder(&self, reminder_id: &str) -> FlowyResult<()> {
let workspace_id = self.workspace_id()?;
self
.mut_awareness(|user_awareness| {
user_awareness.remove_reminder(reminder_id);
})
.await?;
.get_awareness(&workspace_id)
.await?
.write()
.await
.remove_reminder(reminder_id);
self
.collab_interact
.read()
@ -70,22 +72,25 @@ impl UserManager {
/// Updates an existing reminder
///
pub async fn update_reminder(&self, reminder_pb: ReminderPB) -> FlowyResult<()> {
let workspace_id = self.workspace_id()?;
let reminder = Reminder::from(reminder_pb);
self
.mut_awareness(|user_awareness| {
user_awareness.update_reminder(&reminder.id, |update| {
update
.set_object_id(&reminder.object_id)
.set_title(&reminder.title)
.set_message(&reminder.message)
.set_is_ack(reminder.is_ack)
.set_is_read(reminder.is_read)
.set_scheduled_at(reminder.scheduled_at)
.set_type(reminder.ty)
.set_meta(reminder.meta.clone().into_inner());
});
})
.await?;
.get_awareness(&workspace_id)
.await?
.write()
.await
.update_reminder(&reminder.id, |update| {
update
.set_object_id(&reminder.object_id)
.set_title(&reminder.title)
.set_message(&reminder.message)
.set_is_ack(reminder.is_ack)
.set_is_read(reminder.is_read)
.set_scheduled_at(reminder.scheduled_at)
.set_type(reminder.ty)
.set_meta(reminder.meta.clone().into_inner());
});
self
.collab_interact
.read()
@ -105,23 +110,30 @@ impl UserManager {
/// # Returns
/// - Returns a vector of `Reminder` objects containing all reminders for the user.
///
pub async fn get_all_reminders(&self) -> Vec<Reminder> {
let reminders = self
.mut_awareness(|user_awareness| user_awareness.get_all_reminders())
.await;
reminders.unwrap_or_default()
pub async fn get_all_reminders(&self) -> FlowyResult<Vec<Reminder>> {
let workspace_id = self.workspace_id()?;
Ok(
self
.get_awareness(&workspace_id)
.await?
.read()
.await
.get_all_reminders(),
)
}
/// Init UserAwareness for user
/// 1. check if user awareness exists on disk. If yes init awareness from disk
/// 2. If not, init awareness from server.
#[instrument(level = "info", skip(self, session), err)]
#[instrument(level = "info", skip(self), err)]
pub(crate) async fn initial_user_awareness(
&self,
session: &Session,
auth_type: &AuthType,
uid: i64,
user_uuid: &Uuid,
workspace_id: &Uuid,
workspace_auth_type: &AuthType,
) -> FlowyResult<()> {
let object_id = user_awareness_object_id(&session.user_uuid, &session.workspace_id);
let object_id = user_awareness_object_id(user_uuid, &workspace_id.to_string());
// Try to acquire mutable access to `is_loading_awareness`.
// Thread-safety is ensured by DashMap
@ -147,29 +159,22 @@ impl UserManager {
};
if should_init {
if let Some(old_user_awareness) = self.user_awareness.swap(None) {
info!("Closing previous user awareness");
old_user_awareness.read().await.close(); // Ensure that old awareness is closed
}
let is_exist_on_disk = self
.authenticate_user
.is_collab_on_disk(session.user_id, &object_id.to_string())?;
if auth_type.is_local() || is_exist_on_disk {
.is_collab_on_disk(uid, &object_id.to_string())?;
if workspace_auth_type.is_local() || is_exist_on_disk {
trace!(
"Initializing new user awareness from disk:{}, {:?}",
object_id,
auth_type
workspace_auth_type
);
let collab_db = self.get_collab_db(session.user_id)?;
let workspace_id = Uuid::parse_str(&session.workspace_id)?;
let collab_db = self.get_collab_db(uid)?;
let doc_state =
CollabPersistenceImpl::new(collab_db.clone(), session.user_id, workspace_id)
.into_data_source();
CollabPersistenceImpl::new(collab_db.clone(), uid, *workspace_id).into_data_source();
let awareness = Self::collab_for_user_awareness(
&self.collab_builder.clone(),
&workspace_id,
session.user_id,
workspace_id,
uid,
&object_id,
collab_db,
doc_state,
@ -177,16 +182,18 @@ impl UserManager {
)
.await?;
info!("User awareness initialized successfully");
self.user_awareness.store(Some(awareness));
self
.user_awareness_by_workspace
.insert(*workspace_id, awareness);
if let Some(mut is_loading) = self.is_loading_awareness.get_mut(&object_id) {
*is_loading = false;
}
} else {
info!(
"Initializing new user awareness from server:{}, {:?}",
object_id, auth_type
object_id, workspace_auth_type
);
self.load_awareness_from_server(session, object_id, *auth_type)?;
self.load_awareness_from_server(uid, workspace_id, object_id, *workspace_auth_type)?;
}
} else {
return Err(FlowyError::new(
@ -206,18 +213,18 @@ impl UserManager {
/// designed to be thread safe.
fn load_awareness_from_server(
&self,
session: &Session,
uid: i64,
workspace_id: &Uuid,
object_id: Uuid,
authenticator: AuthType,
workspace_auth_type: AuthType,
) -> FlowyResult<()> {
// Clone necessary data
let session = session.clone();
let collab_db = self.get_collab_db(session.user_id)?;
let collab_db = self.get_collab_db(uid)?;
let weak_builder = self.collab_builder.clone();
let user_awareness = Arc::downgrade(&self.user_awareness);
let user_awareness = self.user_awareness_by_workspace.clone();
let cloud_services = self.cloud_service()?;
let authenticate_user = self.authenticate_user.clone();
let is_loading_awareness = self.is_loading_awareness.clone();
let workspace_id = *workspace_id;
// Spawn an async task to fetch or create user awareness
tokio::spawn(async move {
@ -227,15 +234,13 @@ impl UserManager {
}
};
let workspace_id = Uuid::parse_str(&session.workspace_id)?;
let create_awareness = if authenticator.is_local() {
let create_awareness = if workspace_auth_type.is_local() {
let doc_state =
CollabPersistenceImpl::new(collab_db.clone(), session.user_id, workspace_id)
.into_data_source();
CollabPersistenceImpl::new(collab_db.clone(), uid, workspace_id).into_data_source();
Self::collab_for_user_awareness(
&weak_builder,
&workspace_id,
session.user_id,
uid,
&object_id,
collab_db,
doc_state,
@ -245,7 +250,7 @@ impl UserManager {
} else {
let result = cloud_services
.get_user_service()?
.get_user_awareness_doc_state(session.user_id, &workspace_id, &object_id)
.get_user_awareness_doc_state(uid, &workspace_id, &object_id)
.await;
match result {
@ -254,7 +259,7 @@ impl UserManager {
Self::collab_for_user_awareness(
&weak_builder,
&workspace_id,
session.user_id,
uid,
&object_id,
collab_db,
DataSource::DocStateV1(data),
@ -266,12 +271,11 @@ impl UserManager {
if err.is_record_not_found() {
info!("User awareness not found, creating new");
let doc_state =
CollabPersistenceImpl::new(collab_db.clone(), session.user_id, workspace_id)
.into_data_source();
CollabPersistenceImpl::new(collab_db.clone(), uid, workspace_id).into_data_source();
Self::collab_for_user_awareness(
&weak_builder,
&workspace_id,
session.user_id,
uid,
&object_id,
collab_db,
doc_state,
@ -287,19 +291,12 @@ impl UserManager {
match create_awareness {
Ok(new_user_awareness) => {
// Validate session before storing the awareness
if let Ok(current_session) = authenticate_user.get_session() {
if current_session.workspace_id == session.workspace_id {
if let Some(user_awareness) = user_awareness.upgrade() {
info!("User awareness initialized successfully");
user_awareness.store(Some(new_user_awareness));
} else {
error!("Failed to upgrade user awareness");
}
} else {
info!("User awareness is outdated, ignoring");
}
}
user_awareness.insert(workspace_id, new_user_awareness);
send_notification(
&workspace_id.to_string(),
UserNotification::DidLoadUserAwareness,
);
set_is_loading_false();
Ok(())
},
@ -346,51 +343,11 @@ impl UserManager {
Ok(collab)
}
/// Executes a function with user awareness.
///
/// This function takes an asynchronous closure `f` that accepts a reference to a `UserAwareness`
/// and returns an `Output`. If the current user awareness is set (i.e., is `Some`), it invokes
/// the closure `f` with the user awareness. If the user awareness is not set (i.e., is `None`),
/// it attempts to initialize the user awareness via a remote session. If the session fetch
/// or user awareness initialization fails, it returns the provided `default_value`.
///
/// # Parameters
/// - `default_value`: A default value to return if the user awareness is `None` and cannot be initialized.
/// - `f`: The asynchronous closure to execute with the user awareness.
async fn mut_awareness<F, Output>(&self, f: F) -> FlowyResult<Output>
where
F: FnOnce(&mut UserAwareness) -> Output,
{
match self.user_awareness.load_full() {
None => {
info!("User awareness is not loaded when trying to access it");
let session = self.get_session()?;
let object_id = user_awareness_object_id(&session.user_uuid, &session.workspace_id);
let is_loading = self
.is_loading_awareness
.get(&object_id)
.map(|r| *r.value())
.unwrap_or(false);
if !is_loading {
let user_profile = self
.get_user_profile_from_disk(session.user_id, &session.workspace_id)
.await?;
self
.initial_user_awareness(&session, &user_profile.workspace_auth_type)
.await?;
}
Err(FlowyError::new(
ErrorCode::InProgress,
"User awareness is loading",
))
},
Some(lock) => {
let mut user_awareness = lock.write().await;
Ok(f(&mut user_awareness))
},
}
async fn get_awareness(&self, workspace_id: &Uuid) -> FlowyResult<Arc<RwLock<UserAwareness>>> {
let awareness = self
.user_awareness_by_workspace
.get(workspace_id)
.map(|v| v.value().clone());
awareness.ok_or_else(|| FlowyError::internal().with_context("User awareness is not loaded"))
}
}

View File

@ -156,12 +156,19 @@ impl UserManager {
}
#[instrument(skip(self), err)]
pub async fn open_workspace(&self, workspace_id: &Uuid, auth_type: AuthType) -> FlowyResult<()> {
info!("open workspace: {}, auth type:{}", workspace_id, auth_type);
pub async fn open_workspace(
&self,
workspace_id: &Uuid,
workspace_auth_type: AuthType,
) -> FlowyResult<()> {
info!(
"open workspace: {}, auth type:{}",
workspace_id, workspace_auth_type
);
let workspace_id_str = workspace_id.to_string();
let token = self.token_from_auth_type(&auth_type)?;
let token = self.token_from_auth_type(&workspace_auth_type)?;
let cloud_service = self.cloud_service()?;
cloud_service.set_server_auth_type(&auth_type, token)?;
cloud_service.set_server_auth_type(&workspace_auth_type, token)?;
let uid = self.user_id()?;
let profile = self
@ -179,7 +186,7 @@ impl UserManager {
workspace_id,
cloud_service.get_user_service()?,
uid,
auth_type,
workspace_auth_type,
self.db_pool(uid)?,
)
.await?
@ -193,7 +200,7 @@ impl UserManager {
let user_service = cloud_service.get_user_service()?;
let pool = self.db_pool(uid)?;
tokio::spawn(async move {
let _ = sync_workspace(&workspace_id, user_service, uid, auth_type, pool).await;
let _ = sync_workspace(&workspace_id, user_service, uid, workspace_auth_type, pool).await;
});
user_workspace
},
@ -203,19 +210,19 @@ impl UserManager {
.authenticate_user
.set_user_workspace(user_workspace.clone())?;
let uid = self.user_id()?;
let user_uuid = self.user_uuid()?;
if let Err(err) = self
.user_status_callback
.read()
.await
.on_workspace_opened(uid, workspace_id, &user_workspace, &auth_type)
.on_workspace_opened(uid, workspace_id, &user_workspace, &workspace_auth_type)
.await
{
error!("Open workspace failed: {:?}", err);
}
if let Err(err) = self
.initial_user_awareness(self.get_session()?.as_ref(), &auth_type)
.initial_user_awareness(uid, &user_uuid, workspace_id, &workspace_auth_type)
.await
{
error!(