|
28 | 28 |
|
29 | 29 | # internal: |
30 | 30 | from IPython.utils.importstring import import_item |
| 31 | +from IPython.utils.py3compat import cast_bytes |
31 | 32 | from IPython.utils.traitlets import ( |
32 | 33 | HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName |
33 | 34 | ) |
@@ -441,7 +442,7 @@ def _validate_targets(self, targets): |
441 | 442 | for t in targets: |
442 | 443 | # map raw identities to ids |
443 | 444 | if isinstance(t, (str,unicode)): |
444 | | - t = self.by_ident.get(t, t) |
| 445 | + t = self.by_ident.get(cast_bytes(t), t) |
445 | 446 | _targets.append(t) |
446 | 447 | targets = _targets |
447 | 448 | bad_targets = [ t for t in targets if t not in self.ids ] |
@@ -719,8 +720,8 @@ def save_task_result(self, idents, msg): |
719 | 720 | self.unassigned.remove(msg_id) |
720 | 721 |
|
721 | 722 | header = msg['header'] |
722 | | - engine_uuid = header.get('engine', None) |
723 | | - eid = self.by_ident.get(engine_uuid, None) |
| 723 | + engine_uuid = header.get('engine', u'') |
| 724 | + eid = self.by_ident.get(cast_bytes(engine_uuid), None) |
724 | 725 |
|
725 | 726 | status = header.get('status', None) |
726 | 727 |
|
@@ -763,7 +764,7 @@ def save_task_destination(self, idents, msg): |
763 | 764 | # print (content) |
764 | 765 | msg_id = content['msg_id'] |
765 | 766 | engine_uuid = content['engine_id'] |
766 | | - eid = self.by_ident[util.asbytes(engine_uuid)] |
| 767 | + eid = self.by_ident[cast_bytes(engine_uuid)] |
767 | 768 |
|
768 | 769 | self.log.info("task::task %r arrived on %r", msg_id, eid) |
769 | 770 | if msg_id in self.unassigned: |
@@ -853,13 +854,13 @@ def register_engine(self, reg, msg): |
853 | 854 | """Register a new engine.""" |
854 | 855 | content = msg['content'] |
855 | 856 | try: |
856 | | - queue = util.asbytes(content['queue']) |
| 857 | + queue = cast_bytes(content['queue']) |
857 | 858 | except KeyError: |
858 | 859 | self.log.error("registration::queue not specified", exc_info=True) |
859 | 860 | return |
860 | 861 | heart = content.get('heartbeat', None) |
861 | 862 | if heart: |
862 | | - heart = util.asbytes(heart) |
| 863 | + heart = cast_bytes(heart) |
863 | 864 | """register a new engine, and create the socket(s) necessary""" |
864 | 865 | eid = self._next_id |
865 | 866 | # print (eid, queue, reg, heart) |
|
0 commit comments