-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathparser.py
More file actions
executable file
·676 lines (561 loc) · 24.9 KB
/
parser.py
File metadata and controls
executable file
·676 lines (561 loc) · 24.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
#!/usr/bin/env python
# encoding: utf-8
"""
parser.py
Parse a iterator with family info, this can be a file handle, a file stream,
a list of strings etc.
The family info can be in several formats, these are .ped , .fam,
.txt(extended ped format).
.ped and .fam always have 6 columns, these are
Family_ID - '.' or '0' for unknown
Individual_ID - '.' or '0' for unknown
Paternal_ID - '.' or '0' for unknown
Maternal_ID - '.' or '0' for unknown
Sex - '1'=male; '2'=female; ['other', '0', '.']=unknown
Phenotype - '1'=unaffected, '2'=affected, ['-9', '0', '.']= missing,
The other types must specify the columns in the header.
Header allways start with '#'.
These files allways start with the ped columns described above.
The following column names will be treated with care, which means that they
will be used when outputting a madeline type of file or makes accesable
variables in the parser:
'InheritanceModel' - a ';'-separated list of expected inheritance models.
Choices are:
['AR','AR_hom','AR_denovo','AR_hom_denovo','AR_hom_dn','AR_dn',
'AR_compound','AR_comp','AD','AD_dn','AD_denovo','X','X_dn',
'X_denovo','NA','Na','na','.']
'Proband' - 'Yes', 'No', 'Unknown' or '.'. A proband is the first affected
member of a pedigree coming to medical attention.
'Consultand' - 'Yes', 'No', 'Unknown' or '.'. A consultand is an individual
who has sought genetic counseling or testing.
'Alive' - 'Yes', 'No', 'Unknown' or '.'
Create a family object and its family members from different types of input file
Created by Måns Magnusson on 2013-01-17.
Copyright (c) 2013 __MoonsoInc__. All rights reserved.
"""
from __future__ import print_function
import json
import logging
import click
from string import whitespace
from ped_parser import (Individual, Family)
from ped_parser.log import init_log
from ped_parser.exceptions import (WrongAffectionStatus, WrongPhenotype,
WrongGender, PedigreeError, WrongLineFormat)
############### Names of genetic models ###############
# These are stored as global variables and can be altered is the user
# prefer other model names or want to add names
AR_HOM_NAMES = ['AR', 'AR_hom']
AR_HOM_DN_NAMES = ['AR_denovo', 'AR_hom_denovo', 'AR_hom_dn', 'AR_dn']
COMPOUND_NAMES = ['AR_compound', 'AR_comp']
AD_NAMES = ['AD', 'AD_dn', 'AD_denovo']
X_NAMES = ['X', 'X_dn', 'X_denovo']
NA_NAMES = ['NA', 'Na', 'na', '.']
class FamilyParser(object):
"""
Parses a iterator with family info and creates a family object with
individuals.
"""
def __init__(self, family_info, family_type = 'ped', cmms_check=False):
"""
Arguments:
family_info (iterator)
family_type (str): Any of [ped, alt, cmms, fam, mip]
cmms_check (bool, optional): Perform CMMS validations?
"""
super(FamilyParser, self).__init__()
if __name__ == "__main__":
self.logger = logging.getLogger("ped_parser.FamilyParser")
else:
self.logger = logging.getLogger(__name__)
self.logger.info("Initializing family parser")
self.cmms_check = cmms_check
self.family_type = family_type
self.logger.info("Family type:{0}".format(family_type))
self.families = {}
self.individuals = {}
self.legal_ar_hom_names = AR_HOM_NAMES
self.logger.debug("Legal AR hom names:{0}".format(AR_HOM_NAMES))
self.legal_ar_hom_dn_names = AR_HOM_DN_NAMES
self.logger.debug("Legal AR dn names:{0}".format(AR_HOM_DN_NAMES))
self.legal_compound_names = COMPOUND_NAMES
self.logger.debug("Legal AR compound names:{0}".format(COMPOUND_NAMES))
self.legal_ad_names = AD_NAMES
self.logger.debug("Legal AD compound names:{0}".format(AD_NAMES))
self.legal_x_names = X_NAMES
self.logger.debug("Legal X hom names:{0}".format(X_NAMES))
self.legal_na_names = NA_NAMES
self.logger.debug("Legal NA names:{0}".format(NA_NAMES))
self.header = ['family_id', 'sample_id', 'father_id',
'mother_id', 'sex', 'phenotype']
if self.family_type in ['ped', 'fam']:
self.ped_parser(family_info)
elif self.family_type == 'alt':
self.alternative_parser(family_info)
elif self.family_type in ['cmms', 'mip']:
self.alternative_parser(family_info)
# elif family_type == 'broad':
# self.broad_parser(individual_line, line_count)
for fam in self.families:
self.families[fam].family_check()
def get_individual(self, family_id, sample_id, father_id, mother_id, sex, phenotype,
genetic_models = None, proband='.', consultand='.', alive='.'):
"""
Return a individual object based on the indata.
Arguments:
family_id (str): The id for this family
sample_id (str): The id for this sample
father_id (str): The id for this samples father
mother_id (str): The id for this samples mother
sex (str): The id for the sex of this sample
phenotype (str): The id for the phenotype of this sample
genetic_models (str): A ';'-separated string with the expected
models of inheritance for this sample
proband (str): 'Yes', 'No' or '.'
consultand (str): 'Yes', 'No' or '.' if the individual is sequenced
alive (str): 'Yes', 'No' or '.'
returns:
individual (Individual): A Individual object with the information
"""
if sex not in ['1', '2']:
sex = '0'
if phenotype not in ['1', '2']:
phenotype = '0'
if mother_id == '.':
mother_id = '0'
if father_id == '.':
father_id = '0'
if genetic_models:
genetic_models = genetic_models.split(';')
if proband == 'Yes':
proband = 'Y'
elif proband == 'No':
proband = 'N'
else:
proband = '.'
if consultand == 'Yes':
consultand = 'Y'
elif consultand == 'No':
consultand = 'N'
else:
consultand = '.'
if alive == 'Yes':
alive = 'Y'
elif alive == 'No':
alive = 'N'
else:
alive = '.'
individual = Individual(
sample_id,
family_id,
mother_id,
father_id,
sex,
phenotype,
genetic_models,
proband,
consultand,
alive
)
return individual
def check_line_length(self, splitted_line, expected_length):
"""
Check if the line is correctly formated. Throw a SyntaxError if it is not.
"""
if len(splitted_line) != expected_length:
raise WrongLineFormat(
message='WRONG FORMATED PED LINE!',
ped_line = '\t'.join(splitted_line))
return
def ped_parser(self, family_info):
"""
Parse .ped formatted family info.
Add all family info to the parser object
Arguments:
family_info (iterator): An iterator with family info
"""
for line in family_info:
# Check if commented line or empty line:
if not line.startswith('#') and not all(c in whitespace for c in line.rstrip()):
splitted_line = line.rstrip().split('\t')
if len(splitted_line) != 6:
# Try to split the line on another symbol:
splitted_line = line.rstrip().split()
try:
self.check_line_length(splitted_line, 6)
except WrongLineFormat as e:
self.logger.error(e)
self.logger.info("Ped line: {0}".format(e.ped_line))
raise e
sample_dict = dict(zip(self.header, splitted_line))
family_id = sample_dict['family_id']
if sample_dict['family_id'] not in self.families:
self.families[family_id] = Family(family_id, {})
ind_object = self.get_individual(**sample_dict)
self.individuals[ind_object.individual_id] = ind_object
self.families[ind_object.family].add_individual(ind_object)
def alternative_parser(self, family_file):
"""
Parse alternative formatted family info
This parses a information with more than six columns.
For alternative information header comlumn must exist and each row
must have the same amount of columns as the header.
First six columns must be the same as in the ped format.
Arguments:
family_info (iterator): An iterator with family info
"""
alternative_header = None
for line in family_file:
if line.startswith('#'):
alternative_header = line[1:].rstrip().split('\t')
self.logger.info("Alternative header found: {0}".format(line))
elif line.strip():
if not alternative_header:
raise WrongLineFormat(message="Alternative ped files must have "\
"headers! Please add a header line.")
splitted_line = line.rstrip().split('\t')
if len(splitted_line) < 6:
# Try to split the line on another symbol:
splitted_line = line.rstrip().split()
try:
self.check_line_length(splitted_line, len(alternative_header))
except SyntaxError as e:
self.logger.error('Number of entrys differ from header.')
self.logger.error("Header:\n{0}".format('\t'.join(alternative_header)))
self.logger.error("Ped Line:\n{0}".format('\t'.join(splitted_line)))
self.logger.error("Length of Header: {0}. Length of "\
"Ped line: {1}".format(
len(alternative_header),
len(splitted_line))
)
raise e
if len(line) > 1:
sample_dict = dict(zip(self.header, splitted_line[:6]))
family_id = sample_dict['family_id']
all_info = dict(zip(alternative_header, splitted_line))
if sample_dict['family_id'] not in self.families:
self.families[family_id] = Family(family_id, {})
sample_dict['genetic_models'] = all_info.get('InheritanceModel', None)
# Try other header naming:
if not sample_dict['genetic_models']:
sample_dict['genetic_models'] = all_info.get('Inheritance_model', None)
sample_dict['proband'] = all_info.get('Proband', '.')
sample_dict['consultand'] = all_info.get('Consultand', '.')
sample_dict['alive'] = all_info.get('Alive', '.')
ind_object = self.get_individual(**sample_dict)
self.individuals[ind_object.individual_id] = ind_object
self.families[ind_object.family].add_individual(ind_object)
if sample_dict['genetic_models']:
for model in self.get_models(sample_dict['genetic_models']):
self.families[ind_object.family].models_of_inheritance.add(model)
# If requested, we try is it is an id in the CMMS format:
sample_id_parts = ind_object.individual_id.split('-')
if self.cmms_check and (len(sample_id_parts) == 3):
# If the id follow the CMMS convention we can
# do a sanity check
if self.check_cmms_id(ind_object.individual_id):
self.logger.debug("Id follows CMMS convention: {0}".format(
ind_object.individual_id
))
self.logger.debug("Checking CMMS id affections status")
try:
self.check_cmms_affection_status(ind_object)
except WrongAffectionStatus as e:
self.logger.error("Wrong affection status for"\
" {0}. Affection status can be in"\
" {1}".format(e.cmms_id, e.valid_statuses))
raise e
except WrongPhenotype as e:
self.logger.error("Affection status for {0} "\
"({1}) disagrees with phenotype ({2})".format(
e.cmms_id, e.phenotype, e.affection_status
))
raise e
try:
self.check_cmms_gender(ind_object)
except WrongGender as e:
self.logger.error("Gender code for id {0}"\
"({1}) disagrees with sex:{2}".format(
e.cmms_id, e.sex_code, e.sex
))
raise e
for i in range(6, len(splitted_line)):
ind_object.extra_info[alternative_header[i]] = splitted_line[i]
def check_cmms_id(self, ind_id):
"""
Take the ID and check if it is following the cmms standard.
The standard is year:id-generation-indcode:affectionstatus.
Year is two digits, id three digits, generation in roman letters
indcode are digits and affection status are in ['A', 'U', 'X'].
Example 11001-II-1A.
Input:
ind_obj : A individual object
Yields:
bool : True if it is correct
"""
ind_id = ind_id.split('-')
# This in A (=affected), U (=unaffected) or X (=unknown)
family_id = ind_id[0]
try:
int(family_id)
except ValueError:
return False
affection_status = ind_id[-1][-1]
try:
type(affection_status.isalpha())
except ValueError:
return False
return True
def check_cmms_affection_status(self, ind_object):
"""
Check if the affection status is correct.
Args:
ind_object : An Individuals object
Yields:
bool : True if affection status is correct
False otherwise
"""
valid_affection_statuses = ['A', 'U', 'X']
ind_id = ind_object.individual_id.split('-')
phenotype = ind_object.phenotype
affection_status = ind_id[-1][-1]
if affection_status not in valid_affection_statuses:
raise WrongAffectionStatus(ind_object.individual_id,
valid_affection_statuses)
if (affection_status == 'A' and phenotype != 2 or
affection_status == 'U' and phenotype != 1):
raise WrongPhenotype(ind_object.individual_id, phenotype,
affection_status)
return True
def check_cmms_gender(self, ind_object):
"""
Check if the phenotype is correct.
Args:
ind_object : An Individuals object
Yields:
bool : True if phenotype status is correct
False otherwise
"""
ind_id = ind_object.individual_id.split('-')
sex = ind_object.sex
sex_code = int(ind_id[-1][:-1])# Males allways have odd numbers and womans even
if (sex_code % 2 == 0 and sex != 2) or (sex_code % 2 != 0 and sex != 1):
raise WrongGender(ind_object.individual_id, sex, sex_code)
return True
def get_models(self, genetic_models):
"""
Check what genetic models that are found and return them as a set.
Args:
genetic_models : A string with genetic models
Yields:
correct_model_names : A set with the correct model names
"""
correct_model_names = set()
genetic_models = genetic_models.split(';')
correct_model_names = set()
for model in genetic_models:
# We need to allow typos
if model in self.legal_ar_hom_names:
model = 'AR_hom'
elif model in self.legal_ar_hom_dn_names:
model = 'AR_hom_dn'
elif model in self.legal_ad_names:
model = 'AD_dn'
elif model in self.legal_compound_names:
model = 'AR_comp'
elif model in self.legal_x_names:
model = 'X'
elif model in self.legal_na_names:
model = 'NA'
else:
self.logger.warning("Incorrect model name: {0}."\
" Ignoring model.".format(model))
correct_model_names.add(model)
return correct_model_names
def to_dict(self):
"""
Return the information from the pedigree file as a dictionary.
family id is key and a list with dictionarys for each individual
as value.
Returns:
families (dict): A dictionary with the families
"""
self.logger.debug("Return the information as a dictionary")
families = {}
for family_id in self.families:
family = []
for individual_id in self.families[family_id].individuals:
individual = self.families[family_id].individuals[individual_id]
family.append(individual.to_json())
self.logger.debug("Adding individual {0} to family {1}".format(
individual_id, family_id
))
self.logger.debug("Adding family {0}".format(family_id))
families[family_id] = family
return families
def to_json(self):
"""
Yield the information from the pedigree file as a json object.
This is a list with lists that represents families, families have
dictionaries that represents individuals like
[
[
{
'family_id:family_id',
'id':individual_id,
'sex':gender_code,
'phenotype': phenotype_code,
'mother': mother_id,
'father': father_id
},
{
...
}
],
[
]
]
This object can easily be converted to a json object.
Yields:
the information in json format
"""
#json_families = []
for family_id in self.families:
#json_families.append(self.families[family_id].to_json())
yield self.families[family_id].to_json()
#return json.dumps(json_families)
def to_madeline(self):
"""
Return a generator with the info in madeline format.
Yields:
An iterator with family info in madeline format
"""
madeline_header = [
'FamilyID',
'IndividualID',
'Gender',
'Father',
'Mother',
'Affected',
'Proband',
'Consultand',
'Alive'
]
yield '\t'.join(madeline_header)
for family_id in self.families:
for individual_id in self.families[family_id].individuals:
individual = self.families[family_id].individuals[individual_id]
yield individual.to_madeline()
def to_ped(self):
"""
Return a generator with the info in ped format.
Yields:
An iterator with the family info in ped format
"""
ped_header = [
'#FamilyID',
'IndividualID',
'PaternalID',
'MaternalID',
'Sex',
'Phenotype',
]
extra_headers = [
'InheritanceModel',
'Proband',
'Consultand',
'Alive'
]
for individual_id in self.individuals:
individual = self.individuals[individual_id]
for info in individual.extra_info:
if info in extra_headers:
if info not in ped_header:
ped_header.append(info)
self.logger.debug("Ped headers found: {0}".format(
', '.join(ped_header)
))
yield '\t'.join(ped_header)
for family_id in self.families:
for individual_id in self.families[family_id].individuals:
individual = self.families[family_id].individuals[individual_id].to_json()
ped_info = []
ped_info.append(individual['family_id'])
ped_info.append(individual['id'])
ped_info.append(individual['father'])
ped_info.append(individual['mother'])
ped_info.append(individual['sex'])
ped_info.append(individual['phenotype'])
if len(ped_header) > 6:
for header in ped_header[6:]:
ped_info.append(individual['extra_info'].get(header, '.'))
yield '\t'.join(ped_info)
@click.command()
@click.argument('family_file',
nargs=1,
type=click.File(),
metavar="<family_file> or '-'"
)
@click.option('-t', '--family_type',
type=click.Choice(['ped', 'alt', 'cmms', 'mip']),
default='ped',
help='If the analysis use one of the known setups, please specify which one. Default is ped'
)
@click.option('--to_json',
is_flag=True,
help='Print the ped file in json format'
)
@click.option('--to_madeline',
is_flag=True,
help='Print the ped file in madeline format'
)
@click.option('--to_ped',
is_flag=True,
help='Print the ped file in ped format with headers'
)
@click.option('--to_dict',
is_flag=True,
help='Print the ped file in ped format with headers'
)
@click.option('-o', '--outfile',
type=click.File('a')
)
@click.option('-l', '--logfile',
type=click.Path(exists=False),
help="Path to log file. If none logging is "\
"printed to stderr."
)
@click.option('--loglevel',
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR',
'CRITICAL']),
default='INFO',
help="Set the level of log output."
)
def cli(family_file, family_type, to_json, to_madeline, to_ped, to_dict,
outfile, logfile, loglevel):
"""Cli for testing the ped parser."""
from pprint import pprint as pp
my_parser = FamilyParser(family_file, family_type)
if to_json:
if outfile:
outfile.write(my_parser.to_json())
else:
print(my_parser.to_json())
elif to_madeline:
for line in my_parser.to_madeline():
if outfile:
outfile.write(line + '\n')
else:
print(line)
elif to_ped:
for line in my_parser.to_ped():
if outfile:
outfile.write(line + '\n')
else:
print(line)
elif to_dict:
pp(my_parser.to_dict())
if __name__ == '__main__':
from ped_parser import init_log, logger
init_log(logger, loglevel='DEBUG')
cli()