|
import random |
|
import numpy as np |
|
import threading |
|
import panel as pn |
|
pn.extension(template='bootstrap') |
|
import holoviews as hv |
|
import time |
|
import pandas as pd |
|
from holoviews.streams import Stream |
|
hv.extension('bokeh', logo=False) |
|
|
|
|
|
class Particle(): |
|
|
|
|
|
def __init__(self, initial): |
|
self.position = [] |
|
self.velocity = [] |
|
self.initial = initial |
|
self.best_position = [] |
|
self.best_error = float('inf') |
|
self.error = float('inf') |
|
self.num_dimensions = 2 |
|
for i in range(0, self.num_dimensions): |
|
self.velocity.append(random.uniform(-1, 1)) |
|
self.position.append(initial[i]) |
|
|
|
|
|
def update_velocity(self, global_best_position, max_iter, iter_count): |
|
c1_start = 2.5 |
|
c1_end = 0.5 |
|
c2_start = 0.5 |
|
c2_end = 2.5 |
|
w = 0.7298 |
|
|
|
c1 = c1_start - (c1_start - c1_end) * (iter_count / max_iter) |
|
c2 = c2_start + (c2_end - c2_start) * (iter_count / max_iter) |
|
|
|
for i in range(0, self.num_dimensions): |
|
r1 = random.random() |
|
r2 = random.random() |
|
|
|
cog_vel = c1 * r1 * (self.best_position[i] - self.position[i]) |
|
social_vel = c2 * r2 * (global_best_position[i] - self.position[i]) |
|
self.velocity[i] = w * self.velocity[i] + cog_vel + social_vel |
|
|
|
|
|
def update_position(self, bounds): |
|
for i in range(0, self.num_dimensions): |
|
self.position[i] = self.position[i] + self.velocity[i] |
|
|
|
if self.position[i] > bounds[i][1]: |
|
self.position[i] = bounds[i][1] |
|
|
|
if self.position[i] < bounds[i][0]: |
|
self.position[i] = bounds[i][0] |
|
|
|
|
|
def evaluate_fitness(self, number, target, function): |
|
if number == 1: |
|
self.error = fitness_function(self.position, target) |
|
else: |
|
self.error = cost_function(self.position, function) |
|
|
|
if self.error < self.best_error: |
|
self.best_position = self.position[:] |
|
self.best_error = self.error |
|
|
|
|
|
def get_error(self): |
|
return self.error |
|
|
|
|
|
def get_best_pos(self): |
|
return self.best_position[:] |
|
|
|
|
|
def get_best_error(self): |
|
return self.best_error |
|
|
|
|
|
def get_pos(self): |
|
return self.position[:] |
|
|
|
|
|
def get_velocity(self): |
|
return self.velocity[:] |
|
|
|
|
|
def fitness_function(particle_position, target): |
|
x_pos, y_pos = float(target[0]), float(target[1]) |
|
return (x_pos - particle_position[0])**2 + (y_pos - particle_position[1])**2 |
|
|
|
|
|
import sympy as sp |
|
|
|
def cost_function(particle_position, function_str): |
|
x, y = sp.symbols('x y') |
|
function = sp.sympify(function_str) |
|
return function.subs({x: particle_position[0], y: particle_position[1]}) |
|
|
|
|
|
class Interactive_PSO(): |
|
|
|
|
|
def __init__(self): |
|
self._running = False |
|
self.max_iter = 500 |
|
self.num_particles = 25 |
|
self.initial = [5, 5] |
|
self.bounds = [(-500, 500), (-500, 500)] |
|
self.x_axis = [] |
|
self.y_axis = [] |
|
self.target = [5] * 2 |
|
self.global_best_error = float('inf') |
|
self.update_particles_position_lists_with_random_values() |
|
self.global_best_position = [0, 0] |
|
|
|
|
|
|
|
def swarm_initialization(self, number, max_iter): |
|
swarm = [] |
|
self.global_best_position = [0, 0] |
|
self.global_best_error = float('inf') |
|
self.gamma = 0.0001 |
|
function = function_select.value |
|
|
|
for i in range(0, self.num_particles): |
|
swarm.append(Particle([self.x_axis[i], self.y_axis[i]])) |
|
|
|
iter_count = 0 |
|
while self._running: |
|
if self.global_best_error <= 0.00001: |
|
break |
|
|
|
for j in range(0, self.num_particles): |
|
swarm[j].evaluate_fitness(number, self.target, function) |
|
|
|
if swarm[j].get_error() < self.global_best_error: |
|
self.global_best_position = swarm[j].get_best_pos() |
|
self.global_best_error = swarm[j].get_best_error() |
|
|
|
for j in range(0, self.num_particles): |
|
swarm[j].update_velocity(self.global_best_position, max_iter, iter_count) |
|
swarm[j].update_position(self.bounds) |
|
self.x_axis[j] = swarm[j].get_pos()[0] |
|
self.y_axis[j] = swarm[j].get_pos()[1] |
|
|
|
|
|
time.sleep(0.05) |
|
|
|
iter_count += 1 |
|
|
|
|
|
update_table = True |
|
hv.streams.Stream.trigger(table_dmap.streams) |
|
|
|
self.initial = self.global_best_position |
|
self._running = False |
|
print('Best Position:', self.global_best_position) |
|
print('Best Error:', self.global_best_error) |
|
print('Function:', function) |
|
|
|
|
|
|
|
def terminate(self): |
|
self._running = False |
|
|
|
|
|
def starting(self): |
|
self._running = True |
|
|
|
|
|
def isrunning(self): |
|
return self._running |
|
|
|
|
|
def get_num_particles(self): |
|
return self.num_particles |
|
|
|
|
|
def update_num_particles(self, new_value): |
|
self.num_particles = new_value |
|
|
|
|
|
def get_xaxis(self): |
|
return self.x_axis[:] |
|
|
|
|
|
def get_yaxis(self): |
|
return self.y_axis[:] |
|
|
|
|
|
def set_target(self, x, y): |
|
self.target = [x, y] |
|
|
|
|
|
def get_target(self): |
|
return self.target[:] |
|
|
|
|
|
def update_particles_position_lists(self, updated_num_particles): |
|
old_x_value = self.x_axis[0] |
|
old_y_value = self.y_axis[0] |
|
if updated_num_particles > self.num_particles: |
|
for i in range(self.num_particles, updated_num_particles): |
|
self.x_axis.append(old_x_value) |
|
self.y_axis.append(old_y_value) |
|
else: |
|
for i in range((self.num_particles) - 1, updated_num_particles - 1, -1): |
|
self.x_axis.pop(i) |
|
self.y_axis.pop(i) |
|
|
|
|
|
def update_particles_position_lists_with_random_values(self): |
|
self.x_axis = random.sample(range(-500, 500), self.num_particles) |
|
self.y_axis = random.sample(range(-500, 500), self.num_particles) |
|
|
|
pso_swarm = Interactive_PSO() |
|
pso_computation_swarm = Interactive_PSO() |
|
|
|
update_table = False |
|
|
|
|
|
def start_finding_the_target(): |
|
pso_swarm.swarm_initialization(1, pso_swarm.max_iter) |
|
|
|
|
|
def start_computation(): |
|
pso_computation_swarm.swarm_initialization(2, pso_computation_swarm.max_iter) |
|
|
|
|
|
def create_target_element(x, y): |
|
pso_swarm.terminate() |
|
if x is not None: |
|
pso_swarm.set_target(x, y) |
|
return hv.Points((x, y, 1), label='Target').opts(color='red', marker='^', size=10) |
|
|
|
|
|
def update(): |
|
x_axis = pso_swarm.get_xaxis() |
|
y_axis = pso_swarm.get_yaxis() |
|
data = (x_axis, y_axis, np.random.random(size=len(x_axis))) |
|
pop_scatter = hv.Scatter(data, vdims=['y_axis', 'z']) |
|
pop_scatter.opts(size=8, color='z', cmap='Coolwarm_r') |
|
return pop_scatter |
|
|
|
|
|
def computational_update(): |
|
x_axis = pso_computation_swarm.get_xaxis() |
|
y_axis = pso_computation_swarm.get_yaxis() |
|
data = (x_axis, y_axis, np.random.random(size=len(x_axis))) |
|
pop_scatter1 = hv.Scatter(data, vdims=['y_axis', 'z']) |
|
pop_scatter1.opts(size=8, color='z', cmap='Coolwarm_r') |
|
return pop_scatter1 |
|
|
|
|
|
def update_num_particles_event(event): |
|
if population_slider.value == pso_swarm.get_num_particles(): |
|
return |
|
pso_swarm.terminate() |
|
pso_computation_swarm.terminate() |
|
time.sleep(1) |
|
updated_num_particles = population_slider.value |
|
pso_swarm.update_particles_position_lists(updated_num_particles) |
|
pso_swarm.update_num_particles(updated_num_particles) |
|
pso_computation_swarm.update_num_particles(updated_num_particles) |
|
pso_computation_swarm.update_particles_position_lists_with_random_values() |
|
pso_swarm.update_particles_position_lists_with_random_values() |
|
hv.streams.Stream.trigger(pso_scatter1.streams) |
|
hv.streams.Stream.trigger(pso_scatter.streams) |
|
|
|
|
|
def trigger_streams(): |
|
global update_table |
|
hv.streams.Stream.trigger(pso_scatter.streams) |
|
hv.streams.Stream.trigger(pso_scatter1.streams) |
|
if update_table: |
|
update_table = False |
|
hv.streams.Stream.trigger(table_dmap.streams) |
|
|
|
|
|
tap.event(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1]) |
|
|
|
|
|
time.sleep(0.05) |
|
|
|
|
|
def hunting_button_event(event): |
|
if not pso_swarm.isrunning(): |
|
pso_swarm.starting() |
|
threading.Thread(target=start_finding_the_target).start() |
|
|
|
|
|
def computation_button_event(event): |
|
if not pso_computation_swarm.isrunning(): |
|
pso_computation_swarm.starting() |
|
threading.Thread(target=start_computation).start() |
|
|
|
def table(): |
|
position = pso_computation_swarm.global_best_position |
|
df = pd.DataFrame({ |
|
'x_position': [round(position[0])], |
|
'y_position': [round(position[1])] |
|
}) |
|
|
|
|
|
hv_table = hv.Table(df).opts(width=300, height=100) |
|
|
|
return hv_table |
|
|
|
|
|
|
|
def update_function(event): |
|
pso_computation_swarm.terminate() |
|
time.sleep(1) |
|
pso_computation_swarm.update_particles_position_lists_with_random_values() |
|
|
|
|
|
|
|
pso_scatter = hv.DynamicMap(update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500), ylim=(-500, 500), |
|
title="Plot 2 : PSO for target finding ") |
|
pso_scatter1 = hv.DynamicMap(computational_update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500), |
|
ylim=(-500, 500), |
|
title="Plot 1 : PSO for a mathematical computation") |
|
|
|
|
|
tap = hv.streams.SingleTap(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1]) |
|
target_dmap = hv.DynamicMap(create_target_element, streams=[tap]) |
|
|
|
|
|
custom_style = { |
|
'background': '##4287f5', |
|
'border': '1px solid black', |
|
'padding': '8px', |
|
'box-shadow': '5px 5px 5px #bcbcbc' |
|
} |
|
|
|
|
|
table_dmap = hv.DynamicMap(table,streams=[hv.streams.Stream.define('Next')()]) |
|
table_label = pn.pane.Markdown("Once an optimal solution is found in plot 1 it is updated in the below table") |
|
|
|
|
|
start_hunting_button = pn.widgets.Button(name=' Click to find target for plot 2 ', width=50) |
|
start_hunting_button.on_click(hunting_button_event) |
|
|
|
|
|
start_finding_button = pn.widgets.Button(name=' Click to start computation for plot 1', width=50) |
|
start_finding_button.on_click(computation_button_event) |
|
|
|
|
|
update_num_particles_button = pn.widgets.Button(name='Update number of particles', width=50) |
|
update_num_particles_button.on_click(update_num_particles_event) |
|
|
|
|
|
pn.state.add_periodic_callback(trigger_streams, 3) |
|
|
|
|
|
population_slider = pn.widgets.IntSlider(name='Number of praticles', start=10, end=100, value=25) |
|
|
|
|
|
function_select = pn.widgets.Select(name='Select', options=['x^2+(y-100)^2','(x-234)^2+(y+100)^2', 'x^3 + y^3 - 3*x*y', 'x^2 * y^2']) |
|
function_select.param.watch(update_function,'value') |
|
|
|
|
|
plot_for_finding_the_target = pso_scatter*target_dmap |
|
|
|
|
|
dashboard = pn.Column(pn.Row(pn.Row(pso_scatter1.opts(width=500, height=500)), pn.Column(plot_for_finding_the_target.opts(width=500, height=500)), |
|
pn.Column(pn.Column(table_label, table_dmap, styles=custom_style), start_finding_button, start_hunting_button, update_num_particles_button, population_slider,function_select))) |
|
|
|
pn.panel(dashboard).servable(title='Swarm Particles Visualization') |