diff --git a/bun.lockb b/bun.lockb index e3abaae..8caf0d4 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index fda2a34..6a09ec3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sqlitecloud/drivers", - "version": "1.0.219", + "version": "1.0.220", "description": "SQLiteCloud drivers for Typescript/Javascript in edge, web and node clients", "main": "./lib/index.js", "types": "./lib/index.d.ts", @@ -43,6 +43,8 @@ }, "homepage": "https://github.com/sqlitecloud/sqlitecloud-js#readme", "dependencies": { + "@types/amqplib": "^0.10.5", + "amqplib": "^0.10.4", "eventemitter3": "^5.0.1", "jest-html-reporter": "^3.10.2", "lz4js": "^0.2.0", diff --git a/src/realtime/amqp-async.ts b/src/realtime/amqp-async.ts new file mode 100644 index 0000000..1a16427 --- /dev/null +++ b/src/realtime/amqp-async.ts @@ -0,0 +1,51 @@ +import amqplib from 'amqplib' +import crypto from 'crypto' // Import the crypto module + +const serverUrl = process.env.SQLITECLOUD_AMQP as string // 'amqp://user:xxx@rabbitmq.aws-eu1.sqlite.tech' + +// docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=xxx rabbitmq:3.13-management + +// https://rabbitmq.aws-eu1.sqlite.tech:15672/#/ + +const queue = 'async-queue' + +async function performExperiment() { + const conn = await amqplib.connect(serverUrl) + + const ch1 = await conn.createChannel() + await ch1.assertQueue(queue) + + // Listener + ch1.consume(queue, msg => { + if (msg !== null) { + try { + const jsonMsg = JSON.parse(msg.content.toString()) // Parse the message content to JSON + //const jsonMsg = msg.content.toJSON() + console.log(jsonMsg, msg.fields, msg.properties) + } catch (error) { + console.error(`Error while parsing ${msg.content.toString()}`, error) + } + + // console.log('Received:', msg.content.toString()) + ch1.ack(msg) + } else { + console.log('Consumer cancelled by server') + } + }) + + // Sender + const ch2 = await conn.createChannel() + + setInterval(() => { + const buffer = crypto.randomBytes(64 * 1024) // Generate 64KB of random bytes + const base64Buffer = buffer.toString('base64') // Encode the buffer as a base64 string + + const jsonMsg = { task: 'something to do', buffer: base64Buffer } // Create a JSON object + const sent = ch2.sendToQueue(queue, Buffer.from(JSON.stringify(jsonMsg)), { + contentType: 'application/json' + }) + //console.debug(`sent: ${sent}`) + }, 1000) +} + +void performExperiment() diff --git a/src/realtime/amqp-callbacks.ts b/src/realtime/amqp-callbacks.ts new file mode 100644 index 0000000..a7d0866 --- /dev/null +++ b/src/realtime/amqp-callbacks.ts @@ -0,0 +1,74 @@ +// +// +// + +import { error } from 'console' +import crypto from 'crypto' // Import the crypto module +// https://amqp-node.github.io/amqplib/ +import amqplib from 'amqplib/callback_api' + +const rabbit_url = process.env.SQLITECLOUD_AMQP as string +// eg. amqp://user:password@rabbitmq.aws-eu1.sqlite.tech + +// docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=xxx rabbitmq:3.13-management + +// https://rabbitmq.aws-eu1.sqlite.tech:15672/#/ + +const queue = 'jsontasks' + +amqplib.connect(rabbit_url, (err, conn) => { + if (err) { + console.debug(`amqplib.connect - error: ${error}`) + throw err + } + console.debug(`amqplib.connect - connected`, conn) + + // Listener + + conn.createChannel((err, ch2) => { + if (err) throw err + + ch2.assertQueue(queue) + ch2.consume(queue, msg => { + if (msg !== null) { + try { + const jsonMsg = JSON.parse(msg.content.toString()) // Parse the message content to JSON + //const jsonMsg = msg.content.toJSON() + console.debug(`content:`, jsonMsg) + console.debug(`headers:`, msg.properties.headers) + console.debug(`fields:`, msg.fields) + + // console.log(jsonMsg, msg.fields, msg.properties) + } catch (error) { + console.error(`Error while parsing ${msg.content.toString()}`, error) + } + ch2.ack(msg) + } else { + console.log('Consumer cancelled by server') + } + }) + }) + + // Sender + conn.createChannel((err, ch1) => { + if (err) throw err + + ch1.assertQueue(queue) + + setInterval(() => { + const buffer = crypto.randomBytes(64 * 1024) // Generate 64KB of random bytes + const base64Buffer = buffer.toString('base64') // Encode the buffer as a base64 string + + const jsonMsg = { task: 'something to do', data: base64Buffer } // Create a JSON object + const sent = ch1.sendToQueue(queue, Buffer.from(JSON.stringify(jsonMsg)), { + contentType: 'application/json', + headers: { + headerOne: 'value1', + headerTwo: 2 + } + // persistent: true + }) + console.debug(`sent: ${sent}`) + }, 100) + }) +})