Skip to content

Commit

Permalink
Prototype version
Browse files Browse the repository at this point in the history
  • Loading branch information
ChadFulton committed Feb 15, 2013
1 parent 1371cf1 commit 8422696
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 0 deletions.
Empty file added __init__.py
Empty file.
26 changes: 26 additions & 0 deletions agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
class Agent(object):
"""Generic agent class
Implements the observer pattern.
"""

def __init__(self):
self._observers = []

def attach(self, observer):
if not observer in self._observers:
self._observers.append(observer)

def detach(self, observer):
try:
self._observers.remove(observer)
except ValueError:
pass

def notify(self, modifier=None):
for observer in self._observers:
if modifier != observer:
observer.update(self)

def update():
pass
87 changes: 87 additions & 0 deletions ant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from settings import *
from agent import Agent
from random import uniform

class Ant(Agent):
FORWARDS = 0
BACKWARDS = 1

def __init__(self, world, location, direction = None):
super(Ant, self).__init__()

self.world = world;

self.direction = direction or self.FORWARDS
self.history = [] # a list of trips
self.trip = [location] # a list of nodes forming a complete cycle
self.return_trip = [location]
self.return_length = None

def move(self):
"""Move the ant"""
#x = self.trip[:]
self.trip.append(self.choose())

#print map(reverse_node, x), '->', map(reverse_node, self.trip)
#print map(reverse_node, self.return_trip)

# Add the return trip information
if self.direction == self.FORWARDS:
self.return_trip.append(self.trip[-1])
# check for and remove cycles
index = self.return_trip.index(self.trip[-1])
if index != len(self.return_trip)-1:
self.return_trip = self.return_trip[:index+1]

# Let the observers know there the ant moved (only on the return journey)
if self.direction == self.BACKWARDS:
self.notify()

# If we're headed for the destination (FOOD)
if self.direction == self.FORWARDS and self.trip[-1] == self.world.TERMINAL_NODE:
self.direction = self.BACKWARDS
# need to remove the last node here, which is the TERMINAL_NODE by
# definition, since we're already there
self.return_trip.pop()
self.return_length = len(self.return_trip)

# If we made it back home (NEST)
if self.direction == self.BACKWARDS and self.trip[-1] == self.world.SOURCE_NODE:
self.direction = self.FORWARDS
self.history.append(self.trip)
self.trip = [self.trip[-1]]
self.return_trip = [self.trip[0]]
self.return_length = None

def choose(self):
"""Select the next node the ant travels to"""

if self.direction == self.FORWARDS:
# Get the pheremones on the arcs to each possible destination node
pheromones = self.world.get_pheromones(self.trip[-1])

# If there is more than one node available, don't go to the one you were
# just at
if len(self.trip) > 1 and self.trip[-2] in pheromones and len(pheromones) > 1:
del pheromones[self.trip[-2]]

nodes = pheromones.keys()
levels = map(lambda i: i**alpha, pheromones.values())
ranges = map(lambda i: float(sum(levels[:i+1])), range(len(levels)))

# Get a random draw from a standard uniform distribution
draw = uniform(0, sum(levels))

# Get the number of the destination node
index = [i for i in range(len(ranges)) if ranges[i] > draw][0]

node = nodes[index]

#if self.trip[-1] == NEST:
# print nodes, levels, ranges, draw, node

else:
node = self.return_trip.pop()

return node

27 changes: 27 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Define some constants
NEST = 1
FOOD = 2
LONG_BRANCH = 4

def reverse_node(node):
if node == NEST:
return 'NEST'
if node == FOOD:
return 'FOOD'
if node == LONG_BRANCH:
return 'LONG_BRANCH'

# Default parameter values
T = 1000 # number of steps
n = 2 # number of ants
deposit = 1 # amount of pheremone to deposit
autocatalysis = False # is deposit inversely related to found path length?
alpha = 1 # pheromone amplification
rho = 0.00 # evaporation

# Graphs
double_bridge = {
NEST: [FOOD, LONG_BRANCH],
LONG_BRANCH: [NEST, FOOD],
FOOD: [LONG_BRANCH, NEST]
}
39 changes: 39 additions & 0 deletions simulate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from settings import *
from world import World
from ant import Ant

def simulate(n):
"""Run an ant-colony optimization simulation"""

# Instantiate the world
world = World(double_bridge, source_node = NEST, terminal_node = FOOD)

# Create the ants
for i in range(n):
world.populate(Ant(world, world.SOURCE_NODE))

# Make the ants move until convergence
for t in range(T):
for i in range(n):
world.ants[i].move()
world.advance()

return world

def main(n):
p = []
for i in range(100):
world = simulate(n)
pheromones = world.memo_history[-1]
p.append(float(pheromones[NEST | FOOD])**alpha / (float(pheromones[NEST | FOOD])**alpha + float(pheromones[LONG_BRANCH | NEST])**alpha))
fail = [i for i in p if i <= 0.5]
success = [i for i in p if i >= 0.5]

print 'Convergence to long path: ', (float(len(fail)) / float(len(p)))

if __name__ == "__main__":
import sys
args = {
'n': int(sys.argv[1]) if len(sys.argv) > 1 else n,
}
main(**args)
58 changes: 58 additions & 0 deletions world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from settings import *
from agent import Agent
from ant import Ant

class World(Agent):

def __init__(self, graph, source_node, terminal_node):
super(World, self).__init__()

# World population
self.ants = []

# World geography
self.graph = graph
self.SOURCE_NODE = source_node
self.TERMINAL_NODE = terminal_node
self.arcs = []
for start, ends in self.graph.iteritems():
for end in ends:
if start | end not in self.arcs:
self.arcs.append(start | end)

# World pheromone levels
self.base = { arc:0 for arc in self.arcs } # initial pheremone levels
self.current = self.base.copy() # pheromone changes for the period
self.history = [] # history of pheromone settings for each period
self.memo = { arc:1 for arc in self.arcs } # running tally of pheromones on nodes
self.memo_history = [] # history of runny tally for each period

def advance(self):
for arc, level in self.current.iteritems():
# add the new level
self.memo[arc] += level
# evaporate some pheremone
self.memo[arc] = (1 - rho) * self.memo[arc]

self.memo_history.append(self.memo.copy())
self.history.append(self.current)
self.current = self.base.copy()

def get_ant(self, i):
return self.ants[i]

def get_pheromones(self, node):
pheromones = {}
for destination in self.graph[node]:
pheromones[destination] = self.memo[node | destination]
return pheromones

def populate(self, ant):
self.ants.append(ant)
ant.attach(self)
return len(self.ants)

def update(self, ant):
#print reverse_node(ant.trip[-1]), '|', reverse_node(ant.trip[-2])
level = deposit / ant.return_length if autocatalysis else deposit
self.current[ant.trip[-1] | ant.trip[-2]] += level

0 comments on commit 8422696

Please sign in to comment.