From dedc99fee03062e8e6a20719627fefa7d4f507c3 Mon Sep 17 00:00:00 2001 From: kushalkolar Date: Mon, 4 Mar 2024 22:22:06 -0500 Subject: [PATCH] add zmq example --- .../notebooks/multiprocessing_zmq/README.md | 3 + .../multiprocessing_zmq_compute.ipynb | 73 +++++++++++ .../multiprocessing_zmq_plot.ipynb | 114 ++++++++++++++++++ 3 files changed, 190 insertions(+) create mode 100644 examples/notebooks/multiprocessing_zmq/README.md create mode 100644 examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_compute.ipynb create mode 100644 examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_plot.ipynb diff --git a/examples/notebooks/multiprocessing_zmq/README.md b/examples/notebooks/multiprocessing_zmq/README.md new file mode 100644 index 000000000..184453d0c --- /dev/null +++ b/examples/notebooks/multiprocessing_zmq/README.md @@ -0,0 +1,3 @@ +This example shows how to use a zmq publisher-subscriber pattern to perform a computation in one process and visualize results in another process. First, run all cells in `multiprocessing_zmq_plot.ipynb`, and then run cells in `multiprocessing_zmq_compute.ipynb`. The raw bytes for the numpy array are sent using zmq in the compute notebook and received in the plot notebook and displayed. + +For more information on zmq see: https://zeromq.org/languages/python/ diff --git a/examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_compute.ipynb b/examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_compute.ipynb new file mode 100644 index 000000000..7f24f6411 --- /dev/null +++ b/examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_compute.ipynb @@ -0,0 +1,73 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "ca2817a3-869c-4cc6-901b-c34509518175", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import zmq" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dd0b9780-b507-4ea2-af09-134abd76f45b", + "metadata": {}, + "outputs": [], + "source": [ + "context = zmq.Context()\n", + "\n", + "# create publisher\n", + "socket = context.socket(zmq.PUB)\n", + "socket.bind(\"tcp://127.0.0.1:5555\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4729bfe8-3474-4bc9-a489-a57d02e5a287", + "metadata": {}, + "outputs": [], + "source": [ + "for i in range(2_000):\n", + " # make some data, make note of the dtype\n", + " data = np.random.rand(512, 512).astype(np.float32)\n", + "\n", + " # sent bytes over the socket\n", + " socket.send(data.tobytes())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d47dab72-1061-439f-bf6e-a88b9ee8e5aa", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_plot.ipynb b/examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_plot.ipynb new file mode 100644 index 000000000..2c6e93d8f --- /dev/null +++ b/examples/notebooks/multiprocessing_zmq/multiprocessing_zmq_plot.ipynb @@ -0,0 +1,114 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "491e6050-64ae-4bfc-a480-5805cd684710", + "metadata": {}, + "outputs": [], + "source": [ + "import fastplotlib as fpl\n", + "import numpy as np\n", + "import zmq" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "97135f98-6810-49b6-a8de-d0e114720d6c", + "metadata": {}, + "outputs": [], + "source": [ + "context = zmq.Context()\n", + "\n", + "# create subscriber\n", + "sub = context.socket(zmq.SUB)\n", + "sub.setsockopt(zmq.SUBSCRIBE, b\"\")\n", + "\n", + "# keep only the most recent message\n", + "sub.setsockopt(zmq.CONFLATE, 1)\n", + "\n", + "# publisher address and port\n", + "sub.connect(\"tcp://127.0.0.1:5555\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2d4420f2-364a-445a-9658-63e9ffa586c3", + "metadata": {}, + "outputs": [], + "source": [ + "def get_bytes():\n", + " \"\"\"\n", + " Gets the bytes from the publisher\n", + " \"\"\"\n", + " try:\n", + " b = sub.recv(zmq.NOBLOCK)\n", + " except zmq.Again:\n", + " pass\n", + " else:\n", + " return b\n", + " \n", + " return None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "42d20d77-b884-4379-80e4-e08738506eeb", + "metadata": {}, + "outputs": [], + "source": [ + "plot = fpl.Plot()\n", + "\n", + "# initialize some data, must be of same dtype and shape as data sent by publisher\n", + "data = np.random.rand(512, 512).astype(np.float32)\n", + "plot.add_image(data, name=\"image\")\n", + "\n", + "def update_frame(p):\n", + " # recieve bytes\n", + " b = get_bytes()\n", + " \n", + " if b is not None:\n", + " # numpy array from bytes, MUST specify dtype and make sure it matches what you sent\n", + " a = np.frombuffer(b, dtype=np.float32).reshape(512, 512)\n", + " \n", + " # set graphic data\n", + " p[\"image\"].data = a\n", + "\n", + "plot.add_animations(update_frame)\n", + "plot.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f8ac188-9359-4d3c-b8f1-384be84d1585", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}