forked from hologram-io/python-messaging
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubmit.py
More file actions
333 lines (267 loc) · 10.3 KB
/
Copy pathsubmit.py
File metadata and controls
333 lines (267 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# See LICENSE
"""Classes for sending SMS"""
from datetime import datetime, timedelta
import re
import logging
from messaging.sms import consts
from messaging.utils import (encode_str, clean_number,
pack_8bits_to_ucs2, pack_8bits_to_7bits,
pack_8bits_to_8bit,
timedelta_to_relative_validity,
datetime_to_absolute_validity)
from messaging.sms.base import SmsBase
from messaging.sms.gsm0338 import is_valid_gsm
from messaging.sms.pdu import Pdu
VALID_NUMBER = re.compile(r"^\+?\d{3,20}$")
class SmsSubmit(SmsBase):
"""I am a SMS ready to be sent"""
def __init__(self, number, text):
super(SmsSubmit, self).__init__()
self._number = None
self._csca = None
self._klass = None
self._validity = None
self.request_status = False
self.ref = None
self.rand_id = None
self.id_list = range(0, 255)
self.msgvp = 0xaa
self.pid = 0x00
self.number = number
self.text = text
self.text_gsm = None
def _set_number(self, number):
if number and not VALID_NUMBER.match(number):
raise ValueError("Invalid number format: %s" % number)
self._number = number
number = property(lambda self: self._number, _set_number)
def _set_csca(self, csca):
if csca and not VALID_NUMBER.match(csca):
raise ValueError("Invalid csca format: %s" % csca)
self._csca = csca
csca = property(lambda self: self._csca, _set_csca)
def _set_validity(self, validity):
if validity is None or isinstance(validity, (timedelta, datetime)):
# valid values are None, timedelta and datetime
self._validity = validity
else:
raise TypeError("Don't know what to do with %s" % validity)
validity = property(lambda self: self._validity, _set_validity)
def _set_klass(self, klass):
if not isinstance(klass, int):
raise TypeError("_set_klass only accepts int objects")
if klass not in [0, 1, 2, 3]:
raise ValueError("class must be between 0 and 3")
self._klass = klass
klass = property(lambda self: self._klass, _set_klass)
def to_pdu(self):
"""Returns a list of :class:`~messaging.pdu.Pdu` objects"""
smsc_pdu = self._get_smsc_pdu()
sms_submit_pdu = self._get_sms_submit_pdu()
tpmessref_pdu = self._get_tpmessref_pdu()
sms_phone_pdu = self._get_phone_pdu()
tppid_pdu = self._get_tppid_pdu()
sms_msg_pdu = self._get_msg_pdu()
if len(sms_msg_pdu) == 1:
pdu = smsc_pdu
len_smsc = len(smsc_pdu) / 2
pdu += sms_submit_pdu
pdu += tpmessref_pdu
pdu += sms_phone_pdu
pdu += tppid_pdu
pdu += sms_msg_pdu[0]
logging.debug("smsc_pdu: %s" % smsc_pdu)
logging.debug("sms_submit_pdu: %s" % sms_submit_pdu)
logging.debug("tpmessref_pdu: %s" % tpmessref_pdu)
logging.debug("sms_phone_pdu: %s" % sms_phone_pdu)
logging.debug("tppid_pdu: %s" % tppid_pdu)
logging.debug("sms_msg_pdu: %s" % sms_msg_pdu)
logging.debug("-" * 20)
logging.debug("full_pdu: %s" % pdu)
logging.debug("full_text: %s" % self.text)
logging.debug("-" * 20)
return [Pdu(pdu, len_smsc)]
# multipart SMS
sms_submit_pdu = self._get_sms_submit_pdu(udh=True)
pdu_list = []
cnt = len(sms_msg_pdu)
for i, sms_msg_pdu_item in enumerate(sms_msg_pdu):
pdu = smsc_pdu
len_smsc = len(smsc_pdu) / 2
pdu += sms_submit_pdu
pdu += tpmessref_pdu
pdu += sms_phone_pdu
pdu += tppid_pdu
pdu += sms_msg_pdu_item
logging.debug("smsc_pdu: %s" % smsc_pdu)
logging.debug("sms_submit_pdu: %s" % sms_submit_pdu)
logging.debug("tpmessref_pdu: %s" % tpmessref_pdu)
logging.debug("sms_phone_pdu: %s" % sms_phone_pdu)
logging.debug("tppid_pdu: %s" % tppid_pdu)
logging.debug("sms_msg_pdu: %s" % sms_msg_pdu_item)
logging.debug("-" * 20)
logging.debug("full_pdu: %s" % pdu)
logging.debug("full_text: %s" % self.text)
logging.debug("-" * 20)
pdu_list.append(Pdu(pdu, len_smsc, cnt=cnt, seq=i + 1))
return pdu_list
def _get_smsc_pdu(self):
if not self.csca or not self.csca.strip():
return "00"
number = clean_number(self.csca)
ptype = 0x81 # set to unknown number by default
if number[0] == '+':
number = number[1:]
ptype = 0x91
if len(number) % 2:
number += 'F'
ps = chr(ptype)
for n in range(0, len(number), 2):
num = number[n + 1] + number[n]
ps += chr(int(num, 16))
pl = len(ps)
ps = chr(pl) + ps
return encode_str(ps)
def _get_tpmessref_pdu(self):
if self.ref is None:
self.ref = self._get_rand_id()
self.ref &= 0xFF
return encode_str(chr(self.ref))
def _get_phone_pdu(self):
number = clean_number(self.number)
ptype = 0x81
if number[0] == '+':
number = number[1:]
ptype = 0x91
pl = len(number)
if len(number) % 2:
number += 'F'
ps = chr(ptype)
for n in range(0, len(number), 2):
num = number[n + 1] + number[n]
ps += chr(int(num, 16))
ps = chr(pl) + ps
return encode_str(ps)
def _get_tppid_pdu(self):
return encode_str(chr(self.pid))
def _get_sms_submit_pdu(self, udh=False):
sms_submit = 0x1
if self.validity is None:
# handle no validity
pass
elif isinstance(self.validity, datetime):
# handle absolute validity
sms_submit |= 0x18
elif isinstance(self.validity, timedelta):
# handle relative validity
sms_submit |= 0x10
if self.request_status:
sms_submit |= 0x20
if udh:
sms_submit |= 0x40
return encode_str(chr(sms_submit))
def _get_msg_pdu(self):
# Data coding scheme
if self.fmt is None:
if is_valid_gsm(self.text):
self.fmt = 0x00
else:
self.fmt = 0x08
self.dcs = self.fmt
if self.klass is not None:
if self.klass == 0:
self.dcs |= 0x10
elif self.klass == 1:
self.dcs |= 0x11
elif self.klass == 2:
self.dcs |= 0x12
elif self.klass == 3:
self.dcs |= 0x13
dcs_pdu = encode_str(chr(self.dcs))
# Validity period
msgvp_pdu = ""
if self.validity is None:
# handle no validity
pass
elif isinstance(self.validity, timedelta):
# handle relative
msgvp = timedelta_to_relative_validity(self.validity)
msgvp_pdu = encode_str(chr(msgvp))
elif isinstance(self.validity, datetime):
# handle absolute
msgvp = datetime_to_absolute_validity(self.validity)
msgvp_pdu = ''.join(map(encode_str, map(chr, msgvp)))
# UDL + UD
message_pdu = ""
if self.fmt == 0x00:
self.text_gsm = self.text.encode("gsm0338")
if len(self.text_gsm) <= consts.SEVENBIT_SIZE:
message_pdu = [pack_8bits_to_7bits(self.text_gsm)]
else:
message_pdu = self._split_sms_message(self.text_gsm)
elif self.fmt == 0x04:
if len(self.text) <= consts.EIGHTBIT_SIZE:
message_pdu = [pack_8bits_to_8bit(self.text)]
else:
message_pdu = self._split_sms_message(self.text)
elif self.fmt == 0x08:
if len(self.text) <= consts.UCS2_SIZE:
message_pdu = [pack_8bits_to_ucs2(self.text)]
else:
message_pdu = self._split_sms_message(self.text)
else:
raise ValueError("Unknown data coding scheme: %d" % self.fmt)
ret = []
for msg in message_pdu:
ret.append(dcs_pdu + msgvp_pdu + msg)
return ret
def _split_sms_message(self, text):
if self.fmt == 0x00:
len_without_udh = consts.SEVENBIT_MP_SIZE
limit = consts.SEVENBIT_SIZE
packing_func = pack_8bits_to_7bits
total_len = len(self.text_gsm)
elif self.fmt == 0x04:
len_without_udh = consts.EIGHTBIT_MP_SIZE
limit = consts.EIGHTBIT_SIZE
packing_func = pack_8bits_to_8bit
total_len = len(self.text)
elif self.fmt == 0x08:
len_without_udh = consts.UCS2_MP_SIZE
limit = consts.UCS2_SIZE
packing_func = pack_8bits_to_ucs2
total_len = len(self.text)
msgs = []
pi, pe = 0, len_without_udh
while pi < total_len:
if text[pi:pe][-1] == '\x1b':
pe -= 1
msgs.append(text[pi:pe])
pi = pe
pe += len_without_udh
pdu_msgs = []
udh_len = 0x05
mid = 0x00
data_len = 0x03
sms_ref = self._get_rand_id() if self.rand_id is None else self.rand_id
sms_ref &= 0xFF
for i, msg in enumerate(msgs):
if isinstance(msg, bytes):
msg = msg.decode()
i += 1
total_parts = len(msgs)
if limit == consts.SEVENBIT_SIZE:
udh = (chr(udh_len) + chr(mid) + chr(data_len) +
chr(sms_ref) + chr(total_parts) + chr(i))
padding = " "
else:
udh = (chr(int("%04x" % ((udh_len << 8) | mid), 16)) +
chr(int("%04x" % ((data_len << 8) | sms_ref), 16)) +
chr(int("%04x" % ((total_parts << 8) | i), 16)))
padding = ""
pdu_msgs.append(packing_func(padding + msg, udh))
return pdu_msgs
def _get_rand_id(self):
if not self.id_list:
self.id_list = range(0, 255)
return list(self.id_list).pop(0)