-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathcountgaps.py
More file actions
94 lines (84 loc) · 3.84 KB
/
countgaps.py
File metadata and controls
94 lines (84 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from udapi.core.block import Block
from collections import defaultdict, Counter
class CountGaps(Block):
"""Block corefud.checkConsistency searches for sentence sequences with no coref annotation."""
def __init__(self, report_per_newdoc=False, report_per_file=True, report_total=True, **kwargs):
super().__init__(**kwargs)
self.report_per_newdoc = report_per_newdoc
self.report_per_file = report_per_file
self.report_total = report_total
self._total_counter = defaultdict(Counter)
def _report_stats(self, counter, header_id=None):
if header_id:
print(f"============ {header_id} ============")
for key in sorted(counter):
print(f"{key:2d}: {counter[key]}")
print("-------")
print(f"SUM: {sum([k*counter[k] for k in counter])}")
def _count_empty_seqs(self, empty_seqs):
counter = Counter()
for seq in empty_seqs:
counter[len(seq)] += 1
return counter
def process_document(self, doc):
file_counters = defaultdict(Counter)
empty_seqs = []
empty_pars = []
curr_seq = []
curr_par = []
is_empty_par = True
newdoc = None
for i, tree in enumerate(doc.trees):
if tree.newdoc:
if i:
if curr_seq:
empty_seqs.append(curr_seq)
newdoc_seq_counter = self._count_empty_seqs(empty_seqs)
file_counters["seq"].update(newdoc_seq_counter)
if is_empty_par:
empty_pars.append(curr_par)
newdoc_par_counter = self._count_empty_seqs(empty_pars)
file_counters["par"].update(newdoc_par_counter)
if self.report_per_newdoc:
self._report_stats(newdoc_seq_counter, header_id=f"SEQ STATS in {newdoc}")
self._report_stats(newdoc_par_counter, header_id=f"PAR STATS in {newdoc}")
newdoc = tree.newdoc
empty_seqs = []
empty_pars = []
curr_seq = []
curr_par = []
is_empty_par = True
if tree.newpar:
if not tree.newdoc and is_empty_par:
empty_pars.append(curr_par)
curr_par = []
is_empty_par = True
has_mention = any(node.coref_mentions for node in tree.descendants)
if not has_mention:
curr_seq.append(tree.sent_id)
curr_par.append(tree.sent_id)
else:
if curr_seq:
empty_seqs.append(curr_seq)
curr_seq = []
is_empty_par = False
if curr_seq:
empty_seqs.append(curr_seq)
newdoc_seq_counter = self._count_empty_seqs(empty_seqs)
file_counters["seq"].update(newdoc_seq_counter)
if curr_par:
empty_pars.append(curr_par)
newdoc_par_counter = self._count_empty_seqs(empty_pars)
file_counters["par"].update(newdoc_par_counter)
if self.report_per_newdoc:
self._report_stats(newdoc_seq_counter, header_id=f"SEQ STATS, {newdoc}")
self._report_stats(newdoc_par_counter, header_id=f"PAR STATS, {newdoc}")
if self.report_per_file:
self._report_stats(file_counters["seq"], header_id="SEQ STATS, FILE")
self._report_stats(file_counters["par"], header_id="PAR STATS, FILE")
self._total_counter["seq"].update(file_counters["seq"])
self._total_counter["par"].update(file_counters["par"])
def process_end(self):
if self.report_total:
self._report_stats(self._total_counter["seq"], header_id="SEQ STATS, TOTAL")
self._report_stats(self._total_counter["par"], header_id="PAR STATS, TOTAL")