-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearch.py
162 lines (110 loc) · 6.36 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from heapq import heappush, heappop, heapify
import itertools
from collections import deque
from ops import *
class Search():
def __init__(self, initial_state, ops_obj, goal_state=[0, 1, 2, 3, 4, 5, 6, 7, 8]):
super(Search, self).__init__()
self.goal_state = goal_state
self.ops_obj = ops_obj
self.initial_state = initial_state
self.max_frontier_size=0; self.goal_node=0; self.max_search_depth=0
def set_goal_node(self):
'''
Sends data back to Operations to complete the backtrace
:return:
'''
self.ops_obj.set_gn(self.goal_node, self.max_frontier_size, self.max_search_depth)
def bfs(self, start_state):
'''
In theory, is used a ‘color’ property, containers were used here, that took the job of that property - mainly for checking visited / reached / not visited.
:param start_state: usually initial state
:return: the breadth-first tree (containerized in a deque --- double ended queue)
'''
explored, frontier = set(), deque([State(start_state, None, None, 0, 0, 0)])
print("IN BFS")
max_n = 0
while frontier:
max_n += 1
node = frontier.popleft()
explored.add(node.map)
if node.state == self.goal_state:
print("Reached goal")
self.goal_node = node
return frontier
neighbors = self.ops_obj.expand_bfs(node)
for neighbor in neighbors:
if neighbor.map not in explored:
frontier.append(neighbor)
explored.add(neighbor.map)
if neighbor.depth > self.max_search_depth:
self.max_search_depth += 1
if len(frontier) > self.max_frontier_size:
self.max_frontier_size = len(frontier)
print(max_n) # max tries until crash
# 181440
def dfs(self, start_state):
'''
Searching the vertices first. expand(node) takes the depth neighbors first --- going in left/right after
Keeps record of visited nodes and max depth and max-width (frontier)
:param start_state: usually initial state
:return: the path in container<stack>
'''
explored, stack = set(), list([State(start_state, None, None, 0, 0, 0)]) # Initialize root node (in literature s node/vertex)
while stack:
node = stack.pop() # pop(right) the last element inserted --- stack -> LIFO
explored.add(node.map) # Add the current node to explored set so we can't get in a loop
if node.state == self.goal_state: # If we got to the end -> return the depth-first tree
self.goal_node = node
return stack
neighbors = reversed(self.ops_obj.expand(node)) # expand the current node and reverse the returned list(left-right-up-down -> down->up... )
for neighbor in neighbors: # check each adjacent vertex
if neighbor.map not in explored:
stack.append(neighbor) # add the neighbor into df-tree
explored.add(neighbor.map)
if neighbor.depth > self.max_search_depth: # keep track of the max depth --- has no role into computation
self.max_search_depth += 1
if len(stack) > self.max_frontier_size: # keep track of max_frontier --- has no role into computation
self.max_frontier_size = len(stack)
def ast(self, start_state):
'''
A-star implementation using priority queue
:param start_state: usually initial state
:return: a tree of paths in container<stack>
'''
mlen = 0 # number of iterations | nodes until crash
explored, heap, heap_entry, counter = set(), list(), {}, itertools.count()
key = self.ops_obj.h(start_state) # key = heuristic function ( Manhattan distance )
root = State(start_state, None, None, 0, 0, key)
entry = (key, 0, root) # entry = the best vertex available | list of tuples | <distance, move, state>
heappush(heap, entry)
heap_entry[root.map] = entry # create a dictionary with key = state(a1,a2,...,a8,a9)
# value = shortest distance
while heap:
mlen+=1
node = heappop(heap) # extract the last node added
explored.add(node[2].map) # unique nodes
if node[2].state == self.goal_state: # stop condition
self.goal_node = node[2]
return heap
neighbors = self.ops_obj.expand(node[2]) # check for available moves
for neighbor in neighbors:
neighbor.key = neighbor.cost + self.ops_obj.h(neighbor.state) # f(n) = g(n) + h(n)
entry = (neighbor.key, neighbor.move, neighbor)
if neighbor.map not in explored:
heappush(heap, entry)
explored.add(neighbor.map)
heap_entry[neighbor.map] = entry
if neighbor.depth > self.max_search_depth:
self.max_search_depth += 1
elif neighbor.map in heap_entry and neighbor.key < heap_entry[neighbor.map][2].key:
""" If we've found a better route update the dict """
hindex = heap.index((heap_entry[neighbor.map][2].key,
heap_entry[neighbor.map][2].move,
heap_entry[neighbor.map][2]))
heap[int(hindex)] = entry
heap_entry[neighbor.map] = entry
heapify(heap) # convert list into heap
if len(heap) > self.max_frontier_size:
self.max_frontier_size = len(heap)
print("A star: ", mlen)