Source code for distro_tracker.core.utils.datastructures
# Copyright 2013 The Distro Tracker Developers
# See the COPYRIGHT file at the top-level directory of this distribution and
# at https://deb.li/DTAuthors
#
# This file is part of Distro Tracker. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
# including this file, may be copied, modified, propagated, or distributed
# except according to the terms contained in the LICENSE file.
"""Utility data structures for Distro Tracker."""
from collections import deque
from copy import deepcopy
[docs]class InvalidDAGException(Exception):
pass
[docs]class DAG(object):
"""
A class representing a Directed Acyclic Graph.
Allows clients to build up a DAG where the nodes of the graph are any type
of object which can be placed in a dictionary.
"""
[docs] class Node(object):
def __init__(self, id, original):
self.id = id
self.original = original
def __hash__(self):
return self.id.__hash__()
def __eq__(self, other):
return self.id == other.id
def __init__(self):
"""
Instantiates an empty DAG.
"""
#: Maps original node objects to their internal representation
self.nodes_map = {}
#: Represents the graph structure of the DAG as an adjacency list
self.graph = {}
#: Holds the in-degree of each node to allow constant-time lookups
#: instead of iterating through all nodes in the graph.
self.in_degree = {}
#: Represents the last id given to an inserted node.
self._last_id = 0
def _next_id(self):
"""
A private helper method which returns the next ID which can be given to
a node being inserted in the DAG.
"""
# NOTE: Not thread safe.
self._last_id += 1
return self._last_id
@property
def all_nodes(self):
"""
Returns a list of all nodes in the DAG.
"""
return list(self.nodes_map.keys())
[docs] def add_node(self, node):
"""
Adds a new node to the graph.
"""
dag_node = DAG.Node(self._next_id(), node)
self.nodes_map[node] = dag_node
self.in_degree[dag_node.id] = 0
self.graph[dag_node.id] = []
[docs] def replace_node(self, original_node, replacement_node):
"""
Replaces a node already present in the graph ``original_node`` by a new
object.
The internal representation of the DAG remains the same, except the new
object now takes the place of the original one.
"""
node = self.nodes_map[original_node]
del self.nodes_map[original_node]
node.original = replacement_node
self.nodes_map[replacement_node] = node
[docs] def remove_node(self, node):
"""
Removes a given node from the graph.
The ``node`` parameter can be either the internal Node type or the
node as the client sees them.
"""
if not isinstance(node, DAG.Node):
# Try to map it to a DAG Node
node = self.nodes_map[node]
node_to_remove = node
# Update the in degrees of its dependents
for node in self.graph[node_to_remove.id]:
self.in_degree[node] -= 1
# Finally remove it:
# From node mappings
del self.nodes_map[node_to_remove.original]
# From the graph
for dependent_nodes in self.graph.values():
if node_to_remove.id in dependent_nodes:
dependent_nodes.remove(node_to_remove.id)
del self.graph[node_to_remove.id]
# And the in-degree counter
del self.in_degree[node_to_remove.id]
[docs] def add_edge(self, node1, node2):
"""
Adds an edge between two nodes.
:raises InvalidDAGException: If the edge would introduce a cycle in the
graph structure.
"""
# Check for a cycle
if node1 in self.nodes_reachable_from(node2):
raise InvalidDAGException(
"Cannot add edge since it would create a cycle in the graph.")
# When everything is ok, create the new link
node1 = self.nodes_map[node1]
node2 = self.nodes_map[node2]
# If an edge already exists, adding it again does nothing
if node2.id not in self.graph[node1.id]:
self.graph[node1.id].append(node2.id)
self.in_degree[node2.id] += 1
def _get_node_with_no_dependencies(self):
"""
Returns an internal node which has no dependencies, i.e. that has an
in-degree of 0.
If there are multiple such nodes, it is not defined which one of them
them is returned.
"""
for node in self.nodes_map.values():
if self.in_degree[node.id] == 0:
return node
# If no node with a zero in-degree can be found, the graph is not a DAG
# NOTE: If edges are always added using the `add_edge` method, this
# will never happen since the cycle would be caught at that point
raise InvalidDAGException("The graph contains a cycle.")
[docs] def dependent_nodes(self, node):
"""
Returns all nodes which are directly dependent on the given node, i.e.
returns a set of all nodes N where there exists an edge(node, N) in the
DAG.
"""
node = self.nodes_map[node]
id_to_node_map = {
node.id: node
for node in self.nodes_map.values()
}
return [
id_to_node_map[dependent_node_id].original
for dependent_node_id in self.graph[node.id]
]
[docs] def topsort_nodes(self):
"""
Generator which returns DAG nodes in toplogical sort order.
"""
# Save the original nodes structure
original_nodes_map = deepcopy(self.nodes_map)
original_in_degree = deepcopy(self.in_degree)
original_graph = deepcopy(self.graph)
while len(self.nodes_map):
# Find a node with a 0 in degree
node = self._get_node_with_no_dependencies()
# We yield instances of the original node added to the graph, not
# DAG.Node as that is what clients expect.
yield node.original
# Remove this node from the graph and update the in-degrees
self.remove_node(node)
# Restore the original nodes structure so that a top sort is not a
# destructive operation
self.nodes_map = original_nodes_map
self.in_degree = original_in_degree
self.graph = original_graph
[docs] def nodes_reachable_from(self, node):
"""
Returns a set of all nodes reachable from the given node.
"""
node = self.nodes_map[node]
# Implemented in terms of a BFS to avoid recursion
queue = deque([node])
reachable_nodes = []
visited = set()
visited.add(node.id)
id_to_node_map = {
node.id: node
for node in self.nodes_map.values()
}
while len(queue):
current_node = queue.popleft()
for successor_id in self.graph[current_node.id]:
if successor_id not in visited:
visited.add(successor_id)
successor = id_to_node_map[successor_id]
queue.append(successor)
reachable_nodes.append(successor.original)
return set(reachable_nodes)