import inspect
import itertools
import logging
import random
import typing
from functools import partial
from ggsolver import util
from ggsolver.graph import NodePropertyMap, EdgePropertyMap, Graph, SubGraph
from tqdm import tqdm
# try:
# import ggsolver.logic.pl as pl
# except ImportError as err:
# import traceback
# logging.error(util.ColoredMsg.error(f"[ERROR] logic.pl could not be loaded. Logic functionality will not work. "
# f"\nError: {err.with_traceback(None)}"))
# ==========================================================================
# DECORATOR FUNCTIONS.
# ==========================================================================
def register_property(property_set: set):
def register_function(func):
if func.__name__ in property_set:
print(util.BColors.WARNING, f"[WARN] Duplicate property: {func.__name__}.", util.BColors.ENDC)
property_set.add(func.__name__)
return func
return register_function
# ==========================================================================
# BASE CLASS.
# ==========================================================================
class GraphicalModel:
NODE_PROPERTY = set()
EDGE_PROPERTY = set()
GRAPH_PROPERTY = set()
def __init__(self, is_deterministic=True, is_probabilistic=False, **kwargs):
# Types of Graphical Models. Acceptable values:
self._is_deterministic = is_deterministic
self._is_probabilistic = is_probabilistic
# Input domain (Expected value: Name of the function that returns an Iterable object.)
self._input_domain = kwargs["input_domain"] if "input_domain" in kwargs else None
# Pointed model
self._init_state = kwargs["init_state"] if "init_state" in kwargs else None
# Caching variables during serializing and deserializing the model.
self.__graph = None
self.__is_graphified = False
self.__states = list()
self.__state2node = dict()
def __str__(self):
return f"<{self.__class__.__name__} object at {id(self)}>"
def __setattr__(self, key, value):
# If key is any non "__xxx" variable, set `is_graphified` to False.
if key != "__is_graphified" and hasattr(self, "__is_graphified"):
if key[0:2] != "__":
self.__is_graphified = False
super(GraphicalModel, self).__setattr__(key, value)
# ==========================================================================
# PRIVATE FUNCTIONS.
# ==========================================================================
def _clear_cache(self):
self.__states = dict()
self.__is_graphified = False
def _gen_edges(self, delta, state, inp):
next_states = delta(state, inp)
edges = set()
# There are three types of graphical models. Handle each separately.
# If model is deterministic, next states is a single state.
if self.is_deterministic():
if next_states is None:
logging.warning(
util.ColoredMsg.warn(f"[WARN] {self.__class__.__name__}._graphify_unpointed(): "
f"No edge(s) added to graph for state={state}, input={inp}, "
f"next_state={next_states}.")
)
return set()
edges.add((state, next_states, inp, None))
# If model is non-deterministic, next states is an Iterable of states.
elif not self.is_deterministic() and not self.is_probabilistic():
for next_state in next_states:
if next_state is None:
logging.warning(
util.ColoredMsg.warn(f"[WARN] {self.__class__.__name__}._graphify_unpointed(): "
f"No edge(s) added to graph for state={state}, input={inp}, "
f"next_state={next_state}.")
)
continue
edges.add((state, next_state, inp, None))
# If model is stochastic, next states is a Distribution of states.
elif not self.is_deterministic() and self.is_probabilistic():
# FIXME. I have doubts that following implementation is correct.
# If support is empty, the code under `if` will not execute.
for next_state in next_states.support():
if next_state is None:
logging.warning(
util.ColoredMsg.warn(f"[WARN] {self.__class__.__name__}._graphify_unpointed(): "
f"No edge(s) added to graph for state={state}, input={inp}, "
f"next_state={next_state}.")
)
continue
edges.add((state, next_state, inp, next_states.pmf(next_state)))
else:
raise TypeError("Graphical Model is neither deterministic, nor non-deterministic, nor stochastic! "
f"Check the values: is_deterministic: {self.is_deterministic()}, "
f"self.is_quantitative:{self.is_probabilistic()}.")
return edges
def _gen_underlying_graph_unpointed(self, graph):
"""
Programmer's notes:
1. Caches states (returned by `self.states()`) in self.__states variable.
2. Assumes all states to be hashable.
3.
"""
# Get states
states = getattr(self, "states")
states = list(states())
# Add states to graph
node_ids = list(graph.add_nodes(len(states)))
# Cache states as a dictionary {state: uid}
self.__states = dict(zip(states, node_ids))
# Node property: state
np_state = NodePropertyMap(graph=graph)
np_state.update(dict(zip(node_ids, states)))
graph["state"] = np_state
# Logging and printing
logging.info(util.ColoredMsg.ok(f"[INFO] Processed node property: states. Added {len(node_ids)} states. [OK]"))
# Get input function
input_func = getattr(self, self._input_domain)
logging.info(util.ColoredMsg.ok(f"[INFO] Input domain function detected as '{self._input_domain}'. [OK]"))
# Graph property: input domain (stores the name of edge property that represents inputs)
graph["input_domain"] = self._input_domain
logging.info(util.ColoredMsg.ok(f"[INFO] Processed graph property: input_domain. [OK]"))
# Get input domain
inputs = list(input_func())
# Edge properties: input, prob,
ep_input = EdgePropertyMap(graph=graph)
ep_prob = EdgePropertyMap(graph=graph, default=None)
# Generate edges
delta = getattr(self, "delta")
for state, inp in tqdm(itertools.product(self.__states.keys(), inputs),
total=len(self.__states) * len(inputs),
desc="Unpointed graphify adding edges"):
new_edges = self._gen_edges(delta, state, inp)
# Update graph edges
uid = self.__states[state]
for _, t, _, prob in new_edges:
vid = self.__states[t]
key = graph.add_edge(uid, vid)
ep_input[uid, vid, key] = inp
ep_prob[uid, vid, key] = prob
# Add edge properties to graph
graph["input"] = ep_input
graph["prob"] = ep_prob
logging.info(util.ColoredMsg.ok(f"[INFO] Processed edge property: input. [OK]"))
logging.info(util.ColoredMsg.ok(f"[INFO] Processed graph property: prob. [OK]"))
def _gen_underlying_graph_pointed(self, graph):
logging.info(util.ColoredMsg.ok(f"[INFO] Running graphify UNPOINTED."))
# Get input function
input_func = getattr(self, self._input_domain)
logging.info(util.ColoredMsg.ok(f"[INFO] Input domain function detected as '{self._input_domain}'. [OK]"))
# Graph property: input domain (stores the name of edge property that represents inputs)
graph["input_domain"] = self._input_domain
logging.info(util.ColoredMsg.ok(f"[INFO] Processed graph property: input_domain. [OK]"))
# Get input domain
inputs = list(input_func())
# Node property: state
np_state = NodePropertyMap(graph=graph)
# Edge properties: input, prob,
ep_input = EdgePropertyMap(graph=graph)
ep_prob = EdgePropertyMap(graph=graph, default=None)
# BFS traversal until all reachable states are visited.
s0 = self.init_state()
uid = graph.add_node()
self.__states[s0] = uid
np_state[uid] = s0
queue = [s0]
visited = set()
# Generate edges
delta = getattr(self, "delta")
with tqdm(total=1, desc="Pointed graphify adding edges") as progress_bar:
while len(queue) > 0:
# Update progress_bar
progress_bar.total = len(queue) + len(visited)
progress_bar.update(1)
# Visit a state. Add to graph. Update cache. Update node property `state`.
state = queue.pop()
visited.add(state)
uid = self.__states[state]
np_state[uid] = state
# Apply all inputs to state
for inp in inputs:
# Get successors: set of (from_st, to_st, inp, prob)
new_edges = self._gen_edges(delta, state, inp)
for _, to_state, inp, prob in new_edges:
# If to_state was added to queue in the past, its id will be cached.
# Otherwise, add new node, cache it and queue it for exploration.
if to_state in self.__states:
vid = self.__states[to_state]
else:
vid = graph.add_node()
self.__states[to_state] = vid
# np_state[uid] = state
queue.append(to_state)
# Add edge to graph
key = graph.add_edge(uid, vid)
# Set edge properties
ep_input[uid, vid, key] = inp
ep_prob[uid, vid, key] = prob
# Add node properties to graph
graph["state"] = np_state
# Add edge properties to graph
graph["input"] = ep_input
graph["prob"] = ep_prob
logging.info(util.ColoredMsg.ok(f"[INFO] Processed edge property: input. [OK]"))
logging.info(util.ColoredMsg.ok(f"[INFO] Processed graph property: prob. [OK]"))
def _add_node_prop_to_graph(self, graph, p_name, default=None):
"""
Adds the node property called `p_name` to the graph.
Requires: `p_name` should be a function in self that inputs a single parameter: state.
Assumes: self._add_nodes_to_graph() is called before.
"""
if graph.has_property(p_name):
logging.warning(util.ColoredMsg.warn(f"[WARN] Duplicate property is ignored: {p_name}. [IGNORED]"))
return
try:
p_map = NodePropertyMap(graph=graph, default=default)
p_func = getattr(self, p_name) # self.NODE_PROPERTY[p_name]
if not (inspect.isfunction(p_func) or inspect.ismethod(p_func)):
raise TypeError(f"Node property {p_func} is not a function.")
# for uid in range(len(self.__states)):
# p_map[uid] = p_func(self.__states[uid])
#
for uid in range(graph.number_of_nodes()):
p_map[uid] = p_func(graph["state"][uid])
graph[p_name] = p_map
logging.info(util.ColoredMsg.ok(f"[INFO] Processed node property: {p_name}. [OK]"))
except NotImplementedError:
logging.warning(util.ColoredMsg.warn(f"[WARN] Node property function not implemented: {p_name}. [IGNORED]"))
except AttributeError:
logging.warning(util.ColoredMsg.warn(f"[WARN] Node property function is not defined: {p_name}. [IGNORED]"))
def _add_edge_prop_to_graph(self, graph, p_name, default=None):
"""
Adds an edge property called `p_name` to the graph.
Requires: `p_name` should be a function in self that inputs three parameters: state, inp, next_state .
Assumes: self._add_nodes_to_graph() is called.
Assumes: self._add_edges_to_graph() is called.
"""
if graph.has_property(p_name):
logging.warning(util.ColoredMsg.warn(f"[WARN] Duplicate property: {p_name}. [IGNORED]"))
return
try:
p_map = EdgePropertyMap(graph=graph, default=default)
p_func = getattr(self, p_name)
if not (inspect.isfunction(p_func) or inspect.ismethod(p_func)):
raise TypeError(f"Edge property {p_func} is not a function.")
for uid, vid, key in graph.edges():
p_map[(uid, vid, key)] = p_func(graph["states"][uid],
graph["input"][(uid, vid, key)],
graph["states"][vid])
graph[p_name] = p_map
logging.info(util.ColoredMsg.ok(f"[INFO] Processed edge property: {p_name}. [OK]"))
except NotImplementedError:
logging.warning(util.ColoredMsg.warn(f"[WARN] Edge property not implemented: {p_name}. [IGNORED]"))
except AttributeError:
logging.warning(util.ColoredMsg.warn(f"[WARN] Node property function is not defined: {p_name}. [IGNORED]"))
def _add_graph_prop_to_graph(self, graph, p_name):
"""
Adds a graph property called `p_name` to the graph.
Requires: `p_name` should be a function in self that inputs no parameters or a non-callable value.
"""
if graph.has_property(p_name):
logging.warning(util.ColoredMsg.warn(f"[WARN] Duplicate property: {p_name}. [IGNORED]"))
return
try:
p_func = getattr(self, p_name)
if inspect.ismethod(p_func) or (inspect.isfunction(p_func) and p_func.__name__ == "<lambda>"):
graph[p_name] = p_func()
logging.info(util.ColoredMsg.ok(f"[INFO] Processed graph property: {p_name}. [OK]"))
elif inspect.isfunction(p_func):
if len(inspect.signature(p_func).parameters) == 0:
graph[p_name] = p_func()
else:
graph[p_name] = p_func(self)
logging.info(util.ColoredMsg.ok(f"[INFO] Processed graph property: {p_name}. [OK]"))
# elif inspect.ismethod(p_func):
# graph[p_name] = p_func()
# logging.warning(util.ColoredMsg.ok(f"[INFO] Processed graph property: {p_name}. OK"))
else:
raise TypeError(f"Graph property {p_name} is neither a function nor a method.")
except NotImplementedError:
logging.warning(util.ColoredMsg.warn(f"[WARN] Graph property is not implemented: {p_name}. [IGNORED]"))
except AttributeError:
logging.warning(util.ColoredMsg.warn(f"[WARN] Node property function is not defined: {p_name}. [IGNORED]"))
# ==========================================================================
# FUNCTIONS TO BE IMPLEMENTED BY USER.
# ==========================================================================
def states(self):
"""
Defines the states component of the graphical model.
:return: (list/tuple of JSONifiable object). List or tuple of states.
"""
raise NotImplementedError(f"{self.__class__.__name__}.states() is not implemented.")
def delta(self, state, inp) -> typing.Union[util.Distribution, typing.Iterable, object]:
pass
# ==========================================================================
# PUBLIC FUNCTIONS.
# ==========================================================================
def initialize(self, state):
"""
Sets the initial state of the graphical model.
.. note:: The function does NOT check if the given state is valid.
"""
self._init_state = state
def graphify(self, pointed=False, base_only=False):
"""
Constructs the underlying graph of the graphical model.
:param pointed: (bool) If pointed is `True`, the :py:meth:`TSys.graphify_pointed()` is called, which constructs
a pointed graphical model containing only the states reachable from the initial state. Otherwise,
:py:meth:`TSys.graphify_unpointed()` is called, which constructs the complete transition system.
:return: (:class:`ggsolver.graph.Graph` object) An equivalent graph representation of the graphical model.
"""
# Clear cached information
self._clear_cache()
# Input parameter validation
if pointed is True and self._init_state is None:
raise ValueError(f"{self.__class__.__name__} is not initialized. "
f"Did you forget to call {self.__class__.__name__}.initialize() function?")
# Initialize graph object
graph = Graph()
# Glob node, edge and graph properties
node_props = getattr(self, "NODE_PROPERTY")
edge_props = getattr(self, "EDGE_PROPERTY")
graph_props = getattr(self, "GRAPH_PROPERTY")
# Warn about duplication
logging.info(util.ColoredMsg.header(f"[INFO] Globbed node properties: {node_props}"))
logging.info(util.ColoredMsg.header(f"[INFO] Globbed edge properties: {edge_props}"))
logging.info(util.ColoredMsg.header(f"[INFO] Globbed graph properties: {graph_props}"))
logging.info(util.ColoredMsg.header(f"[INFO] Duplicate node, edge properties: "
f"{set.intersection(node_props, edge_props)}"))
logging.info(util.ColoredMsg.header(f"[INFO] Duplicate edge, graph properties: "
f"{set.intersection(edge_props, graph_props)}"))
logging.info(util.ColoredMsg.header(f"[INFO] Duplicate graph, node properties: "
f"{set.intersection(graph_props, node_props)}"))
# Construct underlying graph for pointed construction
if pointed is True:
self._gen_underlying_graph_pointed(graph)
# Construct underlying graph for unpointed construction
else:
self._gen_underlying_graph_unpointed(graph)
if not base_only:
# Add node properties
for p_name in node_props:
self._add_node_prop_to_graph(graph, p_name)
# Add edge properties
for p_name in edge_props:
self._add_edge_prop_to_graph(graph, p_name)
# Add graph properties
for p_name in graph_props:
self._add_graph_prop_to_graph(graph, p_name)
else:
print(util.BColors.WARNING, f"[WARN] Ignoring node, edge and graph (base_only: True)", util.BColors.ENDC)
print(util.BColors.OKGREEN, f"[SUCCESS] {graph} generated.", util.BColors.ENDC)
return graph
def serialize(self):
"""
Serializes the underlying graph of the graphical model into a dictionary with the following format.
The state properties are saved as node properties, transition properties are stored are edge properties
and model properties are stored as graph properties in the underlying graph::
{
"graph": {
"nodes": <number of nodes>,
"edges": {
uid: {vid: key},
...
}
"node_properties": {
"property_name": {
"default": <value>,
"dict": {
"uid": <property value>,
...
}
},
...
},
"edge_properties": {
"property_name": {
"default": <value>,
"dict": [{"edge": [uid, vid, key], "pvalue": <property value>} ...]
},
...
},
"graph_properties": {
"property_name": <value>,
...
}
}
}
:return: (dict) Serialized graphical model.
"""
# 1. Graphify
# 2. Serialize the graph
# 3. Return a dict
raise NotImplementedError
def save(self, fpath, pointed=False, overwrite=False, protocol="json"):
"""
Saves the graphical model to file.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param pointed: (bool) If pointed is `True`, the :py:meth:`TSys.graphify_pointed()` is called, which constructs
a pointed graphical model containing only the states reachable from the initial state. Otherwise,
:py:meth:`TSys.graphify_unpointed()` is called, which constructs the complete transition system.
:param overwrite: (bool) Specifies whether to overwrite the file, if it exists. [Default: False]
:param protocol: (str) The protocol to use to save the file. Options: {"json" [Default], "pickle"}.
.. note:: Pickle protocol is not tested.
"""
# 1. Graphify
graph = self.graphify(pointed=pointed)
# 2. Save the graph
graph.save(fpath, overwrite=overwrite, protocol=protocol)
@classmethod
def deserialize(cls, obj_dict):
"""
Constructs a graphical model from a serialized graph object. The node properties are deserialized as state
properties, the edge properties are deserialized as transition properties, and the graph properties are
deserialized as model properties. All the deserialized properties are represented as a function in the
GraphicalModel class. See example #todo.
The format is described in :py:meth:`GraphicalModel.serialize`.
:return: (Sub-class of GraphicalModel) An instance of the `cls` class. `cls` must be a sub-class of
`GraphicalModel`.
"""
# 1. Construct a graph from obj_dict.
# 2. Define functions from graph
# 3. Create cls() instance.
# 4. Update __dir__ with new methods
# 5. Return instance
raise NotImplementedError
@classmethod
def load(cls, fpath, protocol="json"):
"""
Loads the graphical model from file.
:param fpath: (str) Path to which the file should be saved. Must include an extension.
:param protocol: (str) The protocol to use to save the file. Options: {"json" [Default], "pickle"}.
.. note:: Pickle protocol is not tested.
"""
# Load game graph
graph = Graph.load(fpath, protocol=protocol)
# Create object
obj = cls()
# Add graph properties
for gprop, gprop_value in graph.graph_properties.items():
func_code = f"""def {gprop}():\n\treturn {gprop_value}"""
exec(func_code)
func = locals()[gprop]
setattr(obj, gprop, func)
# Construct inverse state mapping
for node in graph.nodes():
state = graph["state"][node]
if isinstance(state, list):
state = tuple(state)
obj.__state2node[state] = node
# Add node properties
def get_node_property(state, name):
return graph.node_properties[name][obj.__state2node[state]]
for nprop, nprop_value in graph.node_properties.items():
setattr(obj, nprop, partial(get_node_property, name=nprop))
# TODO. Add edge properties (How to handle them is unclear).
# Reconstruct delta function
def delta(state, act):
# Get node from state
node = obj.__state2node[state]
# Get out_edges from node in graph
out_edges = graph.out_edges(node)
# Iterate over each out edge to match action.
successors = set()
for uid, vid, key in out_edges:
action_label = graph["act"][(uid, vid, key)]
if action_label == act:
successors.add(vid)
# If model is deterministic, then return single state.
if not graph["is_stochastic"]:
return graph["state"][successors.pop()]
# If model is stochastic and NOT quantitative, then return list of states.
elif graph["is_stochastic"] and not graph["is_quantitative"]:
return [graph["state"][vid] for vid in successors]
# If model is stochastic and quantitative, then return distribution.
else:
successors = [graph["state"][vid] for vid in successors]
prob = [graph["prob"][uid] for uid in successors]
return util.Distribution(successors, prob)
obj.delta = delta
# Return reconstructed object
return obj
@classmethod
def from_graph(cls, graph):
# Create object
obj = cls()
# Save graph
# TODO. This is inefficent because we are storing `__states` and `__graph`. Remove redundancy.
obj.__graph = graph
obj.__is_graphified = True
# Add graph properties
for gprop, gprop_value in graph.graph_properties.items():
# func_code = f"""def {gprop}():\n\treturn {gprop_value}"""
func = lambda: obj.__graph.graph_properties[gprop]
# exec(func_code)
# func = locals()[gprop]
setattr(obj, gprop, func)
# Construct inverse state mapping
for node in graph.nodes():
state = graph["state"][node]
if isinstance(state, list):
state = tuple(state)
obj.__state2node[state] = node
# Add node properties
def get_node_property(state, name):
return graph.node_properties[name][obj.__state2node[state]]
for nprop, nprop_value in graph.node_properties.items():
setattr(obj, nprop, partial(get_node_property, name=nprop))
# TODO. Add edge properties (How to handle them is unclear).
# Reconstruct delta function
def delta(state, act):
# Get node from state
node = obj.__state2node[state]
# Get out_edges from node in graph
out_edges = graph.out_edges(node)
# Iterate over each out edge to match action.
successors = set()
for uid, vid, key in out_edges:
action_label = graph["act"][(uid, vid, key)]
if action_label == act:
successors.add(vid)
# If model is deterministic, then return single state.
if not graph["is_stochastic"]:
return graph["state"][successors.pop()]
# If model is stochastic and NOT quantitative, then return list of states.
elif graph["is_stochastic"] and not graph["is_quantitative"]:
return [graph["state"][vid] for vid in successors]
# If model is stochastic and quantitative, then return distribution.
else:
successors = [graph["state"][vid] for vid in successors]
prob = [graph["prob"][uid] for uid in successors]
return util.Distribution(successors, prob)
obj.delta = delta
# Return reconstructed object
return obj
def is_nondeterministic(self):
""" Returns `True` if the graphical model is non-deterministic. Else, returns `False`. """
return not self._is_deterministic and not self._is_probabilistic
@register_property(GRAPH_PROPERTY)
def init_state(self):
"""
Returns the initial state of the graphical model.
"""
return self._init_state
@register_property(GRAPH_PROPERTY)
def is_deterministic(self):
"""
Returns `True` if the graphical model is deterministic. Else, returns `False`.
"""
return self._is_deterministic
@register_property(GRAPH_PROPERTY)
def is_probabilistic(self):
""" Returns `True` if the graphical model is probabilistic. Else, returns `False`. """
return self._is_probabilistic
# ==========================================================================
# USER MODELS.
# ==========================================================================
[docs]class TSys(GraphicalModel):
"""
Represents a transition system [Principles of Model Checking, Def. 2.1].
.. math::
TSys = (S, A, T, AP, L)
In the `TSys` class, each component is represented as a function.
- The set of states :math:`S` is represented by `TSys.states` function,
- The set of actions :math:`A` is represented by `TSys.actions` function,
- The transition function :math:`T` is represented by `TSys.delta` function,
- The set of atomic propositions is represented by `TSys.atoms` function,
- The labeling function :math:`L` is represented by `TSys.label` function.
All of the above functions are marked abstract.
The recommended way to use `TSys` class is by subclassing it and implementing its component functions.
A transition system can be either deterministic or non-deterministic or probabilistic.
To define a **deterministic** transition system, provide a keyword argument `is_deterministic=True`
to the constructor. To define a **nondeterministic** transition system, provide a keyword argument
`is_deterministic=False` to the constructor. To define a **probabilistic** transition system, provide
a keyword arguments `is_deterministic=False, is_probabilistic=True` to the constructor.
The design of `TSys` class closely follows its mathematical definition.
Hence, the signatures of `delta` function for deterministic, nondeterministic, probabilistic
transition systems are different.
- **deterministic:** `delta(state, act) -> single state`
- **non-deterministic:** `delta(state, act) -> a list of states`
- **probabilistic:** `delta(state, act) -> a distribution over states`
An important feature of `TSys` class is the `graphify()` function. It constructs a `Graph` object that is equivalent to the transition system. The nodes of the `Graph` represent the states of `TSys`, the edges of the `Graph` are defined by the set of `actions` and the `delta` function. The atomic propositions, labeling function are stored as `node, edge` and `graph` properties. By default, every `Graph` returned a `TSys.graphify()` function have the following (node/edge/graph) properties:
- `state`: (node property) A Map from every node to the state of transition system it represents.
- `actions`: (graph property) List of valid actions.
- `input`: (edge property) A map from every edge `(uid, vid, key)` to its associated action label.
- `prob`: (edge property) The probability associated with the edge `(uid, vid, key)`.
- `atoms`: (graph property) List of valid atomic propositions.
- `label`: (node property) A map every node to the list of atomic propositions true in the state represented by that node.
- `init_state`: (graph property) Initial state of transition system.
- `is_deterministic`: (graph property) Is the transition system deterministic?
- `is_probabilistic`: (graph property) Is the transition system probabilistic?
**Note:** Some features of probabilistic transition system are not tested. If you are trying to implement a probabilistic transition system, reach out to Abhishek Kulkarni (a.kulkarni2@ufl.edu).
Next, we demonstrate how to use `TSys` class to define a deterministic, non-deterministic and probabilistic transition system.
"""
NODE_PROPERTY = GraphicalModel.NODE_PROPERTY.copy()
EDGE_PROPERTY = GraphicalModel.EDGE_PROPERTY.copy()
GRAPH_PROPERTY = GraphicalModel.GRAPH_PROPERTY.copy()
[docs] def __init__(self, is_deterministic=True, is_probabilistic=False, **kwargs):
"""
Constructs a transition system.
:param is_deterministic: (bool). If `True` then the transition system is deterministic. Otherwise,
it is either non-deterministic or probabilistic.
:param is_probabilistic: (bool). If `is_deterministic` is `False`, then if `is_probabilistic` is `True`
then the transition system is probabilistic. Otherwise, it is non-deterministic.
:param input_domain: (optional, str). Name of the member function of TSys class that defines the inputs to the
transition system. [Default: "actions"]
:param init_state: (optional, JSON-serializable object). The initial state of the transition system.
"""
kwargs["input_domain"] = "actions" if "input_domain" not in kwargs else kwargs["input_domain"]
kwargs["is_deterministic"] = is_deterministic
kwargs["is_probabilistic"] = is_probabilistic
super(TSys, self).__init__(**kwargs)
# ==========================================================================
# FUNCTIONS TO BE IMPLEMENTED BY USER.
# ==========================================================================
[docs] @register_property(GRAPH_PROPERTY)
def actions(self):
"""
Defines the actions component of the transition system.
:return: (list/tuple of str). List or tuple of action labels.
"""
raise NotImplementedError(f"{self.__class__.__name__}.actions() is not implemented.")
[docs] def delta(self, state, act):
"""
Defines the transition function of the transition system.
:param state: (object) A valid state.
:param act: (str) An action.
:return: (object/list(object)/util.Distribution object). Depending on the type of transition system, the return
type is different.
- Deterministic: returns a single state.
- Non-deterministic: returns a list/tuple of states.
- Probabilistic: returns a distribution over all state.
"""
raise NotImplementedError(f"{self.__class__.__name__}.delta() is not implemented.")
[docs] @register_property(GRAPH_PROPERTY)
def atoms(self):
"""
Defines the atomic propositions component of the transition system.
:return: (list/tuple of str). List or tuple of atomic proposition labels.
"""
raise NotImplementedError(f"{self.__class__.__name__}.atoms() is not implemented.")
[docs] @register_property(NODE_PROPERTY)
def label(self, state):
"""
Defines the labeling function of the transition system.
:param state: (object) A valid state.
:return: (list/tuple of str). A list/tuple of atomic propositions that are true in the given state.
"""
raise NotImplementedError(f"{self.__class__.__name__}.label() is not implemented.")
[docs]class Game(TSys):
"""
Represents a game transition system, hereafter referred simply as a game.
A `Game` can represent two mathematical structures commonly used in literature, namely
.. math::
G = (S, A, T, AP, L, formula)
.. math::
G = (S, A, T, F, WinCond)
In the `Game` class, each component is represented as a function. By defining the relevant functions, a `Game`
class may represent either of the two mathematical structures.
- The set of states :math:`S` is represented by `Game.states` function,
- The set of actions :math:`A` is represented by `Game.actions` function,
- The transition function :math:`T` is represented by `Game.delta` function,
- The set of atomic propositions :math:`AP` is represented by `Game.atoms` function,
- The labeling function :math:`L` is represented by `Game.label` function.
- When the winning condition is represented by a logic formula :math:`formula`, we define `Game.formula` function.
- When the winning condition is represented by a final states :math:`F`, we define `Game.final` function.
In this case, we must also specify the acceptance condition.
- The winning condition :math:`WinCond` is represented by `Game.win_cond` function.
All of the above functions are marked abstract.
The recommended way to use `Game` class is by subclassing it and implementing the relevant component functions.
**Categorization of a Game:** A game is categorized by three types:
- A game can be either deterministic or non-deterministic or probabilistic.
To define a **deterministic** transition system, provide a keyword argument `is_deterministic=True` to the
constructor. To define a **nondeterministic** transition system, provide a keyword argument `is_deterministic=False`
to the constructor. To define a **probabilistic** transition system, provide a keyword arguments
`is_deterministic=False, is_probabilistic=True` to the constructor.
The design of `Game` class closely follows its mathematical definition.
Hence, the signatures of `delta` function for deterministic, nondeterministic, probabilistic games are different.
- **deterministic:** `delta(state, act) -> single state`
- **non-deterministic:** `delta(state, act) -> a list of states`
- **probabilistic:** `delta(state, act) -> a distribution over states`
- A game can be turn-based or concurrent. To define a **concurrent** game, provide a keyword argument
`is_turn_based=False`. The game is `turn_based` by default.
- A game can be a 1/1.5/2/2.5-player game. A one-player game models a deterministic motion planning-type problem in
a static environment. A 1.5-player game is an MDP. A two-player game models a deterministic interaction between
two strategic players. And, a 2.5-player game models a stochastic interaction between two strategic players.
If a game is one or two player, then the :py:meth:`Game.delta` is `deterministic`.
If a game is 1.5 or 2.5 player, then the :py:meth:`Game.delta` is either `non-deterministic` (when
transition probabilities are unknown), and `probabilistic` (when transition probabilities are known).
Every state in a turn-based game is controlled by a player. To define which player controls which state, define
a game component :py:meth:`Game.turn` which takes in a state and returns a value between 0 and 3 to indicate
which player controls the state.
An important feature of `Game` class is the `graphify()` function. It constructs a `Graph` object that is
equivalent to the game. The nodes of the `Graph` represent the states of `Game`,
the edges of the `Graph` are defined by the set of `actions` and the `delta` function.
The atomic propositions, labeling function are stored as `node, edge` and `graph` properties.
By default, every `Graph` returned a `Game.graphify()` function have the following (node/edge/graph) properties:
- `state`: (node property) A Map from every node to the state of transition system it represents.
- `input_domain`: (graph property) Name of function that defines input domain of game (="actions").
- `actions`: (graph property) List of valid actions.
- `input`: (edge property) A map from every edge `(uid, vid, key)` to its associated action label.
- `prob`: (edge property) The probability associated with the edge `(uid, vid, key)`.
- `atoms`: (graph property) List of valid atomic propositions.
- `label`: (node property) A map every node to the list of atomic propositions true in the state represented by that node.
- `init_state`: (graph property) Initial state of transition system.
- `is_deterministic`: (graph property) Is the transition system deterministic?
- `is_probabilistic`: (graph property) Is the transition system probabilistic?
- `is_turn_based`: (graph property) Is the transition system turn-based?
- `final`: (node property) Returns an integer denoting the acceptance set the state belongs to.
- `win_cond`: (graph property) The winning condition of the game.
- `formula`: (graph property) A logic formula representing the winning condition of the game.
- `turn`: (node property) A map from every node to an integer (0/1/2) that denotes which player controls the node.
- `p1_acts`: (graph property) A subset of actions accessible to P1.
- `p2_acts`: (graph property) A subset of actions accessible to P2.
**Note:** Some features of probabilistic transition system are not tested.
If you are trying to implement a probabilistic transition system, reach out to Abhishek Kulkarni
(a.kulkarni2@ufl.edu).
"""
NODE_PROPERTY = TSys.NODE_PROPERTY.copy()
EDGE_PROPERTY = TSys.EDGE_PROPERTY.copy()
GRAPH_PROPERTY = TSys.GRAPH_PROPERTY.copy()
[docs] def __init__(self, is_turn_based=True, is_deterministic=True, is_probabilistic=False, **kwargs):
kwargs["is_deterministic"] = is_deterministic
kwargs["is_probabilistic"] = is_probabilistic
super(Game, self).__init__(**kwargs)
self._is_turn_based = is_turn_based
# Process keyword arguments
if "states" in kwargs:
def states_():
return list(kwargs["states"])
self.states = states_
if "actions" in kwargs:
def actions_():
return list(kwargs["actions"])
self.actions = actions_
if "trans_dict" in kwargs:
def delta_(state, inp):
return kwargs["trans_dict"][state][inp]
self.delta = delta_
if "atoms" in kwargs:
def atoms_():
return list(kwargs["atoms"])
self.atoms = atoms_
if "label" in kwargs:
def label_(state):
return list(kwargs["label"][state])
self.label = label_
if "init_state" in kwargs:
self.initialize(kwargs["init_state"])
if "turn" in kwargs:
def turn_(state):
return kwargs["turn"][state]
self.turn = turn_
if "final" in kwargs:
def final_(state):
return 0 if state in kwargs["final"] else -1
self.final = final_
# ==========================================================================
# FUNCTIONS TO BE IMPLEMENTED BY USER.
# ==========================================================================
[docs] @register_property(NODE_PROPERTY)
def final(self, state):
"""
Defines whether the given state is a final state.
The structure of final state is based on the winning condition
[See Automata, Logics and Infinite Games (Ch. 2)].
:param state: (object) A valid state.
:return: (int or a list of ints). The integer denotes the acceptance set the state belongs to.
"""
raise NotImplementedError(f"{self.__class__.__name__}.final() is not implemented.")
[docs] @register_property(GRAPH_PROPERTY)
def is_turn_based(self):
""" Is the game turn based? """
return self._is_turn_based
[docs] @register_property(NODE_PROPERTY)
def turn(self, state):
"""
Defines the player who controls the given state.
:param state: (object) A valid state.
:return: (int). In turn-based game, turn can be 1 for player 1 or 2 for player 2.
In concurrent games, the turn must be 0.
.. note:: For concurrent games, the turn function can be left unimplemented.
"""
raise NotImplementedError
[docs] @register_property(GRAPH_PROPERTY)
def p1_acts(self):
""" A subset of actions accessible to P1. """
raise NotImplementedError
[docs] @register_property(GRAPH_PROPERTY)
def p2_acts(self):
""" A subset of actions accessible to P2. """
raise NotImplementedError
[docs] @register_property(GRAPH_PROPERTY)
def win_cond(self):
""" Winning condition of the game. """
raise NotImplementedError
[docs]class Solver:
"""
Represents a game solver that computes the winning regions and strategies for the players
under a fixed solution concept.
:param graph: (Graph or SubGraph instance) Graph or subgraph representing the game on a graph.
"""
def __init__(self, graph, **kwargs):
# Load and validate graph
self._graph = graph
self._solution = SubGraph(self._graph)
# Associate node and edge properties with solution
self._node_winner = NodePropertyMap(self._solution, default=-1) # Values denote which player wins from node.
self._edge_winner = EdgePropertyMap(self._solution, default=-1) # Values denote which player wins from edge.
self._solution["node_winner"] = self._node_winner
self._solution["edge_winner"] = self._edge_winner
# Status variables
self._is_solved = False
# Cache variables
self._state2node = {self._solution["state"][uid]: uid for uid in self._solution.nodes()}
def __str__(self):
return f"<Solver for {self._graph}>"
[docs] def graph(self):
""" Returns the input game graph. """
return self._graph
[docs] def state2node(self, state):
""" Helper function to get the node id associated with given state. """
return self._state2node[state]
[docs] def is_solved(self):
""" Returns if the game is solved or not. """
return self._is_solved
[docs] def solution(self):
"""
Returns the solved game graph.
The graph contains two special properties:
- `node_winner` (node property): Maps each node to the id of player (1/2) who wins from that node.
- `edge_winner` (edge property): Maps each edge to the id of player (1/2) who wins using that edge.
"""
if not self.is_solved():
raise ValueError(f"{self} is not solved.")
return self._solution
[docs] def solve(self):
""" Abstract method."""
raise NotImplementedError
[docs] def winner(self, state):
""" Returns the player who wins from the given state. """
uid = self.state2node(state)
return self._node_winner[uid]
[docs] def win_acts(self, state):
""" Retuns the list of winning actions from the given state. """
# Get the input domain (name of function) of game
ep_input = self._solution["input"]
# Get node id for the state
uid = self.state2node(state)
# Determine which player has a winning strategy at the state
player = self._node_winner[uid]
# Identify all winning actions.
win_acts = set()
for _, vid, key in self._graph.out_edges(uid):
if self._edge_winner[uid, vid, key] == player:
# win_acts.add(self._graph[ep_input][uid, vid, key])
win_acts.add(ep_input[uid, vid, key])
# Convert to list and return
return list(win_acts)
[docs] def win_region(self, player):
""" Returns the winning region for the player. """
# return [self._solution["state"][uid] for uid in self._solution.nodes() if self._node_winner[uid] == player]
return [self._solution["state"][uid] for uid in self._solution.nodes() if self._node_winner[uid] == player]
[docs] def reset(self):
""" Resets the solver. """
self._solution = SubGraph(self._graph)
self._node_winner = NodePropertyMap(self._solution, default=-1) # Values denote which player wins from node.
self._edge_winner = EdgePropertyMap(self._solution, default=-1) # Values denote which player wins from edge.
self._solution["node_winner"] = self._node_winner
self._solution["edge_winner"] = self._edge_winner
class Strategy:
"""
Defines a strategy for a player.
Allows customization of losing behavior by specifying a function that maps a losing node to an action.
By default, losing state is mapped to None.
"""
def __init__(self, graph, player, losing_behavior=None, **kwargs):
assert "node_winner" in graph.node_properties(), "graph must have node property called 'node_winner'. " \
"Ensure the graph is the solution generated by a Solver."
assert "edge_winner" in graph.edge_properties(), "graph must have edge property called 'edge_winner'. " \
"Ensure the graph is the solution generated by a Solver."
assert losing_behavior is None or callable(losing_behavior), \
"losing behavior should be a function that takes a state as input and returns either None or an action."
# Instance variables
self._graph = graph
self._player = player
self._strategy = dict()
self._losing_behavior = (lambda st: None) if losing_behavior is None else losing_behavior
# Generate and cache strategy
self._gen_strategy()
def _gen_strategy(self):
raise NotImplementedError
[docs]class DeterministicStrategy(Strategy):
"""
Represents a deterministic strategy. That is, the strategy at a given state always selects the same winning action.
Allows customization of losing behavior by specifying a function that maps a losing node to an action.
By default, losing state is mapped to None.
"""
[docs] def __call__(self, state):
""" Returns the action selected by strategy at given state. """
return self._strategy[state]
def _gen_strategy(self):
# Generate a deterministic strategy by choosing an action at every winning state.
# States that are losing for the player return according to `losing_behavior` function.
ep_input = self._graph["input"]
win_edges = self._graph["edge_winner"]
for uid in self._graph.nodes():
state = self._graph["state"][uid]
for _, vid, key in self._graph.out_edges(uid):
if win_edges[uid, vid, key] == self._player:
self._strategy[state] = self._graph[ep_input][uid, vid, key]
break
self._strategy[state] = self._losing_behavior(state)
[docs]class NonDeterministicStrategy(Strategy):
"""
Represents a non-deterministic strategy. Uniformly samples an action from winning actions at that state.
Allows customization of losing behavior by specifying a function that maps a losing node to an action.
By default, losing state is mapped to None.
"""
[docs] def __call__(self, state):
""" Returns the action selected by strategy at given state. """
actions = self._strategy[state]
try:
return random.choice(actions)
except IndexError: # In case the list is empty.
return None
def _gen_strategy(self):
# Generate a deterministic strategy by choosing an action at every winning state.
# States that are losing for the player return according to `losing_behavior` function.
ep_input = self._graph["input"]
win_edges = self._graph["edge_winner"]
# Visit all states
for uid in self._graph.nodes():
state = self._graph["state"][uid]
# Initialize strategy as empty set
self._strategy[state] = set()
# Visit all outgoing edges from that state
for _, vid, key in self._graph.out_edges(uid):
# If they are winning for the player, include the action
if win_edges[uid, vid, key] == self._player:
self._strategy[state].add(self._graph[ep_input][uid, vid, key])
# If no actions were winning, follow losing behavior
if len(self._strategy[state]) == 0:
# User must ensure that losing behavior returns an iterable
self._strategy[state] = self._losing_behavior(state)
# Store the valid actions as a list
self._strategy[state] = list(self._strategy[state])
# Currently (v0.1.5) I do not include randomized strategies with customizable probability distribution on each state.
# class RandomizedStrategy(NonDeterministicStrategy):
# def __init__(self, graph, player, losing_behavior=None, **kwargs):
# raise NotImplementedError("Need to figure out how to select/customize distributions at each state.")
#
# def __call__(self, state):
# pass