# -*- coding: utf-8 -*-
"""Utility belt for working with ``pyang`` and ``pyangext``."""
import io
import logging
from os.path import isfile
from warnings import warn
from six import StringIO
from pyang import Context, FileRepository
from pyang.error import err_level, err_to_str, error_codes
from pyang.translators import yang
from pyang.yang_parser import YangParser
from .definitions import PREFIX_SEPARATOR
__all__ = [
'create_context',
'compare_prefixed',
'qualify_str',
'select',
'find',
'dump',
'check',
'parse',
'walk',
]
logging.basicConfig(level=logging.INFO)
logging.captureWarnings(True)
LOGGER = logging.getLogger(__name__)
DEFAULT_OPTIONS = {
'path': [],
'deviations': [],
'features': [],
'format': 'yang',
'keep_comments': True,
'no_path_recurse': False,
'trim_yin': False,
'yang_canonical': False,
'yang_remove_unused_imports': False,
# -- errors
'ignore_error_tags': [],
'ignore_errors': [],
'list_errors': True,
'print_error_code': False,
'errors': [],
'warnings': [code for code, desc in error_codes.items() if desc[0] > 4],
'verbose': True,
}
"""Default options for pyang command line"""
_COPY_OPTIONS = [
'canonical',
'max_line_len',
'max_identifier_len',
'trim_yin',
'lax_xpath_checks',
'strict',
]
"""copy options to pyang context options"""
class objectify(object): # pylint: disable=invalid-name
"""Utility for providing object access syntax (.attr) to dicts"""
def __init__(self, *args, **kwargs):
for entry in args:
self.__dict__.update(entry)
self.__dict__.update(kwargs)
def __getattr__(self, _):
return None
def __setattr__(self, attr, value):
self.__dict__[attr] = value
def _parse_features_string(feature_str):
if feature_str.find(':') == -1:
return (feature_str, [])
[module_name, rest] = feature_str.split(':', 1)
if rest == '':
return (module_name, [])
features = rest.split(',')
return (module_name, features)
[docs]def create_context(path='.', *options, **kwargs):
"""Generates a pyang context.
The dict options and keyword arguments are similar to the command
line options for ``pyang``. For ``plugindir`` use env var
``PYANG_PLUGINPATH``. For ``path`` option use the argument with the
same name, or ``PYANG_MODPATH`` env var.
Arguments:
path (str): location of YANG modules.
(Join string with ``os.pathsep`` for multiple locations).
Default is the current working dir.
*options: list of dicts, with options to be passed to context.
See bellow.
**kwargs: similar to ``options`` but have a higher precedence.
See bellow.
Keyword Arguments:
print_error_code (bool): On errors, print the error code instead
of the error message. Default ``False``.
warnings (list): If contains ``error``, treat all warnings
as errors, except any other error code in the list.
If contains ``none``, do not report any warning.
errors (list): Treat each error code container as an error.
ignore_error_tags (list): Ignore error code.
(For a list of error codes see ``pyang --list-errors``).
ignore_errors (bool): Ignore all errors. Default ``False``.
canonical (bool): Validate the module(s) according to the
canonical YANG order. Default ``False``.
yang_canonical (bool): Print YANG statements according to the
canonical order. Default ``False``.
yang_remove_unused_imports (bool): Remove unused import statements
when printing YANG. Default ``False``.
trim_yin (bool): In YIN input modules, trim whitespace
in textual arguments. Default ``False``.
lax_xpath_checks (bool): Lax check of XPath expressions.
Default ``False``.
strict (bool): Force strict YANG compliance. Default ``False``.
max_line_len (int): Maximum line length allowed. Disabled by default.
max_identifier_len (int): Maximum identifier length allowed.
Disabled by default.
features (list): Features to support, default all.
Format ``<modname>:[<feature>,]*``.
keep_comments (bool): Do not discard comments. Default ``True``.
no_path_recurse (bool): Do not recurse into directories
in the yang path. Default ``False``.
Returns:
pyang.Context: Context object for ``pyang`` usage
"""
# deviations (list): Deviation module (NOT CURRENTLY WORKING).
opts = objectify(DEFAULT_OPTIONS, *options, **kwargs)
repo = FileRepository(path, no_path_recurse=opts.no_path_recurse)
ctx = Context(repo)
ctx.opts = opts
for attr in _COPY_OPTIONS:
setattr(ctx, attr, getattr(opts, attr))
# make a map of features to support, per module (taken from pyang bin)
for feature_name in opts.features:
(module_name, features) = _parse_features_string(feature_name)
ctx.features[module_name] = features
# apply deviations (taken from pyang bin)
for file_name in opts.deviations:
with io.open(file_name, "r", encoding="utf-8") as fd:
module = ctx.add_module(file_name, fd.read())
if module is not None:
ctx.deviation_modules.append(module)
return ctx
[docs]def qualify_str(arg, prefix_sep=PREFIX_SEPARATOR):
"""Transform prefixed strings in tuple ``(prefix, string)``"""
response = arg if isinstance(arg, tuple) else tuple(arg.split(prefix_sep))
if len(response) == 2:
return response
return ('', response[0])
[docs]def compare_prefixed(arg1, arg2,
prefix_sep=PREFIX_SEPARATOR, ignore_prefix=False):
"""Compare 2 arguments : prefixed strings or tuple ``(prefix, string)``
Arguments:
arg1 (str or tuple): first argument
arg2 (str or tuple): first argument
prefix_sep (str): prefix string separator (default: ``':'``)
Returns:
bool
"""
cmp1 = qualify_str(arg1, prefix_sep=prefix_sep)
cmp2 = qualify_str(arg2, prefix_sep=prefix_sep)
if ignore_prefix:
return cmp1[-1:] == cmp2[-1:]
return cmp1 == cmp2
[docs]def select(statements, keyword=None, arg=None, ignore_prefix=False):
"""Given a list of statements filter by keyword, or argument or both.
Arguments:
statements (list of pyang.statements.Statement):
list of statements to be filtered.
keyword (str): if specified the statements should have this keyword
arg (str): if specified the statements should have this argument
``keyword`` and ``arg`` can be also used as keyword arguments.
Returns:
list: nodes that matches the conditions
"""
response = []
for item in statements:
if (keyword and keyword != item.keyword and
not compare_prefixed(
keyword, item.raw_keyword, ignore_prefix=ignore_prefix)):
continue
if (arg and arg != item.arg and
not compare_prefixed(
arg, item.arg, ignore_prefix=ignore_prefix)):
continue
response.append(item)
return response
[docs]def find(parent, keyword=None, arg=None, ignore_prefix=False):
"""Select all sub-statements by keyword, or argument or both.
See Also:
function :func:`select`
"""
return select(parent.substmts, keyword, arg, ignore_prefix)
[docs]def walk(parent, select=lambda x: x, apply=lambda x: x, key='substmts'):
# pylint: disable=redefined-builtin,redefined-outer-name
"""Recursivelly find nodes and/or apply a function to them.
Arguments:
parent (pyang.statements.Statement): root of the subtree were
the search will take place.
select: optional callable that receives a node and returns a bool
(True if the node matches the criteria)
apply: optional callable that are going to be applied to the node
if it matches the criteria
key (str): property where the children nodes are stored,
default is ``substmts``
Returns:
list: results collected from the apply function
"""
results = []
if select(parent):
results.append(apply(parent))
if hasattr(parent, key):
children = getattr(parent, key)
for child in children:
results.extend(walk(child, select, apply, key))
return results
[docs]def dump(node, file_obj=None, prev_indent='', indent_string=' ', ctx=None):
"""Generate a string representation of an abstract syntax tree.
Arguments:
node (pyang.statements.Statement): object to be represented
file_obj (file): *file-like* object where the representation
will be dumped. If nothing is passed, the method returns
a string
Keyword Arguments:
prev_indent (str): string to be added to the produced indentation
indent_string (str): string to be used as indentation
ctx (pyang.Context): context object used to generate string
representation. If no context is passed, a dummy object
is used with default configuration
Returns:
str: text content if ``file_obj`` is not specified
"""
# create a buffer to allow string return if no file_obj given
_file_obj = file_obj or StringIO()
# process AST
yang.emit_stmt(
ctx or create_context(), node, _file_obj, 1, None,
prev_indent, indent_string)
# one-liners <3: if no file_obj get buffer content and close it!
return file_obj or (_file_obj.getvalue(), _file_obj.close())[0]
[docs]def check(ctx, rescue=False):
"""Check existence of errors or warnings in context.
Code mostly borrowed from ``pyang`` script.
Arguments:
ctx (pyang.Context): pyang context to be checked.
Keyword Arguments:
rescue (bool): if ``True``, no exception/warning will be raised.
Raises:
SyntaxError: if errors detected
Warns:
SyntaxWarning: if warnings detected
Returns:
tuple: (list of errors, list of warnings), if ``rescue`` is ``True``
"""
errors = []
warnings = []
opts = ctx.opts
if opts.ignore_errors:
return (errors, warnings)
for (epos, etag, eargs) in ctx.errors:
if (hasattr(opts, 'ignore_error_tags') and
etag in opts.ignore_error_tags):
continue
if not ctx.implicit_errors and hasattr(epos.top, 'i_modulename'):
# this module was added implicitly (by import); skip this error
# the code includes submodules
continue
elevel = err_level(etag) # elevel 4 -> warning
explain = err_to_str(etag, eargs)
reason = etag if opts.print_error_code else explain
if 'unexpected keyword "description"' in reason:
# TODO: WTF pyang bug??
elevel = 4
message = '({}) {}'.format(str(epos), reason)
if (elevel >= 4 or etag in opts.warnings) and etag not in opts.errors:
if 'error' in opts.warnings and etag not in opts.warnings:
pass
elif 'none' in opts.warnings:
continue
else:
warnings.append(message)
continue
errors.append(message)
if rescue:
return (errors, warnings)
if warnings:
for message in warnings:
warn(message, SyntaxWarning)
if errors:
raise SyntaxError('\n'.join(errors))
return (errors, warnings)
[docs]def parse(text, ctx=None):
"""Parse a YANG statement into an Abstract Syntax subtree.
Arguments:
text (str): file name for a YANG module or text
ctx (optional pyang.Context): context used to validate text
Returns:
pyang.statements.Statement: Abstract syntax subtree
Note:
The ``parse`` function can be used to parse small amounts of text.
If yout plan to parse an entire YANG (sub)module, please use instead::
ast = ctx.add_module(module_name, text_contet)
It is also well known that ``parse`` function cannot solve
YANG deviations yet.
"""
parser = YangParser()
filename = 'parser-input'
ctx_ = ctx or create_context()
if isfile(text):
filename = text
with open(filename, 'r') as fp:
text = fp.read()
# ensure reported errors are just from parsing
old_errors = ctx_.errors
ctx_.errors = []
ast = parser.parse(ctx_, filename, text)
# look for errors and warnings
check(ctx_)
# restore other errors
ctx_.errors = old_errors
return ast