fix(federation): previous states on initial state and remove emitter (#37677)

This commit is contained in:
Guilherme Gazzo 2025-12-10 04:00:10 +01:00 committed by GitHub
parent 176d5eae3f
commit 4793aca879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 144 additions and 62 deletions

View File

@ -0,0 +1,6 @@
---
"@rocket.chat/meteor": patch
"@rocket.chat/federation-matrix": patch
---
Fixes an issue where membership updates were not reflected when the user was the first member on their own server.

View File

@ -454,6 +454,30 @@ export const acceptRoomInvite = (roomId: IRoom['_id'], config?: IRequestConfig)
});
};
/**
* Retrieves the subscriptions for the authenticated user.
*
* Fetches the complete list of subscriptions for the authenticated user, which is essential
* for verifying federation subscription synchronization and member synchronization.
*
* @param config - Optional request configuration for custom domains
* @returns Promise resolving to the subscriptions response
*/
export const getSubscriptions = (config?: IRequestConfig) => {
const requestInstance = config?.request || request;
const credentialsInstance = config?.credentials || credentials;
return new Promise<ReturnType<Endpoints['/v1/subscriptions.get']['GET']>>((resolve) => {
void requestInstance
.get(api('subscriptions.get'))
.set(credentialsInstance)
.end((_err: any, req: any) => {
resolve(req.body);
});
});
};
/**
* Rejects a room invite for the authenticated user.
*

View File

@ -1,14 +1,13 @@
import { api } from '@rocket.chat/core-services';
import { UserStatus } from '@rocket.chat/core-typings';
import type { Emitter } from '@rocket.chat/emitter';
import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk';
import { federationSDK } from '@rocket.chat/federation-sdk';
import { Logger } from '@rocket.chat/logger';
import { Rooms, Users } from '@rocket.chat/models';
const logger = new Logger('federation-matrix:edu');
export const edus = async (emitter: Emitter<HomeserverEventSignatures>) => {
emitter.on('homeserver.matrix.typing', async (data) => {
export const edus = async () => {
federationSDK.eventEmitterService.on('homeserver.matrix.typing', async (data) => {
const config = federationSDK.getConfig('edu');
if (!config.processTyping) {
return;
@ -31,7 +30,7 @@ export const edus = async (emitter: Emitter<HomeserverEventSignatures>) => {
}
});
emitter.on('homeserver.matrix.presence', async (data) => {
federationSDK.eventEmitterService.on('homeserver.matrix.presence', async (data) => {
const config = federationSDK.getConfig('edu');
if (!config.processPresence) {
return;

View File

@ -1,6 +1,3 @@
import type { Emitter } from '@rocket.chat/emitter';
import { type HomeserverEventSignatures } from '@rocket.chat/federation-sdk';
import { edus } from './edu';
import { member } from './member';
import { message } from './message';
@ -8,11 +5,11 @@ import { ping } from './ping';
import { reaction } from './reaction';
import { room } from './room';
export function registerEvents(emitter: Emitter<HomeserverEventSignatures>) {
ping(emitter);
message(emitter);
reaction(emitter);
member(emitter);
edus(emitter);
room(emitter);
export function registerEvents() {
ping();
message();
reaction();
member();
edus();
room();
}

View File

@ -1,8 +1,6 @@
import { Room } from '@rocket.chat/core-services';
import type { IRoomNativeFederated, IRoom, IUser, RoomType } from '@rocket.chat/core-typings';
import type { Emitter } from '@rocket.chat/emitter';
import type { HomeserverEventSignatures, PduForType } from '@rocket.chat/federation-sdk';
import { federationSDK } from '@rocket.chat/federation-sdk';
import { federationSDK, type HomeserverEventSignatures, type PduForType } from '@rocket.chat/federation-sdk';
import { Logger } from '@rocket.chat/logger';
import { Rooms, Subscriptions, Users } from '@rocket.chat/models';
@ -237,8 +235,8 @@ async function handleLeave({
// TODO check if there are no pending invites to the room, and if so, delete the room
}
export function member(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.membership', async ({ event }) => {
export function member() {
federationSDK.eventEmitterService.on('homeserver.matrix.membership', async ({ event }) => {
try {
switch (event.content.membership) {
case 'invite':

View File

@ -1,14 +1,6 @@
import { FederationMatrix, Message, MeteorService } from '@rocket.chat/core-services';
import type { IUser, IRoom, FileAttachmentProps } from '@rocket.chat/core-typings';
import type { Emitter } from '@rocket.chat/emitter';
import {
type FileMessageType,
type MessageType,
type FileMessageContent,
type HomeserverEventSignatures,
type EventID,
federationSDK,
} from '@rocket.chat/federation-sdk';
import { type FileMessageType, type MessageType, type FileMessageContent, type EventID, federationSDK } from '@rocket.chat/federation-sdk';
import { Logger } from '@rocket.chat/logger';
import { Users, Rooms, Messages } from '@rocket.chat/models';
@ -118,8 +110,8 @@ async function handleMediaMessage(
};
}
export function message(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => {
export function message() {
federationSDK.eventEmitterService.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => {
try {
const { msgtype, body } = event.content;
const messageBody = body.toString();
@ -258,6 +250,7 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
homeServerDomain: serverName,
senderExternalId: event.sender,
});
await Message.saveMessageFromFederation({
fromId: user._id,
rid: room._id,
@ -271,7 +264,7 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
}
});
emitter.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => {
federationSDK.eventEmitterService.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => {
try {
if (!event.content.ciphertext) {
logger.debug('No message content found in event');
@ -385,7 +378,7 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
}
});
emitter.on('homeserver.matrix.redaction', async ({ event }) => {
federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => {
try {
const redactedEventId = event.redacts;
if (!redactedEventId) {

View File

@ -1,8 +1,7 @@
import type { Emitter } from '@rocket.chat/emitter';
import type { HomeserverEventSignatures } from '@rocket.chat/federation-sdk';
import { federationSDK } from '@rocket.chat/federation-sdk';
export const ping = async (emitter: Emitter<HomeserverEventSignatures>) => {
emitter.on('homeserver.ping', async (data) => {
export const ping = async () => {
federationSDK.eventEmitterService.on('homeserver.ping', async (data) => {
console.log('Message received from homeserver', data);
});
};

View File

@ -1,14 +1,13 @@
import { Message, FederationMatrix } from '@rocket.chat/core-services';
import type { Emitter } from '@rocket.chat/emitter';
import type { HomeserverEventSignatures } from '@rocket.chat/federation-sdk';
import { federationSDK } from '@rocket.chat/federation-sdk';
import { Logger } from '@rocket.chat/logger';
import { Users, Messages } from '@rocket.chat/models'; // Rooms
import emojione from 'emojione';
const logger = new Logger('federation-matrix:reaction');
export function reaction(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => {
export function reaction() {
federationSDK.eventEmitterService.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => {
try {
const isSetReaction = event.content?.['m.relates_to'];
@ -47,7 +46,7 @@ export function reaction(emitter: Emitter<HomeserverEventSignatures>) {
}
});
emitter.on('homeserver.matrix.redaction', async ({ event }) => {
federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => {
try {
const redactedEventId = event.redacts;
if (!redactedEventId) {

View File

@ -1,12 +1,11 @@
import { Room } from '@rocket.chat/core-services';
import type { Emitter } from '@rocket.chat/emitter';
import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk';
import { federationSDK } from '@rocket.chat/federation-sdk';
import { Rooms, Users } from '@rocket.chat/models';
import { getUsernameServername } from '../FederationMatrix';
export function room(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.room.name', async ({ event }) => {
export function room() {
federationSDK.eventEmitterService.on('homeserver.matrix.room.name', async ({ event }) => {
const {
room_id: roomId,
content: { name },
@ -15,18 +14,18 @@ export function room(emitter: Emitter<HomeserverEventSignatures>) {
const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } });
if (!localRoomId) {
throw new Error('mapped room not found');
throw new Error(`mapped room not found: ${roomId}`);
}
const localUserId = await Users.findOneByUsername(userId, { projection: { _id: 1 } });
if (!localUserId) {
throw new Error('mapped user not found');
throw new Error(`mapped user not found: ${userId}`);
}
await Room.saveRoomName(localRoomId._id, localUserId._id, name);
});
emitter.on('homeserver.matrix.room.topic', async ({ event }) => {
federationSDK.eventEmitterService.on('homeserver.matrix.room.topic', async ({ event }) => {
const {
room_id: roomId,
content: { topic },
@ -51,7 +50,7 @@ export function room(emitter: Emitter<HomeserverEventSignatures>) {
});
});
emitter.on('homeserver.matrix.room.role', async (data) => {
federationSDK.eventEmitterService.on('homeserver.matrix.room.role', async (data) => {
const { room_id: roomId, user_id: userId, sender_id: senderId, role } = data;
const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } });

View File

@ -1,5 +1,3 @@
import { Emitter } from '@rocket.chat/emitter';
import type { HomeserverEventSignatures } from '@rocket.chat/federation-sdk';
import { federationSDK, init } from '@rocket.chat/federation-sdk';
import { Logger } from '@rocket.chat/logger';
@ -101,15 +99,12 @@ export function configureFederationMatrixSettings(settings: {
}
export async function setupFederationMatrix() {
const eventHandler = new Emitter<HomeserverEventSignatures>();
await init({
emitter: eventHandler,
dbConfig: {
uri: process.env.MONGO_URL || 'mongodb://localhost:3001/meteor',
poolSize: Number.parseInt(process.env.DATABASE_POOL_SIZE || '10', 10),
},
});
registerEvents(eventHandler);
registerEvents();
}

View File

@ -9,6 +9,8 @@ import {
addUserToRoomSlashCommand,
acceptRoomInvite,
rejectRoomInvite,
getRoomMembers,
getSubscriptions,
} from '../../../../../apps/meteor/tests/data/rooms.helper';
import { type IRequestConfig, getRequestConfig, createUser, deleteUser } from '../../../../../apps/meteor/tests/data/users.helper';
import { IS_EE } from '../../../../../apps/meteor/tests/e2e/config/constants';
@ -1563,14 +1565,58 @@ import { SynapseClient } from '../helper/synapse-client';
// RC view: Admin tries to accept rc1User1's invitation
const response = await acceptRoomInvite(federatedChannel._id, rc1AdminRequestConfig);
expect(response.success).toBe(false);
expect(response.error).toBe('Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite');
expect(response.error).toBe(
'Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite',
);
});
it('It should not allow admin to reject invitation on behalf of another user', async () => {
// RC view: Admin tries to reject rc1User1's invitation
const response = await rejectRoomInvite(federatedChannel._id, rc1AdminRequestConfig);
expect(response.success).toBe(false);
expect(response.error).toBe('Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite');
expect(response.error).toBe(
'Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite',
);
});
});
});
describe('Inviting a RC user from Synapse', () => {
describe('Room that already contains previous events', () => {
let matrixRoomId: string;
let channelName: string;
let rid: string;
beforeAll(async () => {
channelName = `federated-channel-from-synapse-${Date.now()}`;
matrixRoomId = await hs1AdminApp.createRoom(channelName);
await hs1AdminApp.matrixClient.sendTextMessage(matrixRoomId, 'Message from admin');
await hs1AdminApp.matrixClient.invite(matrixRoomId, federationConfig.hs1.additionalUser1.matrixUserId);
await hs1User1App.matrixClient.joinRoom(matrixRoomId);
await hs1User1App.matrixClient.sendTextMessage(matrixRoomId, 'Message from user1');
await hs1AdminApp.matrixClient.invite(matrixRoomId, federationConfig.rc1.adminMatrixUserId);
const subscriptions = await getSubscriptions(rc1AdminRequestConfig);
const pendingInvitation = subscriptions.update.find((subscription) => subscription.status === 'INVITED');
expect(pendingInvitation).not.toBeUndefined();
rid = pendingInvitation?.rid!;
await acceptRoomInvite(rid, rc1AdminRequestConfig);
}, 15000);
describe('It should reflect all the members and messagens on the rocket.chat side', () => {
it('It should show all the three users in the members list', async () => {
const members = await getRoomMembers(rid, rc1AdminRequestConfig);
expect(members.members.length).toBe(3);
expect(members.members.find((member: IUser) => member.username === federationConfig.rc1.adminUser)).not.toBeNull();
expect(
members.members.find((member: IUser) => member.username === federationConfig.rc1.additionalUser1.username),
).not.toBeNull();
expect(members.members.find((member: IUser) => member.username === federationConfig.hs1.adminMatrixUserId)).not.toBeNull();
});
});
});
});

View File

@ -7,7 +7,7 @@
import * as fs from 'fs';
import * as path from 'path';
import { createClient, type MatrixClient, KnownMembership, type Room, type RoomMember } from 'matrix-js-sdk';
import { createClient, type MatrixClient, KnownMembership, type Room, type RoomMember, Visibility } from 'matrix-js-sdk';
/**
* Creates a promise that resolves after the specified delay.
@ -30,7 +30,7 @@ export function wait(ms: number): Promise<void> {
* invitation handling with built-in retry logic for eventual consistency.
*/
export class SynapseClient {
private matrixClient: MatrixClient | null = null;
private _matrixClient: MatrixClient | null = null;
private url: string;
@ -51,6 +51,13 @@ export class SynapseClient {
this.password = password;
}
get matrixClient(): MatrixClient {
if (!this._matrixClient) {
throw new Error('Matrix client is not initialized');
}
return this._matrixClient;
}
/**
* Initializes the Matrix client connection.
*
@ -63,7 +70,7 @@ export class SynapseClient {
async initialize(): Promise<void> {
const client = await this.createClient(this.username, this.password, this.url);
await client.startClient();
this.matrixClient = client;
this._matrixClient = client;
}
/**
@ -132,6 +139,26 @@ export class SynapseClient {
throw new Error(`No room found with name ${roomName}`);
}
async createRoom(roomName: string, visibility: Visibility = Visibility.Private): Promise<string> {
if (!this.matrixClient) {
throw new Error('Matrix client is not initialized');
}
const room = await this.matrixClient.createRoom({
name: roomName,
visibility,
});
return room.room_id;
}
async inviteUserToRoom(roomId: string, userId: string): Promise<void> {
if (!this.matrixClient) {
throw new Error('Matrix client is not initialized');
}
await this.matrixClient.invite(userId, roomId);
}
/**
* Finds a room by name and membership status.
*
@ -664,7 +691,7 @@ export class SynapseClient {
await this.matrixClient.clearStores?.();
this.matrixClient.removeAllListeners();
await this.matrixClient.logout(true);
this.matrixClient = null;
this._matrixClient = null;
}
}
}