Source code for aac.context.language_context

"""The LanguageContext is a singleton that holds the current state of the AaC language, including all definitions and plugin runners."""
from typing import Any, Type
from os.path import join, dirname
from aac.context.language_error import LanguageError
from aac.execute.plugin_manager import get_plugin_manager
from aac.execute.plugin_runner import PluginRunner
from aac.context.definition import Definition
from aac.in_out.parser._parse_source import parse
from aac.context.definition_parser import DefinitionParser

AAC_LANG_FILE_NAME = "../aac.aac"
AAC_LANG_FILE_PATH = join(dirname(__file__), AAC_LANG_FILE_NAME)


[docs] class LanguageContext(object): """A singleton class that holds the current state of the AaC language."""
[docs] def __new__(cls): """Create a new instance of the LanguageContext singleton class.""" if not hasattr(cls, "context_instance"): cls.context_instance = super(LanguageContext, cls).__new__(cls) cls.context_instance.definitions = set() cls.context_instance.fully_qualified_name_to_definition: dict[ str, Definition ] = {} cls.context_instance.fully_qualified_name_to_class: dict[str, Any] = {} cls.context_instance.plugin_runners = {} # load and initialize the AaC language cls.context_instance.parse_and_load(AAC_LANG_FILE_PATH) # load plugins get_plugin_manager().hook.register_plugin() return cls.context_instance
[docs] def get_aac_core_file_path(self) -> str: """Function to return the AaC language file path.""" return AAC_LANG_FILE_PATH
[docs] def get_aac_core_as_yaml(self) -> str: """Function to return the AaC language as a yaml string.""" with open(AAC_LANG_FILE_PATH) as aac_file: return aac_file.read()
[docs] def get_aac_core_definitions(self) -> list[Definition]: """Function to return the definitions for the AaC language.""" return self.parse_and_load(AAC_LANG_FILE_PATH)
def _get_aac_generated_class(self, name: str) -> Type: definitions = self.get_definitions_by_name(name) if len(definitions) != 1: raise LanguageError( f"_get_aac_generated_class unable to identify unique definition for name '{name}'. Found: {[definition.name for definition in definitions]}", definitions[0].source.uri ) aac_class = self.context_instance.fully_qualified_name_to_class[ definitions[0].get_fully_qualified_name() ] if not aac_class: raise LanguageError( f"_get_aac_generated_class unable to identify generated class for name '{name}' with fully_qualified_name '{definitions[0].get_fully_qualified_name()}'", definitions[0].source.uri ) return aac_class
[docs] def is_aac_instance(self, obj: Any, name: str): """Function to determine if an object is an instance of an AaC class.""" return isinstance(obj, self._get_aac_generated_class(name))
[docs] def create_aac_object(self, aac_type_name: str, attributes: dict) -> Any: """Function to create a python instance of an AaC class and attributes.""" definitions = self.get_definitions_by_name(aac_type_name) if len(definitions) != 1: raise LanguageError( f"Unable to identify unique definition for '{aac_type_name}'. Found {[definition.name for definition in definitions]}", definitions[0].source.uri ) aac_class = self._get_aac_generated_class(aac_type_name) result = aac_class() field_names = [field.name for field in definitions[0].instance.fields] for attribute_name, attribute_value in attributes.items(): if attribute_name not in field_names: raise LanguageError(f"Found undefined field name '{attribute_name}'", definitions[0].source.uri) setattr(result, attribute_name, attribute_value) return result
[docs] def create_aac_enum(self, aac_enum_name: str, value: str) -> Any: """Function to create a python instance of an AaC enum class and value.""" definitions = self.get_definitions_by_name(aac_enum_name) if len(definitions) != 1: raise LanguageError( f"Unable to identify unique definition for '{aac_enum_name}'. Found {[definition.name for definition in definitions]}", definitions[0].source.uri ) aac_class = self._get_aac_generated_class(aac_enum_name) try: return getattr(aac_class, value) except ValueError: raise LanguageError( f"{value} is not a valid value for enum {aac_enum_name}", definitions[0].source.uri )
[docs] def parse_and_load(self, arg: str) -> list[Definition]: """Convenience function that parses a file or string and loads the definitions into the context.""" parsed_definitions = parse(arg) parser = DefinitionParser() return parser.load_definitions(self, parsed_definitions)
[docs] def remove_definitions(self, definitions: list[Definition]) -> None: """Remove the given definitions from the context.""" for definition in definitions: definition.source.is_loaded_in_context = False self.context_instance.definitions.remove(definition) del self.context_instance.fully_qualified_name_to_definition[ f"{definition.package}.{definition.name}" ]
[docs] def get_definitions(self) -> list[Definition]: """Get all the definitions.""" return list(self.context_instance.fully_qualified_name_to_definition.values())
[docs] def get_definitions_by_name(self, name: str) -> list[Definition]: """Get all the definitions with a given name.""" result = [] search_name = name if "." not in name: search_name = f".{name}" for definition in self.get_definitions(): if definition.get_fully_qualified_name().endswith(search_name.replace(" ", "")): result.append(definition) return result
[docs] def get_definitions_by_root(self, root_key: str) -> list[Definition]: """Get all the definitions with a given root key.""" result = [] for definition in self.get_definitions(): if definition.get_root_key() == root_key: result.append(definition) return result
[docs] def get_defining_schema_for_root(self, root_key: str) -> Definition: """Get the defining schema for a given root key.""" for definition in self.get_definitions(): if definition.get_root_key() == "schema": if definition.instance.root == root_key: return definition raise LanguageError(message=f"Could not find defining schema for root key: {root_key}", location=None)
[docs] def register_plugin_runner(self, runner: PluginRunner) -> None: """Register a plugin runner.""" if runner.get_plugin_name() not in self.context_instance.plugin_runners: self.context_instance.plugin_runners[runner.get_plugin_name()] = runner else: print(f"Plugin {runner.get_plugin_name()} already registered.")
[docs] def get_plugin_runners(self) -> list[PluginRunner]: """Get all the plugin runners.""" return list(self.context_instance.plugin_runners.values())
[docs] def get_primitives(self) -> list[Definition]: """Get all the primitive definitions.""" return self.get_definitions_by_root("primitive")
[docs] def get_python_type_from_primitive(self, primitive_name: str) -> str: """Get the python type from a primitive name.""" primitives = self.get_definitions_by_name(primitive_name) if len(primitives) != 1: raise LanguageError( message=f"Could not find unique primitive type: {primitive_name} - discovered {[primitive.name for primitive in primitives]}", location=None ) return primitives[0].instance.python_type
[docs] def is_extension_of(self, check_me: Definition, package: str, name: str) -> bool: """Check to see if a given definition extends from a given package and name.""" definitions_of_type = self.get_definitions_of_type(package, name) return check_me in definitions_of_type
def _recurse_type_inheritance(self, check_me: Definition) -> list[Definition]: result = [] if "extends" in list(vars(check_me.instance).keys()): for extension in check_me.instance.extends: ext_package = extension.package ext_name = extension.name parent_definition = ( self.context_instance.fully_qualified_name_to_definition[ f"{ext_package}.{ext_name}" ] ) result.append(parent_definition) result.extend(self._recurse_type_inheritance(parent_definition)) return result
[docs] def get_definitions_of_type(self, package: str, name: str) -> list[Definition]: """Search the language context to find definitions that match a given package and name.""" result = [] # first make sure we can find the definition that defines the type if ( f"{package}.{name}" not in self.context_instance.fully_qualified_name_to_definition ): raise LanguageError(f"Could not find definition for {package}.{name}", None) # now get all the definitions that extend from the definition definition = self.context_instance.fully_qualified_name_to_definition[ f"{package}.{name}" ] result.append(definition) # loop through all definitions and see if the type we care about is in it's inheritance tree for item in self.get_definitions(): inheritance_types = self._recurse_type_inheritance(item) if f"{package}.{name}" in [ f"{definition.package}.{definition.name}" for definition in inheritance_types ]: result.append(item) return result
def _traverse_field_chain( self, structure_list: list, field_chain: list[str] ) -> list: current_structures = structure_list search_depth = len(field_chain) current_depth = 0 values_found = [] for field_name in field_chain: next_structures = [] for structure in current_structures: if field_name in structure: if isinstance(structure[field_name], list): if current_depth == search_depth - 1: values_found.extend(structure[field_name]) else: next_structures.extend(structure[field_name]) else: if ( current_depth == search_depth - 1 ): # This just means we're at the end of the field chain and should pull the value values_found.append(structure[field_name]) else: next_structures.append(structure.get(field_name, [])) current_structures = next_structures current_depth += 1 return values_found
[docs] def get_values_by_field_chain(self, search_term: str) -> list: """Find values from the language context using a dot notation field chain.""" result: list = [] root_key = search_term.split(".")[0] candidate_values = self.get_definitions_by_root(root_key) result = self._traverse_field_chain( [definition.structure for definition in candidate_values], search_term.split("."), ) return result