From 4793aca8796d8a3b4c645a2ed685028067119d8d Mon Sep 17 00:00:00 2001 From: Guilherme Gazzo Date: Wed, 10 Dec 2025 04:00:10 +0100 Subject: [PATCH] fix(federation): previous states on initial state and remove emitter (#37677) --- .changeset/strong-bags-train.md | 6 +++ apps/meteor/tests/data/rooms.helper.ts | 24 +++++++++ .../federation-matrix/src/events/edu.ts | 9 ++-- .../federation-matrix/src/events/index.ts | 17 +++---- .../federation-matrix/src/events/member.ts | 8 ++- .../federation-matrix/src/events/message.ts | 19 +++---- .../federation-matrix/src/events/ping.ts | 7 ++- .../federation-matrix/src/events/reaction.ts | 9 ++-- .../federation-matrix/src/events/room.ts | 15 +++--- ee/packages/federation-matrix/src/setup.ts | 7 +-- .../tests/end-to-end/room.spec.ts | 50 ++++++++++++++++++- .../tests/helper/synapse-client.ts | 35 +++++++++++-- 12 files changed, 144 insertions(+), 62 deletions(-) create mode 100644 .changeset/strong-bags-train.md diff --git a/.changeset/strong-bags-train.md b/.changeset/strong-bags-train.md new file mode 100644 index 00000000000..3f53beead49 --- /dev/null +++ b/.changeset/strong-bags-train.md @@ -0,0 +1,6 @@ +--- +"@rocket.chat/meteor": patch +"@rocket.chat/federation-matrix": patch +--- + +Fixes an issue where membership updates were not reflected when the user was the first member on their own server. diff --git a/apps/meteor/tests/data/rooms.helper.ts b/apps/meteor/tests/data/rooms.helper.ts index 32c033050d8..0f46f6ad692 100644 --- a/apps/meteor/tests/data/rooms.helper.ts +++ b/apps/meteor/tests/data/rooms.helper.ts @@ -454,6 +454,30 @@ export const acceptRoomInvite = (roomId: IRoom['_id'], config?: IRequestConfig) }); }; +/** + * Retrieves the subscriptions for the authenticated user. + * + * Fetches the complete list of subscriptions for the authenticated user, which is essential + * for verifying federation subscription synchronization and member synchronization. + * + * @param config - Optional request configuration for custom domains + * @returns Promise resolving to the subscriptions response + */ + +export const getSubscriptions = (config?: IRequestConfig) => { + const requestInstance = config?.request || request; + const credentialsInstance = config?.credentials || credentials; + + return new Promise>((resolve) => { + void requestInstance + .get(api('subscriptions.get')) + .set(credentialsInstance) + .end((_err: any, req: any) => { + resolve(req.body); + }); + }); +}; + /** * Rejects a room invite for the authenticated user. * diff --git a/ee/packages/federation-matrix/src/events/edu.ts b/ee/packages/federation-matrix/src/events/edu.ts index 70af1023995..f119c0bead5 100644 --- a/ee/packages/federation-matrix/src/events/edu.ts +++ b/ee/packages/federation-matrix/src/events/edu.ts @@ -1,14 +1,13 @@ import { api } from '@rocket.chat/core-services'; import { UserStatus } from '@rocket.chat/core-typings'; -import type { Emitter } from '@rocket.chat/emitter'; -import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { federationSDK } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Rooms, Users } from '@rocket.chat/models'; const logger = new Logger('federation-matrix:edu'); -export const edus = async (emitter: Emitter) => { - emitter.on('homeserver.matrix.typing', async (data) => { +export const edus = async () => { + federationSDK.eventEmitterService.on('homeserver.matrix.typing', async (data) => { const config = federationSDK.getConfig('edu'); if (!config.processTyping) { return; @@ -31,7 +30,7 @@ export const edus = async (emitter: Emitter) => { } }); - emitter.on('homeserver.matrix.presence', async (data) => { + federationSDK.eventEmitterService.on('homeserver.matrix.presence', async (data) => { const config = federationSDK.getConfig('edu'); if (!config.processPresence) { return; diff --git a/ee/packages/federation-matrix/src/events/index.ts b/ee/packages/federation-matrix/src/events/index.ts index 2487c388cb1..3122e21c38c 100644 --- a/ee/packages/federation-matrix/src/events/index.ts +++ b/ee/packages/federation-matrix/src/events/index.ts @@ -1,6 +1,3 @@ -import type { Emitter } from '@rocket.chat/emitter'; -import { type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; - import { edus } from './edu'; import { member } from './member'; import { message } from './message'; @@ -8,11 +5,11 @@ import { ping } from './ping'; import { reaction } from './reaction'; import { room } from './room'; -export function registerEvents(emitter: Emitter) { - ping(emitter); - message(emitter); - reaction(emitter); - member(emitter); - edus(emitter); - room(emitter); +export function registerEvents() { + ping(); + message(); + reaction(); + member(); + edus(); + room(); } diff --git a/ee/packages/federation-matrix/src/events/member.ts b/ee/packages/federation-matrix/src/events/member.ts index 56b5030b5e4..3ba2d107d8a 100644 --- a/ee/packages/federation-matrix/src/events/member.ts +++ b/ee/packages/federation-matrix/src/events/member.ts @@ -1,8 +1,6 @@ import { Room } from '@rocket.chat/core-services'; import type { IRoomNativeFederated, IRoom, IUser, RoomType } from '@rocket.chat/core-typings'; -import type { Emitter } from '@rocket.chat/emitter'; -import type { HomeserverEventSignatures, PduForType } from '@rocket.chat/federation-sdk'; -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures, type PduForType } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Rooms, Subscriptions, Users } from '@rocket.chat/models'; @@ -237,8 +235,8 @@ async function handleLeave({ // TODO check if there are no pending invites to the room, and if so, delete the room } -export function member(emitter: Emitter) { - emitter.on('homeserver.matrix.membership', async ({ event }) => { +export function member() { + federationSDK.eventEmitterService.on('homeserver.matrix.membership', async ({ event }) => { try { switch (event.content.membership) { case 'invite': diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index 599bf6ceca8..609d33459f5 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -1,14 +1,6 @@ import { FederationMatrix, Message, MeteorService } from '@rocket.chat/core-services'; import type { IUser, IRoom, FileAttachmentProps } from '@rocket.chat/core-typings'; -import type { Emitter } from '@rocket.chat/emitter'; -import { - type FileMessageType, - type MessageType, - type FileMessageContent, - type HomeserverEventSignatures, - type EventID, - federationSDK, -} from '@rocket.chat/federation-sdk'; +import { type FileMessageType, type MessageType, type FileMessageContent, type EventID, federationSDK } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Users, Rooms, Messages } from '@rocket.chat/models'; @@ -118,8 +110,8 @@ async function handleMediaMessage( }; } -export function message(emitter: Emitter) { - emitter.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => { +export function message() { + federationSDK.eventEmitterService.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => { try { const { msgtype, body } = event.content; const messageBody = body.toString(); @@ -258,6 +250,7 @@ export function message(emitter: Emitter) { homeServerDomain: serverName, senderExternalId: event.sender, }); + await Message.saveMessageFromFederation({ fromId: user._id, rid: room._id, @@ -271,7 +264,7 @@ export function message(emitter: Emitter) { } }); - emitter.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => { + federationSDK.eventEmitterService.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => { try { if (!event.content.ciphertext) { logger.debug('No message content found in event'); @@ -385,7 +378,7 @@ export function message(emitter: Emitter) { } }); - emitter.on('homeserver.matrix.redaction', async ({ event }) => { + federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => { try { const redactedEventId = event.redacts; if (!redactedEventId) { diff --git a/ee/packages/federation-matrix/src/events/ping.ts b/ee/packages/federation-matrix/src/events/ping.ts index 3bcd05d0424..204a49fdf89 100644 --- a/ee/packages/federation-matrix/src/events/ping.ts +++ b/ee/packages/federation-matrix/src/events/ping.ts @@ -1,8 +1,7 @@ -import type { Emitter } from '@rocket.chat/emitter'; -import type { HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { federationSDK } from '@rocket.chat/federation-sdk'; -export const ping = async (emitter: Emitter) => { - emitter.on('homeserver.ping', async (data) => { +export const ping = async () => { + federationSDK.eventEmitterService.on('homeserver.ping', async (data) => { console.log('Message received from homeserver', data); }); }; diff --git a/ee/packages/federation-matrix/src/events/reaction.ts b/ee/packages/federation-matrix/src/events/reaction.ts index 932a0993e30..5060bb498f8 100644 --- a/ee/packages/federation-matrix/src/events/reaction.ts +++ b/ee/packages/federation-matrix/src/events/reaction.ts @@ -1,14 +1,13 @@ import { Message, FederationMatrix } from '@rocket.chat/core-services'; -import type { Emitter } from '@rocket.chat/emitter'; -import type { HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { federationSDK } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Users, Messages } from '@rocket.chat/models'; // Rooms import emojione from 'emojione'; const logger = new Logger('federation-matrix:reaction'); -export function reaction(emitter: Emitter) { - emitter.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => { +export function reaction() { + federationSDK.eventEmitterService.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => { try { const isSetReaction = event.content?.['m.relates_to']; @@ -47,7 +46,7 @@ export function reaction(emitter: Emitter) { } }); - emitter.on('homeserver.matrix.redaction', async ({ event }) => { + federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => { try { const redactedEventId = event.redacts; if (!redactedEventId) { diff --git a/ee/packages/federation-matrix/src/events/room.ts b/ee/packages/federation-matrix/src/events/room.ts index 45668831e52..80b344ae2e9 100644 --- a/ee/packages/federation-matrix/src/events/room.ts +++ b/ee/packages/federation-matrix/src/events/room.ts @@ -1,12 +1,11 @@ import { Room } from '@rocket.chat/core-services'; -import type { Emitter } from '@rocket.chat/emitter'; -import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { federationSDK } from '@rocket.chat/federation-sdk'; import { Rooms, Users } from '@rocket.chat/models'; import { getUsernameServername } from '../FederationMatrix'; -export function room(emitter: Emitter) { - emitter.on('homeserver.matrix.room.name', async ({ event }) => { +export function room() { + federationSDK.eventEmitterService.on('homeserver.matrix.room.name', async ({ event }) => { const { room_id: roomId, content: { name }, @@ -15,18 +14,18 @@ export function room(emitter: Emitter) { const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); if (!localRoomId) { - throw new Error('mapped room not found'); + throw new Error(`mapped room not found: ${roomId}`); } const localUserId = await Users.findOneByUsername(userId, { projection: { _id: 1 } }); if (!localUserId) { - throw new Error('mapped user not found'); + throw new Error(`mapped user not found: ${userId}`); } await Room.saveRoomName(localRoomId._id, localUserId._id, name); }); - emitter.on('homeserver.matrix.room.topic', async ({ event }) => { + federationSDK.eventEmitterService.on('homeserver.matrix.room.topic', async ({ event }) => { const { room_id: roomId, content: { topic }, @@ -51,7 +50,7 @@ export function room(emitter: Emitter) { }); }); - emitter.on('homeserver.matrix.room.role', async (data) => { + federationSDK.eventEmitterService.on('homeserver.matrix.room.role', async (data) => { const { room_id: roomId, user_id: userId, sender_id: senderId, role } = data; const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); diff --git a/ee/packages/federation-matrix/src/setup.ts b/ee/packages/federation-matrix/src/setup.ts index 1630741dda5..f307d9c1e53 100644 --- a/ee/packages/federation-matrix/src/setup.ts +++ b/ee/packages/federation-matrix/src/setup.ts @@ -1,5 +1,3 @@ -import { Emitter } from '@rocket.chat/emitter'; -import type { HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; import { federationSDK, init } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; @@ -101,15 +99,12 @@ export function configureFederationMatrixSettings(settings: { } export async function setupFederationMatrix() { - const eventHandler = new Emitter(); - await init({ - emitter: eventHandler, dbConfig: { uri: process.env.MONGO_URL || 'mongodb://localhost:3001/meteor', poolSize: Number.parseInt(process.env.DATABASE_POOL_SIZE || '10', 10), }, }); - registerEvents(eventHandler); + registerEvents(); } diff --git a/ee/packages/federation-matrix/tests/end-to-end/room.spec.ts b/ee/packages/federation-matrix/tests/end-to-end/room.spec.ts index f29586ebd56..f85285c8150 100644 --- a/ee/packages/federation-matrix/tests/end-to-end/room.spec.ts +++ b/ee/packages/federation-matrix/tests/end-to-end/room.spec.ts @@ -9,6 +9,8 @@ import { addUserToRoomSlashCommand, acceptRoomInvite, rejectRoomInvite, + getRoomMembers, + getSubscriptions, } from '../../../../../apps/meteor/tests/data/rooms.helper'; import { type IRequestConfig, getRequestConfig, createUser, deleteUser } from '../../../../../apps/meteor/tests/data/users.helper'; import { IS_EE } from '../../../../../apps/meteor/tests/e2e/config/constants'; @@ -1563,14 +1565,58 @@ import { SynapseClient } from '../helper/synapse-client'; // RC view: Admin tries to accept rc1User1's invitation const response = await acceptRoomInvite(federatedChannel._id, rc1AdminRequestConfig); expect(response.success).toBe(false); - expect(response.error).toBe('Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite'); + expect(response.error).toBe( + 'Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite', + ); }); it('It should not allow admin to reject invitation on behalf of another user', async () => { // RC view: Admin tries to reject rc1User1's invitation const response = await rejectRoomInvite(federatedChannel._id, rc1AdminRequestConfig); expect(response.success).toBe(false); - expect(response.error).toBe('Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite'); + expect(response.error).toBe( + 'Failed to handle invite: No subscription found or user does not have permission to accept or reject this invite', + ); + }); + }); + }); + + describe('Inviting a RC user from Synapse', () => { + describe('Room that already contains previous events', () => { + let matrixRoomId: string; + let channelName: string; + let rid: string; + beforeAll(async () => { + channelName = `federated-channel-from-synapse-${Date.now()}`; + matrixRoomId = await hs1AdminApp.createRoom(channelName); + + await hs1AdminApp.matrixClient.sendTextMessage(matrixRoomId, 'Message from admin'); + await hs1AdminApp.matrixClient.invite(matrixRoomId, federationConfig.hs1.additionalUser1.matrixUserId); + await hs1User1App.matrixClient.joinRoom(matrixRoomId); + await hs1User1App.matrixClient.sendTextMessage(matrixRoomId, 'Message from user1'); + await hs1AdminApp.matrixClient.invite(matrixRoomId, federationConfig.rc1.adminMatrixUserId); + + const subscriptions = await getSubscriptions(rc1AdminRequestConfig); + + const pendingInvitation = subscriptions.update.find((subscription) => subscription.status === 'INVITED'); + + expect(pendingInvitation).not.toBeUndefined(); + + rid = pendingInvitation?.rid!; + + await acceptRoomInvite(rid, rc1AdminRequestConfig); + }, 15000); + + describe('It should reflect all the members and messagens on the rocket.chat side', () => { + it('It should show all the three users in the members list', async () => { + const members = await getRoomMembers(rid, rc1AdminRequestConfig); + expect(members.members.length).toBe(3); + expect(members.members.find((member: IUser) => member.username === federationConfig.rc1.adminUser)).not.toBeNull(); + expect( + members.members.find((member: IUser) => member.username === federationConfig.rc1.additionalUser1.username), + ).not.toBeNull(); + expect(members.members.find((member: IUser) => member.username === federationConfig.hs1.adminMatrixUserId)).not.toBeNull(); + }); }); }); }); diff --git a/ee/packages/federation-matrix/tests/helper/synapse-client.ts b/ee/packages/federation-matrix/tests/helper/synapse-client.ts index dcb53dde899..732eb7b503b 100644 --- a/ee/packages/federation-matrix/tests/helper/synapse-client.ts +++ b/ee/packages/federation-matrix/tests/helper/synapse-client.ts @@ -7,7 +7,7 @@ import * as fs from 'fs'; import * as path from 'path'; -import { createClient, type MatrixClient, KnownMembership, type Room, type RoomMember } from 'matrix-js-sdk'; +import { createClient, type MatrixClient, KnownMembership, type Room, type RoomMember, Visibility } from 'matrix-js-sdk'; /** * Creates a promise that resolves after the specified delay. @@ -30,7 +30,7 @@ export function wait(ms: number): Promise { * invitation handling with built-in retry logic for eventual consistency. */ export class SynapseClient { - private matrixClient: MatrixClient | null = null; + private _matrixClient: MatrixClient | null = null; private url: string; @@ -51,6 +51,13 @@ export class SynapseClient { this.password = password; } + get matrixClient(): MatrixClient { + if (!this._matrixClient) { + throw new Error('Matrix client is not initialized'); + } + return this._matrixClient; + } + /** * Initializes the Matrix client connection. * @@ -63,7 +70,7 @@ export class SynapseClient { async initialize(): Promise { const client = await this.createClient(this.username, this.password, this.url); await client.startClient(); - this.matrixClient = client; + this._matrixClient = client; } /** @@ -132,6 +139,26 @@ export class SynapseClient { throw new Error(`No room found with name ${roomName}`); } + async createRoom(roomName: string, visibility: Visibility = Visibility.Private): Promise { + if (!this.matrixClient) { + throw new Error('Matrix client is not initialized'); + } + + const room = await this.matrixClient.createRoom({ + name: roomName, + visibility, + }); + + return room.room_id; + } + + async inviteUserToRoom(roomId: string, userId: string): Promise { + if (!this.matrixClient) { + throw new Error('Matrix client is not initialized'); + } + await this.matrixClient.invite(userId, roomId); + } + /** * Finds a room by name and membership status. * @@ -664,7 +691,7 @@ export class SynapseClient { await this.matrixClient.clearStores?.(); this.matrixClient.removeAllListeners(); await this.matrixClient.logout(true); - this.matrixClient = null; + this._matrixClient = null; } } }