Skip to content

Commit d222f4c

Browse files
Support entering and leaving rooms through pubsub client managers
1 parent 3f78af2 commit d222f4c

4 files changed

Lines changed: 207 additions & 0 deletions

File tree

src/socketio/asyncio_pubsub_manager.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,25 @@ async def disconnect(self, sid, namespace, **kwargs):
9090
await self._handle_disconnect(message) # handle in this host
9191
await self._publish(message) # notify other hosts
9292

93+
async def enter_room(self, sid, namespace, room, eio_sid=None):
94+
if self.is_connected(sid, namespace):
95+
# client is in this server, so we can disconnect directly
96+
return await super().enter_room(sid, namespace, room,
97+
eio_sid=eio_sid)
98+
else:
99+
message = {'method': 'enter_room', 'sid': sid, 'room': room,
100+
'namespace': namespace or '/', 'host_id': self.host_id}
101+
await self._publish(message) # notify other hosts
102+
103+
async def leave_room(self, sid, namespace, room):
104+
if self.is_connected(sid, namespace):
105+
# client is in this server, so we can disconnect directly
106+
return await super().leave_room(sid, namespace, room)
107+
else:
108+
message = {'method': 'leave_room', 'sid': sid, 'room': room,
109+
'namespace': namespace or '/', 'host_id': self.host_id}
110+
await self._publish(message) # notify other hosts
111+
93112
async def close_room(self, room, namespace=None):
94113
message = {'method': 'close_room', 'room': room,
95114
'namespace': namespace or '/', 'host_id': self.host_id}
@@ -158,6 +177,18 @@ async def _handle_disconnect(self, message):
158177
namespace=message.get('namespace'),
159178
ignore_queue=True)
160179

180+
async def _handle_enter_room(self, message):
181+
sid = message.get('sid')
182+
namespace = message.get('namespace')
183+
if self.is_connected(sid, namespace):
184+
await super().enter_room(sid, namespace, message.get('room'))
185+
186+
async def _handle_leave_room(self, message):
187+
sid = message.get('sid')
188+
namespace = message.get('namespace')
189+
if self.is_connected(sid, namespace):
190+
await super().leave_room(sid, namespace, message.get('room'))
191+
161192
async def _handle_close_room(self, message):
162193
await super().close_room(room=message.get('room'),
163194
namespace=message.get('namespace'))
@@ -191,6 +222,10 @@ async def _thread(self):
191222
await self._handle_emit(data)
192223
elif data['method'] == 'disconnect':
193224
await self._handle_disconnect(data)
225+
elif data['method'] == 'enter_room':
226+
await self._handle_enter_room(data)
227+
elif data['method'] == 'leave_room':
228+
await self._handle_leave_room(data)
194229
elif data['method'] == 'close_room':
195230
await self._handle_close_room(data)
196231
except asyncio.CancelledError:

src/socketio/pubsub_manager.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,24 @@ def disconnect(self, sid, namespace=None, **kwargs):
8787
self._handle_disconnect(message) # handle in this host
8888
self._publish(message) # notify other hosts
8989

90+
def enter_room(self, sid, namespace, room, eio_sid=None):
91+
if self.is_connected(sid, namespace):
92+
# client is in this server, so we can add to the room directly
93+
return super().enter_room(sid, namespace, room, eio_sid=eio_sid)
94+
else:
95+
message = {'method': 'enter_room', 'sid': sid, 'room': room,
96+
'namespace': namespace or '/', 'host_id': self.host_id}
97+
self._publish(message) # notify other hosts
98+
99+
def leave_room(self, sid, namespace, room):
100+
if self.is_connected(sid, namespace):
101+
# client is in this server, so we can remove from the room directly
102+
return super().leave_room(sid, namespace, room)
103+
else:
104+
message = {'method': 'leave_room', 'sid': sid, 'room': room,
105+
'namespace': namespace or '/', 'host_id': self.host_id}
106+
self._publish(message) # notify other hosts
107+
90108
def close_room(self, room, namespace=None):
91109
message = {'method': 'close_room', 'room': room,
92110
'namespace': namespace or '/', 'host_id': self.host_id}
@@ -153,6 +171,18 @@ def _handle_disconnect(self, message):
153171
namespace=message.get('namespace'),
154172
ignore_queue=True)
155173

174+
def _handle_enter_room(self, message):
175+
sid = message.get('sid')
176+
namespace = message.get('namespace')
177+
if self.is_connected(sid, namespace):
178+
super().enter_room(sid, namespace, message.get('room'))
179+
180+
def _handle_leave_room(self, message):
181+
sid = message.get('sid')
182+
namespace = message.get('namespace')
183+
if self.is_connected(sid, namespace):
184+
super().leave_room(sid, namespace, message.get('room'))
185+
156186
def _handle_close_room(self, message):
157187
super().close_room(room=message.get('room'),
158188
namespace=message.get('namespace'))
@@ -184,6 +214,10 @@ def _thread(self):
184214
self._handle_emit(data)
185215
elif data['method'] == 'disconnect':
186216
self._handle_disconnect(data)
217+
elif data['method'] == 'enter_room':
218+
self._handle_enter_room(data)
219+
elif data['method'] == 'leave_room':
220+
self._handle_leave_room(data)
187221
elif data['method'] == 'close_room':
188222
self._handle_close_room(data)
189223
except:

tests/asyncio/test_asyncio_pubsub_manager.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,27 @@ def test_disconnect_ignore_queue(self):
181181
self.pm._publish.mock.assert_not_called()
182182
assert self.pm.is_connected(sid, '/') is False
183183

184+
def test_enter_room(self):
185+
sid = self.pm.connect('123', '/')
186+
_run(self.pm.enter_room(sid, '/', 'foo'))
187+
_run(self.pm.enter_room('456', '/', 'foo'))
188+
assert sid in self.pm.rooms['/']['foo']
189+
assert self.pm.rooms['/']['foo'][sid] == '123'
190+
self.pm._publish.mock.assert_called_once_with(
191+
{'method': 'enter_room', 'sid': '456', 'room': 'foo',
192+
'namespace': '/', 'host_id': '123456'}
193+
)
194+
195+
def test_leave_room(self):
196+
sid = self.pm.connect('123', '/')
197+
_run(self.pm.leave_room(sid, '/', 'foo'))
198+
_run(self.pm.leave_room('456', '/', 'foo'))
199+
assert 'foo' not in self.pm.rooms['/']
200+
self.pm._publish.mock.assert_called_once_with(
201+
{'method': 'leave_room', 'sid': '456', 'room': 'foo',
202+
'namespace': '/', 'host_id': '123456'}
203+
)
204+
184205
def test_close_room(self):
185206
_run(self.pm.close_room('foo'))
186207
self.pm._publish.mock.assert_called_once_with(
@@ -413,6 +434,48 @@ def test_handle_disconnect(self):
413434
sid='123', namespace='/foo', ignore_queue=True
414435
)
415436

437+
def test_handle_enter_room(self):
438+
sid = self.pm.connect('123', '/')
439+
with mock.patch.object(
440+
asyncio_manager.AsyncManager, 'enter_room', new=AsyncMock()
441+
) as super_enter_room:
442+
_run(
443+
self.pm._handle_enter_room(
444+
{'method': 'enter_room', 'sid': sid, 'namespace': '/',
445+
'room': 'foo'}
446+
)
447+
)
448+
_run(
449+
self.pm._handle_enter_room(
450+
{'method': 'enter_room', 'sid': '456', 'namespace': '/',
451+
'room': 'foo'}
452+
)
453+
)
454+
super_enter_room.mock.assert_called_once_with(
455+
self.pm, sid, '/', 'foo'
456+
)
457+
458+
def test_handle_leave_room(self):
459+
sid = self.pm.connect('123', '/')
460+
with mock.patch.object(
461+
asyncio_manager.AsyncManager, 'leave_room', new=AsyncMock()
462+
) as super_leave_room:
463+
_run(
464+
self.pm._handle_leave_room(
465+
{'method': 'leave_room', 'sid': sid, 'namespace': '/',
466+
'room': 'foo'}
467+
)
468+
)
469+
_run(
470+
self.pm._handle_leave_room(
471+
{'method': 'leave_room', 'sid': '456', 'namespace': '/',
472+
'room': 'foo'}
473+
)
474+
)
475+
super_leave_room.mock.assert_called_once_with(
476+
self.pm, sid, '/', 'foo'
477+
)
478+
416479
def test_handle_close_room(self):
417480
with mock.patch.object(
418481
asyncio_manager.AsyncManager, 'close_room', new=AsyncMock()
@@ -447,6 +510,8 @@ def test_background_thread(self):
447510
self.pm._handle_emit = AsyncMock()
448511
self.pm._handle_callback = AsyncMock()
449512
self.pm._handle_disconnect = AsyncMock()
513+
self.pm._handle_enter_room = AsyncMock()
514+
self.pm._handle_leave_room = AsyncMock()
450515
self.pm._handle_close_room = AsyncMock()
451516
host_id = self.pm.host_id
452517

@@ -461,6 +526,10 @@ async def messages():
461526
yield {'method': 'bogus', 'host_id': 'x'}
462527
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
463528
'host_id': 'x'})
529+
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
530+
'room': 'room', 'host_id': 'x'}
531+
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
532+
'room': 'room', 'host_id': 'x'}
464533
yield 'bad json'
465534
yield b'bad pickled'
466535

@@ -490,6 +559,14 @@ async def messages():
490559
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
491560
'host_id': 'x'}
492561
)
562+
self.pm._handle_enter_room.mock.assert_called_once_with(
563+
{'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
564+
'room': 'room', 'host_id': 'x'}
565+
)
566+
self.pm._handle_leave_room.mock.assert_called_once_with(
567+
{'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
568+
'room': 'room', 'host_id': 'x'}
569+
)
493570
self.pm._handle_close_room.mock.assert_called_once_with(
494571
{'method': 'close_room', 'value': 'baz', 'host_id': 'x'}
495572
)

tests/common/test_pubsub_manager.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,27 @@ def test_disconnect_ignore_queue(self):
187187
self.pm._publish.assert_not_called()
188188
assert not self.pm.is_connected(sid, '/')
189189

190+
def test_enter_room(self):
191+
sid = self.pm.connect('123', '/')
192+
self.pm.enter_room(sid, '/', 'foo')
193+
self.pm.enter_room('456', '/', 'foo')
194+
assert sid in self.pm.rooms['/']['foo']
195+
assert self.pm.rooms['/']['foo'][sid] == '123'
196+
self.pm._publish.assert_called_once_with(
197+
{'method': 'enter_room', 'sid': '456', 'room': 'foo',
198+
'namespace': '/', 'host_id': '123456'}
199+
)
200+
201+
def test_leave_room(self):
202+
sid = self.pm.connect('123', '/')
203+
self.pm.leave_room(sid, '/', 'foo')
204+
self.pm.leave_room('456', '/', 'foo')
205+
assert 'foo' not in self.pm.rooms['/']
206+
self.pm._publish.assert_called_once_with(
207+
{'method': 'leave_room', 'sid': '456', 'room': 'foo',
208+
'namespace': '/', 'host_id': '123456'}
209+
)
210+
190211
def test_close_room(self):
191212
self.pm.close_room('foo')
192213
self.pm._publish.assert_called_once_with(
@@ -373,6 +394,32 @@ def test_handle_disconnect(self):
373394
sid='123', namespace='/foo', ignore_queue=True
374395
)
375396

397+
def test_handle_enter_room(self):
398+
sid = self.pm.connect('123', '/')
399+
with mock.patch.object(
400+
base_manager.BaseManager, 'enter_room'
401+
) as super_enter_room:
402+
self.pm._handle_enter_room({
403+
'method': 'enter_room', 'sid': sid, 'namespace': '/',
404+
'room': 'foo'})
405+
self.pm._handle_enter_room({
406+
'method': 'enter_room', 'sid': '456', 'namespace': '/',
407+
'room': 'foo'})
408+
super_enter_room.assert_called_once_with(sid, '/', 'foo')
409+
410+
def test_handle_leave_room(self):
411+
sid = self.pm.connect('123', '/')
412+
with mock.patch.object(
413+
base_manager.BaseManager, 'leave_room'
414+
) as super_leave_room:
415+
self.pm._handle_leave_room({
416+
'method': 'leave_room', 'sid': sid, 'namespace': '/',
417+
'room': 'foo'})
418+
self.pm._handle_leave_room({
419+
'method': 'leave_room', 'sid': '456', 'namespace': '/',
420+
'room': 'foo'})
421+
super_leave_room.assert_called_once_with(sid, '/', 'foo')
422+
376423
def test_handle_close_room(self):
377424
with mock.patch.object(
378425
base_manager.BaseManager, 'close_room'
@@ -397,6 +444,8 @@ def test_background_thread(self):
397444
self.pm._handle_emit = mock.MagicMock()
398445
self.pm._handle_callback = mock.MagicMock()
399446
self.pm._handle_disconnect = mock.MagicMock()
447+
self.pm._handle_enter_room = mock.MagicMock()
448+
self.pm._handle_leave_room = mock.MagicMock()
400449
self.pm._handle_close_room = mock.MagicMock()
401450
host_id = self.pm.host_id
402451

@@ -411,6 +460,10 @@ def messages():
411460
yield {'method': 'bogus', 'host_id': 'x'}
412461
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
413462
'host_id': 'x'})
463+
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
464+
'room': 'room', 'host_id': 'x'}
465+
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
466+
'room': 'room', 'host_id': 'x'}
414467
yield 'bad json'
415468
yield b'bad pickled'
416469

@@ -442,6 +495,14 @@ def messages():
442495
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
443496
'host_id': 'x'}
444497
)
498+
self.pm._handle_enter_room.assert_called_once_with(
499+
{'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
500+
'room': 'room', 'host_id': 'x'}
501+
)
502+
self.pm._handle_leave_room.assert_called_once_with(
503+
{'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
504+
'room': 'room', 'host_id': 'x'}
505+
)
445506
self.pm._handle_close_room.assert_called_once_with(
446507
{'method': 'close_room', 'value': 'baz', 'host_id': 'x'}
447508
)

0 commit comments

Comments
 (0)