"""The LanguageContext is a singleton that holds the current state of the AaC language, including all definitions and plugin runners."""
from typing import Any, Type
from os.path import join, dirname
from aac.context.language_error import LanguageError
from aac.execute.plugin_manager import get_plugin_manager
from aac.execute.plugin_runner import PluginRunner
from aac.context.definition import Definition
from aac.in_out.parser._parse_source import parse
from aac.context.definition_parser import DefinitionParser
AAC_LANG_FILE_NAME = "../aac.aac"
AAC_LANG_FILE_PATH = join(dirname(__file__), AAC_LANG_FILE_NAME)
[docs]
class LanguageContext(object):
"""A singleton class that holds the current state of the AaC language."""
[docs]
def __new__(cls):
"""Create a new instance of the LanguageContext singleton class."""
if not hasattr(cls, "context_instance"):
cls.context_instance = super(LanguageContext, cls).__new__(cls)
cls.context_instance.definitions = set()
cls.context_instance.fully_qualified_name_to_definition: dict[
str, Definition
] = {}
cls.context_instance.fully_qualified_name_to_class: dict[str, Any] = {}
cls.context_instance.plugin_runners = {}
# load and initialize the AaC language
cls.context_instance.parse_and_load(AAC_LANG_FILE_PATH)
# load plugins
get_plugin_manager().hook.register_plugin()
return cls.context_instance
[docs]
def get_aac_core_file_path(self) -> str:
"""
Function to return the AaC language file path.
Returns:
The filepath to the core AaC definitions file as a string.
"""
return AAC_LANG_FILE_PATH
[docs]
def get_aac_core_as_yaml(self) -> str:
"""
Function to return the AaC language as a YAML string.
Returns:
The core AaC definitions file as a string in YAML format.
"""
with open(AAC_LANG_FILE_PATH) as aac_file:
return aac_file.read()
[docs]
def get_aac_core_definitions(self) -> list[Definition]:
"""
Function to return the definitions for the AaC language.
Returns:
A list of definitions in the core AaC definitions file.
"""
return self.parse_and_load(AAC_LANG_FILE_PATH)
def _get_aac_generated_class(self, name: str) -> Type:
definitions = self.get_definitions_by_name(name)
if len(definitions) != 1:
raise LanguageError(
f"_get_aac_generated_class unable to identify unique definition for name '{name}'. Found: {[definition.name for definition in definitions]}",
definitions[0].source.uri
)
aac_class = self.context_instance.fully_qualified_name_to_class[
definitions[0].get_fully_qualified_name()
]
if not aac_class:
raise LanguageError(
f"_get_aac_generated_class unable to identify generated class for name '{name}' with fully_qualified_name '{definitions[0].get_fully_qualified_name()}'",
definitions[0].source.uri
)
return aac_class
[docs]
def is_aac_instance(self, obj: Any, name: str) -> bool:
"""
Function to determine if an object is an instance of an AaC class.
Args:
obj (Any): Object to check.
name (str): The AaC class name to be used for comparison.
Returns:
A boolean which equals true if the object is an instance of the specified class.
"""
return isinstance(obj, self._get_aac_generated_class(name))
[docs]
def create_aac_object(self, aac_type_name: str, attributes: dict) -> Any:
"""
Function to create a Python object representation of an AaC class and its attributes.
Args:
aac_type_name (str): The name of the AaC class.
attributes (dict): A dictionary of attributes for the AaC class.
Returns:
A Python object representing the AaC class.
Raises:
LanguageError: When a unique definition for the AaC type is not found, an error message detailing the issue is generated.
"""
definitions = self.get_definitions_by_name(aac_type_name)
if len(definitions) != 1:
raise LanguageError(
f"Unable to identify unique definition for '{aac_type_name}'. Found {[definition.name for definition in definitions]}",
definitions[0].source.uri
)
aac_class = self._get_aac_generated_class(aac_type_name)
result = aac_class()
field_names = [field.name for field in definitions[0].instance.fields]
for attribute_name, attribute_value in attributes.items():
if attribute_name not in field_names:
raise LanguageError(
f"Found undefined field name '{attribute_name}'",
definitions[0].source.uri
)
setattr(result, attribute_name, attribute_value)
return result
[docs]
def create_aac_enum(self, aac_enum_name: str, value: str) -> Any:
"""
Function to create a Python enum for an AaC enum class and value.
Args:
aac_enum_name (str): The name of the AaC Enum class.
value (str): The value for the enum.
Returns:
A Python enum set to the specified value.
Raises:
LanguageError: When a unique definition for the AaC Enum is not found, an error message detailing the issue is generated.
"""
definitions = self.get_definitions_by_name(aac_enum_name)
if len(definitions) > 1:
raise LanguageError(
f"Unable to identify unique definition for '{aac_enum_name}'. Found {[definition.name for definition in definitions]}",
definitions[0].source.uri
)
if len(definitions) < 1:
raise LanguageError(
f"Unable to identify definition for '{aac_enum_name}'",
"No file to reference"
)
aac_class = self._get_aac_generated_class(aac_enum_name)
try:
return getattr(aac_class, value)
except ValueError:
raise LanguageError(
f"{value} is not a valid value for enum {aac_enum_name}",
definitions[0].source.uri
)
[docs]
def parse_and_load(self, arg: str) -> list[Definition]:
"""
Convenience function that parses a file or string and loads the definitions into the context.
Args:
arg (str): An AaC definition file. Can be a string in YAML format or a filepath.
Returns:
A list of definition objects which have been loaded into the Language Context.
"""
parsed_definitions = parse(arg)
parser = DefinitionParser()
return parser.load_definitions(self, parsed_definitions)
[docs]
def remove_definitions(self, definitions: list[Definition]) -> None:
"""
Remove the given definitions from the context.
Args:
definitions (list[Definition]): A list of Definitions to be removed from the Language Context.
"""
for definition in definitions:
definition.source.is_loaded_in_context = False
self.context_instance.definitions.remove(definition)
del self.context_instance.fully_qualified_name_to_definition[
f"{definition.package}.{definition.name}"
]
[docs]
def get_definitions(self) -> list[Definition]:
"""
Get all the definitions.
Returns:
A list of all Definitions currently loaded into the Language Context.
"""
return list(self.context_instance.fully_qualified_name_to_definition.values())
[docs]
def get_definitions_by_name(self, name: str) -> list[Definition]:
"""
Get all the definitions with a given name.
Args:
name (str): The name of the definition(s).
Returns:
A list of definitions with the given name.
"""
result = []
search_name = name
if "." not in name:
search_name = f".{name}"
for definition in self.get_definitions():
if definition.get_fully_qualified_name().endswith(search_name.replace(" ", "")):
result.append(definition)
return result
[docs]
def get_definitions_by_root(self, root_key: str) -> list[Definition]:
"""
Get all the definitions with a given root key.
Args:
root_key (str): A root key to search for.
Returns:
A list of definitions with the given root key.
"""
result = []
for definition in self.get_definitions():
if definition.get_root_key() == root_key:
result.append(definition)
return result
[docs]
def get_defining_schema_for_root(self, root_key: str) -> Definition:
"""
Get the defining schema for a given root key.
Args:
root_key (str): The root key for the schema definition.
Returns:
The definition of the root schema.
Raises:
LanguageError: When no definition is found for the defining schema, an error message detailing the issue is generated.
"""
current_definition: Definition = None
for definition in self.get_definitions():
current_definition = definition
if definition.get_root_key() == "schema":
if definition.instance.root == root_key:
return definition
raise LanguageError(
message=f"Could not find defining schema for root key: {root_key}",
location=current_definition.source.uri
)
[docs]
def register_plugin_runner(self, runner: PluginRunner) -> None:
"""
Register a plugin runner.
Args:
runner (PluginRunner): A plugin runner to be registered.
"""
if runner.get_plugin_name() not in self.context_instance.plugin_runners:
self.context_instance.plugin_runners[runner.get_plugin_name()] = runner
else:
print(f"Plugin {runner.get_plugin_name()} already registered.")
[docs]
def get_plugin_runners(self) -> list[PluginRunner]:
"""
Get all the plugin runners.
Returns:
A list of all registered plugin runners.
"""
return list(self.context_instance.plugin_runners.values())
[docs]
def get_primitives(self) -> list[Definition]:
"""
Get all the primitive definitions.
Returns:
A list of all primitive definitions.
"""
return self.get_definitions_by_root("primitive")
[docs]
def get_python_type_from_primitive(self, primitive_name: str) -> str:
"""
Get the Python type from a primitive name.
Args:
primitive_name (str): name of the primitive type.
Returns:
The Python type equivalent to the declared primitive type.
Raises:
LanguageError: When no primitive type is found with the given name, an error message detailing the issue is generated.
"""
primitives = self.get_definitions_by_name(primitive_name)
if len(primitives) > 1:
raise LanguageError(
message=f"Could not find unique primitive type: {primitive_name} - discovered {[primitive.name for primitive in primitives]}",
location=primitives[0].source.uri
)
if len(primitives) < 1:
raise LanguageError(
message=f"Could not find primitive type: {primitive_name}",
location="No file to reference"
)
return primitives[0].instance.python_type
[docs]
def is_extension_of(self, check_me: Definition, package: str, name: str) -> bool:
"""
Check to see if a given definition extends from a given package and name.
Args:
check_me (Definition): The definition to be checked if it is an extension.
package (str): The package name of the definitions to be checked against.
name (str): The name of the definitions to be checked against.
Returns:
A boolean value which equals True if the given definition is an extension, and equals False if it is not.
"""
definitions_of_type = self.get_definitions_of_type(package, name)
return check_me in definitions_of_type
def _recurse_type_inheritance(self, check_me: Definition) -> list[Definition]:
result = []
if "extends" in list(vars(check_me.instance).keys()):
for extension in check_me.instance.extends:
ext_package = extension.package
ext_name = extension.name
parent_definition = (
self.context_instance.fully_qualified_name_to_definition[
f"{ext_package}.{ext_name}"
]
)
result.append(parent_definition)
result.extend(self._recurse_type_inheritance(parent_definition))
return result
[docs]
def get_definitions_of_type(self, package: str, name: str) -> list[Definition]:
"""
Search the language context to find definitions that match a given package and name.
Args:
package (str): The package name of the type.
name (str): The name of the type.
Returns:
A list of definitions of the specified type within the specified package.
Raises:
LanguageError: When no definition is found with the given name within the given package, an error message detailing the issue is generated.
"""
result = []
# first make sure we can find the definition that defines the type
if (
f"{package}.{name}"
not in self.context_instance.fully_qualified_name_to_definition
):
raise LanguageError(f"Could not find definition for {package}.{name}", "No file to reference")
# now get all the definitions that extend from the definition
definition = self.context_instance.fully_qualified_name_to_definition[
f"{package}.{name}"
]
result.append(definition)
# loop through all definitions and see if the type we care about is in it's inheritance tree
for item in self.get_definitions():
inheritance_types = self._recurse_type_inheritance(item)
if f"{package}.{name}" in [
f"{definition.package}.{definition.name}"
for definition in inheritance_types
]:
result.append(item)
return result
def _traverse_field_chain(
self, structure_list: list, field_chain: list[str]
) -> list:
current_structures = structure_list
search_depth = len(field_chain)
current_depth = 0
values_found = []
for field_name in field_chain:
next_structures = []
for structure in current_structures:
if field_name in structure:
if isinstance(structure[field_name], list):
if current_depth == search_depth - 1:
values_found.extend(structure[field_name])
else:
next_structures.extend(structure[field_name])
else:
if (
current_depth == search_depth - 1
): # This just means we're at the end of the field chain and should pull the value
values_found.append(structure[field_name])
else:
next_structures.append(structure.get(field_name, []))
current_structures = next_structures
current_depth += 1
return values_found
[docs]
def get_values_by_field_chain(self, search_term: str) -> list:
"""
Find values from the language context using a dot notation field chain.
Args:
search_term (str): A dot notation field chain. (i.e. root.definition.field)
Returns:
A list of values from the specified fields.
"""
result: list = []
root_key = search_term.split(".")[0]
candidate_values = self.get_definitions_by_root(root_key)
result = self._traverse_field_chain(
[definition.structure for definition in candidate_values],
search_term.split("."),
)
return result