From c8d6c601f12d609581b7cde4e8230bca34c44069 Mon Sep 17 00:00:00 2001 From: Guilherme Gazzo Date: Mon, 17 Nov 2025 19:28:34 +0100 Subject: [PATCH] refactor: standardize event handling in federation-matrix (#37535) --- .../federation-matrix/src/events/member.ts | 42 ++++---- .../federation-matrix/src/events/message.ts | 100 +++++++++--------- .../federation-matrix/src/events/reaction.ts | 18 ++-- .../federation-matrix/src/events/room.ts | 8 +- 4 files changed, 86 insertions(+), 82 deletions(-) diff --git a/ee/packages/federation-matrix/src/events/member.ts b/ee/packages/federation-matrix/src/events/member.ts index 704b1b22e0b..d40b150cc4c 100644 --- a/ee/packages/federation-matrix/src/events/member.ts +++ b/ee/packages/federation-matrix/src/events/member.ts @@ -8,51 +8,51 @@ import { createOrUpdateFederatedUser, getUsernameServername } from '../Federatio const logger = new Logger('federation-matrix:member'); -async function membershipLeaveAction(data: HomeserverEventSignatures['homeserver.matrix.membership']) { - const room = await Rooms.findOne({ 'federation.mrid': data.room_id }, { projection: { _id: 1 } }); +async function membershipLeaveAction(event: HomeserverEventSignatures['homeserver.matrix.membership']['event']) { + const room = await Rooms.findOne({ 'federation.mrid': event.room_id }, { projection: { _id: 1 } }); if (!room) { - logger.warn(`No bridged room found for Matrix room_id: ${data.room_id}`); + logger.warn(`No bridged room found for Matrix room_id: ${event.room_id}`); return; } const serverName = federationSDK.getConfig('serverName'); - const [affectedUsername] = getUsernameServername(data.state_key, serverName); + const [affectedUsername] = getUsernameServername(event.state_key, serverName); // state_key is the user affected by the membership change const affectedUser = await Users.findOneByUsername(affectedUsername); if (!affectedUser) { - logger.error(`No Rocket.Chat user found for bridged user: ${data.state_key}`); + logger.error(`No Rocket.Chat user found for bridged user: ${event.state_key}`); return; } // Check if this is a kick (sender != state_key) or voluntary leave (sender == state_key) - if (data.sender === data.state_key) { + if (event.sender === event.state_key) { // Voluntary leave await Room.removeUserFromRoom(room._id, affectedUser); logger.info(`User ${affectedUser.username} left room ${room._id} via Matrix federation`); } else { // Kick - find who kicked - const [kickerUsername] = getUsernameServername(data.sender, serverName); + const [kickerUsername] = getUsernameServername(event.sender, serverName); const kickerUser = await Users.findOneByUsername(kickerUsername); await Room.removeUserFromRoom(room._id, affectedUser, { byUser: kickerUser || { _id: 'matrix.federation', username: 'Matrix User' }, }); - const reasonText = data.content.reason ? ` Reason: ${data.content.reason}` : ''; - logger.info(`User ${affectedUser.username} was kicked from room ${room._id} by ${data.sender} via Matrix federation.${reasonText}`); + const reasonText = event.content.reason ? ` Reason: ${event.content.reason}` : ''; + logger.info(`User ${affectedUser.username} was kicked from room ${room._id} by ${event.sender} via Matrix federation.${reasonText}`); } } -async function membershipJoinAction(data: HomeserverEventSignatures['homeserver.matrix.membership']) { - const room = await Rooms.findOne({ 'federation.mrid': data.room_id }); +async function membershipJoinAction(event: HomeserverEventSignatures['homeserver.matrix.membership']['event']) { + const room = await Rooms.findOne({ 'federation.mrid': event.room_id }); if (!room) { - logger.warn(`No bridged room found for room_id: ${data.room_id}`); + logger.warn(`No bridged room found for room_id: ${event.room_id}`); return; } - const [username, serverName, isLocal] = getUsernameServername(data.sender, federationSDK.getConfig('serverName')); + const [username, serverName, isLocal] = getUsernameServername(event.sender, federationSDK.getConfig('serverName')); // for local users we must to remove the @ and the server domain const localUser = isLocal && (await Users.findOneByUsername(username)); @@ -71,9 +71,9 @@ async function membershipJoinAction(data: HomeserverEventSignatures['homeserver. } const insertedId = await createOrUpdateFederatedUser({ - username: data.event.state_key, + username: event.state_key, origin: serverName, - name: data.content.displayname || (data.state_key as `@${string}:${string}`), + name: event.content.displayname || event.state_key, }); const user = await Users.findOneById(insertedId); @@ -86,17 +86,17 @@ async function membershipJoinAction(data: HomeserverEventSignatures['homeserver. } export function member(emitter: Emitter) { - emitter.on('homeserver.matrix.membership', async (data) => { + emitter.on('homeserver.matrix.membership', async ({ event }) => { try { - if (data.content.membership === 'leave') { - return membershipLeaveAction(data); + if (event.content.membership === 'leave') { + return membershipLeaveAction(event); } - if (data.content.membership === 'join') { - return membershipJoinAction(data); + if (event.content.membership === 'join') { + return membershipJoinAction(event); } - logger.debug(`Ignoring membership event with membership: ${data.content.membership}`); + logger.debug(`Ignoring membership event with membership: ${event.content.membership}`); } catch (error) { logger.error('Failed to process Matrix membership event:', error); } diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index 591ca6043de..f3e8ba752c8 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -119,11 +119,10 @@ async function handleMediaMessage( } export function message(emitter: Emitter) { - emitter.on('homeserver.matrix.message', async (data) => { + emitter.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => { try { - const { content } = data; - const { msgtype } = content; - const messageBody = content.body.toString(); + const { msgtype, body } = event.content; + const messageBody = body.toString(); if (!messageBody && !msgtype) { logger.debug('No message content found in event'); @@ -131,19 +130,19 @@ export function message(emitter: Emitter) { } // at this point we know for sure the user already exists - const user = await Users.findOneByUsername(data.sender); + const user = await Users.findOneByUsername(event.sender); if (!user) { - throw new Error(`User not found for sender: ${data.sender}`); + throw new Error(`User not found for sender: ${event.sender}`); } - const room = await Rooms.findOne({ 'federation.mrid': data.room_id }); + const room = await Rooms.findOne({ 'federation.mrid': event.room_id }); if (!room) { - throw new Error(`No mapped room found for room_id: ${data.room_id}`); + throw new Error(`No mapped room found for room_id: ${event.room_id}`); } const serverName = federationSDK.getConfig('serverName'); - const relation = content['m.relates_to']; + const relation = event.content['m.relates_to']; // SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such. const hasRelation = relation && 'rel_type' in relation; @@ -161,7 +160,7 @@ export function message(emitter: Emitter) { const thread = threadRootEventId ? await getThreadMessageId(threadRootEventId) : undefined; const isEditedMessage = hasRelation && relation.rel_type === 'm.replace'; - if (isEditedMessage && relation.event_id && data.content['m.new_content']) { + if (isEditedMessage && relation.event_id && event.content['m.new_content']) { logger.debug('Received edited message from Matrix, updating existing message'); const originalMessage = await Messages.findOneByFederationId(relation.event_id); if (!originalMessage) { @@ -171,7 +170,7 @@ export function message(emitter: Emitter) { if (originalMessage.federation?.eventId !== relation.event_id) { return; } - if (originalMessage.msg === data.content['m.new_content']?.body) { + if (originalMessage.msg === event.content['m.new_content']?.body) { logger.debug('No changes in message content, skipping update'); return; } @@ -180,10 +179,10 @@ export function message(emitter: Emitter) { const messageToReplyToUrl = await MeteorService.getMessageURLToReplyTo(room.t as string, room._id, originalMessage._id); const formatted = await toInternalQuoteMessageFormat({ messageToReplyToUrl, - formattedMessage: data.content.formatted_body || '', + formattedMessage: event.content.formatted_body || '', rawMessage: messageBody, homeServerDomain: serverName, - senderExternalId: data.sender, + senderExternalId: event.sender, }); await Message.updateMessage( { @@ -197,11 +196,12 @@ export function message(emitter: Emitter) { } const formatted = toInternalMessageFormat({ - rawMessage: data.content['m.new_content'].body, - formattedMessage: data.content.formatted_body || '', + rawMessage: event.content['m.new_content'].body, + formattedMessage: event.content.formatted_body || '', homeServerDomain: serverName, - senderExternalId: data.sender, + senderExternalId: event.sender, }); + await Message.updateMessage( { ...originalMessage, @@ -222,47 +222,47 @@ export function message(emitter: Emitter) { const messageToReplyToUrl = await MeteorService.getMessageURLToReplyTo(room.t as string, room._id, originalMessage._id); const formatted = await toInternalQuoteMessageFormat({ messageToReplyToUrl, - formattedMessage: data.content.formatted_body || '', + formattedMessage: event.content.formatted_body || '', rawMessage: messageBody, homeServerDomain: serverName, - senderExternalId: data.sender, + senderExternalId: event.sender, }); await Message.saveMessageFromFederation({ fromId: user._id, rid: room._id, msg: formatted, - federation_event_id: data.event_id, + federation_event_id: eventId, thread, }); return; } const isMediaMessage = Object.values(fileTypes).includes(msgtype as FileMessageType); - if (isMediaMessage && content.url) { + if (isMediaMessage && 'url' in event.content) { const result = await handleMediaMessage( - content.url, - content.info, + event.content.url, + event.content.info, msgtype, messageBody, user, room, - data.room_id, - data.event_id, + event.room_id, + eventId, thread, ); await Message.saveMessageFromFederation(result); } else { const formatted = toInternalMessageFormat({ rawMessage: messageBody, - formattedMessage: data.content.formatted_body || '', + formattedMessage: event.content.formatted_body || '', homeServerDomain: serverName, - senderExternalId: data.sender, + senderExternalId: event.sender, }); await Message.saveMessageFromFederation({ fromId: user._id, rid: room._id, msg: formatted, - federation_event_id: data.event_id, + federation_event_id: eventId, thread, }); } @@ -271,25 +271,25 @@ export function message(emitter: Emitter) { } }); - emitter.on('homeserver.matrix.encrypted', async (data) => { + emitter.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => { try { - if (!data.content.ciphertext) { + if (!event.content.ciphertext) { logger.debug('No message content found in event'); return; } // at this point we know for sure the user already exists - const user = await Users.findOneByUsername(data.sender); + const user = await Users.findOneByUsername(event.sender); if (!user) { - throw new Error(`User not found for sender: ${data.sender}`); + throw new Error(`User not found for sender: ${event.sender}`); } - const room = await Rooms.findOne({ 'federation.mrid': data.room_id }); + const room = await Rooms.findOne({ 'federation.mrid': event.room_id }); if (!room) { - throw new Error(`No mapped room found for room_id: ${data.room_id}`); + throw new Error(`No mapped room found for room_id: ${event.room_id}`); } - const relation = data.content['m.relates_to']; + const relation = event.content['m.relates_to']; // SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such. const hasRelation = relation && 'rel_type' in relation; @@ -317,7 +317,7 @@ export function message(emitter: Emitter) { if (originalMessage.federation?.eventId !== relation.event_id) { return; } - if (originalMessage.content?.ciphertext === data.content.ciphertext) { + if (originalMessage.content?.ciphertext === event.content.ciphertext) { logger.debug('No changes in message content, skipping update'); return; } @@ -327,8 +327,8 @@ export function message(emitter: Emitter) { { ...originalMessage, content: { - algorithm: data.content.algorithm, - ciphertext: data.content.ciphertext, + algorithm: event.content.algorithm, + ciphertext: event.content.ciphertext, }, }, user, @@ -341,8 +341,8 @@ export function message(emitter: Emitter) { { ...originalMessage, content: { - algorithm: data.content.algorithm, - ciphertext: data.content.ciphertext, + algorithm: event.content.algorithm, + ciphertext: event.content.ciphertext, }, }, user, @@ -361,10 +361,10 @@ export function message(emitter: Emitter) { fromId: user._id, rid: room._id, e2e_content: { - algorithm: data.content.algorithm, - ciphertext: data.content.ciphertext, + algorithm: event.content.algorithm, + ciphertext: event.content.ciphertext, }, - federation_event_id: data.event_id, + federation_event_id: eventId, thread, }); return; @@ -374,10 +374,10 @@ export function message(emitter: Emitter) { fromId: user._id, rid: room._id, e2e_content: { - algorithm: data.content.algorithm, - ciphertext: data.content.ciphertext, + algorithm: event.content.algorithm, + ciphertext: event.content.ciphertext, }, - federation_event_id: data.event_id, + federation_event_id: eventId, thread, }); } catch (error) { @@ -385,9 +385,9 @@ export function message(emitter: Emitter) { } }); - emitter.on('homeserver.matrix.redaction', async (data) => { + emitter.on('homeserver.matrix.redaction', async ({ event }) => { try { - const redactedEventId = data.redacts; + const redactedEventId = event.redacts; if (!redactedEventId) { logger.debug('No redacts field in redaction event'); return; @@ -399,12 +399,12 @@ export function message(emitter: Emitter) { return; } - const rcMessage = await Messages.findOneByFederationId(data.redacts); + const rcMessage = await Messages.findOneByFederationId(event.redacts); if (!rcMessage) { - logger.debug(`No RC message found for event ${data.redacts}`); + logger.debug(`No RC message found for event ${event.redacts}`); return; } - const internalUsername = data.sender; + const internalUsername = event.sender; const user = await Users.findOneByUsername(internalUsername); if (!user) { logger.debug(`User not found: ${internalUsername}`); diff --git a/ee/packages/federation-matrix/src/events/reaction.ts b/ee/packages/federation-matrix/src/events/reaction.ts index cff8f6b4669..3d72def9660 100644 --- a/ee/packages/federation-matrix/src/events/reaction.ts +++ b/ee/packages/federation-matrix/src/events/reaction.ts @@ -8,20 +8,20 @@ import emojione from 'emojione'; const logger = new Logger('federation-matrix:reaction'); export function reaction(emitter: Emitter) { - emitter.on('homeserver.matrix.reaction', async (data) => { + emitter.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => { try { - const isSetReaction = data.content?.['m.relates_to']; + const isSetReaction = event.content?.['m.relates_to']; const reactionTargetEventId = isSetReaction?.event_id; const reactionKey = isSetReaction?.key; - const [userPart, domain] = data.sender.split(':'); + const [userPart, domain] = event.sender.split(':'); if (!userPart || !domain) { - logger.error('Invalid Matrix sender ID format:', data.sender); + logger.error('Invalid Matrix sender ID format:', event.sender); return; } - const internalUsername = data.sender; + const internalUsername = event.sender; const user = await Users.findOneByUsername(internalUsername); if (!user) { logger.error(`No RC user mapping found for Matrix event ${reactionTargetEventId} ${internalUsername}`); @@ -41,15 +41,15 @@ export function reaction(emitter: Emitter) { const reactionEmoji = emojione.toShort(reactionKey); await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, true); - await Messages.setFederationReactionEventId(internalUsername, rcMessage._id, reactionEmoji, data.event_id); + await Messages.setFederationReactionEventId(internalUsername, rcMessage._id, reactionEmoji, eventId); } catch (error) { logger.error('Failed to process Matrix reaction:', error); } }); - emitter.on('homeserver.matrix.redaction', async (data) => { + emitter.on('homeserver.matrix.redaction', async ({ event }) => { try { - const redactedEventId = data.redacts; + const redactedEventId = event.redacts; if (!redactedEventId) { logger.debug('No redacts field in redaction event'); return; @@ -76,7 +76,7 @@ export function reaction(emitter: Emitter) { return; } - const internalUsername = data.sender; + const internalUsername = event.sender; const user = await Users.findOneByUsername(internalUsername); if (!user) { logger.debug(`User not found: ${internalUsername}`); diff --git a/ee/packages/federation-matrix/src/events/room.ts b/ee/packages/federation-matrix/src/events/room.ts index b43874d42cf..68a12b30192 100644 --- a/ee/packages/federation-matrix/src/events/room.ts +++ b/ee/packages/federation-matrix/src/events/room.ts @@ -22,8 +22,12 @@ export function room(emitter: Emitter) { await Room.saveRoomName(localRoomId._id, localUserId._id, name); }); - emitter.on('homeserver.matrix.room.topic', async (data) => { - const { room_id: roomId, topic, user_id: userId } = data; + emitter.on('homeserver.matrix.room.topic', async ({ event }) => { + const { + room_id: roomId, + content: { topic }, + sender: userId, + } = event; const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); if (!localRoomId) {