# # STEP Part 21 Parser # # Copyright (c) 2011, Thomas Paviot (tpaviot@gmail.com) # Copyright (c) 2014, Christopher HORLER (cshorler@googlemail.com) # # All rights reserved. # # This file is part of the StepClassLibrary (SCL). # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # Neither the name of the nor the names of its contributors may # be used to endorse or promote products derived from this software without # specific prior written permission. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging import ply.lex as lex import ply.yacc as yacc from ply.lex import LexError logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) #################################################################################################### # Common Code for Lexer / Parser #################################################################################################### base_tokens = ['INTEGER', 'REAL', 'USER_DEFINED_KEYWORD', 'STANDARD_KEYWORD', 'STRING', 'BINARY', 'ENTITY_INSTANCE_NAME', 'ENUMERATION', 'PART21_END', 'PART21_START', 'HEADER_SEC', 'ENDSEC', 'DATA'] #################################################################################################### # Lexer #################################################################################################### class Lexer: tokens = list(base_tokens) states = (('slurp', 'exclusive'),) def __init__(self, debug=False, optimize=False, compatibility_mode=False, header_limit=4096): self.base_tokens = list(base_tokens) self.schema_dict = {} self.active_schema = {} self.input_length = 0 self.compatibility_mode = compatibility_mode self.header_limit = header_limit self.lexer = lex.lex(module=self, debug=debug, optimize=optimize, lextab='l21tab', debuglog=logger, errorlog=logger) self.reset() def __getattr__(self, name): if name == 'lineno': return self.lexer.lineno elif name == 'lexpos': return self.lexer.lexpos else: raise AttributeError def input(self, s): self.lexer.input(s) self.input_length += len(s) def reset(self): self.lexer.lineno = 1 self.lexer.begin('slurp') def token(self): return self.lexer.token() def activate_schema(self, schema_name): if schema_name in self.schema_dict: self.active_schema = self.schema_dict[schema_name] else: raise ValueError('schema not registered') def register_schema(self, schema_name, entities): if schema_name in self.schema_dict: raise ValueError('schema already registered') for k in entities: if k in self.base_tokens: raise ValueError('schema cannot override base_tokens') if isinstance(entities, list): entities = dict((k, k) for k in entities) self.schema_dict[schema_name] = entities def t_slurp_PART21_START(self, t): r'ISO-10303-21;' t.lexer.begin('INITIAL') return t def t_slurp_error(self, t): offset = t.value.find('\nISO-10303-21;', 0, self.header_limit) if offset == -1 and self.header_limit < len(t.value): # not found within header_limit raise LexError("Scanning error. try increasing lexer header_limit parameter", "{0}...".format(t.value[0:20])) elif offset == -1: # not found before EOF t.lexer.lexpos = self.input_length else: # found ISO-10303-21; offset += 1 # also skip the \n t.lexer.lineno += t.value[0:offset].count('\n') t.lexer.skip(offset) def t_error(self, t): raise LexError("Scanning error, invalid input", "{0}...".format(t.value[0:20])) # Comment (ignored) def t_COMMENT(self, t): r'/\*(.|\n)*?\*/' t.lexer.lineno += t.value.count('\n') def t_PART21_END(self, t): r'END-ISO-10303-21;' t.lexer.begin('slurp') return t def t_HEADER_SEC(self, t): r'HEADER;' return t def t_ENDSEC(self, t): r'ENDSEC;' return t # Keywords def t_STANDARD_KEYWORD(self, t): r'(?:!|)[A-Za-z_][0-9A-Za-z_]*' if self.compatibility_mode: t.value = t.value.upper() elif not t.value.isupper(): raise LexError('Scanning error. Mixed/lower case keyword detected, please use compatibility_mode=True', t.value) if t.value in self.base_tokens: t.type = t.value elif t.value in self.active_schema: t.type = self.active_schema[t.value] elif t.value.startswith('!'): t.type = 'USER_DEFINED_KEYWORD' return t def t_newline(self, t): r'\n+' t.lexer.lineno += len(t.value) # Simple Data Types def t_REAL(self, t): r'[+-]*[0-9][0-9]*\.[0-9]*(?:E[+-]*[0-9][0-9]*)?' t.value = float(t.value) return t def t_INTEGER(self, t): r'[+-]*[0-9][0-9]*' t.value = int(t.value) return t def t_STRING(self, t): r"'(?:[][!\"*$%&.#+,\-()?/:;<=>@{}|^`~0-9a-zA-Z_\\ ]|'')*'" t.value = t.value[1:-1] return t def t_BINARY(self, t): r'"[0-3][0-9A-F]*"' try: t.value = int(t.value[2:-1], base=16) except ValueError: t.value = None return t t_ENTITY_INSTANCE_NAME = r'\#[0-9]+' t_ENUMERATION = r'\.[A-Z_][A-Z0-9_]*\.' # Punctuation literals = '()=;,*$' t_ANY_ignore = ' \t\r' #################################################################################################### # Simple Model #################################################################################################### class P21File: def __init__(self, header, *sections): self.header = header self.sections = list(*sections) class P21Header: def __init__(self, file_description, file_name, file_schema): self.file_description = file_description self.file_name = file_name self.file_schema = file_schema self.extra_headers = [] class HeaderEntity: def __init__(self, type_name, params): self.type_name = type_name self.params = params class Section: def __init__(self, entities): self.entities = entities class SimpleEntity: def __init__(self, ref, type_name, params): self.ref = ref self.type_name = type_name self.params = params class ComplexEntity: def __init__(self, ref, params): self.ref = ref self.params = params class TypedParameter: def __init__(self, type_name, *params): self.type_name = type_name self.params = list(params) if params else None #################################################################################################### # Parser #################################################################################################### class Parser: tokens = list(base_tokens) def __init__(self, lexer=None, debug=False, tabmodule=None, start=None, optimize=False): # defaults start_tabs = {'exchange_file': 'p21tab', 'extract_header': 'p21hdrtab'} if start and tabmodule: start_tabs[start] = tabmodule if not start: start = 'exchange_file' if start not in start_tabs: raise ValueError('please pass (dedicated) tabmodule') # lexer may provide a more specialised set of tokens for use in (subclassed) parser try: self.tokens = lexer.tokens except AttributeError: pass self.lexer = lexer if lexer else Lexer() self.parser = yacc.yacc(debug=debug, module=self, tabmodule=start_tabs[start], start=start, optimize=optimize, debuglog=logger, errorlog=logger) self.reset() def parse(self, p21_data, **kwargs): #TODO: will probably need to change this function if the lexer is ever to support t_eof self.lexer.reset() self.lexer.input(p21_data) if 'debug' in kwargs: result = self.parser.parse(lexer=self.lexer, debug=logger, ** dict((k, v) for k, v in kwargs.items() if k != 'debug')) else: result = self.parser.parse(lexer=self.lexer, **kwargs) return result def reset(self): self.refs = {} self.is_in_exchange_structure = False def p_exchange_file(self, p): """exchange_file : check_p21_start_token header_section data_section_list check_p21_end_token""" p[0] = P21File(p[2], p[3]) def p_extract_header(self, p): """extract_header : check_p21_start_token header_section DATA""" p[0] = P21File(p[2], []) # clear input to avoid trailing context errors p.lexer.input('') def p_check_start_token(self, p): """check_p21_start_token : PART21_START""" self.is_in_exchange_structure = True p[0] = p[1] def p_check_end_token(self, p): """check_p21_end_token : PART21_END""" self.is_in_exchange_structure = False p[0] = p[1] # TODO: Specialise the first 3 header entities def p_header_section(self, p): """header_section : HEADER_SEC header_entity header_entity header_entity ENDSEC""" p[0] = P21Header(p[2], p[3], p[4]) def p_header_section_with_entity_list(self, p): """header_section : HEADER_SEC header_entity header_entity header_entity header_entity_list ENDSEC""" p[0] = P21Header(p[2], p[3], p[4]) p[0].extra_headers.extend(p[5]) def p_header_entity(self, p): """header_entity : keyword '(' parameter_list ')' ';'""" p[0] = HeaderEntity(p[1], p[3]) def p_check_entity_instance_name(self, p): """check_entity_instance_name : ENTITY_INSTANCE_NAME""" if p[1] in self.refs: logger.error('Line: {0}, SyntaxError - Duplicate Entity Instance Name: {1}'.format(p.lineno(1), p[1])) raise SyntaxError else: self.refs[p[1]] = None p[0] = p[1] def p_simple_entity_instance(self, p): """simple_entity_instance : check_entity_instance_name '=' simple_record ';'""" p[0] = SimpleEntity(p[1], *p[3]) def p_entity_instance_error(self, p): """entity_instance : check_entity_instance_name '=' error ';'""" logger.error('resyncing parser, check input between lineno %d and %d', p.lineno(2), p.lineno(4)) def p_complex_entity_instance(self, p): """complex_entity_instance : check_entity_instance_name '=' subsuper_record ';'""" p[0] = ComplexEntity(p[1], p[3]) def p_subsuper_record(self, p): """subsuper_record : '(' simple_record_list ')'""" p[0] = [SimpleEntity(None, *x) for x in p[2]] def p_data_section_list_init(self, p): """data_section_list : data_section""" p[0] = [p[1],] def p_data_section_list(self, p): """data_section_list : data_section_list data_section""" p[0] = p[1] p[0].append(p[2]) def p_header_entity_list_init(self, p): """header_entity_list : header_entity""" p[0] = [p[1],] def p_header_entity_list(self, p): """header_entity_list : header_entity_list header_entity""" p[0] = p[1] p[0].append(p[2]) def p_parameter_list_init(self, p): """parameter_list : parameter""" p[0] = [p[1],] def p_parameter_list(self, p): """parameter_list : parameter_list ',' parameter""" p[0] = p[1] p[0].append(p[3]) def p_keyword(self, p): """keyword : USER_DEFINED_KEYWORD | STANDARD_KEYWORD""" p[0] = p[1] def p_parameter_simple(self, p): """parameter : STRING | INTEGER | REAL | ENTITY_INSTANCE_NAME | ENUMERATION | BINARY | '*' | '$' | typed_parameter | list_parameter""" p[0] = p[1] def p_list_parameter(self, p): """list_parameter : '(' parameter_list ')'""" p[0] = p[2] def p_typed_parameter(self, p): """typed_parameter : keyword '(' parameter ')'""" p[0] = TypedParameter(p[1], p[3]) def p_parameter_empty_list(self, p): """parameter : '(' ')'""" p[0] = [] def p_data_start(self, p): """data_start : DATA '(' parameter_list ')' ';'""" pass # TODO: do something with the parameters def p_data_start_empty(self, p): """data_start : DATA '(' ')' ';' | DATA ';'""" pass def p_data_section(self, p): """data_section : data_start entity_instance_list ENDSEC""" p[0] = Section(p[2]) def p_entity_instance_list_init(self, p): """entity_instance_list : entity_instance""" p[0] = [p[1],] def p_entity_instance_list(self, p): """entity_instance_list : entity_instance_list entity_instance""" p[0] = p[1] p[0].append(p[2]) def p_entity_instance_list_empty(self, p): """entity_instance_list : empty""" p[0] = [] def p_entity_instance(self, p): """entity_instance : simple_entity_instance | complex_entity_instance""" p[0] = p[1] def p_simple_record_empty(self, p): """simple_record : keyword '(' ')'""" p[0] = (p[1], []) def p_simple_record_with_params(self, p): """simple_record : keyword '(' parameter_list ')'""" p[0] = (p[1], p[3]) def p_simple_record_list_init(self, p): """simple_record_list : simple_record""" p[0] = [p[1],] def p_simple_record_list(self, p): """simple_record_list : simple_record_list simple_record""" p[0] = p[1] p[0].append(p[2]) def p_empty(self, p): """empty :""" pass def debug_lexer(): import codecs from os.path import normpath, expanduser logging.basicConfig() logger.setLevel(logging.DEBUG) lexer = Lexer(debug=True) p = normpath(expanduser('~/projects/src/stepcode/data/ap209/ATS7-out.stp')) with codecs.open(p, 'r', encoding='iso-8859-1') as f: s = f.read() lexer.input(s) while True: tok = lexer.token() if not tok: break print(tok) def debug_parser(): import codecs from os.path import normpath, expanduser logging.basicConfig() logger.setLevel(logging.DEBUG) parser = Parser() parser.reset() logger.info("***** parser debug *****") p = normpath(expanduser('~/projects/src/stepcode/data/ap214e3/s1-c5-214/s1-c5-214.stp')) with codecs.open(p, 'r', encoding='iso-8859-1') as f: s = f.read() parser.parse(s, debug=1) logger.info("***** finished *****") def test(): import os, codecs from os.path import normpath, expanduser logging.basicConfig() logger.setLevel(logging.INFO) lexer = Lexer(optimize=True) parser = Parser(lexer=lexer, optimize=True) compat_list = [] def parse_check(p): logger.info("processing {0}".format(p)) parser.reset() with codecs.open(p, 'r', encoding='iso-8859-1') as f: s = f.read() parser.parse(s) logger.info("***** standard test *****") stepcode_dir = normpath(os.path.expanduser('~/projects/src/stepcode')) for d, _, files in os.walk(stepcode_dir): for f in filter(lambda x: x.endswith('.stp'), files): p = os.path.join(d, f) try: parse_check(p) except LexError: logger.exception('Lexer issue, adding {0} to compatibility test list'.format(os.path.basename(p))) compat_list.append(p) lexer = Lexer(optimize=True, compatibility_mode=True) parser = Parser(lexer=lexer, optimize=True) logger.info("***** compatibility test *****") for p in compat_list: parse_check(p) logger.info("***** finished *****") def test_header_only(): import os, codecs from os.path import normpath, expanduser logging.basicConfig() logger.setLevel(logging.INFO) lexer = Lexer(optimize=True) parser = Parser(start='extract_header', optimize=True) compat_list = [] def parse_check(p): logger.info("processing {0}".format(p)) parser.reset() with codecs.open(p, 'r', encoding='iso-8859-1') as f: s = f.read() parser.parse(s) logger.info("***** standard test *****") stepcode_dir = normpath(os.path.expanduser('~/projects/src/stepcode')) for d, _, files in os.walk(stepcode_dir): for f in filter(lambda x: x.endswith('.stp'), files): p = os.path.join(d, f) try: parse_check(p) except LexError: logger.exception('Lexer issue, adding {0} to compatibility test list'.format(os.path.basename(p))) compat_list.append(p) lexer = Lexer(optimize=True, compatibility_mode=True) parser = Parser(lexer=lexer, start='extract_header', optimize=True) logger.info("***** compatibility test *****") for p in compat_list: parse_check(p) logger.info("***** finished *****") if __name__ == '__main__': #debug_lexer() #debug_parser() test() #test_header_only()