Skip to content

Commit e269424

Browse files
Examples of sending/receiving queue messages
1 parent 1f4a3a5 commit e269424

4 files changed

Lines changed: 129 additions & 1 deletion

File tree

bun.lockb

4.52 KB
Binary file not shown.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@sqlitecloud/drivers",
3-
"version": "1.0.219",
3+
"version": "1.0.220",
44
"description": "SQLiteCloud drivers for Typescript/Javascript in edge, web and node clients",
55
"main": "./lib/index.js",
66
"types": "./lib/index.d.ts",
@@ -43,6 +43,8 @@
4343
},
4444
"homepage": "https://github.com/sqlitecloud/sqlitecloud-js#readme",
4545
"dependencies": {
46+
"@types/amqplib": "^0.10.5",
47+
"amqplib": "^0.10.4",
4648
"eventemitter3": "^5.0.1",
4749
"jest-html-reporter": "^3.10.2",
4850
"lz4js": "^0.2.0",

src/realtime/amqp-async.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import amqplib from 'amqplib'
2+
import crypto from 'crypto' // Import the crypto module
3+
4+
const serverUrl = process.env.SQLITECLOUD_AMQP as string // 'amqp://user:xxx@rabbitmq.aws-eu1.sqlite.tech'
5+
6+
// docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=xxx rabbitmq:3.13-management
7+
8+
// https://rabbitmq.aws-eu1.sqlite.tech:15672/#/
9+
10+
const queue = 'async-queue'
11+
12+
async function performExperiment() {
13+
const conn = await amqplib.connect(serverUrl)
14+
15+
const ch1 = await conn.createChannel()
16+
await ch1.assertQueue(queue)
17+
18+
// Listener
19+
ch1.consume(queue, msg => {
20+
if (msg !== null) {
21+
try {
22+
const jsonMsg = JSON.parse(msg.content.toString()) // Parse the message content to JSON
23+
//const jsonMsg = msg.content.toJSON()
24+
console.log(jsonMsg, msg.fields, msg.properties)
25+
} catch (error) {
26+
console.error(`Error while parsing ${msg.content.toString()}`, error)
27+
}
28+
29+
// console.log('Received:', msg.content.toString())
30+
ch1.ack(msg)
31+
} else {
32+
console.log('Consumer cancelled by server')
33+
}
34+
})
35+
36+
// Sender
37+
const ch2 = await conn.createChannel()
38+
39+
setInterval(() => {
40+
const buffer = crypto.randomBytes(64 * 1024) // Generate 64KB of random bytes
41+
const base64Buffer = buffer.toString('base64') // Encode the buffer as a base64 string
42+
43+
const jsonMsg = { task: 'something to do', buffer: base64Buffer } // Create a JSON object
44+
const sent = ch2.sendToQueue(queue, Buffer.from(JSON.stringify(jsonMsg)), {
45+
contentType: 'application/json'
46+
})
47+
//console.debug(`sent: ${sent}`)
48+
}, 1000)
49+
}
50+
51+
void performExperiment()

src/realtime/amqp-callbacks.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//
2+
//
3+
//
4+
5+
import { error } from 'console'
6+
import crypto from 'crypto' // Import the crypto module
7+
// https://amqp-node.github.io/amqplib/
8+
import amqplib from 'amqplib/callback_api'
9+
10+
const rabbit_url = process.env.SQLITECLOUD_AMQP as string // 'amqp://user:cKZWUkrGMOrdcsqZ@rabbitmq.aws-eu1.sqlite.tech'
11+
12+
// docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=xxx rabbitmq:3.13-management
13+
14+
// https://rabbitmq.aws-eu1.sqlite.tech:15672/#/
15+
16+
const topic = 'messaggini'
17+
18+
const queue = 'jsontasks'
19+
20+
amqplib.connect(rabbit_url, (err, conn) => {
21+
if (err) {
22+
console.debug(`amqplib.connect - error: ${error}`)
23+
throw err
24+
}
25+
console.debug(`amqplib.connect - connected`, conn)
26+
27+
// Listener
28+
29+
conn.createChannel((err, ch2) => {
30+
if (err) throw err
31+
32+
ch2.assertQueue(queue)
33+
ch2.consume(queue, msg => {
34+
if (msg !== null) {
35+
try {
36+
const jsonMsg = JSON.parse(msg.content.toString()) // Parse the message content to JSON
37+
//const jsonMsg = msg.content.toJSON()
38+
console.debug(`content:`, jsonMsg)
39+
console.debug(`headers:`, msg.properties.headers)
40+
console.debug(`fields:`, msg.fields)
41+
42+
// console.log(jsonMsg, msg.fields, msg.properties)
43+
} catch (error) {
44+
console.error(`Error while parsing ${msg.content.toString()}`, error)
45+
}
46+
ch2.ack(msg)
47+
} else {
48+
console.log('Consumer cancelled by server')
49+
}
50+
})
51+
})
52+
53+
// Sender
54+
conn.createChannel((err, ch1) => {
55+
if (err) throw err
56+
57+
ch1.assertQueue(queue)
58+
59+
setInterval(() => {
60+
const buffer = crypto.randomBytes(64 * 1024) // Generate 64KB of random bytes
61+
const base64Buffer = buffer.toString('base64') // Encode the buffer as a base64 string
62+
63+
const jsonMsg = { task: 'something to do', data: base64Buffer } // Create a JSON object
64+
const sent = ch1.sendToQueue(queue, Buffer.from(JSON.stringify(jsonMsg)), {
65+
contentType: 'application/json',
66+
headers: {
67+
headerOne: 'value1',
68+
headerTwo: 2
69+
}
70+
// persistent: true
71+
})
72+
console.debug(`sent: ${sent}`)
73+
}, 100)
74+
})
75+
})

0 commit comments

Comments
 (0)