diff --git a/primo2/inference/exact.py b/primo2/inference/exact.py index 1a6724c..977ed93 100644 --- a/primo2/inference/exact.py +++ b/primo2/inference/exact.py @@ -25,6 +25,7 @@ from .factor import Factor from .order import Orderer +from .marginal import Marginal class VariableElimination(object): @@ -54,8 +55,8 @@ def naive_marginals(bn, variables, evidence=None): Returns ------- - Factor - A factor containing the desired marginals + Marginal + A Marginal object containing the desired marginals """ if not evidence: @@ -79,7 +80,7 @@ def naive_marginals(bn, variables, evidence=None): # Normalize to get conditional probability for the evidence resFactor.normalize() - return resFactor + return Marginal.from_factor(resFactor) @staticmethod @@ -111,8 +112,8 @@ def bucket_marginals(bn, variables, evidence=None, order=None): Returns ------- - Factor - A factor containing the desired marginals + Marginal + A Marginal object containing the desired marginals """ if not order: @@ -163,7 +164,7 @@ def bucket_marginals(bn, variables, evidence=None, order=None): # Normalize evidence buckets[-1].normalize() - return buckets[-1] + return Marginal.from_factor(buckets[-1]) class FactorTree(object): @@ -310,9 +311,9 @@ def set_evidence(self, evidence, softPosteriors=False): if softPosteriors: #Compute the old/naive marignals for the evidence values which are #required to compute the proper likelihood ratio factor below - self.calculate_messages() + self._calculate_messages() for e in evidence: - oldMarginals[e] = self.marginals([e]).potentials + oldMarginals[e] = self.marginals([e], returnFactor=True).potentials self.reset_factors() # Add evidence to buckets @@ -324,9 +325,9 @@ def set_evidence(self, evidence, softPosteriors=False): if e in nodeData["variables"]: nodeData["factor"] = nodeData["factor"] * evidenceFactor break - self.calculate_messages() + self._calculate_messages() - def marginals(self, variables): + def marginals(self, variables, returnFactor = False): """ Function to compute marginals for the given variables, potentially given some evidence that was set beforehand using set_evidence. @@ -344,15 +345,18 @@ def marginals(self, variables): if they are contained in any clique. To compute the joint marginals for a fixed instantiation, this instantiation can be set as evidence and its probability can be queried using - get_evidence_probability() + get_evidence_probability() + returnFactors: Boolean, optional + If set to true, marginals will return a Factor object instead + of a Marginal. Should only be used internally. Returns ------- - Factor - A factor containing the desired marginals + Marginal + A Marginal object containing the desired marginals """ if not self.tree.graph["messagesValid"]: - self.calculate_messages() + self._calculate_messages() # Determine clique containing variables: varSet = set(variables) @@ -360,7 +364,7 @@ def marginals(self, variables): if varSet.issubset(treeData["variables"]): resFactor = treeData["factor"].marginalize(treeData["variables"] - varSet) resFactor.normalize() - return resFactor + return Marginal.from_factor(resFactor) if not returnFactor else resFactor else: # No suitable clique found raise ValueError("No clique containing the variables {} was found.".format(variables)) @@ -369,7 +373,7 @@ def marginals(self, variables): def get_evidence_probability(self): raise NotImplementedError("We still need to implement this...") - def calculate_messages(self): + def _calculate_messages(self): """ Performs the two way (inward and outward) message passing with the first node as root. Is needed to validate the messages in the @@ -377,13 +381,13 @@ def calculate_messages(self): """ try: root = self.tree.nodes()[0] - self.pull_messages(self.tree, root, None) - self.push_messages(self.tree, root, None) + self._pull_messages(self.tree, root, None) + self._push_messages(self.tree, root, None) self.tree.graph["messagesValid"] = True except IndexError: pass - def pull_messages(self, tree, curNode, parent): + def _pull_messages(self, tree, curNode, parent): """ Performs the inward message passing from the given node to its parent according to Hugin's architecture. @@ -402,7 +406,7 @@ def pull_messages(self, tree, curNode, parent): #Let neighbors collect messages for neighbor in tree.neighbors_iter(curNode): if neighbor != parent: - self.pull_messages(tree, neighbor, curNode) + self._pull_messages(tree, neighbor, curNode) # Send message to parent if parent: @@ -412,7 +416,7 @@ def pull_messages(self, tree, curNode, parent): else: return - def push_messages(self, tree, curNode, parent): + def _push_messages(self, tree, curNode, parent): """ Performs the outwards message passing from the given node to its children other than the given parent according to Hugin's architecture. @@ -436,6 +440,6 @@ def push_messages(self, tree, curNode, parent): tree.node[neighbor]["factor"] = tree.node[neighbor]["factor"] * (newSeqFactor / tree[curNode][neighbor]["factor"]) tree[curNode][neighbor]["factor"] = newSeqFactor # Have neighbor pushing out further - self.push_messages(tree, neighbor, curNode) + self._push_messages(tree, neighbor, curNode) \ No newline at end of file diff --git a/primo2/inference/factor.py b/primo2/inference/factor.py index fa7a45b..69ca1d1 100644 --- a/primo2/inference/factor.py +++ b/primo2/inference/factor.py @@ -465,8 +465,8 @@ def get_potential(self, variables=None): else: index.append(range(len(self.values[v]))) - if len(variables) == 0: - return self.potentials +# if len(variables) == 0: +# return self.potentials return np.squeeze(np.copy(self.potentials[np.ix_(*index)])) diff --git a/primo2/inference/marginal.py b/primo2/inference/marginal.py new file mode 100644 index 0000000..c95122d --- /dev/null +++ b/primo2/inference/marginal.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +""" +Created on Thu Feb 23 13:37:05 2017 + +@author: jpoeppel +""" + +import numpy as np +import warnings + +class Marginal(object): + + """ + A class representing the inference results. This class holds the + (joint) probability distribution after performing inference. + + TODO: Consider adding potentially used evidence and it's probability + as well. + """ + + def __init__(self): + self.variables = [] + self.values = {} + self.probabilities = 0 + + + def copy(self): + """ + Creates a (deep) copy of this marginal. + + Returns + ------- + Marginal + The copied marginal + """ + res = Marginal() + res.variables = list(self.variables) + res.values = dict(self.values) + res.probabilities = np.copy(self.probabilities) + return res + + @classmethod + def from_factor(cls, factor): + """ + Creates a marginal from a factor. This method should only be used + internally as a factor does not make any guarantees about the kind + of potential it contains, thus calling it with factors not containing + marginal probabilities will result in invalid marginals! + + Parameters + ---------- + factor: Factor + The factor whose potential is used to construct the marginal. + + Returns + -------- + Marginal + The created marginal representing the (joint) posterior + marginal over all the variables in the factor. + """ + res = cls() + res.variables = factor.variableOrder + res.values = dict(factor.values) + res.probabilities = factor.potentials.copy() + return res + + def get_probabilities(self, variables=None, returnDict=False): + """ + Returns the probabilities for the specified variable(s), if specified, + either as a compact numpy array (default), ordered according to the + variable and corresponding value orders in self.variables and + self.values, or as a dictionary. + + If variables is not specified it will return the probabilities for + all included variables in the specfied form. + + Parameter + --------- + variables: Dict, RandomNode, String, optional. + Dictionary containing the desired variables (the actual RandomNode + or their Name) as keys and either an instantiation or a list + of instantiations of interest as values. An empty list will be + interpreted as ALL values for that variable. + For a marginal containing the binary variables A and B, + get_probabilities({"A":"True"}) and get_probabilities({"A":["True"]}) + will return the probabilties P(A=True, B=True) and + P(A=True, B=False). + Whereas get_probabilities({"A":"True", "B":"False"}) will only + return P(A=True, B=False). + + Any variable that is not part of the marginal will issue a + warning and will be ignored. A variable is also ignored if an + unknown instantiation was set for it. + + + returnDict: Boolean, optional (default: False) + Specifies if the probabilities should be returned as a dictionary + of the form {variable: {value: probabilities}} (if set to true) or as + a compact np.array with one dimension for each variable, according + to the order given in self.variables. The entries within each + dimension correspond to the values specified with the same + indices in self.values for that variable. + In the simple case where only one variable is desired and a + dictionary should be returned, the outer dictionary is omitted. + + Returns + ------- + dict or np.array + The probabilities for the desired variables and their instantiations. + See the optional returnDict parameter for more information about + the return type. + """ + + + + if not variables: + variables = {} + elif not isinstance(variables, dict): + try: + variables = {variables: self.values[variables]} + except KeyError: + variables = {variables: []} + + #Check variables in order to raise consistent warnings and make sure + #values are in unified form + for v in variables: + if not v in self.variables: + warnings.warn("The variable {} is not part of this marginal "\ + "and will be ignored.".format(v), + RuntimeWarning) + else: + #For compatibility with both python2 and python3, we need both checks + #since in python3 strings also have __iter__ which makes it not + #possible to distinguish between list-like objects and strings + #easily. + if not hasattr(variables[v], "__iter__") or isinstance(variables[v],str): + #Catch the case of {"A":"True"} + variables[v] = [variables[v]] + + if returnDict: + #If we want to return dicts, just call this method multiple + #times to construct the partial matrizes that we want + #TODO This is quite inefficient!!! + res = {} + for var in variables: + tmp = {} + for val in variables[var]: + tmpVariables = dict(variables) + tmpVariables[var] = [val] + tmp[val] = self.get_probabilities(tmpVariables) + res[var] = tmp + + if len(res) == 1: + #Omit outer dictionary in the trivial case + return res.values()[0] + return res + + index = [] + for v in self.variables: + if v in variables and len(variables[v]) > 0: + try: + #Otherwise we just take the indices of interest + index.append([self.values[v].index(value) for value in variables[v]]) + except ValueError: + warnings.warn("Unknown value ({}) for variable {}. "\ + "Ignoring this variable.".format(value, v), + RuntimeWarning) + index.append(range(len(self.values[v]))) + else: + #If a variable is not specified or we have an empty list, + #we use the entire slice + index.append(range(len(self.values[v]))) + + res = np.squeeze(np.copy(self.probabilities[np.ix_(*index)])) + + return res + + + def marginalize(self, variables): + """ + Allows to marginalize out one or multiple variables. + + Parameters + ---------- + variables: RandomNode, String, [RandomNode,], [String,] + The variable(s) that should be marginalized out. Variables + that were not part of the Marginal will be ignored but will + raise a Warning. + + Returns + -------- + Marginal + The marginal resulting in summing out the given variables. + """ + #For compatibility with both python2 and python3, we need both checks + #since in python3 strings also have __iter__ which makes it not + #possible to distinguish between list-like objects and strings + #easily. + if not hasattr(variables, "__iter__") or isinstance(variables,str): + variables = [variables] + + res = self.copy() + for v in variables: + try: + res.probabilities = np.sum(res.probabilities, axis=res.variables.index(v)) + del res.values[v] + res.variables.remove(v) + except ValueError: + warnings.warn("Variable {} will be ignored since it is not " \ + "contained in this marginal.".format(v), RuntimeWarning) + + return res \ No newline at end of file diff --git a/primo2/inference/mcmc.py b/primo2/inference/mcmc.py index 3c29eb5..1f25bef 100644 --- a/primo2/inference/mcmc.py +++ b/primo2/inference/mcmc.py @@ -22,6 +22,7 @@ import random from .factor import Factor +from .marginal import Marginal class MCMC(object): @@ -69,9 +70,9 @@ def marginals(self, variables, evidence=None): Returns ------- - Factor - A factor over the given variables representing their joint probability - given the evidence. + Maginal + A Marginal object over the given variables representing their + joint probability given the evidence. """ if not evidence: evidence = {} @@ -81,7 +82,7 @@ def marginals(self, variables, evidence=None): variableValues = {v: self.bn.get_node(v).values for v in variables} res = Factor.from_samples(sampleChain, variableValues) - return res + return Marginal.from_factor(res) diff --git a/primo2/tests/Inverence_test.py b/primo2/tests/Inverence_test.py index 0f32a8c..cba3641 100644 --- a/primo2/tests/Inverence_test.py +++ b/primo2/tests/Inverence_test.py @@ -96,50 +96,50 @@ def test_empty_cpt(self): bn.add_node(n2) bn.add_edge(n1,n2) res = VariableElimination.naive_marginals(bn, ["a"]) - np.testing.assert_array_almost_equal(res.get_potential(), np.array([0.0, 0.0])) + np.testing.assert_array_almost_equal(res.get_probabilities(), np.array([0.0, 0.0])) def test_naive_marginals(self): - resFactor = VariableElimination.naive_marginals(self.bn, ["winter"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.6, 0.4])) + resMarginal = VariableElimination.naive_marginals(self.bn, ["winter"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.6, 0.4])) def test_naive_marginal_evidence_trivial(self): - resFactor = VariableElimination.naive_marginals(self.bn, ["rain"], {"winter": "true"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.8, 0.2])) + resMarginal = VariableElimination.naive_marginals(self.bn, ["rain"], {"winter": "true"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.8, 0.2])) def test_naive_marginal_evidence_trivial_multiple_evidence(self): - resFactor = VariableElimination.naive_marginals(self.bn, ["wet_grass"], {"sprinkler": "true", "rain": "false"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.1, 0.9])) + resMarginal = VariableElimination.naive_marginals(self.bn, ["wet_grass"], {"sprinkler": "true", "rain": "false"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.1, 0.9])) def test_naive_marginal_evidence(self): - resFactor = VariableElimination.naive_marginals(self.bn, ["wet_grass"], {"winter": "true"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.668, 0.332])) + resMarginal = VariableElimination.naive_marginals(self.bn, ["wet_grass"], {"winter": "true"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.668, 0.332])) def test_naive_marginal_evidence_multiple_evidence(self): - resFactor = VariableElimination.naive_marginals(self.bn, ["wet_grass"], {"winter": "true", "rain": "false"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.02, 0.98])) + resMarginal = VariableElimination.naive_marginals(self.bn, ["wet_grass"], {"winter": "true", "rain": "false"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.02, 0.98])) def test_bucket_marginals(self): - resFactor = VariableElimination.bucket_marginals(self.bn, ["winter"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.6, 0.4])) + resMarginal = VariableElimination.bucket_marginals(self.bn, ["winter"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.6, 0.4])) # def test_bucket_marginal_evidence_trivial(self): - resFactor = VariableElimination.bucket_marginals(self.bn, ["rain"], {"wet_grass": "false"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.158858, 0.841142])) + resMarginal = VariableElimination.bucket_marginals(self.bn, ["rain"], {"wet_grass": "false"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.158858, 0.841142])) def test_bucket_marginal_evidence_trivial_multiple_evidence(self): - resFactor = VariableElimination.bucket_marginals(self.bn, ["wet_grass"], {"sprinkler": "true", "rain": "false"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.1, 0.9])) + resMarginal = VariableElimination.bucket_marginals(self.bn, ["wet_grass"], {"sprinkler": "true", "rain": "false"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.1, 0.9])) def test_bucket_marginal_evidence(self): - resFactor = VariableElimination.bucket_marginals(self.bn, ["wet_grass"], {"winter": "true"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.668, 0.332])) + resMarginal = VariableElimination.bucket_marginals(self.bn, ["wet_grass"], {"winter": "true"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.668, 0.332])) def test_bucket_marginal_evidence_multiple_evidence(self): - resFactor = VariableElimination.bucket_marginals(self.bn, ["wet_grass"], {"winter": "true", "rain": "false"}) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.02, 0.98])) + resMarginal = VariableElimination.bucket_marginals(self.bn, ["wet_grass"], {"winter": "true", "rain": "false"}) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.02, 0.98])) ### TODO check multiple marginals # def test_bucket_multiple_marginals(self): @@ -163,7 +163,7 @@ def test_empty_cpt(self): bn.add_edge(n1,n2) ft = FactorTree.create_jointree(bn) res = ft.marginals(["a"]) - np.testing.assert_array_almost_equal(res.get_potential(), np.array([0.0, 0.0])) + np.testing.assert_array_almost_equal(res.get_probabilities(), np.array([0.0, 0.0])) def test_create_jointree(self): order = ["slippery_road", "wet_grass", "sprinkler", "winter", "rain"] @@ -180,44 +180,44 @@ def test_create_jointree(self): def test_jointree_marginals(self): ft = FactorTree.create_jointree(self.bn) - resFactor = ft.marginals(["winter"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.6, 0.4])) + resMarginal = ft.marginals(["winter"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.6, 0.4])) def test_jointree_marginals2(self): ft = FactorTree.create_jointree(self.bn) - resFactor = ft.marginals(["slippery_road"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.364, 0.636])) + resMarginal = ft.marginals(["slippery_road"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.364, 0.636])) def test_jointree_marginals_trivial_evidence(self): ft = FactorTree.create_jointree(self.bn) ft.set_evidence({"slippery_road":"true"}) - resFactor = ft.marginals(["slippery_road"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([1.0, 0.0])) + resMarginal = ft.marginals(["slippery_road"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([1.0, 0.0])) def test_jointree_evidence_trivial(self): ft = FactorTree.create_jointree(self.bn) ft.set_evidence({"wet_grass": "false"}) - resFactor = ft.marginals(["rain"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.158858, 0.841142])) + resMarginal = ft.marginals(["rain"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.158858, 0.841142])) def test_jointree_marginal_evidence_trivial_multiple_evidence(self): ft = FactorTree.create_jointree(self.bn) ft.set_evidence({"sprinkler": "true", "rain": "false"}) - resFactor = ft.marginals(["wet_grass"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.1, 0.9])) + resMarginal = ft.marginals(["wet_grass"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.1, 0.9])) def test_jointree_marginal_evidence(self): ft = FactorTree.create_jointree(self.bn) ft.set_evidence({"winter": "true"}) - resFactor = ft.marginals(["wet_grass"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.668, 0.332])) + resMarginal = ft.marginals(["wet_grass"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.668, 0.332])) def test_jointree_marginal_evidence_multiple_evidence(self): ft = FactorTree.create_jointree(self.bn) ft.set_evidence( {"winter": "true", "rain": "false"}) - resFactor = ft.marginals(["wet_grass"]) - np.testing.assert_array_almost_equal(resFactor.get_potential(), np.array([0.02, 0.98])) + resMarginal = ft.marginals(["wet_grass"]) + np.testing.assert_array_almost_equal(resMarginal.get_probabilities(), np.array([0.02, 0.98])) def test_jointree_marginal_soft_evidence(self): bn = BayesianNetwork() @@ -236,8 +236,8 @@ def test_jointree_marginal_soft_evidence(self): tree = FactorTree.create_jointree(bn) tree.set_evidence({"cloth": np.array([0.7,0.25,0.05])}, softPosteriors=True) - np.testing.assert_array_almost_equal(tree.marginals(["cloth"]).get_potential(), np.array([0.7,0.25,0.05])) - np.testing.assert_array_almost_equal(tree.marginals(["sold"]).get_potential(), np.array([0.42,0.58])) + np.testing.assert_array_almost_equal(tree.marginals(["cloth"]).get_probabilities(), np.array([0.7,0.25,0.05])) + np.testing.assert_array_almost_equal(tree.marginals(["sold"]).get_probabilities(), np.array([0.42,0.58])) if __name__ == "__main__": #Workaround so that this script also finds the resource files when run directly diff --git a/primo2/tests/Marginal_test.py b/primo2/tests/Marginal_test.py new file mode 100644 index 0000000..6ae3aff --- /dev/null +++ b/primo2/tests/Marginal_test.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +""" +Created on Thu Feb 23 14:09:48 2017 + +@author: jpoeppel +""" + +import unittest +import warnings + +import numpy as np + +import random + +from primo2.inference.factor import Factor +from primo2.inference.marginal import Marginal +from primo2.nodes import DiscreteNode + +class MarginalTest(unittest.TestCase): + + def __init__(self, methodName, testFactor=None): + super(MarginalTest, self).__init__(methodName) + self.longMessage = True + self.factor = testFactor + + def test_create_from_factor(self): + m = Marginal.from_factor(self.factor) + + self.assertEqual(m.variables, self.factor.variableOrder) + for v in self.factor.values: + self.assertEqual(m.values[v], self.factor.values[v]) + np.testing.assert_array_equal(m.probabilities, self.factor.potentials) + pass + + def test_get_probabilities_entire_variable_str_dict(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + res = m.get_probabilities(varName, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + np.testing.assert_array_equal(res[k], self.factor.get_potential({varName:[k]})) + pass + + def test_get_probabilities_entire_variable_str_array(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + res = m.get_probabilities(varName, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, self.factor.potentials) + + def test_get_probabilities_entire_variable_list_dict(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + res = m.get_probabilities({varName: self.factor.values[varName]}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + np.testing.assert_array_equal(res[k], self.factor.get_potential({varName:[k]})) + + def test_get_probabilities_entire_variable_list_array(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + res = m.get_probabilities({varName: self.factor.values[varName]}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, self.factor.potentials) + + def test_get_probabilities_entire_variable_empty_list_array(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + res = m.get_probabilities({varName: []}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, self.factor.potentials) + + def test_get_probabilities_entire_variable_empty_list_dict(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + res = m.get_probabilities({varName: []}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + np.testing.assert_array_equal(res[k], self.factor.get_potential({varName: [k]})) + + def test_get_probabilities_part_variable_list_dict(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + #Use random values to avoid having to create tests for all possible outcomes + value = random.choice(self.factor.values[varName]) + res = m.get_probabilities({varName: [value]}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + np.testing.assert_array_equal(res[k], self.factor.get_potential({varName: [k]}), + err_msg="Failed for value: {} of variable {}".format(value, varName), + verbose=True) + + def test_get_probabilities_part_variable_list_array(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + #Use random values to avoid having to create tests for all possible outcomes + value = random.choice(self.factor.values[varName]) + res = m.get_probabilities({varName: [value]}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, self.factor.get_potential({varName: [value]}), + err_msg="Failed for value: {} of variable {}".format(value, varName), + verbose=True) + + def test_get_probabilities_part_variable_str_dict(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + #Use random values to avoid having to create tests for all possible outcomes + value = random.choice(self.factor.values[varName]) + res = m.get_probabilities({varName: value}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + np.testing.assert_array_equal(res[k], self.factor.get_potential({varName: [k]}), + err_msg="Failed for value: {} of variable {}".format(value, varName), + verbose=True) + + def test_get_probabilities_part_variable_str_array(self): + m = Marginal.from_factor(self.factor) + varName = self.factor.checkVar + #Use random values to avoid having to create tests for all possible outcomes + value = random.choice(self.factor.values[varName]) + res = m.get_probabilities({varName: value}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, self.factor.get_potential({varName: [value]}), + err_msg="Failed for value: {} of variable {}".format(value, varName), + verbose=True) + + def test_get_probabilitites_unknown_variable(self): + m = Marginal.from_factor(self.factor) + wrongVar = self.factor.wrongVar + with warnings.catch_warnings(record=True) as w: + # Cause all warnings to always be triggered. + warnings.simplefilter("always") + + m.get_probabilities(wrongVar) + self.assertEqual(len(w), 1) + self.assertEqual(w[0].category, RuntimeWarning) + self.assertEqual(str(w[0].message), "The variable {} is not part of this marginal and will be ignored.".format(wrongVar)) + + + with warnings.catch_warnings(record=True) as w: + # Cause all warnings to always be triggered. + warnings.simplefilter("always") + + m.get_probabilities({wrongVar:[]}) + self.assertEqual(len(w), 1) + self.assertEqual(w[0].category, RuntimeWarning) + self.assertEqual(str(w[0].message), "The variable {} is not part of this marginal and will be ignored.".format(wrongVar)) + + def test_get_probabilitites_unknown_value(self): + m = Marginal.from_factor(self.factor) + checkVar = self.factor.checkVar + with warnings.catch_warnings(record=True) as w: + # Cause all warnings to always be triggered. + warnings.simplefilter("always") + + res = m.get_probabilities({checkVar: "unknown"}) + + self.assertEqual(len(w), 1) + self.assertEqual(w[0].category, RuntimeWarning) + self.assertEqual(str(w[0].message), "Unknown value ({}) for variable {}. Ignoring this variable.".format("unknown", checkVar)) + np.testing.assert_array_equal(res, self.factor.potentials) + + + + def test_get_probability_simple(self): + pass + + def test_get_probability_under_specified(self): + pass + + def test_get_probability_fully_specified(self): + pass + + def test_marginalize(self): + m = Marginal.from_factor(self.factor) + #Use random variables to avoid having to create tests for all possible outcomes + checkVar = random.choice(self.factor.variableOrder) + resm = m.marginalize(checkVar) + checkList = list(m.variables) + checkList.remove(checkVar) + self.assertEqual(resm.variables, checkList) + np.testing.assert_array_equal(resm.probabilities, self.factor.marginalize(checkVar).potentials) + + def test_marginalize_missing(self): + m = Marginal.from_factor(self.factor) + #Use random variables to avoid having to create tests for all possible outcomes + checkVar = "NotThere" + resm = m.marginalize(checkVar) + checkList = list(m.variables) + checkList.remove(checkVar) + self.assertEqual(resm.variables, checkList) + np.testing.assert_array_equal(resm.probabilities, self.factor.marginalize(checkVar).potentials) + with warnings.catch_warnings(record=True) as w: + # Cause all warnings to always be triggered. + warnings.simplefilter("always") + res = m.marginalize(["A",checkVar]) + self.assertEqual(len(w),1) + self.assertEqual(w[0].category, RuntimeWarning) + self.assertEqual(str(w[0].message), "Variable {} will be ignored since it is not contained in this marginal.".format(checkVar)) + np.testing.assert_array_equal(res.probabilities, self.factor.marginalize("A").potentials) + +######### Will not be run with all factors in setUp_test_factors, but with specific factors ######### + def test_get_probabilitites_multiple_variables_simple_dict(self): + #Uses f2 + m = Marginal.from_factor(self.factor) + res = m.get_probabilities({"A": "2", "B": "Apples"}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + for v in res[k]: + np.testing.assert_array_equal(res[k][v], + self.factor.get_potential({"A": ["2"], "B":["Apples"]})) + + res = m.get_probabilities({"A": "2", "B": ["Apples"]}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + for v in res[k]: + np.testing.assert_array_equal(res[k][v], + self.factor.get_potential({"A": ["2"], "B":["Apples"]})) + + + def test_get_probabilitites_multiple_variables_complex_dict(self): + #Uses f2 + m = Marginal.from_factor(self.factor) + res = m.get_probabilities({"A": "2", "B": ["Apples", "Peaches"]}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + for v in res[k]: + if k == "A": + np.testing.assert_array_equal(res[k][v], + self.factor.get_potential({"A": ["2"], + "B":["Apples","Peaches"]})) + else: + np.testing.assert_array_equal(res[k][v], + self.factor.get_potential({"A": ["2"], "B":[v]})) + + + res = m.get_probabilities({"A": ["2","1"], "B": ["Apples", "Peaches"]}, returnDict=True) + self.assertIsInstance(res, dict) + for k in res: + for v in res[k]: + if k == "A": + np.testing.assert_array_equal(res[k][v], + self.factor.get_potential({"A": [v], + "B":["Apples","Peaches"]})) + else: + np.testing.assert_array_equal(res[k][v], + self.factor.get_potential({"A": ["2","1"], "B":[v]})) + + + def test_get_probabilitites_multiple_variables_simple_array(self): + #Uses f2 + m = Marginal.from_factor(self.factor) + res = m.get_probabilities({"A": "2", "B": "Apples"}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, self.factor.get_potential({"A": ["2"], "B":["Apples"]})) + + res = m.get_probabilities({"A": "2", "B": ["Apples"]}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, self.factor.get_potential({"A": ["2"], "B":["Apples"]})) + + + + def test_get_probabilitites_multiple_variables_complex_array(self): + #Uses f2 + m = Marginal.from_factor(self.factor) + res = m.get_probabilities({"A": "2", "B": ["Apples", "Peaches"]}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, + self.factor.get_potential({"A": ["2"], + "B":["Apples", "Peaches"]})) + + + res = m.get_probabilities({"A": ["2","1"], "B": ["Apples", "Peaches"]}, returnDict=False) + self.assertIsInstance(res, np.ndarray) + np.testing.assert_array_equal(res, + self.factor.get_potential({"A": ["2","1"], + "B":["Apples", "Peaches"]})) + + + def test_get_probabilities_node(self): + #Uses f + m = Marginal.from_factor(self.factor) + n = DiscreteNode("A") + res = m.get_probabilities(n) + np.testing.assert_array_equal(res, self.factor.potentials) + res = m.get_probabilities({n:[]}, returnDict=True) + for k in res: + np.testing.assert_array_equal(res[k], self.factor.get_potential({"A":[k]})) + +def setUp_test_factors(): + f = Factor() + f.variableOrder = ["A"] + f.values = {"A": ["True","False"]} + f.variables = {"A":0} + f.potentials = np.array([0.2,0.8]) + f.checkVar = "A" + f.wrongVar = "B" + + f2 = Factor() + f2.variableOrder = ["A", "B"] + f2.values = {"A": ["1","2","3"], "B":["Apples", "Peaches"]} + f2.variables = {"A":0, "B":1} + f2.potentials = np.array([[0.2,0.1], [0.15,0.05], [0.27,0.23]]) + f2.checkVar = "B" + f2.wrongVar = "C" + return [f,f2] + +def load_tests(loader, tests, pattern): + test_cases = unittest.TestSuite() + factors = setUp_test_factors() + for f in factors: + test_cases.addTest(MarginalTest("test_create_from_factor", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_entire_variable_str_dict", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_entire_variable_str_array", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_entire_variable_list_dict", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_entire_variable_list_array", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_entire_variable_empty_list_array", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_entire_variable_empty_list_dict", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_part_variable_list_dict", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_part_variable_list_array", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_part_variable_str_dict", f)) + test_cases.addTest(MarginalTest("test_get_probabilities_part_variable_list_dict", f)) + + test_cases.addTest(MarginalTest("test_get_probabilitites_unknown_variable", f)) + test_cases.addTest(MarginalTest("test_get_probabilitites_unknown_value", f)) +# test_cases.addTest(MarginalTest("test_get_probability_simple", f)) +# test_cases.addTest(MarginalTest("test_get_probability_under_specified", f)) +# test_cases.addTest(MarginalTest("test_get_probability_fully_specified", f)) + test_cases.addTest(MarginalTest("test_marginalize", f)) +# test_cases.addTest(MarginalTest("test_marginalize_missing", f)) + + test_cases.addTest(MarginalTest("test_get_probabilities_node", factors[0])) + test_cases.addTest(MarginalTest("test_get_probabilitites_multiple_variables_simple_dict", factors[1])) + test_cases.addTest(MarginalTest("test_get_probabilitites_multiple_variables_simple_array", factors[1])) + test_cases.addTest(MarginalTest("test_get_probabilitites_multiple_variables_complex_dict", factors[1])) + test_cases.addTest(MarginalTest("test_get_probabilitites_multiple_variables_complex_array", factors[1])) + return test_cases + + +if __name__ == "__main__": + unittest.main()