Skip to content

Commit e05c489

Browse files
committed
feat: Added rabbitmq to send events
1 parent 24c8803 commit e05c489

18 files changed

Lines changed: 414 additions & 7 deletions

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"@hapi/boom": "^10.0.1",
4848
"@sentry/node": "^7.59.2",
4949
"@whiskeysockets/baileys": "github:EvolutionAPI/Baileys",
50+
"amqplib": "^0.10.3",
5051
"axios": "^1.3.5",
5152
"class-validator": "^0.13.2",
5253
"compression": "^1.7.4",

src/libs/amqp.server.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import * as amqp from 'amqplib/callback_api';
2+
3+
import { Logger } from '../config/logger.config';
4+
5+
const logger = new Logger('AMQP');
6+
7+
let amqpChannel: amqp.Channel | null = null;
8+
9+
export const initAMQP = () => {
10+
return new Promise<void>((resolve, reject) => {
11+
amqp.connect('amqp://admin:admin@localhost:5672', (error, connection) => {
12+
if (error) {
13+
reject(error);
14+
return;
15+
}
16+
17+
connection.createChannel((channelError, channel) => {
18+
if (channelError) {
19+
reject(channelError);
20+
return;
21+
}
22+
23+
const exchangeName = 'evolution_exchange';
24+
25+
channel.assertExchange(exchangeName, 'topic', { durable: false });
26+
amqpChannel = channel;
27+
28+
logger.log('Serviço do RabbitMQ inicializado com sucesso.');
29+
resolve();
30+
});
31+
});
32+
});
33+
};
34+
35+
export const getAMQP = (): amqp.Channel | null => {
36+
return amqpChannel;
37+
};

src/main.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import { configService, Cors, HttpServer } from './config/env.config';
99
import { onUnexpectedError } from './config/error.config';
1010
import { Logger } from './config/logger.config';
1111
import { ROOT_DIR } from './config/path.config';
12-
import { initIO } from './libs/socket';
12+
import { initAMQP } from './libs/amqp.server';
13+
import { initIO } from './libs/socket.server';
1314
import { ServerUP } from './utils/server-up';
1415
import { HttpStatus, router } from './whatsapp/routers/index.router';
1516
import { waMonitor } from './whatsapp/whatsapp.module';
@@ -86,6 +87,8 @@ function bootstrap() {
8687

8788
initIO(server);
8889

90+
initAMQP();
91+
8992
onUnexpectedError();
9093
}
9194

src/validate/validate.schema.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,3 +934,43 @@ export const websocketSchema: JSONSchema7 = {
934934
required: ['enabled'],
935935
...isNotEmpty('enabled'),
936936
};
937+
938+
export const rabbitmqSchema: JSONSchema7 = {
939+
$id: v4(),
940+
type: 'object',
941+
properties: {
942+
enabled: { type: 'boolean', enum: [true, false] },
943+
events: {
944+
type: 'array',
945+
minItems: 0,
946+
items: {
947+
type: 'string',
948+
enum: [
949+
'APPLICATION_STARTUP',
950+
'QRCODE_UPDATED',
951+
'MESSAGES_SET',
952+
'MESSAGES_UPSERT',
953+
'MESSAGES_UPDATE',
954+
'MESSAGES_DELETE',
955+
'SEND_MESSAGE',
956+
'CONTACTS_SET',
957+
'CONTACTS_UPSERT',
958+
'CONTACTS_UPDATE',
959+
'PRESENCE_UPDATE',
960+
'CHATS_SET',
961+
'CHATS_UPSERT',
962+
'CHATS_UPDATE',
963+
'CHATS_DELETE',
964+
'GROUPS_UPSERT',
965+
'GROUP_UPDATE',
966+
'GROUP_PARTICIPANTS_UPDATE',
967+
'CONNECTION_UPDATE',
968+
'CALL',
969+
'NEW_JWT_TOKEN',
970+
],
971+
},
972+
},
973+
},
974+
required: ['enabled'],
975+
...isNotEmpty('enabled'),
976+
};

src/whatsapp/controllers/instance.controller.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { RepositoryBroker } from '../repository/repository.manager';
1111
import { AuthService, OldToken } from '../services/auth.service';
1212
import { ChatwootService } from '../services/chatwoot.service';
1313
import { WAMonitoringService } from '../services/monitor.service';
14+
import { RabbitmqService } from '../services/rabbitmq.service';
1415
import { SettingsService } from '../services/settings.service';
1516
import { WebhookService } from '../services/webhook.service';
1617
import { WebsocketService } from '../services/websocket.service';
@@ -28,6 +29,7 @@ export class InstanceController {
2829
private readonly chatwootService: ChatwootService,
2930
private readonly settingsService: SettingsService,
3031
private readonly websocketService: WebsocketService,
32+
private readonly rebbitmqService: RabbitmqService,
3133
private readonly cache: RedisCache,
3234
) {}
3335

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { Logger } from '../../config/logger.config';
2+
import { InstanceDto } from '../dto/instance.dto';
3+
import { RabbitmqDto } from '../dto/rabbitmq.dto';
4+
import { RabbitmqService } from '../services/rabbitmq.service';
5+
6+
const logger = new Logger('RabbitmqController');
7+
8+
export class RabbitmqController {
9+
constructor(private readonly rabbitmqService: RabbitmqService) {}
10+
11+
public async createRabbitmq(instance: InstanceDto, data: RabbitmqDto) {
12+
logger.verbose('requested createRabbitmq from ' + instance.instanceName + ' instance');
13+
14+
if (!data.enabled) {
15+
logger.verbose('rabbitmq disabled');
16+
data.events = [];
17+
}
18+
19+
if (data.events.length === 0) {
20+
logger.verbose('rabbitmq events empty');
21+
data.events = [
22+
'APPLICATION_STARTUP',
23+
'QRCODE_UPDATED',
24+
'MESSAGES_SET',
25+
'MESSAGES_UPSERT',
26+
'MESSAGES_UPDATE',
27+
'MESSAGES_DELETE',
28+
'SEND_MESSAGE',
29+
'CONTACTS_SET',
30+
'CONTACTS_UPSERT',
31+
'CONTACTS_UPDATE',
32+
'PRESENCE_UPDATE',
33+
'CHATS_SET',
34+
'CHATS_UPSERT',
35+
'CHATS_UPDATE',
36+
'CHATS_DELETE',
37+
'GROUPS_UPSERT',
38+
'GROUP_UPDATE',
39+
'GROUP_PARTICIPANTS_UPDATE',
40+
'CONNECTION_UPDATE',
41+
'CALL',
42+
'NEW_JWT_TOKEN',
43+
];
44+
}
45+
46+
return this.rabbitmqService.create(instance, data);
47+
}
48+
49+
public async findRabbitmq(instance: InstanceDto) {
50+
logger.verbose('requested findRabbitmq from ' + instance.instanceName + ' instance');
51+
return this.rabbitmqService.find(instance);
52+
}
53+
}

src/whatsapp/dto/rabbitmq.dto.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export class RabbitmqDto {
2+
enabled: boolean;
3+
events?: string[];
4+
}

src/whatsapp/models/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export * from './chat.model';
33
export * from './chatwoot.model';
44
export * from './contact.model';
55
export * from './message.model';
6+
export * from './rabbitmq.model';
67
export * from './settings.model';
78
export * from './webhook.model';
89
export * from './websocket.model';
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { Schema } from 'mongoose';
2+
3+
import { dbserver } from '../../libs/db.connect';
4+
5+
export class RabbitmqRaw {
6+
_id?: string;
7+
enabled?: boolean;
8+
events?: string[];
9+
}
10+
11+
const rabbitmqSchema = new Schema<RabbitmqRaw>({
12+
_id: { type: String, _id: true },
13+
enabled: { type: Boolean, required: true },
14+
events: { type: [String], required: true },
15+
});
16+
17+
export const RabbitmqModel = dbserver?.model(RabbitmqRaw.name, rabbitmqSchema, 'rabbitmq');
18+
export type IRabbitmqModel = typeof RabbitmqModel;

0 commit comments

Comments
 (0)