Source code for simprov.provenance

import random
import uuid
from dataclasses import dataclass, field, asdict
from typing import List, Tuple, Set, Dict
from uuid import UUID

from networkx import DiGraph, set_node_attributes, bfs_tree
from networkx.algorithms.dag import has_cycle

random.seed("simprov")


def _random_uuid():
    return uuid.UUID(bytes=bytes(random.getrandbits(8) for _ in range(16)), version=4)


[docs] @dataclass class Activity: """ Represents a provenance activity. :ivar UUID ivar: The unique identifier of the activity. :ivar str name: The name of the activity, e.g., "Specifying Simulation Model", or "Executing Simulation Experiment". :ivar List[Entity] used_entities: A list of all entities that were used by the activity. :ivar List[Entity] generated_entities: A list of all entities that were generated by the activity. """ name: str used_entities: List['Entity'] = field(default_factory=list) generated_entities: List['Entity'] = field(default_factory=list) associated_agents: List['Agent'] = field(default_factory=list) id: UUID = field(default_factory=_random_uuid) user_generated_edges: list = field(default_factory=list) @property def entities(self) -> List['Entity']: """ Returns a list of all the entities that were used and generated by the activitiy. :rtype: List[Entity] :return: The list of all entities. """ return self.used_entities + self.generated_entities
[docs] def todict(self) -> dict: """Returns all activity attributes as a dictionary. It also sets the key `type` to be "Activity" in the resulting dictionary. :rtype: Dict :return: The node attributes. """ entity_dict = asdict(self) entity_dict["type"] = "Activity" return entity_dict
[docs] @dataclass class Entity: """ Represents a provenance entity. :ivar UUID ivar: The unique identifier of the entity. :ivar str name: The name of the entity, e.g., simulation model or simulation experiment. :ivar Dict attributes: Holds all the attribute values of an entity, e.g., a file path and a specification. :ivar Tuple primary_key: The primary key. :ivar Dict meta_information: The meta information. """ name: str primary_key: Tuple | None = None attributes: dict = field(default_factory=dict) id: UUID = field(default_factory=_random_uuid) meta_information: dict = field(default_factory=dict)
[docs] def todict(self): """Returns all entities attributes as a dictionary. It also sets the key `type` to be "Entity" in the resulting dictionary. :rtype: Dict :return: The node attributes. """ entity_dict = asdict(self) entity_dict["type"] = "Entity" return entity_dict
def __setitem__(self, key, value): self.attributes[key] = value def __getitem__(self, key): return self.attributes[key]
@dataclass class Agent: """ Represents a provenance agent. :ivar UUID ivar: The unique identifier of the agent. :ivar str name: The name of the entity, e.g., simulator or python environment. :ivar Dict attributes: Holds all the attribute values of an entity, e.g., a file path and a specification. :ivar Tuple primary_key: The primary key. :ivar Dict meta_information: The meta information. """ name: str primary_key: Tuple | None = None attributes: dict = field(default_factory=dict) id: UUID = field(default_factory=_random_uuid) meta_information: dict = field(default_factory=dict) def todict(self): """Returns all entities attributes as a dictionary. It also sets the key `type` to be "Agent" in the resulting dictionary. :rtype: Dict :return: The node attributes. """ entity_dict = asdict(self) entity_dict["type"] = "Agent" return entity_dict def __setitem__(self, key, value): self.attributes[key] = value def __getitem__(self, key): return self.attributes[key]
[docs] class ProvenanceGraph: """Represents a provenance graph. It only consists of activities and entities. Agents are not supported right now. :ivar DiGraph graph: The graph holding all the entities and activities. :ivar Dict[Tuple,Entity] latest_entities_map: A mapping from the primary keys of the entities to an entity instance. :ivar Dict[UUID,Union[Entity,Activity]] node_map: A mapping from the node ids to the corresponding entities and activities. :ivar Set user_generated_dependencies: A set tracking the dependencies generated by the user. """ def __init__(self) -> None: super().__init__() self.graph: DiGraph = DiGraph() self.last_entities_map: Dict = {} self.last_agents_map: Dict = {} self.node_map: Dict = {} self.user_generated_dependencies: Set = set() self.hidden_nodes: Set = set() self.visibility_affected_nodes = {} self.splitted_agents_table:Dict = {}
[docs] def chain_provenance_activity(self, activity: Activity): """Chains an activity with the provenance graph. It tries to replace the entities used by the activity with the "latest" version of the entities. If an entity can not be resolved it uses the original entity. Finally, the activity is added to the provenance graph. :param Activity activity: The activity. """ chained_used_entities = [] for used_entity in activity.used_entities: if used_entity.primary_key in self.last_entities_map: chained_used_entities.append(self.last_entities_map[used_entity.primary_key]) else: chained_used_entities.append(used_entity) chained_agents = [] for associated_agent in activity.associated_agents: if associated_agent.primary_key in self.last_agents_map: chained_agents.append(self.last_agents_map[associated_agent.primary_key]) else: chained_agents.append(associated_agent) activity.used_entities = chained_used_entities activity.associated_agents = chained_agents self.add_activity(activity)
[docs] def add_activity(self, activity: Activity): """Adds an activity to the provenance graph. :param Activity activity: The activity. """ self.node_map[activity.id] = activity self.graph.add_node(activity.id, **activity.todict()) for used_entity in activity.used_entities: if used_entity.id not in self.node_map: self.add_entity(used_entity) else: self.last_entities_map[used_entity.primary_key] = used_entity self.graph.add_edge(activity.id, used_entity.id) for generated_entity in activity.generated_entities: if generated_entity.id not in self.node_map: self.add_entity(generated_entity) else: self.last_entities_map[generated_entity.primary_key] = generated_entity self.graph.add_edge(generated_entity.id, activity.id) for associated_agent in activity.associated_agents: if associated_agent.id not in self.node_map: self.add_agent(associated_agent) else: self.last_entities_map[associated_agent.primary_key] = associated_agent self.graph.add_edge(activity.id, associated_agent.id)
[docs] def add_entity(self, entity: Entity): """Adds an entitiy to the provenacne graph. :param Entity entity: The entity. """ self.last_entities_map[entity.primary_key] = entity self.node_map[entity.id] = entity self.graph.add_node(entity.id, **entity.todict())
[docs] def add_agent(self, agent: Agent): """Adds an agent to the provenacne graph. :param Agent agent: The entity. """ self.last_agents_map[agent.primary_key] = agent self.node_map[agent.id] = agent self.graph.add_node(agent.id, **agent.todict())
@property def entities(self) -> List[Entity]: """The list of all entities in the provenance graph. """ entities = [] for node in self.graph.nodes: node_data = self.graph.nodes[node] if node_data["type"] == "Entity": entities.append(self.node_map[node_data["id"]]) return entities @property def activities(self) -> List[Activity]: """A list of all activities in the provenance graph.""" activities = [] for node in self.graph.nodes: node_data = self.graph.nodes[node] if node_data["type"] == "Activity": activities.append(self.node_map[node_data["id"]]) return activities
[docs] def update_entity_attributes(self, entity_id: UUID, changes: Dict): """Updates the attributes of an entity. :param UUID entity_id: The id of the entity. :param Dict changes: A dictionary of attribute-value mapping. """ node: Entity = self.node_map[entity_id] for (changed_attribute, value) in changes.items(): node.attributes[changed_attribute] = value set_node_attributes(self.graph, {entity_id: value}, changed_attribute) self.graph.nodes[entity_id][changed_attribute] = value
[docs] def update_activity_dependencies(self, activity_id: UUID, changes: Dict): """Updates the dependencies of an activity. :param UUID activity_id: The id of the activity. :param Dict changes: A dictionary of the changes. """ node: Activity = self.node_map[activity_id] for dependency in changes: from_uuid = UUID(dependency["source"]) to_uuid = UUID(dependency["target"]) if dependency.get("user-generated", False): self.add_dependency(from_uuid, to_uuid) self.user_generated_dependencies.add((from_uuid, to_uuid)) self.graph.add_edge(from_uuid, to_uuid) if activity_id == from_uuid: node.user_generated_edges.append(to_uuid) else: node.user_generated_edges.append(from_uuid) if dependency.get("user-removed", False): self.remove_dependency(from_uuid, to_uuid) self.user_generated_dependencies.remove((from_uuid, to_uuid)) self.graph.remove_edge(from_uuid, to_uuid) if activity_id == from_uuid: node.user_generated_edges.remove(to_uuid) else: node.user_generated_edges.remove(from_uuid)
[docs] def add_dependency(self, source_node_id: UUID, target_node_id: UUID): """Adds a depdency between two nodes. :param UUID source_node_id: The id of the source node. :param UUID target_node_id: The id of the target node. """ from_node = self.node_map[source_node_id] to_node = self.node_map[target_node_id] if isinstance(from_node, Activity) and isinstance(to_node, Entity): if to_node not in from_node.used_entities: from_node.used_entities.append(to_node) if isinstance(from_node, Entity) and isinstance(to_node, Activity): if from_node not in to_node.generated_entities: to_node.generated_entities.append(from_node)
[docs] def remove_dependency(self, source_node_id: UUID, target_node_id: UUID): """Removes a depdency between two nodes. :param UUID source_node_id: The id of the source node. :param UUID target_node_id: The id of the target node. """ from_node = self.node_map[source_node_id] to_node = self.node_map[target_node_id] if isinstance(from_node, Activity) and isinstance(to_node, Entity): from_node.used_entities.remove(to_node) if isinstance(from_node, Entity) and isinstance(to_node, Activity): to_node.generated_entities.remove(from_node)
[docs] def cytoscape_data(self) -> List[Dict]: """Returns the provenance graph for Cytoscape. :rtype: List[Dict] :returns: Cytoscape-Representation of the graph. """ elements = [] for node in self.graph.nodes: node_data = self.graph.nodes[node] node_data["hidden"] = True if node in self.hidden_nodes else False node_element = {"group": "nodes", "data": node_data} elements.append(node_element) for (source_id, target_id) in self.graph.edges: was_user_generated = True if (source_id, target_id) in self.user_generated_dependencies else False edge_data = {"source": source_id, "target": target_id, "user-generated": was_user_generated} edge_element = {"group": "edges", "data": edge_data} elements.append(edge_element) return elements
[docs] def node_data(self, node_id: UUID) -> dict: """ Returns the data of a node. :param UUID node_id: The id of the node. :rtype: Dict :return: The node attributes. """ base = self.node_map.get(node_id,None) if base is None: base = self.splitted_agents_table[node_id] return base.todict()
[docs] def is_entity(self, node_id: UUID) -> bool: """Checks whether a node is an entity. :param UUD node_id: The id of the node. :rtype: bool :return: `True` if node is an entity, `False` otherwise """ return node_id in self.node_map and isinstance(self.node_map[node_id], Entity)
[docs] def is_agent(self, node_id: UUID) -> bool: """Checks whether a node is an agent. :param UUD node_id: The id of the node. :rtype: bool :return: `True` if node is an entity, `False` otherwise """ return node_id in self.node_map and isinstance(self.node_map[node_id], Agent)
[docs] def is_activity(self, node_id: UUID) -> bool: """Checks whether a node is an activity. :param UUD node_id: The id of the node. :rtype: bool :return: `True` if node is an activity, `False` otherwise """ return node_id in self.node_map and isinstance(self.node_map[node_id], Activity)
[docs] def are_dependencies_are_forming_a_cycle(self, dependencies: Dict) -> bool: """Checks whether the new dependencies lead to a cycle in the provenance graph. :param Dict dependencies: A Dictionary with the new dependencies. :rtype bool: :return: `True` if changes leads to a cycle; `False` otherwise """ original_copy = self.graph.copy() new_graph = DiGraph() new_graph.add_nodes_from(original_copy.nodes) new_graph.add_edges_from(original_copy.edges) for edge in dependencies: source_id = UUID(edge["source"]) target_id = UUID(edge["target"]) new_graph.add_edge(source_id, target_id) return has_cycle(new_graph)
[docs] def propagate_visibility_information(self, node_id:UUID, hide_node:bool=False): """ Hides/Unhides all from a given node onwards. :param UUID node_id: :param bool hide_node: """ if hide_node: affected_nodes = self._find_nodes_to_hide(node_id) for node in affected_nodes: self.hidden_nodes.add(node) self.visibility_affected_nodes[node_id] = affected_nodes else: affected_nodes = self.visibility_affected_nodes.get(node_id, []) \ + list(self.graph.successors(node_id)) \ + [node_id] \ + list(self.graph.predecessors(node_id)) for node in filter(lambda candidate: candidate in self.hidden_nodes, affected_nodes): self.hidden_nodes.remove(node) if node_id in self.visibility_affected_nodes: del self.visibility_affected_nodes[node_id]
def _find_nodes_to_hide(self, node_id): nodes = [] copy_graph = self.graph.copy() tree = bfs_tree(self.graph, node_id, True) for node in tree.nodes: nodes.append(node) copy_graph.remove_nodes_from(tree.nodes) for node in copy_graph: if copy_graph.degree[node] == 0: nodes.append(node) return nodes