44import logging
55import logging .config
66
7- from collections import namedtuple
87
8+ class Message (object ):
9+ def __init__ (self , ** kwargs ):
10+ for k in self .attr_list :
11+ if k not in kwargs :
12+ raise ValueError ("missing {}" .format (k ))
13+ setattr (self , k , kwargs [k ])
914
10- def _writeline (s ):
11- sys .stdout .write (s + '\n ' )
15+ def asdict (self ):
16+ res = {k : getattr (self , k ) for k in self .attr_list }
17+ res ['type' ] = self ._type
18+ return res
19+
20+ def __eq__ (self , other ):
21+ return self .asdict () == other .asdict ()
22+
23+ def __repr__ (self ):
24+ attrstr = ", " .join (
25+ "{}={}" .format (k , getattr (self , k )) for k in self .attr_list )
26+ return "{}({})" .format (self .__class__ .__name__ , attrstr )
27+
28+ def tojson (self ):
29+ return json .dumps (self .asdict ())
30+
31+
32+ class RecordMessage (Message ):
33+ _type = 'RECORD'
34+ attr_list = ['stream' , 'record' ]
35+
36+
37+ class SchemaMessage (Message ):
38+ _type = 'SCHEMA'
39+ attr_list = ['stream' , 'schema' , 'key_properties' ]
40+
41+
42+ class StateMessage (Message ):
43+ _type = 'STATE'
44+ attr_list = ['value' ]
45+
46+
47+ def to_json (message ):
48+ m = vars (message )
49+ if isinstance (message , RecordMessage ):
50+ m ['type' ] = 'RECORD'
51+ elif isinstance (message , SchemaMessage ):
52+ m ['type' ] = 'SCHEMA'
53+ elif isinstance (message , StateMessage ):
54+ m ['type' ] = 'STATE'
55+ else :
56+ raise Exception ('Unrecognized message {}' .format (message ))
57+ return json .dumps (m )
58+
59+
60+ def _write_message (message ):
61+ sys .stdout .write (to_json (message ) + '\n ' )
1262 sys .stdout .flush ()
1363
1464
@@ -17,9 +67,7 @@ def write_record(stream_name, record):
1767
1868 >>> write_record("users", {"id": 2, "email": "mike@stitchdata.com"})
1969 """
20- _writeline (json .dumps ({'type' : 'RECORD' ,
21- 'stream' : stream_name ,
22- 'record' : record }))
70+ _write_message (RecordMessage (stream = stream_name , record = record ))
2371
2472
2573def write_records (stream_name , records ):
@@ -45,19 +93,44 @@ def write_schema(stream_name, schema, key_properties):
4593 key_properties = [key_properties ]
4694 if not isinstance (key_properties , list ):
4795 raise Exception ("key_properties must be a string or list of strings" )
48- _writeline (json .dumps ({'type' : 'SCHEMA' ,
49- 'stream' : stream_name ,
50- 'key_properties' : key_properties ,
51- 'schema' : schema }))
96+ _write_message (
97+ SchemaMessage (
98+ stream = stream_name ,
99+ schema = schema ,
100+ key_properties = key_properties ))
52101
53102
54103def write_state (value ):
55104 """Write a state message.
56105
57106 >>> write_state({'last_updated_at': '2017-02-14T09:21:00'})
58107 """
59- _writeline (json .dumps ({'type' : 'STATE' ,
60- 'value' : value }))
108+ _write_message (StateMessage (value ))
109+
110+
111+ def _required_key (msg , k ):
112+ if k not in msg :
113+ raise Exception ("Message is missing required key '{}': {}" .format (
114+ k , msg ))
115+ return msg [k ]
116+
117+
118+ def parse_message (s ):
119+ """Parse a message string into a Message object."""
120+ o = json .loads (s )
121+ t = _required_key (o , 'type' )
122+
123+ if t == 'RECORD' :
124+ return RecordMessage (stream = _required_key (o , 'stream' ),
125+ record = _required_key (o , 'record' ))
126+
127+ elif t == 'SCHEMA' :
128+ return SchemaMessage (stream = _required_key (o , 'stream' ),
129+ schema = _required_key (o , 'schema' ),
130+ key_properties = _required_key (o , 'key_properties' ))
131+
132+ elif t == 'STATE' :
133+ return StateMessage (value = _required_key (o , 'value' ))
61134
62135
63136def get_logger ():
0 commit comments