refactor: standardize event handling in federation-matrix (#37535)

This commit is contained in:
Guilherme Gazzo 2025-11-17 19:28:34 +01:00 committed by GitHub
parent ed544f8946
commit c8d6c601f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 86 additions and 82 deletions

View File

@ -8,51 +8,51 @@ import { createOrUpdateFederatedUser, getUsernameServername } from '../Federatio
const logger = new Logger('federation-matrix:member');
async function membershipLeaveAction(data: HomeserverEventSignatures['homeserver.matrix.membership']) {
const room = await Rooms.findOne({ 'federation.mrid': data.room_id }, { projection: { _id: 1 } });
async function membershipLeaveAction(event: HomeserverEventSignatures['homeserver.matrix.membership']['event']) {
const room = await Rooms.findOne({ 'federation.mrid': event.room_id }, { projection: { _id: 1 } });
if (!room) {
logger.warn(`No bridged room found for Matrix room_id: ${data.room_id}`);
logger.warn(`No bridged room found for Matrix room_id: ${event.room_id}`);
return;
}
const serverName = federationSDK.getConfig('serverName');
const [affectedUsername] = getUsernameServername(data.state_key, serverName);
const [affectedUsername] = getUsernameServername(event.state_key, serverName);
// state_key is the user affected by the membership change
const affectedUser = await Users.findOneByUsername(affectedUsername);
if (!affectedUser) {
logger.error(`No Rocket.Chat user found for bridged user: ${data.state_key}`);
logger.error(`No Rocket.Chat user found for bridged user: ${event.state_key}`);
return;
}
// Check if this is a kick (sender != state_key) or voluntary leave (sender == state_key)
if (data.sender === data.state_key) {
if (event.sender === event.state_key) {
// Voluntary leave
await Room.removeUserFromRoom(room._id, affectedUser);
logger.info(`User ${affectedUser.username} left room ${room._id} via Matrix federation`);
} else {
// Kick - find who kicked
const [kickerUsername] = getUsernameServername(data.sender, serverName);
const [kickerUsername] = getUsernameServername(event.sender, serverName);
const kickerUser = await Users.findOneByUsername(kickerUsername);
await Room.removeUserFromRoom(room._id, affectedUser, {
byUser: kickerUser || { _id: 'matrix.federation', username: 'Matrix User' },
});
const reasonText = data.content.reason ? ` Reason: ${data.content.reason}` : '';
logger.info(`User ${affectedUser.username} was kicked from room ${room._id} by ${data.sender} via Matrix federation.${reasonText}`);
const reasonText = event.content.reason ? ` Reason: ${event.content.reason}` : '';
logger.info(`User ${affectedUser.username} was kicked from room ${room._id} by ${event.sender} via Matrix federation.${reasonText}`);
}
}
async function membershipJoinAction(data: HomeserverEventSignatures['homeserver.matrix.membership']) {
const room = await Rooms.findOne({ 'federation.mrid': data.room_id });
async function membershipJoinAction(event: HomeserverEventSignatures['homeserver.matrix.membership']['event']) {
const room = await Rooms.findOne({ 'federation.mrid': event.room_id });
if (!room) {
logger.warn(`No bridged room found for room_id: ${data.room_id}`);
logger.warn(`No bridged room found for room_id: ${event.room_id}`);
return;
}
const [username, serverName, isLocal] = getUsernameServername(data.sender, federationSDK.getConfig('serverName'));
const [username, serverName, isLocal] = getUsernameServername(event.sender, federationSDK.getConfig('serverName'));
// for local users we must to remove the @ and the server domain
const localUser = isLocal && (await Users.findOneByUsername(username));
@ -71,9 +71,9 @@ async function membershipJoinAction(data: HomeserverEventSignatures['homeserver.
}
const insertedId = await createOrUpdateFederatedUser({
username: data.event.state_key,
username: event.state_key,
origin: serverName,
name: data.content.displayname || (data.state_key as `@${string}:${string}`),
name: event.content.displayname || event.state_key,
});
const user = await Users.findOneById(insertedId);
@ -86,17 +86,17 @@ async function membershipJoinAction(data: HomeserverEventSignatures['homeserver.
}
export function member(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.membership', async (data) => {
emitter.on('homeserver.matrix.membership', async ({ event }) => {
try {
if (data.content.membership === 'leave') {
return membershipLeaveAction(data);
if (event.content.membership === 'leave') {
return membershipLeaveAction(event);
}
if (data.content.membership === 'join') {
return membershipJoinAction(data);
if (event.content.membership === 'join') {
return membershipJoinAction(event);
}
logger.debug(`Ignoring membership event with membership: ${data.content.membership}`);
logger.debug(`Ignoring membership event with membership: ${event.content.membership}`);
} catch (error) {
logger.error('Failed to process Matrix membership event:', error);
}

View File

@ -119,11 +119,10 @@ async function handleMediaMessage(
}
export function message(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.message', async (data) => {
emitter.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => {
try {
const { content } = data;
const { msgtype } = content;
const messageBody = content.body.toString();
const { msgtype, body } = event.content;
const messageBody = body.toString();
if (!messageBody && !msgtype) {
logger.debug('No message content found in event');
@ -131,19 +130,19 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
}
// at this point we know for sure the user already exists
const user = await Users.findOneByUsername(data.sender);
const user = await Users.findOneByUsername(event.sender);
if (!user) {
throw new Error(`User not found for sender: ${data.sender}`);
throw new Error(`User not found for sender: ${event.sender}`);
}
const room = await Rooms.findOne({ 'federation.mrid': data.room_id });
const room = await Rooms.findOne({ 'federation.mrid': event.room_id });
if (!room) {
throw new Error(`No mapped room found for room_id: ${data.room_id}`);
throw new Error(`No mapped room found for room_id: ${event.room_id}`);
}
const serverName = federationSDK.getConfig('serverName');
const relation = content['m.relates_to'];
const relation = event.content['m.relates_to'];
// SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such.
const hasRelation = relation && 'rel_type' in relation;
@ -161,7 +160,7 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
const thread = threadRootEventId ? await getThreadMessageId(threadRootEventId) : undefined;
const isEditedMessage = hasRelation && relation.rel_type === 'm.replace';
if (isEditedMessage && relation.event_id && data.content['m.new_content']) {
if (isEditedMessage && relation.event_id && event.content['m.new_content']) {
logger.debug('Received edited message from Matrix, updating existing message');
const originalMessage = await Messages.findOneByFederationId(relation.event_id);
if (!originalMessage) {
@ -171,7 +170,7 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
if (originalMessage.federation?.eventId !== relation.event_id) {
return;
}
if (originalMessage.msg === data.content['m.new_content']?.body) {
if (originalMessage.msg === event.content['m.new_content']?.body) {
logger.debug('No changes in message content, skipping update');
return;
}
@ -180,10 +179,10 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
const messageToReplyToUrl = await MeteorService.getMessageURLToReplyTo(room.t as string, room._id, originalMessage._id);
const formatted = await toInternalQuoteMessageFormat({
messageToReplyToUrl,
formattedMessage: data.content.formatted_body || '',
formattedMessage: event.content.formatted_body || '',
rawMessage: messageBody,
homeServerDomain: serverName,
senderExternalId: data.sender,
senderExternalId: event.sender,
});
await Message.updateMessage(
{
@ -197,11 +196,12 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
}
const formatted = toInternalMessageFormat({
rawMessage: data.content['m.new_content'].body,
formattedMessage: data.content.formatted_body || '',
rawMessage: event.content['m.new_content'].body,
formattedMessage: event.content.formatted_body || '',
homeServerDomain: serverName,
senderExternalId: data.sender,
senderExternalId: event.sender,
});
await Message.updateMessage(
{
...originalMessage,
@ -222,47 +222,47 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
const messageToReplyToUrl = await MeteorService.getMessageURLToReplyTo(room.t as string, room._id, originalMessage._id);
const formatted = await toInternalQuoteMessageFormat({
messageToReplyToUrl,
formattedMessage: data.content.formatted_body || '',
formattedMessage: event.content.formatted_body || '',
rawMessage: messageBody,
homeServerDomain: serverName,
senderExternalId: data.sender,
senderExternalId: event.sender,
});
await Message.saveMessageFromFederation({
fromId: user._id,
rid: room._id,
msg: formatted,
federation_event_id: data.event_id,
federation_event_id: eventId,
thread,
});
return;
}
const isMediaMessage = Object.values(fileTypes).includes(msgtype as FileMessageType);
if (isMediaMessage && content.url) {
if (isMediaMessage && 'url' in event.content) {
const result = await handleMediaMessage(
content.url,
content.info,
event.content.url,
event.content.info,
msgtype,
messageBody,
user,
room,
data.room_id,
data.event_id,
event.room_id,
eventId,
thread,
);
await Message.saveMessageFromFederation(result);
} else {
const formatted = toInternalMessageFormat({
rawMessage: messageBody,
formattedMessage: data.content.formatted_body || '',
formattedMessage: event.content.formatted_body || '',
homeServerDomain: serverName,
senderExternalId: data.sender,
senderExternalId: event.sender,
});
await Message.saveMessageFromFederation({
fromId: user._id,
rid: room._id,
msg: formatted,
federation_event_id: data.event_id,
federation_event_id: eventId,
thread,
});
}
@ -271,25 +271,25 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
}
});
emitter.on('homeserver.matrix.encrypted', async (data) => {
emitter.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => {
try {
if (!data.content.ciphertext) {
if (!event.content.ciphertext) {
logger.debug('No message content found in event');
return;
}
// at this point we know for sure the user already exists
const user = await Users.findOneByUsername(data.sender);
const user = await Users.findOneByUsername(event.sender);
if (!user) {
throw new Error(`User not found for sender: ${data.sender}`);
throw new Error(`User not found for sender: ${event.sender}`);
}
const room = await Rooms.findOne({ 'federation.mrid': data.room_id });
const room = await Rooms.findOne({ 'federation.mrid': event.room_id });
if (!room) {
throw new Error(`No mapped room found for room_id: ${data.room_id}`);
throw new Error(`No mapped room found for room_id: ${event.room_id}`);
}
const relation = data.content['m.relates_to'];
const relation = event.content['m.relates_to'];
// SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such.
const hasRelation = relation && 'rel_type' in relation;
@ -317,7 +317,7 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
if (originalMessage.federation?.eventId !== relation.event_id) {
return;
}
if (originalMessage.content?.ciphertext === data.content.ciphertext) {
if (originalMessage.content?.ciphertext === event.content.ciphertext) {
logger.debug('No changes in message content, skipping update');
return;
}
@ -327,8 +327,8 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
{
...originalMessage,
content: {
algorithm: data.content.algorithm,
ciphertext: data.content.ciphertext,
algorithm: event.content.algorithm,
ciphertext: event.content.ciphertext,
},
},
user,
@ -341,8 +341,8 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
{
...originalMessage,
content: {
algorithm: data.content.algorithm,
ciphertext: data.content.ciphertext,
algorithm: event.content.algorithm,
ciphertext: event.content.ciphertext,
},
},
user,
@ -361,10 +361,10 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
fromId: user._id,
rid: room._id,
e2e_content: {
algorithm: data.content.algorithm,
ciphertext: data.content.ciphertext,
algorithm: event.content.algorithm,
ciphertext: event.content.ciphertext,
},
federation_event_id: data.event_id,
federation_event_id: eventId,
thread,
});
return;
@ -374,10 +374,10 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
fromId: user._id,
rid: room._id,
e2e_content: {
algorithm: data.content.algorithm,
ciphertext: data.content.ciphertext,
algorithm: event.content.algorithm,
ciphertext: event.content.ciphertext,
},
federation_event_id: data.event_id,
federation_event_id: eventId,
thread,
});
} catch (error) {
@ -385,9 +385,9 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
}
});
emitter.on('homeserver.matrix.redaction', async (data) => {
emitter.on('homeserver.matrix.redaction', async ({ event }) => {
try {
const redactedEventId = data.redacts;
const redactedEventId = event.redacts;
if (!redactedEventId) {
logger.debug('No redacts field in redaction event');
return;
@ -399,12 +399,12 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
return;
}
const rcMessage = await Messages.findOneByFederationId(data.redacts);
const rcMessage = await Messages.findOneByFederationId(event.redacts);
if (!rcMessage) {
logger.debug(`No RC message found for event ${data.redacts}`);
logger.debug(`No RC message found for event ${event.redacts}`);
return;
}
const internalUsername = data.sender;
const internalUsername = event.sender;
const user = await Users.findOneByUsername(internalUsername);
if (!user) {
logger.debug(`User not found: ${internalUsername}`);

View File

@ -8,20 +8,20 @@ import emojione from 'emojione';
const logger = new Logger('federation-matrix:reaction');
export function reaction(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.reaction', async (data) => {
emitter.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => {
try {
const isSetReaction = data.content?.['m.relates_to'];
const isSetReaction = event.content?.['m.relates_to'];
const reactionTargetEventId = isSetReaction?.event_id;
const reactionKey = isSetReaction?.key;
const [userPart, domain] = data.sender.split(':');
const [userPart, domain] = event.sender.split(':');
if (!userPart || !domain) {
logger.error('Invalid Matrix sender ID format:', data.sender);
logger.error('Invalid Matrix sender ID format:', event.sender);
return;
}
const internalUsername = data.sender;
const internalUsername = event.sender;
const user = await Users.findOneByUsername(internalUsername);
if (!user) {
logger.error(`No RC user mapping found for Matrix event ${reactionTargetEventId} ${internalUsername}`);
@ -41,15 +41,15 @@ export function reaction(emitter: Emitter<HomeserverEventSignatures>) {
const reactionEmoji = emojione.toShort(reactionKey);
await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, true);
await Messages.setFederationReactionEventId(internalUsername, rcMessage._id, reactionEmoji, data.event_id);
await Messages.setFederationReactionEventId(internalUsername, rcMessage._id, reactionEmoji, eventId);
} catch (error) {
logger.error('Failed to process Matrix reaction:', error);
}
});
emitter.on('homeserver.matrix.redaction', async (data) => {
emitter.on('homeserver.matrix.redaction', async ({ event }) => {
try {
const redactedEventId = data.redacts;
const redactedEventId = event.redacts;
if (!redactedEventId) {
logger.debug('No redacts field in redaction event');
return;
@ -76,7 +76,7 @@ export function reaction(emitter: Emitter<HomeserverEventSignatures>) {
return;
}
const internalUsername = data.sender;
const internalUsername = event.sender;
const user = await Users.findOneByUsername(internalUsername);
if (!user) {
logger.debug(`User not found: ${internalUsername}`);

View File

@ -22,8 +22,12 @@ export function room(emitter: Emitter<HomeserverEventSignatures>) {
await Room.saveRoomName(localRoomId._id, localUserId._id, name);
});
emitter.on('homeserver.matrix.room.topic', async (data) => {
const { room_id: roomId, topic, user_id: userId } = data;
emitter.on('homeserver.matrix.room.topic', async ({ event }) => {
const {
room_id: roomId,
content: { topic },
sender: userId,
} = event;
const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } });
if (!localRoomId) {