Skip to content

Commit acd49c2

Browse files
author
x
committed
Update Diffusion Examples to 6.6.0-preview.2
1 parent 29374a1 commit acd49c2

23 files changed

Lines changed: 524 additions & 247 deletions

c/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ recordv2-topics: $(OBJDIR)/recordv2-topics.o
144144
$(CC) $< $(LDFLAGS) -o $(BINDIR)/$@
145145

146146
binary-topics: $(OBJDIR)/binary-topics.o
147+
147148
$(CC) $< $(LDFLAGS) -o $(BINDIR)/$@
148149

149150
int64-topics: $(OBJDIR)/int64-topics.o

java/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
<groupId>com.pushtechnology.diffusion</groupId>
1313
<artifactId>diffusion-api-java-examples</artifactId>
14-
<version>6.6.0-preview.1</version>
14+
<version>6.6.0-preview.2</version>
1515
<packaging>jar</packaging>
1616
<description>Diffusion public API examples.</description>
1717

@@ -36,7 +36,7 @@
3636
<dependency>
3737
<groupId>com.pushtechnology.diffusion</groupId>
3838
<artifactId>diffusion-api</artifactId>
39-
<version>6.6.0-preview.1</version>
39+
<version>6.6.0-preview.2</version>
4040
</dependency>
4141

4242
<dependency>
@@ -54,7 +54,7 @@
5454
<dependency>
5555
<groupId>com.pushtechnology.diffusion</groupId>
5656
<artifactId>diffusion-client</artifactId>
57-
<version>6.6.0-preview.1</version>
57+
<version>6.6.0-preview.2</version>
5858
<scope>runtime</scope>
5959
</dependency>
6060

java/src/main/java/com/pushtechnology/diffusion/examples/ControlClientAddingAndRemovingTopics.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (C) 2014, 2019 Push Technology Ltd.
2+
* Copyright (C) 2014, 2020 Push Technology Ltd.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -133,7 +133,7 @@ public boolean addSessionTopic(String topicPath, TopicType topicType)
133133
* @return a CompletableFuture that completes when a response is received
134134
* from the server
135135
*/
136-
public CompletableFuture<?> removeTopic(String topicPath) {
136+
public CompletableFuture<Integer> removeTopic(String topicPath) {
137137
return removeTopics(">" + topicPath);
138138
}
139139

@@ -144,7 +144,7 @@ public CompletableFuture<?> removeTopic(String topicPath) {
144144
* @return a CompletableFuture that completes when a response is received
145145
* from the server
146146
*/
147-
public CompletableFuture<?> removeTopicBranch(String topicPath) {
147+
public CompletableFuture<Integer> removeTopicBranch(String topicPath) {
148148
return removeTopics("?" + topicPath + "//");
149149
}
150150

@@ -155,8 +155,9 @@ public CompletableFuture<?> removeTopicBranch(String topicPath) {
155155
* @return a CompletableFuture that completes when a response is received
156156
* from the server
157157
*/
158-
public CompletableFuture<?> removeTopics(String topicSelector) {
159-
return topicControl.removeTopics(topicSelector);
158+
public CompletableFuture<Integer> removeTopics(String topicSelector) {
159+
return topicControl.removeTopics(topicSelector)
160+
.thenApply(TopicControl.TopicRemovalResult::getRemovedCount);
160161
}
161162

162163
/**

js/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
{
22
"name": "diffusion-examples",
3-
"version": "6.6.0-preview.1",
3+
"version": "6.6.0-preview.2",
44
"description": "Examples for using the Diffusion JavaScript client",
55
"main": "index.js",
66
"directories": {
77
"example": "examples"
88
},
99
"license": "Apache-2.0",
1010
"dependencies": {
11-
"diffusion": "6.6.0-preview.1"
11+
"diffusion": "6.6.0-preview.2"
1212
}
1313
}

python/examples/messaging/register_request_handler.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@ async def main():
2626
url=server_url, principal=principal, credentials=credentials
2727
) as session:
2828

29-
# instantiating the messaging component
30-
messaging = diffusion.Messaging(session)
31-
3229
# registering the request handler
33-
await messaging.register_request_handler(
30+
await session.messaging.register_request_handler(
3431
path,
3532
callback=path_request_handler,
3633
request_type=diffusion.datatypes.STRING,

python/examples/messaging/send_request_to_filter.ipynb

Lines changed: 24 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,6 @@
4141
"principal = \"admin\"\n",
4242
"credentials = diffusion.Credentials(\"password\")\n",
4343
"\n",
44-
"\n",
45-
"# handler callback function\n",
46-
"def request_handler(request: str, context=None) -> str:\n",
47-
" return f\"Hello there, {request}!\"\n",
48-
"\n",
49-
"\n",
50-
"# filter response handler function\n",
51-
"def on_filter_response(response, context=None):\n",
52-
" print(f\"Received response from session '{context['session_id']}':\")\n",
53-
" print(f\" {response}\")\n",
54-
" print(\" - Request was sent to {filter} on path {path}\".format(**context))\n",
55-
" print(\" - Received {received} of {expected} response(s).\".format(**context))\n",
56-
"\n",
57-
"\n",
5844
"# request properties\n",
5945
"request = \"Pushme Pullyou\"\n",
6046
"path = \"path\"\n",
@@ -67,61 +53,24 @@
6753
"metadata": {},
6854
"outputs": [],
6955
"source": [
70-
"# creating the two sessions\n",
71-
"sender_session = diffusion.Session(\n",
72-
" url=server_url, principal=principal, credentials=credentials\n",
73-
")\n",
74-
"receiver_session = diffusion.Session(\n",
75-
" url=server_url, principal=principal, credentials=credentials\n",
76-
")\n",
77-
"\n",
78-
"async with sender_session, receiver_session:\n",
79-
" # instantiating the messaging components for both sessions\n",
80-
" receiver = diffusion.Messaging(receiver_session)\n",
81-
" sender = diffusion.Messaging(sender_session)\n",
82-
"\n",
83-
" # adding the handler for addressed requests\n",
84-
" receiver.add_request_handler(\n",
85-
" path,\n",
86-
" callback=request_handler,\n",
87-
" request_type=request_type,\n",
88-
" response_type=request_type,\n",
89-
" addressed=True,\n",
90-
" )\n",
91-
"\n",
92-
" # specifying the session filter\n",
93-
" # this is a very simple filter, addressing the receiver session directly\n",
94-
" session_filter = f\"$SessionId is '{receiver_session.session_id}'\"\n",
56+
"# request handler\n",
57+
"def callback(request: str, **kwargs) -> str:\n",
58+
" return f\"Hello there, {request}!\"\n",
9559
"\n",
96-
" # adding filter response handler\n",
97-
" sender.add_filter_response_handler(\n",
98-
" session_filter=session_filter, callback=on_filter_response\n",
99-
" )\n",
60+
"request_handler = diffusion.messaging.RequestHandler(\n",
61+
" callback, request_type=request_type, response_type=request_type\n",
62+
")\n",
10063
"\n",
101-
" # sending the request and receiving the number of expected responses\n",
102-
" print(f\"Sending request: '{request}' to session filter `{session_filter}`...\")\n",
103-
" try:\n",
104-
" response = await sender.send_request_to_filter(\n",
105-
" session_filter=session_filter, path=path, request=request_type(request),\n",
106-
" )\n",
107-
" except diffusion.DiffusionError as ex:\n",
108-
" print(f\"ERROR: {ex}\")\n",
109-
" else:\n",
110-
" print(f\"... expecting {response} response(s) ...\")\n",
64+
"# filter response handler\n",
65+
"def filter_response_callback(response, **kwargs):\n",
66+
" print(\"Received response from session '{session_id}':\".format(**kwargs))\n",
67+
" print(f\" {response}\")\n",
68+
" print(\" - Request was sent to {filter} on path {path}\".format(**kwargs))\n",
69+
" print(\" - Received {received} of {expected} response(s).\".format(**kwargs))\n",
11170
"\n",
112-
" # waiting a bit to receive all responses\n",
113-
" await asyncio.sleep(1)\n"
114-
]
115-
},
116-
{
117-
"cell_type": "code",
118-
"execution_count": null,
119-
"metadata": {},
120-
"outputs": [],
121-
"source": [
122-
"request = \"Pushme Pullyou\"\n",
123-
"path = \"path\"\n",
124-
"request_type = diffusion.datatypes.STRING # datatype of the request"
71+
"filter_response_handler = diffusion.handlers.EventStreamHandler(\n",
72+
" response=filter_response_callback\n",
73+
")"
12574
]
12675
},
12776
{
@@ -131,40 +80,31 @@
13180
"outputs": [],
13281
"source": [
13382
"# creating the two sessions\n",
134-
"sender_session = diffusion.Session(\n",
83+
"sender = diffusion.Session(\n",
13584
" url=server_url, principal=principal, credentials=credentials\n",
13685
")\n",
137-
"receiver_session = diffusion.Session(\n",
86+
"receiver = diffusion.Session(\n",
13887
" url=server_url, principal=principal, credentials=credentials\n",
13988
")\n",
14089
"\n",
141-
"async with sender_session, receiver_session:\n",
142-
" # instantiating the messaging components for both sessions\n",
143-
" receiver = diffusion.Messaging(receiver_session)\n",
144-
" sender = diffusion.Messaging(sender_session)\n",
90+
"async with sender, receiver:\n",
14591
"\n",
14692
" # adding the handler for addressed requests\n",
147-
" receiver.add_request_handler(\n",
148-
" path,\n",
149-
" callback=request_handler,\n",
150-
" request_type=request_type,\n",
151-
" response_type=request_type,\n",
152-
" addressed=True,\n",
153-
" )\n",
93+
" receiver.messaging.add_stream_handler(path, handler=request_handler, addressed=True)\n",
15494
"\n",
15595
" # specifying the session filter\n",
15696
" # this is a very simple filter, addressing the receiver session directly\n",
157-
" session_filter = f\"$SessionId is '{receiver_session.session_id}'\"\n",
97+
" session_filter = f\"$SessionId is '{receiver.session_id}'\"\n",
15898
"\n",
15999
" # adding filter response handler\n",
160-
" sender.add_filter_response_handler(\n",
161-
" session_filter=session_filter, callback=on_filter_response\n",
100+
" sender.messaging.add_filter_response_handler(\n",
101+
" session_filter=session_filter, handler=filter_response_handler\n",
162102
" )\n",
163103
"\n",
164104
" # sending the request and receiving the number of expected responses\n",
165105
" print(f\"Sending request: '{request}' to session filter `{session_filter}`...\")\n",
166106
" try:\n",
167-
" response = await sender.send_request_to_filter(\n",
107+
" response = await sender.messaging.send_request_to_filter(\n",
168108
" session_filter=session_filter, path=path, request=request_type(request),\n",
169109
" )\n",
170110
" except diffusion.DiffusionError as ex:\n",
@@ -193,7 +133,7 @@
193133
"name": "python",
194134
"nbconvert_exporter": "python",
195135
"pygments_lexer": "ipython3",
196-
"version": "3.8.5"
136+
"version": "3.8.6"
197137
}
198138
},
199139
"nbformat": 4,

python/examples/messaging/send_request_to_filter.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,17 @@
1010

1111

1212
# filter response handler function
13-
def on_filter_response(response, context=None):
14-
print(f"Received response from session '{context['session_id']}':")
13+
def on_filter_response(response, **kwargs):
14+
print("Received response from session '{session_id}':".format(**kwargs))
1515
print(f" {response}")
16-
print(" - Request was sent to {filter} on path {path}".format(**context))
17-
print(" - Received {received} of {expected} response(s).".format(**context))
16+
print(" - Request was sent to {filter} on path {path}".format(**kwargs))
17+
print(" - Received {received} of {expected} response(s).".format(**kwargs))
1818

1919

20+
filter_response_handler = diffusion.handlers.EventStreamHandler(
21+
response=on_filter_response
22+
)
23+
2024
# request properties
2125
request = "Pushme Pullyou"
2226
path = "path"
@@ -32,24 +36,23 @@ async def main():
3236
url=server_url, principal=principal, credentials=credentials
3337
) as session:
3438

35-
# instantiating the messaging component
36-
messaging = diffusion.Messaging(session)
37-
3839
# specifying the session filter
3940
# this is a very simple filter, addressing all the sessions
4041
# with the same principal as the current session
4142
session_filter = f"$Principal is '{principal}'"
4243

4344
# adding filter response handler
44-
messaging.add_filter_response_handler(
45+
session.messaging.add_filter_response_handler(
4546
session_filter=session_filter, callback=on_filter_response
4647
)
4748

4849
# sending the request and receiving the number of expected responses
4950
print(f"Sending request: '{request}' to session filter `{session_filter}`...")
5051
try:
51-
response = await messaging.send_request_to_filter(
52-
session_filter=session_filter, path=path, request=request_type(request),
52+
response = await session.messaging.send_request_to_filter(
53+
session_filter=session_filter,
54+
path=path,
55+
request=request_type(request),
5356
)
5457
except diffusion.DiffusionError as ex:
5558
print(f"ERROR: {ex}")

0 commit comments

Comments
 (0)