mirror of
https://github.com/RocketChat/Rocket.Chat.git
synced 2025-12-28 06:47:25 +00:00
chore: Schedule and defer some query recomputations (#36243)
This commit is contained in:
parent
568afaf1a6
commit
c992f7d78c
@ -1,10 +1,7 @@
|
||||
import type { IRole, IRoom, IUser } from '@rocket.chat/core-typings';
|
||||
import mem from 'mem';
|
||||
import type { Filter } from 'mongodb';
|
||||
|
||||
import { CachedChatSubscription } from './CachedChatSubscription';
|
||||
import { Users } from './Users';
|
||||
import { isTruthy } from '../../../../lib/isTruthy';
|
||||
|
||||
/** @deprecated new code refer to Minimongo collections like this one; prefer fetching data from the REST API, listening to changes via streamer events, and storing the state in a Tanstack Query */
|
||||
export const Subscriptions = Object.assign(CachedChatSubscription.collection, {
|
||||
@ -14,43 +11,10 @@ export const Subscriptions = Object.assign(CachedChatSubscription.collection, {
|
||||
return false;
|
||||
}
|
||||
|
||||
const query = {
|
||||
rid,
|
||||
};
|
||||
|
||||
const subscription = this.findOne(query, { fields: { roles: 1 } });
|
||||
const subscription = this.state.find((record) => record.rid === rid);
|
||||
|
||||
return subscription && Array.isArray(subscription.roles) && subscription.roles.includes(roleId);
|
||||
},
|
||||
{ maxAge: 1000, cacheKey: JSON.stringify },
|
||||
),
|
||||
|
||||
findUsersInRoles: mem(
|
||||
function (this: typeof CachedChatSubscription.collection, roles: IRole['_id'][] | IRole['_id'], scope?: string, options?: any) {
|
||||
roles = Array.isArray(roles) ? roles : [roles];
|
||||
|
||||
const query: Filter<any> = {
|
||||
roles: { $in: roles },
|
||||
};
|
||||
|
||||
if (scope) {
|
||||
query.rid = scope;
|
||||
}
|
||||
|
||||
const subscriptions = this.find(query).fetch();
|
||||
|
||||
const uids = subscriptions
|
||||
.map((subscription) => {
|
||||
if (typeof subscription.u !== 'undefined' && typeof subscription.u._id !== 'undefined') {
|
||||
return subscription.u._id;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
})
|
||||
.filter(isTruthy);
|
||||
|
||||
return Users.find({ _id: { $in: uids } }, options);
|
||||
},
|
||||
{ maxAge: 1000, cacheKey: JSON.stringify },
|
||||
),
|
||||
});
|
||||
|
||||
@ -3,7 +3,6 @@ import { useSession, useSessionDispatch, useUserPreference, useUserSubscriptions
|
||||
import { useEffect } from 'react';
|
||||
|
||||
import { useFireGlobalEvent } from './useFireGlobalEvent';
|
||||
import { Rooms } from '../../app/models/client';
|
||||
|
||||
const query = { open: { $ne: false }, hideUnreadStatus: { $ne: true }, archived: { $ne: true } };
|
||||
const options = { fields: { unread: 1, alert: 1, rid: 1, t: 1, name: 1, ls: 1, unreadAlert: 1, fname: 1, prid: 1 } };
|
||||
@ -23,11 +22,7 @@ export const useUnread = () => {
|
||||
let unreadAlert: false | '•' = false;
|
||||
|
||||
const unreadCount = subscriptions.reduce((ret, subscription) => {
|
||||
const room = Rooms.findOne({ _id: subscription.rid }, { fields: { usersCount: 1 } });
|
||||
fireEventUnreadChangedBySubscription({
|
||||
...subscription,
|
||||
usersCount: room?.usersCount,
|
||||
});
|
||||
fireEventUnreadChangedBySubscription(subscription);
|
||||
|
||||
if (subscription.alert || subscription.unread > 0) {
|
||||
// Increment the total unread count.
|
||||
|
||||
@ -2,31 +2,171 @@ import { create } from 'zustand';
|
||||
|
||||
export interface IDocumentMapStore<T extends { _id: string }> {
|
||||
readonly records: ReadonlyMap<T['_id'], T>;
|
||||
/**
|
||||
* Checks if a document with the given _id exists in the store.
|
||||
*
|
||||
* @param _id - The _id of the document to check.
|
||||
* @returns true if the document exists, false otherwise.
|
||||
*/
|
||||
has(_id: T['_id']): boolean;
|
||||
/**
|
||||
* Retrieves a document by its _id.
|
||||
*
|
||||
* @param _id - The _id of the document to retrieve.
|
||||
* @returns The document if found, or undefined if not found.
|
||||
*/
|
||||
get(_id: T['_id']): T | undefined;
|
||||
/**
|
||||
* Checks if any document in the store satisfies the given predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @returns true if at least one document matches the predicate, false otherwise.
|
||||
*/
|
||||
some(predicate: (record: T) => boolean): boolean;
|
||||
/**
|
||||
* Finds a document that satisfies the given predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @returns The first document that matches the predicate, or undefined if no document matches.
|
||||
*/
|
||||
find<U extends T>(predicate: (record: T) => record is U): U | undefined;
|
||||
/**
|
||||
* Finds a document that satisfies the given predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @returns The first document that matches the predicate, or undefined if no document matches.
|
||||
*/
|
||||
find(predicate: (record: T) => boolean): T | undefined;
|
||||
/**
|
||||
* Finds the first document that satisfies the given predicate, using a comparator to determine the best match.
|
||||
*
|
||||
* Usually the "best" document is the first of a ordered set, but it can be any criteria defined by the comparator.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @param comparator - A function that compares two documents and returns a negative number if the first is better, zero if they are equal, or a positive number if the second is better.
|
||||
* @returns The best matching document according to the predicate and comparator, or undefined if no document matches.
|
||||
*/
|
||||
findFirst<U extends T>(predicate: (record: T) => record is U, comparator: (a: T, b: T) => number): U | undefined;
|
||||
/**
|
||||
* Finds the first document that satisfies the given predicate, using a comparator to determine the best match.
|
||||
*
|
||||
* Usually the "best" document is the first of a ordered set, but it can be any criteria defined by the comparator.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @param comparator - A function that compares two documents and returns a negative number if the first is better, zero if they are equal, or a positive number if the second is better.
|
||||
* @returns The best matching document according to the predicate and comparator, or undefined if no document matches.
|
||||
*/
|
||||
findFirst(predicate: (record: T) => boolean, comparator: (a: T, b: T) => number): T | undefined;
|
||||
/**
|
||||
* Filters documents in the store based on a predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @returns An array of documents that match the predicate.
|
||||
*/
|
||||
filter<U extends T>(predicate: (record: T) => record is U): U[];
|
||||
/**
|
||||
* Filters documents in the store based on a predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @returns An array of documents that match the predicate.
|
||||
*/
|
||||
filter(predicate: (record: T) => boolean): T[];
|
||||
indexBy<TKey extends keyof T>(key: TKey): Map<T[TKey], T>;
|
||||
/**
|
||||
* Creates an index of documents by a specified key.
|
||||
*
|
||||
* @param key - The key to index the documents by.
|
||||
* @returns A Map where the keys are the values of the specified key in the documents, and the values are the documents themselves.
|
||||
*/
|
||||
indexBy<TKey extends keyof T>(key: TKey): ReadonlyMap<T[TKey], T>;
|
||||
/**
|
||||
* Replaces all documents in the store with the provided records.
|
||||
*
|
||||
* @param records - An array of documents to replace the current records in the store.
|
||||
*/
|
||||
replaceAll(records: T[]): void;
|
||||
/**
|
||||
* Stores a single document in the store.
|
||||
*
|
||||
* @param doc - The document to store.
|
||||
*/
|
||||
store(doc: T): void;
|
||||
/**
|
||||
* Stores multiple documents in the store.
|
||||
*
|
||||
* @param docs - An iterable of documents to store.
|
||||
*/
|
||||
storeMany(docs: Iterable<T>): void;
|
||||
/**
|
||||
* Deletes a document from the store by its _id.
|
||||
*
|
||||
* @param _id - The _id of the document to delete.
|
||||
*/
|
||||
delete(_id: T['_id']): void;
|
||||
/**
|
||||
* Updates documents in the store that match a predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @param modifier - A function that takes a document and returns the modified document.
|
||||
* @returns void
|
||||
*/
|
||||
update<U extends T>(predicate: (record: T) => record is U, modifier: (record: U) => U): void;
|
||||
/**
|
||||
* Updates documents in the store that match a predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @param modifier - A function that takes a document and returns the modified document.
|
||||
* @returns void
|
||||
*/
|
||||
update(predicate: (record: T) => boolean, modifier: (record: T) => T): void;
|
||||
/**
|
||||
* Asynchronously updates documents in the store that match a predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @param modifier - A function that takes a document and returns a Promise that resolves to the modified document.
|
||||
* @returns void
|
||||
*/
|
||||
updateAsync<U extends T>(predicate: (record: T) => record is U, modifier: (record: U) => Promise<U>): Promise<void>;
|
||||
/**
|
||||
* Asynchronously updates documents in the store that match a predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
* @param modifier - A function that takes a document and returns a Promise that resolves to the modified document.
|
||||
* @returns void
|
||||
*/
|
||||
updateAsync(predicate: (record: T) => boolean, modifier: (record: T) => Promise<T>): Promise<void>;
|
||||
/**
|
||||
* Removes documents from the store that match a predicate.
|
||||
*
|
||||
* @param predicate - A function that takes a document and returns true if it matches the condition.
|
||||
*/
|
||||
remove(predicate: (record: T) => boolean): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory function to create a Zustand store that holds a map of documents.
|
||||
*
|
||||
* @param options - Optional callbacks to handle invalidation of documents.
|
||||
* @returns the Zustand store with methods to manage the document map.
|
||||
*/
|
||||
export const createDocumentMapStore = <T extends { _id: string }>({
|
||||
onInvalidate,
|
||||
onInvalidateAll,
|
||||
}: { onInvalidate?: (...docs: T[]) => void; onInvalidateAll?: () => void } = {}) =>
|
||||
}: {
|
||||
/**
|
||||
* Callback invoked when a document is stored, updated or deleted.
|
||||
*
|
||||
* This is useful to recompute Minimongo queries that depend on the changed documents.
|
||||
* @deprecated prefer subscribing to the store
|
||||
*/
|
||||
onInvalidate?: (...docs: T[]) => void;
|
||||
/**
|
||||
* Callback invoked when all documents are replaced in the store.
|
||||
*
|
||||
* This is useful to recompute Minimongo queries that depend on the changed documents.
|
||||
* @deprecated prefer subscribing to the store
|
||||
*/
|
||||
onInvalidateAll?: () => void;
|
||||
} = {}) =>
|
||||
create<IDocumentMapStore<T>>()((set, get) => ({
|
||||
records: new Map(),
|
||||
has: (id: T['_id']) => get().records.has(id),
|
||||
|
||||
@ -2,6 +2,7 @@ import { Mongo } from 'meteor/mongo';
|
||||
|
||||
import { createDocumentMapStore } from './DocumentMapStore';
|
||||
import { LocalCollection } from './LocalCollection';
|
||||
import type { Query } from './Query';
|
||||
|
||||
/**
|
||||
* Implements a minimal version of a MongoDB collection using Zustand for state management.
|
||||
@ -9,23 +10,53 @@ import { LocalCollection } from './LocalCollection';
|
||||
* It's a middle layer between the Mongo.Collection and Zustand aiming for complete migration to Zustand.
|
||||
*/
|
||||
export class MinimongoCollection<T extends { _id: string }> extends Mongo.Collection<T> {
|
||||
private pendingRecomputations = new Set<Query<T>>();
|
||||
|
||||
private recomputeAll() {
|
||||
this.pendingRecomputations.clear();
|
||||
|
||||
for (const query of this._collection.queries) {
|
||||
this._collection.recomputeQuery(query);
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleRecomputationsFor(docs: T[]) {
|
||||
for (const query of this._collection.queries) {
|
||||
if (this.pendingRecomputations.has(query)) continue;
|
||||
|
||||
if (docs.some((doc) => query.predicate(doc))) {
|
||||
this.scheduleRecomputation(query);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleRecomputation(query: Query<T>) {
|
||||
this.pendingRecomputations.add(query);
|
||||
|
||||
queueMicrotask(() => {
|
||||
if (this.pendingRecomputations.size === 0) return;
|
||||
|
||||
this.pendingRecomputations.forEach((query) => {
|
||||
this._collection.recomputeQuery(query);
|
||||
});
|
||||
this.pendingRecomputations.clear();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* A Zustand store that holds the records of the collection.
|
||||
*
|
||||
* It should be used as a hook in React components to access the collection's records and methods.
|
||||
*
|
||||
* Beware mutating the store will **asynchronously** trigger recomputations of all Minimongo
|
||||
* queries that depend on the changed documents.
|
||||
*/
|
||||
readonly use = createDocumentMapStore<T>({
|
||||
onInvalidate: (...docs) => {
|
||||
for (const query of this._collection.queries) {
|
||||
if (docs.some((doc) => query.predicate(doc))) {
|
||||
this._collection.recomputeQuery(query);
|
||||
}
|
||||
}
|
||||
},
|
||||
onInvalidateAll: () => {
|
||||
for (const query of this._collection.queries) {
|
||||
this._collection.recomputeQuery(query);
|
||||
}
|
||||
this.recomputeAll();
|
||||
},
|
||||
onInvalidate: (...docs) => {
|
||||
this.scheduleRecomputationsFor(docs);
|
||||
},
|
||||
});
|
||||
|
||||
@ -44,6 +75,9 @@ export class MinimongoCollection<T extends { _id: string }> extends Mongo.Collec
|
||||
* Returns the Zustand store state that holds the records of the collection.
|
||||
*
|
||||
* It's a convenience method to access the Zustand store directly i.e. outside of React components.
|
||||
*
|
||||
* Beware mutating the store will **asynchronously** trigger recomputations of all Minimongo
|
||||
* queries that depend on the changed documents.
|
||||
*/
|
||||
get state() {
|
||||
return this.use.getState();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user