|
41 | 41 | "principal = \"admin\"\n", |
42 | 42 | "credentials = diffusion.Credentials(\"password\")\n", |
43 | 43 | "\n", |
44 | | - "\n", |
45 | | - "# handler callback function\n", |
46 | | - "def request_handler(request: str, context=None) -> str:\n", |
47 | | - " return f\"Hello there, {request}!\"\n", |
48 | | - "\n", |
49 | | - "\n", |
50 | | - "# filter response handler function\n", |
51 | | - "def on_filter_response(response, context=None):\n", |
52 | | - " print(f\"Received response from session '{context['session_id']}':\")\n", |
53 | | - " print(f\" {response}\")\n", |
54 | | - " print(\" - Request was sent to {filter} on path {path}\".format(**context))\n", |
55 | | - " print(\" - Received {received} of {expected} response(s).\".format(**context))\n", |
56 | | - "\n", |
57 | | - "\n", |
58 | 44 | "# request properties\n", |
59 | 45 | "request = \"Pushme Pullyou\"\n", |
60 | 46 | "path = \"path\"\n", |
|
67 | 53 | "metadata": {}, |
68 | 54 | "outputs": [], |
69 | 55 | "source": [ |
70 | | - "# creating the two sessions\n", |
71 | | - "sender_session = diffusion.Session(\n", |
72 | | - " url=server_url, principal=principal, credentials=credentials\n", |
73 | | - ")\n", |
74 | | - "receiver_session = diffusion.Session(\n", |
75 | | - " url=server_url, principal=principal, credentials=credentials\n", |
76 | | - ")\n", |
77 | | - "\n", |
78 | | - "async with sender_session, receiver_session:\n", |
79 | | - " # instantiating the messaging components for both sessions\n", |
80 | | - " receiver = diffusion.Messaging(receiver_session)\n", |
81 | | - " sender = diffusion.Messaging(sender_session)\n", |
82 | | - "\n", |
83 | | - " # adding the handler for addressed requests\n", |
84 | | - " receiver.add_request_handler(\n", |
85 | | - " path,\n", |
86 | | - " callback=request_handler,\n", |
87 | | - " request_type=request_type,\n", |
88 | | - " response_type=request_type,\n", |
89 | | - " addressed=True,\n", |
90 | | - " )\n", |
91 | | - "\n", |
92 | | - " # specifying the session filter\n", |
93 | | - " # this is a very simple filter, addressing the receiver session directly\n", |
94 | | - " session_filter = f\"$SessionId is '{receiver_session.session_id}'\"\n", |
| 56 | + "# request handler\n", |
| 57 | + "def callback(request: str, **kwargs) -> str:\n", |
| 58 | + " return f\"Hello there, {request}!\"\n", |
95 | 59 | "\n", |
96 | | - " # adding filter response handler\n", |
97 | | - " sender.add_filter_response_handler(\n", |
98 | | - " session_filter=session_filter, callback=on_filter_response\n", |
99 | | - " )\n", |
| 60 | + "request_handler = diffusion.messaging.RequestHandler(\n", |
| 61 | + " callback, request_type=request_type, response_type=request_type\n", |
| 62 | + ")\n", |
100 | 63 | "\n", |
101 | | - " # sending the request and receiving the number of expected responses\n", |
102 | | - " print(f\"Sending request: '{request}' to session filter `{session_filter}`...\")\n", |
103 | | - " try:\n", |
104 | | - " response = await sender.send_request_to_filter(\n", |
105 | | - " session_filter=session_filter, path=path, request=request_type(request),\n", |
106 | | - " )\n", |
107 | | - " except diffusion.DiffusionError as ex:\n", |
108 | | - " print(f\"ERROR: {ex}\")\n", |
109 | | - " else:\n", |
110 | | - " print(f\"... expecting {response} response(s) ...\")\n", |
| 64 | + "# filter response handler\n", |
| 65 | + "def filter_response_callback(response, **kwargs):\n", |
| 66 | + " print(\"Received response from session '{session_id}':\".format(**kwargs))\n", |
| 67 | + " print(f\" {response}\")\n", |
| 68 | + " print(\" - Request was sent to {filter} on path {path}\".format(**kwargs))\n", |
| 69 | + " print(\" - Received {received} of {expected} response(s).\".format(**kwargs))\n", |
111 | 70 | "\n", |
112 | | - " # waiting a bit to receive all responses\n", |
113 | | - " await asyncio.sleep(1)\n" |
114 | | - ] |
115 | | - }, |
116 | | - { |
117 | | - "cell_type": "code", |
118 | | - "execution_count": null, |
119 | | - "metadata": {}, |
120 | | - "outputs": [], |
121 | | - "source": [ |
122 | | - "request = \"Pushme Pullyou\"\n", |
123 | | - "path = \"path\"\n", |
124 | | - "request_type = diffusion.datatypes.STRING # datatype of the request" |
| 71 | + "filter_response_handler = diffusion.handlers.EventStreamHandler(\n", |
| 72 | + " response=filter_response_callback\n", |
| 73 | + ")" |
125 | 74 | ] |
126 | 75 | }, |
127 | 76 | { |
|
131 | 80 | "outputs": [], |
132 | 81 | "source": [ |
133 | 82 | "# creating the two sessions\n", |
134 | | - "sender_session = diffusion.Session(\n", |
| 83 | + "sender = diffusion.Session(\n", |
135 | 84 | " url=server_url, principal=principal, credentials=credentials\n", |
136 | 85 | ")\n", |
137 | | - "receiver_session = diffusion.Session(\n", |
| 86 | + "receiver = diffusion.Session(\n", |
138 | 87 | " url=server_url, principal=principal, credentials=credentials\n", |
139 | 88 | ")\n", |
140 | 89 | "\n", |
141 | | - "async with sender_session, receiver_session:\n", |
142 | | - " # instantiating the messaging components for both sessions\n", |
143 | | - " receiver = diffusion.Messaging(receiver_session)\n", |
144 | | - " sender = diffusion.Messaging(sender_session)\n", |
| 90 | + "async with sender, receiver:\n", |
145 | 91 | "\n", |
146 | 92 | " # adding the handler for addressed requests\n", |
147 | | - " receiver.add_request_handler(\n", |
148 | | - " path,\n", |
149 | | - " callback=request_handler,\n", |
150 | | - " request_type=request_type,\n", |
151 | | - " response_type=request_type,\n", |
152 | | - " addressed=True,\n", |
153 | | - " )\n", |
| 93 | + " receiver.messaging.add_stream_handler(path, handler=request_handler, addressed=True)\n", |
154 | 94 | "\n", |
155 | 95 | " # specifying the session filter\n", |
156 | 96 | " # this is a very simple filter, addressing the receiver session directly\n", |
157 | | - " session_filter = f\"$SessionId is '{receiver_session.session_id}'\"\n", |
| 97 | + " session_filter = f\"$SessionId is '{receiver.session_id}'\"\n", |
158 | 98 | "\n", |
159 | 99 | " # adding filter response handler\n", |
160 | | - " sender.add_filter_response_handler(\n", |
161 | | - " session_filter=session_filter, callback=on_filter_response\n", |
| 100 | + " sender.messaging.add_filter_response_handler(\n", |
| 101 | + " session_filter=session_filter, handler=filter_response_handler\n", |
162 | 102 | " )\n", |
163 | 103 | "\n", |
164 | 104 | " # sending the request and receiving the number of expected responses\n", |
165 | 105 | " print(f\"Sending request: '{request}' to session filter `{session_filter}`...\")\n", |
166 | 106 | " try:\n", |
167 | | - " response = await sender.send_request_to_filter(\n", |
| 107 | + " response = await sender.messaging.send_request_to_filter(\n", |
168 | 108 | " session_filter=session_filter, path=path, request=request_type(request),\n", |
169 | 109 | " )\n", |
170 | 110 | " except diffusion.DiffusionError as ex:\n", |
|
193 | 133 | "name": "python", |
194 | 134 | "nbconvert_exporter": "python", |
195 | 135 | "pygments_lexer": "ipython3", |
196 | | - "version": "3.8.5" |
| 136 | + "version": "3.8.6" |
197 | 137 | } |
198 | 138 | }, |
199 | 139 | "nbformat": 4, |
|
0 commit comments