fix: Re-establish connection with FreeSwitch server when it is lost (#36230)

This commit is contained in:
Pierre Lehnen 2025-06-27 11:40:45 -03:00 committed by GitHub
parent 15baefa234
commit 93fef88ee2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 402 additions and 200 deletions

View File

@ -0,0 +1,6 @@
---
'@rocket.chat/freeswitch': patch
'@rocket.chat/meteor': patch
---
Fixes FreeSwitch event parser to automatically reconnect when connection is lost

View File

@ -11,7 +11,14 @@ import type {
AtLeast,
} from '@rocket.chat/core-typings';
import { isKnownFreeSwitchEventType } from '@rocket.chat/core-typings';
import { getDomain, getUserPassword, getExtensionList, getExtensionDetails, listenToEvents } from '@rocket.chat/freeswitch';
import {
getDomain,
getUserPassword,
getExtensionList,
getExtensionDetails,
FreeSwitchEventClient,
type FreeSwitchOptions,
} from '@rocket.chat/freeswitch';
import type { InsertionModel } from '@rocket.chat/model-typings';
import { FreeSwitchCall, FreeSwitchEvent, Users } from '@rocket.chat/models';
import { objectMap, wrapExceptions } from '@rocket.chat/tools';
@ -25,47 +32,135 @@ export class VoipFreeSwitchService extends ServiceClassInternal implements IVoip
private serviceStarter: ServiceStarter;
private eventClient: FreeSwitchEventClient | null = null;
private wasEverConnected = false;
constructor() {
super();
this.serviceStarter = new ServiceStarter(() => this.startEvents());
this.serviceStarter = new ServiceStarter(
async () => {
// Delay start to ensure setting values are up-to-date in the cache
setImmediate(() => this.startEvents());
},
async () => this.stopEvents(),
);
this.onEvent('watch.settings', async ({ setting }): Promise<void> => {
if (setting._id === 'VoIP_TeamCollab_Enabled' && setting.value === true) {
void this.serviceStarter.start();
if (setting._id === 'VoIP_TeamCollab_Enabled') {
if (setting.value !== true) {
void this.serviceStarter.stop();
return;
}
if (setting.value === true) {
void this.serviceStarter.start();
return;
}
}
if (setting._id === 'VoIP_TeamCollab_FreeSwitch_Host') {
// Re-connect if the host changes
if (this.eventClient && this.eventClient.host !== setting.value) {
this.stopEvents();
}
if (setting.value) {
void this.serviceStarter.start();
}
}
// If any other freeswitch setting changes, only reconnect if it's not yet connected
if (setting._id.startsWith('VoIP_TeamCollab_FreeSwitch_')) {
if (!this.eventClient?.isReady()) {
this.stopEvents();
void this.serviceStarter.start();
}
}
});
}
private listening = false;
public async started(): Promise<void> {
void this.serviceStarter.start();
}
private async startEvents(): Promise<void> {
if (this.listening) {
if (this.eventClient) {
if (!this.eventClient.isDone()) {
return;
}
const client = this.eventClient;
this.eventClient = null;
client.endConnection();
}
const options = wrapExceptions(() => this.getConnectionSettings()).suppress();
if (!options) {
this.wasEverConnected = false;
return;
}
try {
// #ToDo: Reconnection
// #ToDo: Only connect from one rocket.chat instance
await listenToEvents(
async (...args) => wrapExceptions(() => this.onFreeSwitchEvent(...args)).suppress(),
this.getConnectionSettings(),
);
this.listening = true;
} catch (_e) {
this.listening = false;
}
this.initializeEventClient(options);
}
private getConnectionSettings(): { host: string; port: number; password: string; timeout: number } {
if (!settings.get('VoIP_TeamCollab_Enabled') && !process.env.FREESWITCHIP) {
private retryEventsLater(): void {
// Try to re-establish connection after some time
setTimeout(
() => {
void this.startEvents();
},
this.wasEverConnected ? 3000 : 20_000,
);
}
private initializeEventClient(options: FreeSwitchOptions): void {
const client = FreeSwitchEventClient.listenToEvents(options);
this.eventClient = client;
client.on('ready', () => {
if (this.eventClient !== client) {
return;
}
this.wasEverConnected = true;
});
client.on('end', () => {
if (this.eventClient && this.eventClient !== client) {
return;
}
this.eventClient = null;
this.retryEventsLater();
});
client.on('event', async ({ eventName, eventData }) => {
if (this.eventClient !== client) {
return;
}
await wrapExceptions(() =>
this.onFreeSwitchEvent(eventName as string, eventData as unknown as Record<string, string | undefined>),
).suppress();
});
}
private stopEvents(): void {
if (!this.eventClient) {
return;
}
this.eventClient.endConnection();
this.wasEverConnected = false;
this.eventClient = null;
}
private getConnectionSettings(): FreeSwitchOptions {
if (!settings.get('VoIP_TeamCollab_Enabled')) {
throw new Error('VoIP is disabled.');
}
const host = process.env.FREESWITCHIP || settings.get<string>('VoIP_TeamCollab_FreeSwitch_Host');
const host = settings.get<string>('VoIP_TeamCollab_FreeSwitch_Host');
if (!host) {
throw new Error('VoIP is not properly configured.');
}
@ -75,14 +170,16 @@ export class VoipFreeSwitchService extends ServiceClassInternal implements IVoip
const password = settings.get<string>('VoIP_TeamCollab_FreeSwitch_Password');
return {
host,
port,
socketOptions: {
host,
port,
},
password,
timeout,
};
}
private async onFreeSwitchEvent(eventName: string, data: Record<string, string | undefined>): Promise<void> {
public async onFreeSwitchEvent(eventName: string, data: Record<string, string | undefined>): Promise<void> {
const uniqueId = data['Unique-ID'];
if (!uniqueId) {
return;

View File

@ -1 +1 @@
export type FreeSwitchOptions = { host?: string; port?: number; password?: string; timeout?: number };
export type FreeSwitchOptions = { socketOptions: { host: string; port: number }; password: string; timeout?: number };

View File

@ -1,8 +1,8 @@
import type { StringMap } from 'esl';
import type { FreeSwitchOptions } from '../FreeSwitchOptions';
import { FreeSwitchApiClient } from '../esl';
import { logger } from '../logger';
import { runCommand } from '../runCommand';
export function getCommandGetDomain(): string {
return 'eval ${domain}';
@ -20,6 +20,6 @@ export function parseDomainResponse(response: StringMap): string {
}
export async function getDomain(options: FreeSwitchOptions): Promise<string> {
const response = await runCommand(options, getCommandGetDomain());
const response = await FreeSwitchApiClient.runSingleCommand(options, getCommandGetDomain());
return parseDomainResponse(response);
}

View File

@ -1,7 +1,7 @@
import type { FreeSwitchExtension } from '@rocket.chat/core-typings';
import type { FreeSwitchOptions } from '../FreeSwitchOptions';
import { runCommand } from '../runCommand';
import { FreeSwitchApiClient } from '../esl';
import { mapUserData } from '../utils/mapUserData';
import { parseUserList } from '../utils/parseUserList';
@ -14,7 +14,7 @@ export async function getExtensionDetails(
requestParams: { extension: string; group?: string },
): Promise<FreeSwitchExtension> {
const { extension, group } = requestParams;
const response = await runCommand(options, getCommandListFilteredUser(extension, group));
const response = await FreeSwitchApiClient.runSingleCommand(options, getCommandListFilteredUser(extension, group));
const users = parseUserList(response);

View File

@ -1,7 +1,7 @@
import type { FreeSwitchExtension } from '@rocket.chat/core-typings';
import type { FreeSwitchOptions } from '../FreeSwitchOptions';
import { runCommand } from '../runCommand';
import { FreeSwitchApiClient } from '../esl';
import { mapUserData } from '../utils/mapUserData';
import { parseUserList } from '../utils/parseUserList';
@ -10,7 +10,7 @@ export function getCommandListUsers(): string {
}
export async function getExtensionList(options: FreeSwitchOptions): Promise<FreeSwitchExtension[]> {
const response = await runCommand(options, getCommandListUsers());
const response = await FreeSwitchApiClient.runSingleCommand(options, getCommandListUsers());
const users = parseUserList(response);
return users.map((item) => mapUserData(item));

View File

@ -2,8 +2,8 @@ import type { StringMap } from 'esl';
import type { FreeSwitchOptions } from '../FreeSwitchOptions';
import { logger } from '../logger';
import { runCallback } from '../runCommand';
import { getCommandGetDomain, parseDomainResponse } from './getDomain';
import { FreeSwitchApiClient } from '../esl';
export function getCommandGetUserPassword(user: string, domain = 'rocket.chat'): string {
return `user_data ${user}@${domain} param password`;
@ -21,7 +21,7 @@ export function parsePasswordResponse(response: StringMap): string {
}
export async function getUserPassword(options: FreeSwitchOptions, user: string): Promise<string> {
return runCallback(options, async (runCommand) => {
return FreeSwitchApiClient.runCallback(options, async (runCommand) => {
const domainResponse = await runCommand(getCommandGetDomain());
const domain = parseDomainResponse(domainResponse);

View File

@ -1,87 +0,0 @@
import { Socket, type SocketConnectOpts } from 'node:net';
import { FreeSwitchResponse } from 'esl';
import { logger } from './logger';
const defaultPassword = 'ClueCon';
export type EventNames = Parameters<FreeSwitchResponse['event_json']>;
export async function connect(
options?: { host?: string; port?: number; password?: string },
customEventNames: EventNames = [],
): Promise<FreeSwitchResponse> {
const host = options?.host ?? '127.0.0.1';
const port = options?.port ?? 8021;
const password = options?.password ?? defaultPassword;
return new Promise((resolve, reject) => {
logger.debug({ msg: 'FreeSwitchClient::connect', options: { host, port } });
const socket = new Socket();
const currentCall = new FreeSwitchResponse(socket, logger);
let connecting = true;
socket.once('connect', () => {
void (async (): Promise<void> => {
connecting = false;
try {
// Normally when the client connects, FreeSwitch will first send us an authentication request. We use it to trigger the remainder of the stack.
await currentCall.onceAsync('freeswitch_auth_request', 20_000, 'FreeSwitchClient expected authentication request');
await currentCall.auth(password);
currentCall.auto_cleanup();
await currentCall.event_json('CHANNEL_EXECUTE_COMPLETE', 'BACKGROUND_JOB', ...customEventNames);
} catch (error) {
logger.error('FreeSwitchClient: connect error', error);
reject(error);
}
if (currentCall) {
resolve(currentCall);
}
})();
});
socket.once('error', (error) => {
if (!connecting) {
return;
}
logger.error({ msg: 'failed to connect to freeswitch server', error });
connecting = false;
reject(error);
});
socket.once('end', () => {
if (!connecting) {
return;
}
logger.debug('FreeSwitchClient::connect: client received `end` event (remote end sent a FIN packet)');
connecting = false;
reject(new Error('connection-ended'));
});
socket.on('warning', (data) => {
if (!connecting) {
return;
}
logger.warn({ msg: 'FreeSwitchClient: warning', data });
});
try {
logger.debug('FreeSwitchClient::connect: socket.connect', { options: { host, port } });
socket.connect({
host,
port,
password,
} as unknown as SocketConnectOpts);
} catch (error) {
logger.error('FreeSwitchClient::connect: socket.connect error', { error });
connecting = false;
reject(error);
}
});
}

View File

@ -0,0 +1,53 @@
import { FreeSwitchResponse, type FreeSwitchEventData, type StringMap } from 'esl';
import { logger } from '../logger';
import { FreeSwitchESLClient, type FreeSwitchESLClientOptions } from './client';
export class FreeSwitchApiClient extends FreeSwitchESLClient {
private getCommandResponse(response: FreeSwitchEventData, command?: string): StringMap {
if (!response?.body) {
logger.error('No response from FreeSwitch server', command, response);
throw new Error('No response from FreeSwitch server.');
}
return response.body;
}
protected async transitionToReady(): Promise<void> {
try {
this.response.event_json('BACKGROUND_JOB');
} catch (error) {
logger.error({ msg: 'Failed to request api responses', error });
throw new Error('failed-to-request-api-responses');
}
super.transitionToReady();
}
public async runCommand(command: string, timeout?: number): Promise<StringMap> {
await this.waitUntilUsable();
const result = await this.response.bgapi(command, timeout ?? FreeSwitchResponse.default_command_timeout);
return this.getCommandResponse(result, command);
}
public static async runCallback<T>(
options: FreeSwitchESLClientOptions,
cb: (runCommand: (command: string, timeout?: number) => Promise<StringMap>) => Promise<T>,
): Promise<T> {
const client = new FreeSwitchApiClient(options);
try {
await client.waitUntilUsable();
// Await result so it runs within the try..finally scope
const result = await cb(async (command: string, timeout?: number) => client.runCommand(command, timeout));
return result;
} finally {
client.endConnection();
}
}
public static async runSingleCommand(options: FreeSwitchESLClientOptions, command: string, timeout?: number): Promise<StringMap> {
return this.runCallback(options, async (runCommand) => runCommand(command, timeout));
}
}

View File

@ -0,0 +1,158 @@
import { Socket, type TcpSocketConnectOpts } from 'node:net';
import type { ValueOf } from '@rocket.chat/core-typings';
import { Emitter } from '@rocket.chat/emitter';
import { wrapExceptions } from '@rocket.chat/tools';
import { FreeSwitchResponse, type StringMap } from 'esl';
import { logger } from '../logger';
export type EventNames = Parameters<FreeSwitchResponse['event_json']>;
export type FreeSwitchESLClientOptions = {
socketOptions: TcpSocketConnectOpts;
password: string;
timeout?: number;
};
export type FreeSwitchESLClientEvents = {
ready: void;
end: void;
event: { eventName: ValueOf<EventNames>; eventData: StringMap };
};
export type FreeSwitchESLClientState = 'none' | 'connecting' | 'authenticating' | 'transitioning' | 'failed' | 'ready' | 'ended';
export class FreeSwitchESLClient extends Emitter<FreeSwitchESLClientEvents> {
private state: FreeSwitchESLClientState = 'none';
private socket: Socket;
protected response: FreeSwitchResponse;
private expectingEnd = false;
public host: string | undefined;
constructor(protected options: FreeSwitchESLClientOptions) {
super();
this.host = this.options.socketOptions.host;
logger.debug('Connecting new FreeSwitch socket');
this.socket = new Socket();
this.response = new FreeSwitchResponse(this.socket, logger);
this.socket.once('connect', () => {
logger.debug('FreeSwitch socket connected.');
this.authenticate();
});
this.socket.once('error', (error) => {
logger.error({ msg: 'error on connection with freeswitch server', state: this.state, error });
this.changeState('failed');
});
this.socket.once('end', () => {
if (!this.expectingEnd) {
logger.debug('FreeSwitchESLClient received `end` event (remote end sent a FIN packet)');
}
this.changeState('ended');
});
this.socket.on('warning', (data) => {
logger.warn({ msg: 'FreeSwitchClient: warning', data });
});
try {
this.socket.connect(this.options.socketOptions);
} catch (error) {
this.changeState('failed');
logger.error({ msg: 'failed to connect to freeswitch server', error });
}
}
private async authenticate(): Promise<void> {
logger.debug('FreeSwitch socket authenticating.');
this.changeState('authenticating');
try {
// Wait for FreeSwitch to send us an authentication request
await this.response.onceAsync(
'freeswitch_auth_request',
this.options.timeout ?? 20_000,
'FreeSwitchClient expected authentication request',
);
await this.response.auth(this.options.password);
this.changeState('transitioning');
this.response.auto_cleanup();
await this.transitionToReady();
} catch (error) {
logger.error('FreeSwitchClient: initialization error', error);
this.changeState('failed');
}
}
protected async transitionToReady(): Promise<void> {
this.changeState('ready');
}
protected changeState(newState: FreeSwitchESLClientState): void {
logger.debug({ msg: 'FreeSwitchESLClient changing state .', newState, state: this.state });
if (this.isDone()) {
return;
}
this.state = newState;
if (this.isReady()) {
this.emit('ready');
return;
}
if (this.isDone()) {
this.emit('end');
}
}
public isReady(): boolean {
return this.state === 'ready';
}
public isDone(): boolean {
return ['failed', 'ended'].includes(this.state);
}
public async waitUntilUsable(): Promise<void> {
if (this.isReady()) {
return;
}
if (this.isDone()) {
throw new Error('connection-ended');
}
return new Promise((resolve, reject) => {
let concluded = false;
this.once('ready', () => {
if (!concluded) {
concluded = true;
resolve();
}
});
this.once('end', () => {
if (!concluded) {
concluded = true;
reject(new Error('connection-ended'));
}
});
});
}
public endConnection(): void {
this.expectingEnd = true;
wrapExceptions(() => this.response.end()).suppress();
}
}

View File

@ -0,0 +1,49 @@
import { logger } from '../logger';
import { FreeSwitchESLClient, type EventNames, type FreeSwitchESLClientOptions } from './client';
const eventsToListen: EventNames = [
'CHANNEL_CALLSTATE',
'CHANNEL_STATE',
'CHANNEL_CREATE',
'CHANNEL_DESTROY',
'CHANNEL_ANSWER',
'CHANNEL_HANGUP',
'CHANNEL_HANGUP_COMPLETE',
'CHANNEL_BRIDGE',
'CHANNEL_UNBRIDGE',
'CHANNEL_OUTGOING',
'CHANNEL_PARK',
'CHANNEL_UNPARK',
'CHANNEL_HOLD',
'CHANNEL_UNHOLD',
'CHANNEL_ORIGINATE',
'CHANNEL_UUID',
];
export class FreeSwitchEventClient extends FreeSwitchESLClient {
constructor(
protected options: FreeSwitchESLClientOptions,
private eventsToListen: EventNames,
) {
super(options);
eventsToListen.forEach((eventName) => {
this.response.on(eventName, (eventData) => this.emit('event', { eventName, eventData: eventData.body }));
});
}
protected async transitionToReady(): Promise<void> {
try {
this.response.event_json(...this.eventsToListen);
} catch (error) {
logger.error({ msg: 'Failed to request events', error });
throw new Error('failed-to-request-events');
}
super.transitionToReady();
}
public static listenToEvents(options: FreeSwitchESLClientOptions): FreeSwitchEventClient {
return new FreeSwitchEventClient(options, eventsToListen);
}
}

View File

@ -0,0 +1,3 @@
export * from './apiClient';
export * from './client';
export * from './eventClient';

View File

@ -1,12 +0,0 @@
import type { FreeSwitchEventData, StringMap } from 'esl';
import { logger } from './logger';
export async function getCommandResponse(response: FreeSwitchEventData, command?: string): Promise<StringMap> {
if (!response?.body) {
logger.error('No response from FreeSwitch server', command, response);
throw new Error('No response from FreeSwitch server.');
}
return response.body;
}

View File

@ -1,2 +1,4 @@
export * from './commands';
export * from './listenToEvents';
export * from './esl';
export * from './logger';
export * from './FreeSwitchOptions';

View File

@ -1,37 +0,0 @@
import type { FreeSwitchResponse } from 'esl';
import { connect, type EventNames } from './connect';
export async function listenToEvents(
callback: (eventName: string, data: Record<string, string | undefined>) => Promise<void>,
options?: { host?: string; port?: number; password?: string },
): Promise<FreeSwitchResponse> {
const eventsToListen: EventNames = [
'CHANNEL_CALLSTATE',
'CHANNEL_STATE',
'CHANNEL_CREATE',
'CHANNEL_DESTROY',
'CHANNEL_ANSWER',
'CHANNEL_HANGUP',
'CHANNEL_HANGUP_COMPLETE',
'CHANNEL_BRIDGE',
'CHANNEL_UNBRIDGE',
'CHANNEL_OUTGOING',
'CHANNEL_PARK',
'CHANNEL_UNPARK',
'CHANNEL_HOLD',
'CHANNEL_UNHOLD',
'CHANNEL_ORIGINATE',
'CHANNEL_UUID',
];
const connection = await connect(options, eventsToListen);
eventsToListen.forEach((eventName) =>
connection.on(eventName, (event) => {
callback(eventName, event.body);
}),
);
return connection;
}

View File

@ -1,30 +0,0 @@
import { wrapExceptions } from '@rocket.chat/tools';
import { FreeSwitchResponse, type StringMap } from 'esl';
import type { FreeSwitchOptions } from './FreeSwitchOptions';
import { connect } from './connect';
import { getCommandResponse } from './getCommandResponse';
export async function runCallback<T>(
options: FreeSwitchOptions,
cb: (runCommand: (command: string) => Promise<StringMap>) => Promise<T>,
): Promise<T> {
const { host, port, password, timeout } = options;
const call = await connect({ host, port, password });
try {
// Await result so it runs within the try..finally scope
const result = await cb(async (command) => {
const response = await call.bgapi(command, timeout ?? FreeSwitchResponse.default_command_timeout);
return getCommandResponse(response, command);
});
return result;
} finally {
await wrapExceptions(async () => call.end()).suppress();
}
}
export async function runCommand(options: FreeSwitchOptions, command: string): Promise<StringMap> {
return runCallback(options, async (runCommand) => runCommand(command));
}