|
| 1 | +// |
| 2 | +// |
| 3 | +// |
| 4 | + |
| 5 | +import { error } from 'console' |
| 6 | +import crypto from 'crypto' // Import the crypto module |
| 7 | +// https://amqp-node.github.io/amqplib/ |
| 8 | +import amqplib from 'amqplib/callback_api' |
| 9 | + |
| 10 | +const rabbit_url = process.env.SQLITECLOUD_AMQP as string // 'amqp://user:cKZWUkrGMOrdcsqZ@rabbitmq.aws-eu1.sqlite.tech' |
| 11 | + |
| 12 | +// docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=xxx rabbitmq:3.13-management |
| 13 | + |
| 14 | +// https://rabbitmq.aws-eu1.sqlite.tech:15672/#/ |
| 15 | + |
| 16 | +const topic = 'messaggini' |
| 17 | + |
| 18 | +const queue = 'jsontasks' |
| 19 | + |
| 20 | +amqplib.connect(rabbit_url, (err, conn) => { |
| 21 | + if (err) { |
| 22 | + console.debug(`amqplib.connect - error: ${error}`) |
| 23 | + throw err |
| 24 | + } |
| 25 | + console.debug(`amqplib.connect - connected`, conn) |
| 26 | + |
| 27 | + // Listener |
| 28 | + |
| 29 | + conn.createChannel((err, ch2) => { |
| 30 | + if (err) throw err |
| 31 | + |
| 32 | + ch2.assertQueue(queue) |
| 33 | + ch2.consume(queue, msg => { |
| 34 | + if (msg !== null) { |
| 35 | + try { |
| 36 | + const jsonMsg = JSON.parse(msg.content.toString()) // Parse the message content to JSON |
| 37 | + //const jsonMsg = msg.content.toJSON() |
| 38 | + console.debug(`content:`, jsonMsg) |
| 39 | + console.debug(`headers:`, msg.properties.headers) |
| 40 | + console.debug(`fields:`, msg.fields) |
| 41 | + |
| 42 | + // console.log(jsonMsg, msg.fields, msg.properties) |
| 43 | + } catch (error) { |
| 44 | + console.error(`Error while parsing ${msg.content.toString()}`, error) |
| 45 | + } |
| 46 | + ch2.ack(msg) |
| 47 | + } else { |
| 48 | + console.log('Consumer cancelled by server') |
| 49 | + } |
| 50 | + }) |
| 51 | + }) |
| 52 | + |
| 53 | + // Sender |
| 54 | + conn.createChannel((err, ch1) => { |
| 55 | + if (err) throw err |
| 56 | + |
| 57 | + ch1.assertQueue(queue) |
| 58 | + |
| 59 | + setInterval(() => { |
| 60 | + const buffer = crypto.randomBytes(64 * 1024) // Generate 64KB of random bytes |
| 61 | + const base64Buffer = buffer.toString('base64') // Encode the buffer as a base64 string |
| 62 | + |
| 63 | + const jsonMsg = { task: 'something to do', data: base64Buffer } // Create a JSON object |
| 64 | + const sent = ch1.sendToQueue(queue, Buffer.from(JSON.stringify(jsonMsg)), { |
| 65 | + contentType: 'application/json', |
| 66 | + headers: { |
| 67 | + headerOne: 'value1', |
| 68 | + headerTwo: 2 |
| 69 | + } |
| 70 | + // persistent: true |
| 71 | + }) |
| 72 | + console.debug(`sent: ${sent}`) |
| 73 | + }, 100) |
| 74 | + }) |
| 75 | +}) |
0 commit comments