import random
import uuid
from dataclasses import dataclass, field, asdict
from typing import List, Tuple, Set, Dict
from uuid import UUID
from networkx import DiGraph, set_node_attributes, bfs_tree
from networkx.algorithms.dag import has_cycle
random.seed("simprov")
def _random_uuid():
return uuid.UUID(bytes=bytes(random.getrandbits(8) for _ in range(16)), version=4)
[docs]
@dataclass
class Activity:
""" Represents a provenance activity.
:ivar UUID ivar:
The unique identifier of the activity.
:ivar str name:
The name of the activity, e.g., "Specifying Simulation Model", or "Executing Simulation Experiment".
:ivar List[Entity] used_entities:
A list of all entities that were used by the activity.
:ivar List[Entity] generated_entities:
A list of all entities that were generated by the activity.
"""
name: str
used_entities: List['Entity'] = field(default_factory=list)
generated_entities: List['Entity'] = field(default_factory=list)
associated_agents: List['Agent'] = field(default_factory=list)
id: UUID = field(default_factory=_random_uuid)
user_generated_edges: list = field(default_factory=list)
@property
def entities(self) -> List['Entity']:
""" Returns a list of all the entities that were used and generated by the activitiy.
:rtype: List[Entity]
:return: The list of all entities.
"""
return self.used_entities + self.generated_entities
[docs]
def todict(self) -> dict:
"""Returns all activity attributes as a dictionary.
It also sets the key `type` to be "Activity" in the resulting dictionary.
:rtype: Dict
:return:
The node attributes.
"""
entity_dict = asdict(self)
entity_dict["type"] = "Activity"
return entity_dict
[docs]
@dataclass
class Entity:
""" Represents a provenance entity.
:ivar UUID ivar:
The unique identifier of the entity.
:ivar str name:
The name of the entity, e.g., simulation model or simulation experiment.
:ivar Dict attributes:
Holds all the attribute values of an entity, e.g., a file path and a specification.
:ivar Tuple primary_key:
The primary key.
:ivar Dict meta_information:
The meta information.
"""
name: str
primary_key: Tuple | None = None
attributes: dict = field(default_factory=dict)
id: UUID = field(default_factory=_random_uuid)
meta_information: dict = field(default_factory=dict)
[docs]
def todict(self):
"""Returns all entities attributes as a dictionary.
It also sets the key `type` to be "Entity" in the resulting dictionary.
:rtype: Dict
:return:
The node attributes.
"""
entity_dict = asdict(self)
entity_dict["type"] = "Entity"
return entity_dict
def __setitem__(self, key, value):
self.attributes[key] = value
def __getitem__(self, key):
return self.attributes[key]
@dataclass
class Agent:
""" Represents a provenance agent.
:ivar UUID ivar:
The unique identifier of the agent.
:ivar str name:
The name of the entity, e.g., simulator or python environment.
:ivar Dict attributes:
Holds all the attribute values of an entity, e.g., a file path and a specification.
:ivar Tuple primary_key:
The primary key.
:ivar Dict meta_information:
The meta information.
"""
name: str
primary_key: Tuple | None = None
attributes: dict = field(default_factory=dict)
id: UUID = field(default_factory=_random_uuid)
meta_information: dict = field(default_factory=dict)
def todict(self):
"""Returns all entities attributes as a dictionary.
It also sets the key `type` to be "Agent" in the resulting dictionary.
:rtype: Dict
:return:
The node attributes.
"""
entity_dict = asdict(self)
entity_dict["type"] = "Agent"
return entity_dict
def __setitem__(self, key, value):
self.attributes[key] = value
def __getitem__(self, key):
return self.attributes[key]
[docs]
class ProvenanceGraph:
"""Represents a provenance graph.
It only consists of activities and entities.
Agents are not supported right now.
:ivar DiGraph graph:
The graph holding all the entities and activities.
:ivar Dict[Tuple,Entity] latest_entities_map:
A mapping from the primary keys of the entities to an entity instance.
:ivar Dict[UUID,Union[Entity,Activity]] node_map:
A mapping from the node ids to the corresponding entities and activities.
:ivar Set user_generated_dependencies:
A set tracking the dependencies generated by the user.
"""
def __init__(self) -> None:
super().__init__()
self.graph: DiGraph = DiGraph()
self.last_entities_map: Dict = {}
self.last_agents_map: Dict = {}
self.node_map: Dict = {}
self.user_generated_dependencies: Set = set()
self.hidden_nodes: Set = set()
self.visibility_affected_nodes = {}
self.splitted_agents_table:Dict = {}
[docs]
def chain_provenance_activity(self, activity: Activity):
"""Chains an activity with the provenance graph.
It tries to replace the entities used by the activity with the "latest" version of the entities.
If an entity can not be resolved it uses the original entity.
Finally, the activity is added to the provenance graph.
:param Activity activity:
The activity.
"""
chained_used_entities = []
for used_entity in activity.used_entities:
if used_entity.primary_key in self.last_entities_map:
chained_used_entities.append(self.last_entities_map[used_entity.primary_key])
else:
chained_used_entities.append(used_entity)
chained_agents = []
for associated_agent in activity.associated_agents:
if associated_agent.primary_key in self.last_agents_map:
chained_agents.append(self.last_agents_map[associated_agent.primary_key])
else:
chained_agents.append(associated_agent)
activity.used_entities = chained_used_entities
activity.associated_agents = chained_agents
self.add_activity(activity)
[docs]
def add_activity(self, activity: Activity):
"""Adds an activity to the provenance graph.
:param Activity activity:
The activity.
"""
self.node_map[activity.id] = activity
self.graph.add_node(activity.id, **activity.todict())
for used_entity in activity.used_entities:
if used_entity.id not in self.node_map:
self.add_entity(used_entity)
else:
self.last_entities_map[used_entity.primary_key] = used_entity
self.graph.add_edge(activity.id, used_entity.id)
for generated_entity in activity.generated_entities:
if generated_entity.id not in self.node_map:
self.add_entity(generated_entity)
else:
self.last_entities_map[generated_entity.primary_key] = generated_entity
self.graph.add_edge(generated_entity.id, activity.id)
for associated_agent in activity.associated_agents:
if associated_agent.id not in self.node_map:
self.add_agent(associated_agent)
else:
self.last_entities_map[associated_agent.primary_key] = associated_agent
self.graph.add_edge(activity.id, associated_agent.id)
[docs]
def add_entity(self, entity: Entity):
"""Adds an entitiy to the provenacne graph.
:param Entity entity:
The entity.
"""
self.last_entities_map[entity.primary_key] = entity
self.node_map[entity.id] = entity
self.graph.add_node(entity.id, **entity.todict())
[docs]
def add_agent(self, agent: Agent):
"""Adds an agent to the provenacne graph.
:param Agent agent:
The entity.
"""
self.last_agents_map[agent.primary_key] = agent
self.node_map[agent.id] = agent
self.graph.add_node(agent.id, **agent.todict())
@property
def entities(self) -> List[Entity]:
"""The list of all entities in the provenance graph.
"""
entities = []
for node in self.graph.nodes:
node_data = self.graph.nodes[node]
if node_data["type"] == "Entity":
entities.append(self.node_map[node_data["id"]])
return entities
@property
def activities(self) -> List[Activity]:
"""A list of all activities in the provenance graph."""
activities = []
for node in self.graph.nodes:
node_data = self.graph.nodes[node]
if node_data["type"] == "Activity":
activities.append(self.node_map[node_data["id"]])
return activities
[docs]
def update_entity_attributes(self, entity_id: UUID, changes: Dict):
"""Updates the attributes of an entity.
:param UUID entity_id:
The id of the entity.
:param Dict changes:
A dictionary of attribute-value mapping.
"""
node: Entity = self.node_map[entity_id]
for (changed_attribute, value) in changes.items():
node.attributes[changed_attribute] = value
set_node_attributes(self.graph, {entity_id: value}, changed_attribute)
self.graph.nodes[entity_id][changed_attribute] = value
[docs]
def update_activity_dependencies(self, activity_id: UUID, changes: Dict):
"""Updates the dependencies of an activity.
:param UUID activity_id:
The id of the activity.
:param Dict changes:
A dictionary of the changes.
"""
node: Activity = self.node_map[activity_id]
for dependency in changes:
from_uuid = UUID(dependency["source"])
to_uuid = UUID(dependency["target"])
if dependency.get("user-generated", False):
self.add_dependency(from_uuid, to_uuid)
self.user_generated_dependencies.add((from_uuid, to_uuid))
self.graph.add_edge(from_uuid, to_uuid)
if activity_id == from_uuid:
node.user_generated_edges.append(to_uuid)
else:
node.user_generated_edges.append(from_uuid)
if dependency.get("user-removed", False):
self.remove_dependency(from_uuid, to_uuid)
self.user_generated_dependencies.remove((from_uuid, to_uuid))
self.graph.remove_edge(from_uuid, to_uuid)
if activity_id == from_uuid:
node.user_generated_edges.remove(to_uuid)
else:
node.user_generated_edges.remove(from_uuid)
[docs]
def add_dependency(self, source_node_id: UUID, target_node_id: UUID):
"""Adds a depdency between two nodes.
:param UUID source_node_id:
The id of the source node.
:param UUID target_node_id:
The id of the target node.
"""
from_node = self.node_map[source_node_id]
to_node = self.node_map[target_node_id]
if isinstance(from_node, Activity) and isinstance(to_node, Entity):
if to_node not in from_node.used_entities:
from_node.used_entities.append(to_node)
if isinstance(from_node, Entity) and isinstance(to_node, Activity):
if from_node not in to_node.generated_entities:
to_node.generated_entities.append(from_node)
[docs]
def remove_dependency(self, source_node_id: UUID, target_node_id: UUID):
"""Removes a depdency between two nodes.
:param UUID source_node_id:
The id of the source node.
:param UUID target_node_id:
The id of the target node.
"""
from_node = self.node_map[source_node_id]
to_node = self.node_map[target_node_id]
if isinstance(from_node, Activity) and isinstance(to_node, Entity):
from_node.used_entities.remove(to_node)
if isinstance(from_node, Entity) and isinstance(to_node, Activity):
to_node.generated_entities.remove(from_node)
[docs]
def cytoscape_data(self) -> List[Dict]:
"""Returns the provenance graph for Cytoscape.
:rtype: List[Dict]
:returns: Cytoscape-Representation of the graph.
"""
elements = []
for node in self.graph.nodes:
node_data = self.graph.nodes[node]
node_data["hidden"] = True if node in self.hidden_nodes else False
node_element = {"group": "nodes", "data": node_data}
elements.append(node_element)
for (source_id, target_id) in self.graph.edges:
was_user_generated = True if (source_id, target_id) in self.user_generated_dependencies else False
edge_data = {"source": source_id, "target": target_id,
"user-generated": was_user_generated}
edge_element = {"group": "edges", "data": edge_data}
elements.append(edge_element)
return elements
[docs]
def node_data(self, node_id: UUID) -> dict:
""" Returns the data of a node.
:param UUID node_id:
The id of the node.
:rtype: Dict
:return: The node attributes.
"""
base = self.node_map.get(node_id,None)
if base is None:
base = self.splitted_agents_table[node_id]
return base.todict()
[docs]
def is_entity(self, node_id: UUID) -> bool:
"""Checks whether a node is an entity.
:param UUD node_id:
The id of the node.
:rtype: bool
:return: `True` if node is an entity, `False` otherwise
"""
return node_id in self.node_map and isinstance(self.node_map[node_id], Entity)
[docs]
def is_agent(self, node_id: UUID) -> bool:
"""Checks whether a node is an agent.
:param UUD node_id:
The id of the node.
:rtype: bool
:return: `True` if node is an entity, `False` otherwise
"""
return node_id in self.node_map and isinstance(self.node_map[node_id], Agent)
[docs]
def is_activity(self, node_id: UUID) -> bool:
"""Checks whether a node is an activity.
:param UUD node_id:
The id of the node.
:rtype: bool
:return: `True` if node is an activity, `False` otherwise
"""
return node_id in self.node_map and isinstance(self.node_map[node_id], Activity)
def _find_nodes_to_hide(self, node_id):
nodes = []
copy_graph = self.graph.copy()
tree = bfs_tree(self.graph, node_id, True)
for node in tree.nodes:
nodes.append(node)
copy_graph.remove_nodes_from(tree.nodes)
for node in copy_graph:
if copy_graph.degree[node] == 0:
nodes.append(node)
return nodes