"""
ggsolver: graph.py
License goes here...
"""
import json
import os
import pickle
from functools import reduce
import networkx as nx
from ggsolver import util
class IGraph:
"""
Graph interface. A graphical model must implement a IGraph interface.
"""
def __init__(self):
self._graph = None
self._node_properties = dict()
self._edge_properties = dict()
self._graph_properties = dict()
def __getitem__(self, pname):
if pname in self._node_properties:
return self._node_properties[pname]
elif pname in self._edge_properties:
return self._edge_properties[pname]
elif pname in self._graph_properties:
return self._graph_properties[pname]
else:
raise KeyError(f"{pname} is not a valid node/edge/graph property.")
def __setitem__(self, pname, pmap):
# Check if the property exists
if pname in self._node_properties.keys():
assert isinstance(pmap, NodePropertyMap)
if pname in self._edge_properties.keys():
assert isinstance(pmap, EdgePropertyMap)
# Update property value
if isinstance(pmap, NodePropertyMap):
pmap.graph = self
self._node_properties[pname] = pmap
elif isinstance(pmap, EdgePropertyMap):
pmap.graph = self
self._edge_properties[pname] = pmap
else:
self._graph_properties[pname] = pmap
@property
def node_properties(self):
""" Returns the node properties as a dictionary of {"property name": NodePropertyMap object}. """
return self._node_properties
@property
def edge_properties(self):
""" Returns the edge properties as a dictionary of {"property name": EdgePropertyMap object}. """
return self._edge_properties
@property
def graph_properties(self):
""" Returns the graph properties as a dictionary of {"property name": property value}. """
return self._graph_properties
def add_node(self):
pass
def add_nodes(self, num_nodes):
pass
def add_edge(self, uid, vid):
pass
def add_edges(self, edges):
""" (uid, vid) pairs """
pass
def rem_node(self, uid):
pass
def rem_edge(self, uid, vid, key):
pass
def has_node(self, uid):
pass
def has_edge(self, uid, vid, key=None):
pass
def nodes(self):
pass
def edges(self):
pass
def successors(self, uid):
pass
def predecessors(self, uid):
pass
def neighbors(self, uid):
pass
def ancestors(self, uid):
pass
def descendants(self, uid):
pass
def in_edges(self, uid):
pass
def out_edges(self, uid):
pass
def number_of_nodes(self):
pass
def number_of_edges(self):
pass
def clear(self):
pass
def serialize(self):
pass
def deserialize(self, obj_dict):
pass
def has_property(self, pname):
if pname in self._node_properties or pname in self._edge_properties or pname in self._graph_properties:
return True
return False
def save(self, fpath, overwrite=False, protocol="json"):
pass
def load(self, fpath, protocol="json"):
pass
[docs]class NodePropertyMap(dict):
"""
Implements a default dictionary that maps a node ID to its property value. To store data efficiently,
only the non-default values are stored in the dictionary.
Raises an error if the node ID is invalid.
"""
[docs] def __init__(self, graph, default=None):
super(NodePropertyMap, self).__init__()
self.graph = graph
self.default = default
def __repr__(self):
return f"<NodePropertyMap graph={repr(self.graph)}>"
def __missing__(self, node):
if self.graph.has_node(node):
return self.default
raise ValueError(f"[ERROR] NodePropertyMap.__missing__:: {repr(self.graph)} does not contain node {node}.")
def __getitem__(self, node):
try:
return super(NodePropertyMap, self).__getitem__(node)
except KeyError:
return self.__missing__(node)
def __setitem__(self, node, value):
assert self.graph.has_node(node), f"Node {node} not in {self.graph}."
if value != self.default:
super(NodePropertyMap, self).__setitem__(node, value)
def serialize(self):
return {
"default": self.default,
"dict": {k: v for k, v in self.items()}
}
def deserialize(self, obj_dict):
self.clear()
self.default = obj_dict["default"]
# Explicitly deserialize to ensure all keys are valid nodes.
for k, v in obj_dict["dict"].items():
self[int(k)] = v
[docs]class EdgePropertyMap(dict):
"""
Implements a default dictionary that maps an edge (uid, vid, key) to its property value. To store data efficiently,
only the non-default values are stored in the dictionary.
Raises an error if the edge (uid, vid, key) is invalid.
"""
[docs] def __init__(self, graph, default=None):
super(EdgePropertyMap, self).__init__()
self.graph = graph
self.default = default
def __repr__(self):
return f"<EdgePropertyMap graph={repr(self.graph)}>"
def __missing__(self, edge):
if self.graph.has_edge(*edge):
return self.default
raise ValueError(f"[ERROR] EdgePropertyMap.__missing__:: {repr(self.graph)} does not contain node {edge}.")
def __getitem__(self, edge):
try:
return dict.__getitem__(self, edge)
except KeyError:
return self.__missing__(edge)
def __setitem__(self, node, value):
if value != self.default:
super(EdgePropertyMap, self).__setitem__(node, value)
def serialize(self):
return {
"default": self.default,
"dict": [{"edge": edge, "pvalue": pvalue} for edge, pvalue in self.items()]
}
def deserialize(self, obj_dict):
self.clear()
self.default = obj_dict["default"]
# Explicitly deserialize to ensure all keys are valid edges.
for item in obj_dict["dict"]:
self[tuple(item["edge"])] = item["pvalue"]
[docs]class Graph(IGraph):
"""
A MultiDiGraph class represented as a 5-tuple (nodes, edges, node_properties, edge_properties, graph_properties).
In addition, the graph implements a serialization protocol, save-load and drawing functionality.
"""
[docs] def __init__(self):
super(Graph, self).__init__()
self._graph = nx.MultiDiGraph()
def __str__(self):
return f"<Graph with |V|={self.number_of_nodes()}, |E|={self.number_of_edges()}>"
def base_graph(self):
return self._graph
[docs] def add_node(self):
"""
Adds a new node to the graph.
:warning: Duplication is NOT checked.
:return: (int) ID of the added node.
"""
uid = self._graph.number_of_nodes()
self._graph.add_node(uid)
return uid
[docs] def add_nodes(self, num_nodes):
"""
Adds multiple nodes to the graph.
:warning: Duplication is NOT checked.
:param num_nodes: (int) Number of nodes to be added.
:return: (list) IDs of added nodes.
"""
return [self.add_node() for _ in range(num_nodes)]
[docs] def add_edge(self, uid, vid):
"""
Adds a new edge between the give nodes.
:warning: Duplication is NOT checked. Hence, calling the function twice adds two parallel edges between
the same nodes.
:return: (int) Key of the added edge. Key = 0 means the first edge was added between the given nodes.
If Key = k, then (k+1)-th edge was added.
"""
return self._graph.add_edge(uid, vid)
[docs] def add_edges(self, edges):
"""
Adds multiple edge between the give nodes.
:warning: Duplication is NOT checked. Hence, calling the function twice adds two parallel edges between
the same nodes.
:return: (int) Key of the added edge. Key = 0 means the first edge was added between the given nodes.
If Key = k, then (k+1)-th edge was added.
"""
return [self.add_edge(uid, vid) for uid, vid in edges]
[docs] def rem_node(self, uid):
"""
Removal of nodes is NOT supported. Use filtering instead.
"""
raise NotImplementedError("Removal of nodes is not supported. Use SubGraph instead.")
[docs] def rem_edge(self, uid, vid, key):
"""
Removal of edges is NOT supported. Use filtering instead.
"""
raise NotImplementedError("Removal of nodes is not supported. Use SubGraph instead.")
[docs] def has_node(self, uid):
"""
Checks whether the graph has the given node or not.
:param uid: (int) Node ID to be checked for containment.
:return: (bool) True if given node is in the graph, else False.
"""
return self._graph.has_node(uid)
[docs] def has_edge(self, uid, vid, key=None):
"""
Checks whether the graph has the given edge or not.
:param uid: (int) Source node ID.
:param vid: (int) Target node ID.
:param key: If provided, checks whether the edge (u, v, k) is in the graph or not. Otherwise, checks if there
exists an edge between nodes represented by uid and vid.
:type key: int, optional
:return: (bool) True if given edge is in the graph, else False.
"""
return self._graph.has_edge(uid, vid, key)
[docs] def nodes(self):
"""
List of all nodes in the graph.
"""
return list(self._graph.nodes())
[docs] def edges(self):
"""
List of all edges in the graph. Each edge is represented as a 3-tuple (uid, vid, key).
"""
return list(self._graph.edges(keys=True))
[docs] def successors(self, uid):
"""
List of all successors of the node represented by uid.
"""
return list(self._graph.successors(uid))
[docs] def predecessors(self, uid):
"""
List of all predecessors of the node represented by uid.
"""
return list(self._graph.predecessors(uid))
[docs] def neighbors(self, uid):
"""
List of all (in and out) neighbors of the node represented by uid.
"""
return list(self._graph.neighbors(uid))
[docs] def ancestors(self, uid):
"""
List of all nodes from which the node represented by uid is reachable.
"""
return list(nx.ancestors(self._graph, uid))
[docs] def descendants(self, uid):
"""
List of all nodes that can be reached from the node represented by uid.
"""
return list(nx.descendants(self._graph, uid))
[docs] def in_edges(self, uid):
"""
List of all in edges to the node represented by uid.
"""
return self._graph.in_edges(uid, keys=True)
def is_isomorphic_to(self, other: 'Graph'):
"""
Checks if the graph is isomorphic to the `other` graph.
:param other: (:class:`Graph` object) Graph to be checked for isomorphism with current graph.
:return: (bool) `True`, if graphs are isomorphic. Else, `False`.
"""
return nx.is_isomorphic(self._graph, other._graph)
[docs] def out_edges(self, uid):
"""
List of all out edges from the node represented by uid.
"""
return self._graph.out_edges(uid, keys=True)
[docs] def number_of_nodes(self):
"""
The number of nodes in the graph.
"""
return self._graph.number_of_nodes()
[docs] def number_of_edges(self):
"""
The number of edges in the graph.
"""
return self._graph.number_of_edges()
[docs] def clear(self):
"""
Clears all nodes, edges and the node, edge and graph properties.
"""
self._graph.clear()
self._node_properties = dict()
self._edge_properties = dict()
self._graph_properties = dict()
[docs] def serialize(self):
"""
Serializes the graph into a dictionary with the following format::
{
"graph": {
"nodes": <number of nodes>,
"edges": {
uid: {vid: key},
...
}
"node_properties": {
"property_name": {
"default": <value>,
"dict": {
"uid": <property value>,
...
}
},
...
},
"edge_properties": {
"property_name": {
"default": <value>,
"dict": [{"edge": [uid, vid, key], "pvalue": <property value>} ...]
},
...
},
"graph_properties": {
"property_name": <value>,
...
}
}
}
:return: (dict) Serialized graph
"""
# Initialize a graph dictionary
graph = dict()
# Add nodes
graph["nodes"] = self.number_of_nodes()
# Add edges
graph["edges"] = dict()
for uid in range(self.number_of_nodes()):
successors = list(self.successors(uid))
if len(list(successors)) == 0:
continue
graph["edges"][uid] = dict()
for vid in successors:
graph["edges"][uid].update({vid: self._graph.number_of_edges(uid, vid)})
# Add node properties
# graph["node_properties"] = self._node_properties
# graph["edge_properties"] = {
# prop_name: [
# {
# "edge": edge,
# "pvalue": pvalue
# }
# for edge, pvalue in prop_value.items()
# ]
# for prop_name, prop_value in self._edge_properties.items()
# }
graph["node_properties"] = {p_name: prop.serialize() for p_name, prop in self._node_properties.items()}
graph["edge_properties"] = {p_name: prop.serialize() for p_name, prop in self._edge_properties.items()}
graph["graph_properties"] = self._graph_properties
# # Warn about any properties that were ignored.
# ignored_attr = set(self.__dict__.keys()) - set(self._graph_properties.keys())
# print(util.BColors.WARNING, f"[WARN] Attributes {ignored_attr} were not serialized because they are not "
# f"node/edge/graph properties.", util.BColors.ENDC)
# TODO. Add metadata such as time of serialization, serializer version etc.
obj_dict = {"graph": graph}
# Return serialized object
return obj_dict
[docs] @classmethod
def deserialize(cls, obj_dict):
"""
Constructs a graph from a serialized graph object. The format is described in :py:meth:`Graph.serialize`.
:return: (Graph) A new :class:`Graph` object..
"""
# Instantiate new object
obj = cls()
# Get serialized graph object
graph_dict = obj_dict["graph"]
# Add nodes
obj.add_nodes(num_nodes=int(graph_dict["nodes"]))
# Add edges
edges = graph_dict["edges"]
for uid in edges:
for vid in edges[uid]:
for key in range(edges[uid][vid]):
obj._graph.add_edge(int(uid), int(vid), key=int(key))
# Add properties
for node_prop, np_value in graph_dict["node_properties"].items():
np_map = NodePropertyMap(graph=obj)
# np_map.update({int(k): v for k, v in np_value.items()})
np_map.deserialize(np_value)
obj[node_prop] = np_map
for graph_prop, gp_value in graph_dict["graph_properties"].items():
obj[graph_prop] = gp_value
for edge_prop, ep_value in graph_dict["edge_properties"].items():
ep_map = EdgePropertyMap(graph=obj)
ep_map.deserialize(ep_value)
obj[edge_prop] = ep_map
# Return constructed object
return obj
[docs] def save(self, fpath, overwrite=False, protocol="json"):
"""
Saves the graph to file.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param overwrite: (bool) Specifies whether to overwrite the file, if it exists. [Default: False]
:param protocol: (str) The protocol to use to save the file. Options: {"json" [Default], "pickle"}.
.. note:: Pickle protocol is not tested.
"""
if not overwrite and os.path.exists(fpath):
raise FileExistsError("File already exists. To overwrite, call Graph.save(..., overwrite=True).")
graph_dict = self.serialize()
if protocol == "json":
with open(fpath, "w") as file:
json.dump(graph_dict, file, indent=2)
elif protocol == "pickle":
with open(fpath, "wb") as file:
pickle.dump(graph_dict, file)
else:
raise ValueError(f"Graph.save() does not support '{protocol}' protocol. One of ['json', 'pickle'] expected")
[docs] @classmethod
def load(cls, fpath, protocol="json"):
"""
Loads the graph from file.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param protocol: (str) The protocol to use to save the file. Options: {"json" [Default], "pickle"}.
.. note:: Pickle protocol is not tested.
"""
if not os.path.exists(fpath):
raise FileNotFoundError("File does not exist.")
if protocol == "json":
with open(fpath, "r") as file:
obj_dict = json.load(file)
graph = cls.deserialize(obj_dict)
elif protocol == "pickle":
with open(fpath, "rb") as file:
obj_dict = pickle.load(file)
graph = cls.deserialize(obj_dict)
else:
raise ValueError(f"Graph.load() does not support '{protocol}' protocol. One of ['json', 'pickle'] expected")
return graph
[docs] def to_png(self, fpath, nlabel=None, elabel=None):
"""
Generates a PNG image of the graph.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param nlabel: (list of str) Specifies the node properties to use to annotate a node in image.
:param elabel: (list of str) Specifies the edge properties to use to annotate an edge in image.
:warning: If the node labels are not unique, the generated figure may contain 0, 1, 2, ...
that avoid duplication.
"""
max_nodes = 500
if self._graph.number_of_nodes() > max_nodes:
raise ValueError(f"Cannot draw a graph with more than {max_nodes} nodes.")
g = self._graph
# If node properties to displayed are specified, process them.
if nlabel is not None:
g = nx.MultiDiGraph()
# If more than one property is selected, then display as tuple.
if len(nlabel) == 1:
node_state_map = {n: self[prop][n] for prop in nlabel for n in self._graph.nodes()}
else:
node_state_map = {n: tuple(self[prop][n] for prop in nlabel) for n in self._graph.nodes()}
# Add nodes to dummy graph
for n in node_state_map.values():
g.add_node(str(n))
# If edge labels to be displayed are specified, process them.
if elabel is not None:
for u, v, k in self._graph.edges(keys=True):
if len(elabel) == 1:
g.add_edge(str(node_state_map[u]), str(node_state_map[v]),
label=self[elabel[0]][(u, v, k)])
else:
g.add_edge(str(node_state_map[u]), str(node_state_map[v]),
label=tuple(self[prop][(u, v, k)] for prop in elabel))
else:
for u, v, k in self._graph.edges(keys=True):
g.add_edge(str(node_state_map[u]), str(node_state_map[v]))
dot_graph = nx.nx_agraph.to_agraph(g)
dot_graph.layout("dot")
dot_graph.draw(fpath)
[docs] def is_isomorphic_to(self, other: 'Graph'):
"""
Checks if the graph is isomorphic to the `other` graph.
:param other: (:class:`Graph` object) Graph to be checked for isomorphism with current graph.
:return: (bool) `True`, if graphs are isomorphic. Else, `False`.
"""
return nx.is_isomorphic(self._graph, other._graph)
def base_graph(self):
return self._graph
def reverse(self):
"""
Return a SubGraph.
"""
return self._graph.reverse(copy=False)
def bfs_layers(self, sources):
return nx.bfs_layers(self._graph, sources)
def reverse_bfs(self, sources):
rev_graph = self._graph.reverse()
reachable_nodes = set(reduce(set.union, list(map(set, nx.bfs_layers(rev_graph, sources)))))
return reachable_nodes
class SubGraph(Graph):
"""
A MultiDiGraph class represented as a 5-tuple (nodes, edges, node_properties, edge_properties, graph_properties).
In addition, the graph implements a serialization protocol, save-load and drawing functionality.
"""
def __init__(self, graph, hidden_nodes=None, hidden_edges=None):
super(SubGraph, self).__init__()
# Internal representation
self._base_graph = graph
self._graph = nx.subgraph_view(self._base_graph.base_graph(), self.is_node_visible, self.is_edge_visible)
# self._hidden_nodes = set() if hidden_nodes is None else set(hidden_nodes)
# self._hidden_edges = set() if hidden_edges is None else set(hidden_edges)
self._hidden_nodes = NodePropertyMap(self._base_graph, default=False)
self._hidden_edges = EdgePropertyMap(self._base_graph, default=False)
self._base_graph["hidden_nodes"] = self._hidden_nodes
self._base_graph["hidden_edges"] = self._hidden_edges
# Initialize hidden nodes and edges
if hidden_nodes is not None:
for uid in hidden_nodes:
self._hidden_nodes[uid] = True
if hidden_edges is not None:
for edge in hidden_edges:
self._hidden_edges[edge] = True
# Map node, edge and graph properties
self._node_properties = graph._node_properties
self._edge_properties = graph._edge_properties
self._graph_properties = graph._graph_properties
def __str__(self):
return f"<SubGraph of {self._graph}>"
def is_node_visible(self, uid):
"""
Is the node included in the subgraph?
"""
return not self._hidden_nodes[uid]
def is_edge_visible(self, uid, vid, key):
"""
Is the node included in the subgraph?
"""
return not self._hidden_edges[(uid, vid, key)]
def hide_node(self, uid):
"""
Removes the node from subgraph.
Raises error if `uid` is not in base graph.
If `uid` was hidden then no change is made.
"""
self._hidden_nodes[uid] = True
def show_node(self, uid):
"""
Adds the node back to subgraph.
Raises error if `uid` is not in base graph.
If `uid` was already visible, then no change is made.
"""
self._hidden_nodes[uid] = False
def hide_nodes(self, ulist):
"""
Removes multiples nodes from subgraph.
"""
map(self.hide_node, ulist)
def show_nodes(self, ulist):
""" Adds multiple nodes to subgraph. """
map(self.show_node, ulist)
def hidden_nodes(self):
""" Gets the list of nodes in base graph that are not in subgraph. """
return [uid for uid, value in self._hidden_nodes.items() if value is True]
def visible_nodes(self):
# return [uid for uid, value in self._hidden_nodes.items() if value is False]
return list(set(self.nodes()) - set(self.hidden_nodes()))
def number_of_visible_nodes(self):
""" Gets the number of nodes in subgraph. """
return self.number_of_nodes() - len(self.hidden_nodes())
def hide_edge(self, uid, vid, key):
""" Removes the edge from subgraph. No changes are made to base graph. """
self._hidden_edges[(uid, vid, key)] = True
def show_edge(self, uid, vid, key):
""" Adds the edge to subgraph. The edge must be a valid edge in base graph. """
self._hidden_edges[(uid, vid, key)] = False
def hide_edges(self, elist):
""" Removes multiple edge from subgraph. No changes are made to base graph. """
map(self.hide_edge, elist)
def show_edges(self, elist):
""" Adds multiple edges to subgraph. The edge must be a valid edge in base graph. """
map(self.show_edge, elist)
def hidden_edges(self):
""" Gets the list of edges from base graph that are not in subgraph. """
return [edge for edge, value in self._hidden_edges.items() if value is True]
def visible_edges(self):
# return [edge for edge, value in self._hidden_edges.items() if value is False]
return list(set(self.edges()) - set(self.hidden_edges()))
def number_of_visible_edges(self):
""" Gets the number of edges in subgraph. """
return self.number_of_edges() - len(self.hidden_edges())
def add_node(self):
"""
Raises error. Nodes cannot be added to subgraph.
See :meth:`SubGraph.hide_nodes` and :meth:`SubGraph.show_nodes`.
"""
raise PermissionError("Cannot add nodes to a SubGraph.")
def add_nodes(self, num_nodes):
"""
Raises error. Nodes cannot be added to subgraph.
See :meth:`SubGraph.hide_nodes` and :meth:`SubGraph.show_nodes`.
"""
raise PermissionError("Cannot add nodes to a SubGraph.")
def add_edge(self, uid, vid):
"""
Raises error. Edges cannot be added to subgraph.
See :meth:`SubGraph.hide_edges` and :meth:`SubGraph.show_edges`.
"""
raise PermissionError("Cannot add edges to a SubGraph.")
def add_edges(self, edges):
"""
Raises error. Edges cannot be added to subgraph.
See :meth:`SubGraph.hide_edges` and :meth:`SubGraph.show_edges`.
"""
raise PermissionError("Cannot add edges to a SubGraph.")
def rem_node(self, uid):
"""
Raises error. Nodes cannot be removed to subgraph.
See :meth:`SubGraph.hide_nodes` and :meth:`SubGraph.show_nodes`.
"""
raise NotImplementedError("Removal of nodes is not supported. Use SubGraph.hide_node() instead.")
def rem_edge(self, uid, vid, key):
"""
Raises error. Edges cannot be removed to subgraph.
See :meth:`SubGraph.hide_edges` and :meth:`SubGraph.show_edges`.
"""
raise NotImplementedError("Removal of nodes is not supported. Use SubGraph instead.")
def has_node(self, uid):
"""
Checks whether the subgraph has the given node or not. Checks whether the node exists and is visible.
:param uid: (int) Node ID to be checked for containment.
:return: (bool) True if given node is in the graph, else False.
"""
return self._graph.has_node(uid)
def has_edge(self, uid, vid, key=None):
"""
Checks whether the graph has the given edge or not. Checks whether the edge exists and is visible.
:param uid: (int) Source node ID.
:param vid: (int) Target node ID.
:param key: If provided, checks whether the edge (u, v, k) is in the graph or not. Otherwise, checks if there
exists an edge between nodes represented by uid and vid.
:type key: int, optional
:return: (bool) True if given edge is in the graph, else False.
"""
return self._graph.has_edge(uid, vid, key)
def nodes(self):
"""
List of all nodes in the **subgraph**.
"""
return list(self._graph.nodes())
def edges(self):
"""
List of all edges in the **subgraph**. Each edge is represented as a 3-tuple (uid, vid, key).
"""
return list(self._graph.edges(keys=True))
def successors(self, uid):
"""
List of all successors of the node represented by uid.
Includes only visible nodes reachable via visible edges.
"""
return list(self._graph.successors(uid))
def predecessors(self, uid):
"""
List of all predecessors of the node represented by uid.
Includes only visible nodes reachable via visible edges.
"""
return list(self._graph.predecessors(uid))
def neighbors(self, uid):
"""
List of all (in and out) neighbors of the node represented by uid.
Includes only visible nodes reachable via visible edges.
"""
return list(self._graph.neighbors(uid))
def ancestors(self, uid):
"""
List of all nodes from which the node represented by uid is reachable.
Includes only visible nodes reachable via visible edges.
"""
return list(nx.ancestors(self._graph, uid))
def descendants(self, uid):
"""
List of all nodes that can be reached from the node represented by uid.
Includes only visible nodes reachable via visible edges.
"""
return list(nx.descendants(self._graph, uid))
def in_edges(self, uid):
"""
List of all in edges to the node represented by uid.
Includes only visible edges.
"""
return self._graph.in_edges(uid, keys=True)
def out_edges(self, uid):
"""
List of all out edges from the node represented by uid.
Includes only visible edges.
"""
return self._graph.out_edges(uid, keys=True)
def number_of_nodes(self):
"""
The number of nodes in the **base** graph.
"""
return self._graph.number_of_nodes()
def number_of_edges(self):
"""
The number of edges in the **base** graph.
"""
return self._graph.number_of_edges()
def clear(self):
"""
Clears all nodes, edges and the node, edge and graph properties.
.. warning:: The function is untested.
# todo
"""
self._graph.clear()
self._node_properties = dict()
self._edge_properties = dict()
self._graph_properties = dict()
def serialize(self):
"""
Serializes the graph into a dictionary with the following format::
{
"graph": {
"nodes": <number of nodes>,
"edges": {
uid: {vid: key},
...
}
"node_properties": {
"property_name": {
"default": <value>,
"dict": {
"uid": <property value>,
...
}
},
...
},
"edge_properties": {
"property_name": {
"default": <value>,
"dict": [{"edge": [uid, vid, key], "pvalue": <property value>} ...]
},
...
},
"graph_properties": {
"property_name": <value>,
...
}
}
}
:return: (dict) Serialized graph
"""
obj_dict = self._base_graph.serialize()
return obj_dict
@classmethod
def deserialize(cls, obj_dict):
"""
Constructs a graph from a serialized graph object. The format is described in :py:meth:`Graph.serialize`.
:return: (Graph) A new :class:`Graph` object..
.. warning:: The function is untested.
# todo
"""
# Instantiate new object
obj = cls()
# Get serialized graph object
graph_dict = obj_dict["graph"]
# Add nodes
obj.add_nodes(num_nodes=int(graph_dict["nodes"]))
# Add edges
edges = graph_dict["edges"]
for uid in edges:
for vid in edges[uid]:
for key in range(edges[uid][vid]):
obj._graph.add_edge(int(uid), int(vid), key=int(key))
# Add properties
for node_prop, np_value in graph_dict["node_properties"].items():
np_map = NodePropertyMap(graph=obj)
# np_map.update({int(k): v for k, v in np_value.items()})
np_map.deserialize(np_value)
obj[node_prop] = np_map
for graph_prop, gp_value in graph_dict["graph_properties"].items():
obj[graph_prop] = gp_value
for edge_prop, ep_value in graph_dict["edge_properties"].items():
ep_map = EdgePropertyMap(graph=obj)
ep_map.deserialize(ep_value)
obj[edge_prop] = ep_map
# Return constructed object
return obj
def save(self, fpath, overwrite=False, protocol="json"):
"""
Saves the graph to file.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param overwrite: (bool) Specifies whether to overwrite the file, if it exists. [Default: False]
:param protocol: (str) The protocol to use to save the file. Options: {"json" [Default], "pickle"}.
.. note:: Pickle protocol is not tested.
"""
if not overwrite and os.path.exists(fpath):
raise FileExistsError("File already exists. To overwrite, call Graph.save(..., overwrite=True).")
graph_dict = self.serialize()
if protocol == "json":
with open(fpath, "w") as file:
json.dump(graph_dict, file, indent=2)
elif protocol == "pickle":
with open(fpath, "wb") as file:
pickle.dump(graph_dict, file)
else:
raise ValueError(f"Graph.save() does not support '{protocol}' protocol. One of ['json', 'pickle'] expected")
@classmethod
def load(cls, fpath, protocol="json"):
"""
Loads the graph from file.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param protocol: (str) The protocol to use to save the file. Options: {"json" [Default], "pickle"}.
.. note:: Pickle protocol is not tested.
.. warning:: The function is untested.
# todo
"""
if not os.path.exists(fpath):
raise FileNotFoundError("File does not exist.")
if protocol == "json":
with open(fpath, "r") as file:
obj_dict = json.load(file)
graph = cls.deserialize(obj_dict)
elif protocol == "pickle":
with open(fpath, "rb") as file:
obj_dict = pickle.load(file)
graph = cls.deserialize(obj_dict)
else:
raise ValueError(f"Graph.load() does not support '{protocol}' protocol. One of ['json', 'pickle'] expected")
return graph
def to_png(self, fpath, nlabel=None, elabel=None):
"""
Generates a PNG image of the graph.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param nlabel: (list of str) Specifies the node properties to use to annotate a node in image.
:param elabel: (list of str) Specifies the edge properties to use to annotate an edge in image.
:warning: If the node labels are not unique, the generated figure may contain 0, 1, 2, ...
that avoid duplication.
.. warning:: The function is untested.
# todo
"""
max_nodes = 500
if self._graph.number_of_nodes() > max_nodes:
raise ValueError(f"Cannot draw a graph with more than {max_nodes} nodes.")
g = self._graph
# If node properties to displayed are specified, process them.
if nlabel is not None:
g = nx.MultiDiGraph()
# If more than one property is selected, then display as tuple.
if len(nlabel) == 1:
node_state_map = {n: self[prop][n] for prop in nlabel for n in self._graph.nodes()}
else:
node_state_map = {n: tuple(self[prop][n] for prop in nlabel) for n in self._graph.nodes()}
# Add nodes to dummy graph
for n in node_state_map.values():
g.add_node(str(n))
# If edge labels to be displayed are specified, process them.
if elabel is not None:
for u, v, k in self._graph.edges(keys=True):
if len(elabel) == 1:
g.add_edge(str(node_state_map[u]), str(node_state_map[v]),
label=self[elabel[0]][(u, v, k)])
else:
g.add_edge(str(node_state_map[u]), str(node_state_map[v]),
label=tuple(self[prop][(u, v, k)] for prop in elabel))
else:
for u, v, k in self._graph.edges(keys=True):
g.add_edge(str(node_state_map[u]), str(node_state_map[v]))
dot_graph = nx.nx_agraph.to_agraph(g)
dot_graph.layout("dot")
dot_graph.draw(fpath)
def is_isomorphic_to(self, other: 'Graph'):
"""
Checks if the graph is isomorphic to the `other` graph.
:param other: (:class:`Graph` object) Graph to be checked for isomorphism with current graph.
:return: (bool) `True`, if graphs are isomorphic. Else, `False`.
.. warning:: The function is untested.
# todo
"""
return nx.is_isomorphic(self._graph, other._graph)