Commit f16d822f authored by czb5793's avatar czb5793
Browse files

Small changes

parent 979d3c01
......@@ -105,12 +105,15 @@ class GraphSlamPlotter(SlamPlotter):
def __init__(self, slam, viewer, radius, robot_config, frame_number):
SlamPlotter.__init__(self, slam, viewer, radius, robot_config, frame_number)
self.count = 0
def draw_slam_to_frame(self):
SlamPlotter.draw_slam_to_frame(self)
frame = self.viewer.current_frames[self.frame_number]
self.robot_estimated_traverse_path = self.slam.get_estimated_trajectory()
self.__draw_robot_estimated_traverse_path_to_frame(frame)
self.count += 1
if self.slam.draw_trajectory:
self.robot_estimated_traverse_path = self.slam.get_estimated_trajectory()
self.__draw_robot_estimated_traverse_path_to_frame(frame)
def __draw_robot_estimated_traverse_path_to_frame(self, frame):
# Draw traverse path of the robot estimated by the Graph-based SLAM
......
......@@ -24,7 +24,7 @@ import gi
from gi.repository import GLib
from plotters.SlamPlotter import *
from supervisor.slam.SlamEvaluation2 import SlamEvaluation2
from supervisor.slam.SlamEvaluation import SlamEvaluation2
from plotters.MappingPlotter import MappingPlotter
......@@ -43,6 +43,7 @@ from simulation.World import *
from plotters.WorldPlotter import *
from simulation.exceptions import CollisionException, GoalNotReachedException
import time
class Simulator:
......@@ -105,6 +106,9 @@ class Simulator:
# create the simulation world
self.world = World(self.period)
# time cost in a step [ms]
self.t = 0
# create the robot
robot = Robot(self.cfg["robot"])
# Assign supervisor to the robot
......@@ -243,8 +247,12 @@ class Simulator:
self.viewer.draw_frame() # render the frame
def _run_sim(self):
self.sim_event_source = GLib.timeout_add(int(self.period * 1000), self._run_sim)
x = 100
interval = max(self.t + 10, self.period*1000/x)
self.sim_event_source = GLib.timeout_add(int(interval), self._run_sim)
start_time = time.time()
self._step_sim()
self.t = int((time.time() - start_time)*1000) # time cost in a step [ms]
def _update_slam_accuracies(self):
# Only perform the SLAM evaluation on specific simulation cycles. The period is configurable.
......@@ -283,7 +291,9 @@ class Simulator:
if __name__ == "__main__":
filename = "config.yaml" if len(sys.argv) == 1 else sys.argv[1]
filename = "config_graph_based_slam.yaml" if len(sys.argv) == 1 else sys.argv[1]
# filename = "original_config.yaml" if len(sys.argv) == 1 else sys.argv[1]
with open(filename, 'r') as ymlfile:
cfg = yaml.safe_load(ymlfile)
Simulator(cfg)
......@@ -57,6 +57,6 @@ class RobotSupervisorInterface:
:param v_l: Velocity of left wheel
:param v_r: Velocity of right wheel
"""
v_l += normal(0, self.motor_noise[0])
v_r += normal(0, self.motor_noise[1])
v_l = v_l + normal(0, self.motor_noise[0])
v_r = v_r + normal(0, self.motor_noise[1])
self.robot.set_wheel_drive_rates(v_l, v_r)
......@@ -9,7 +9,7 @@ class FeatureDetector(Sensor):
def __init__(self):
"""
Identifiers: a dictionary that maps sensor identifiers to feature identifiers
identifiers: a dictionary that maps sensor identifiers to feature identifiers
"""
self.identifiers = dict()
......
......@@ -14,7 +14,7 @@ import matplotlib.pyplot as plt
class PoseVertex(Vertex):
def __init__(self, id, pose):
Vertex.__init__(self, pose, None, None)
Vertex.__init__(self, pose, None)
assert pose.shape[0] == 3
self.id = id
......@@ -24,8 +24,8 @@ class PoseVertex(Vertex):
class PosePoseEdge(Edge):
def __init__(self, id_vertex1, id_vertex2, z, information, list_vertices):
Edge.__init__(self, id_vertex1, id_vertex2, z, information, list_vertices)
def __init__(self, vertex1, vertex2, z, information):
Edge.__init__(self, vertex1, vertex2, z, information)
def calc_error_vector(self, x1, x2, z):
"""
......@@ -56,9 +56,7 @@ class PosePoseEdge(Edge):
B = np.array([[c, s, 0],
[-s, c, 0],
[0, 0, 1]])
e = self.calc_error_vector(x1, x2, z)
return e, A, B
return A, B
def __str__(self):
x, y, yaw = np.squeeze(self.z)
......
......@@ -235,7 +235,7 @@ class Supervisor:
# update LAM estimations
for slam in self.reg_slam:
if slam is not None:
slam.update(motion_command, zip(measured_distances, sensor_angles, landmark_ids))
slam.update(np.copy(motion_command), zip(measured_distances, sensor_angles, landmark_ids))
# update mapping estimations
for mapping in self.reg_mapping:
......
......@@ -62,6 +62,7 @@ class SupervisorStateMachine:
# change the goal if robot cannot reach it in time
if self.timer.not_reach_goal_in_time(self.supervisor.goal):
self.transition_to_state_go_to_goal()
self.timer.reset()
print ("GoalNotReachedException: Not reach the goal in time.")
raise GoalNotReachedException() # add a new goal not far from the robot
......
......@@ -69,7 +69,7 @@ class EKFSlam(Slam):
"""
Performs a full update cycle consisting of prediction and correction step
:param u: Motion command
:param z: List of sensor measurements. A single measurement is a tuple of measured distance and measured angle.
:param z: List of sensor measurements. A single measurement is a tuple of measured distance, measured angle and associated index of landmark.
"""
start_time = time.time()
......
import time
from supervisor.slam.graph.landmark_graph import *
from supervisor.slam.graph.landmark_graph import LMGraph, PoseLandmarkEdge, PosePoseEdge, PoseVertex, LandmarkVertex
from supervisor.slam.Slam import Slam
from utils.math_util import normalize_angle
from models.Pose import Pose
import numpy as np
from math import *
optimize_allowed = True
class GraphBasedSLAM(Slam):
def __init__(self, supervisor_interface, slam_cfg, step_time, callback = None):
......@@ -20,7 +18,7 @@ class GraphBasedSLAM(Slam):
"""
# Bind the supervisor interface
self.supervisor = supervisor_interface
self.draw_trajectory = slam_cfg["graph_based_slam"]["draw_trajectory"]
# Extract relevant configurations
self.dt = step_time
self.robot_state_size = slam_cfg["robot_state_size"]
......@@ -29,27 +27,24 @@ class GraphBasedSLAM(Slam):
self.motion_noise = np.diag([slam_cfg["graph_based_slam"]["motion_noise"]["x"],
slam_cfg["graph_based_slam"]["motion_noise"]["y"],
np.deg2rad(slam_cfg["graph_based_slam"]["motion_noise"]["theta"])]) ** 2
self.min_distance_threshold = slam_cfg["graph_based_slam"]["distance_threshold"] # minimum distance for data assosiation
# distance threshold for determining optimization
self.min_distance_threshold = slam_cfg["graph_based_slam"]["distance_threshold"]
self.frontend_pose_density = slam_cfg["graph_based_slam"]["frontend_pose_density"]
self.optimization_interval = slam_cfg["graph_based_slam"]['optimization_interval'] # the number interval of pose-vertices added that the graph optimization is executed.
self.frontend_interval = slam_cfg["graph_based_slam"]['frontend_interval'] # the timestep interval of executing the frontend part.
self.num_fixed_vertices = slam_cfg["graph_based_slam"]["num_fixed_vertexes"]
# The estimated combined state vector, initially containing the robot pose at the origin and no landmarks
self.mu = np.zeros((self.robot_state_size, 1))
self.Sigma = np.zeros((self.robot_state_size, self.robot_state_size)) # The state covariance, initially set to absolute certainty of the initial robot pose
self.step_counter = 0
self.graph = LMGraph()
self.fix_hessian = 0 # number of fixed vertices while the graph optimzation.
self.__init_first_step() # initialize the first step
self.flg_optim = False
self.flg_optim = False
self.callback = callback
def __init_first_step(self):
""" add the initial robot pose as the first pose-vertex """
# add the first node to the graph
vertex1 = PoseVertex(self.mu, np.eye(3))
self.graph.add_vertex(vertex1)
self.fix_hessian += 3 # fix this vertex
def get_estimated_pose(self):
"""
......@@ -164,7 +159,11 @@ class GraphBasedSLAM(Slam):
"""
Back end part of the Graph based slam where the graph optimization is executed.
"""
self.graph.graph_optimization(number_fix=self.fix_hessian, damp_factor=5, solver="cholesky")
fix_hessian = 0
for i in range(self.num_fixed_vertices): # fix the first 20 vertices
fix_hessian += self.graph.vertices[i].dim
self.graph.graph_optimization(number_fix=fix_hessian, damp_factor=5, solver="cholesky")
last_vertex = self.graph.get_last_pose_vertex()
self.mu = np.copy(last_vertex.pose) # update current state
self.Sigma = np.copy(last_vertex.sigma)
......
from math import sqrt
from matplotlib import pyplot as plt
from supervisor.slam.EKFSlam import EKFSlam
from supervisor.slam.FastSlam import FastSlam
from supervisor.slam.GraphBasedSLAM import GraphBasedSLAM
from matplotlib.ticker import MaxNLocator
from models.obstacles.FeaturePoint import FeaturePoint
import pickle
import pandas as pd
class SlamEvaluation2:
def __init__(self, list_slam, evaluation_cfg, robot):
"""
Initializes an object of the SlamEvaluation2 class
:param list_slam: a list of slam algorithms that will be evaluated
:param evaluation_cfg: The configurations for the class.
Currently only used to calculate number of simulation cycles
:param robot: robot object in the real world
"""
self.list_slam = list_slam
self.cfg = evaluation_cfg
self.average_distances_lm = [list() for _ in list_slam]
self.distances_robot = [list() for _ in list_slam]
self.robot = robot
self.reset_record()
self.sim_circle = 0
def reset_record(self):
self.head1 = ["sim_circle", "landmark_id", "estimated_landmark_position", "estimated_robot_pose",
"actual_landmark_position", "actual_robot_pose", "slam_name"]
self.data = list()
self.head2 = ["sim_circle", "name", "time_per_update"]
self.update_time_record = list()
def evaluate(self, obstacles):
"""
Evaluates the average distance of the estimated obstacle positions to the closest actual obstacle in the map.
The value is saved.
:param obstacles: The list of actual obstacles of the map
"""
for j, slam in enumerate(self.list_slam):
if slam is None:
continue
slam_obstacles = slam.get_landmarks()
distances = []
for i, slam_obstacle in enumerate(slam_obstacles):
if self.cfg["associate_id"] == True:
dist = self.__find_distance_by_landmark_id(slam_obstacle, obstacles)
else:
dist = self.__find_min_distance(slam_obstacle, obstacles)
distances.append(dist)
if (len(distances)) > 0:
self.average_distances_lm[j].append(sum(distances) / len(distances))
slam_pose = slam.get_estimated_pose()
self.distances_robot[j].append(self.__calc_squared_distance(slam_pose.sunpack(), self.robot.pose.sunpack()))
def record(self, sim_circle, obstacles):
"""
Record data in a list
:param sim_circle: simulation circle
"""
self.sim_circle = sim_circle
for j, slam in enumerate(self.list_slam):
if slam is None:
continue
slam_landmarks = slam.get_landmarks()
estimated_robot_position = slam.get_estimated_pose().sunpack()
actual_robot_position = self.robot.pose.sunpack()
slam_name = str(slam)
for i, slam_landmark in enumerate(slam_landmarks):
landmark_id = slam_landmark[2]
estimated_landmark_position = slam_landmark[:2]
actual_landmark_position = None
for obstacle in obstacles:
if type(obstacle) == FeaturePoint and obstacle.id == landmark_id:
actual_landmark_position = obstacle.pose.sunpack()
break
self.data.append([int(sim_circle), int(landmark_id), estimated_landmark_position,
estimated_robot_position, actual_landmark_position, actual_robot_position, slam_name])
if sim_circle % 1000 == 0:
print ("sim_circle", sim_circle)
def time_per_step(self, name, time):
"""
A callback function for recording time used per update step.
:param name: Name of slam or mapping algorithm
:param time: time cost by updating
"""
if self.sim_circle > 0:
self.update_time_record.append((self.sim_circle, name, time))
def __find_distance_by_landmark_id(self, slam_obstacle, obstacles):
"""
Finds the distance of the estimated obstacle to the actual obstacle regarding its identifiers
:param slam_obstacle: An estimated obstacle position of a SLAM algorithm
:param obstacles: The list of actual obstacles in the map
:return: Distance of estimated obstacle to the actual obstacle
"""
slam_obstacle_id = slam_obstacle[2]
for obstacle in obstacles:
if type(obstacle) == FeaturePoint and obstacle.id == slam_obstacle_id:
sq_dist = self.__calc_squared_distance(slam_obstacle[:2], obstacle.pose.sunpack())
return sqrt(sq_dist)
def __find_min_distance(self, slam_obstacle, obstacles):
"""
Finds the distance of the estimated obstacle to the the closest actual obstacle
:param slam_obstacle: An estimated obstacle position of a SLAM algorithm
:param obstacles: The list of actual obstacles in the map
:return: Distance of estimated obstacle to closest actual obstacle
"""
squared_distances = [self.__calc_squared_distance(slam_obstacle, obstacle.pose.sunpack()) for obstacle in obstacles]
return sqrt(min(squared_distances))
def plot(self):
"""
Produces a plot of how the average distance changed over the course of the simulation.
Saves the plot in a png file.
"""
self.save_infomation("./scripts/sobot_information")
fig, ax = plt.subplots(2, figsize=(9,8))
ax[0].grid()
line_styles = ['k-', 'k--', 'k:', 'k-.']
for i, slam in enumerate(self.list_slam):
if slam is None:
continue
name = str(slam)
# Calculates number of elapsed simulation cycles
sim_cycles = len(self.average_distances_lm[i]) * self.cfg["interval"]
ax[0].plot(range(0, sim_cycles, self.cfg["interval"]),
self.average_distances_lm[i],
line_styles[i%len(self.list_slam)], label = name)
ax[0].xaxis.set_major_locator(MaxNLocator(integer=True))
ax[0].legend()
ax[0].set(xlabel='Simulation cycles', ylabel='Average distance to true landmark in meters', title='Evaluation of SLAM')
plt.savefig('slam_landmark_evaluation.png')
ax[0].grid()
plt.show()
#fig, ax = plt.subplots()
ax[1].grid()
for i, slam in enumerate(self.list_slam):
if slam is None:
continue
name = str(slam)
# Calculates number of elapsed simulation cycles
sim_cycles = len(self.average_distances_lm[i]) * self.cfg["interval"]
ax[1].plot(range(0, sim_cycles, self.cfg["interval"]), self.distances_robot[i], line_styles[i%len(self.list_slam)], label = name)
ax[1].legend()
ax[1].set(xlabel='Simulation cycles', ylabel='Distance to true robot position in meters', title='Evaluation of SLAM')
plt.savefig('slam_robot_position_evaluation.png')
ax[1].grid()
plt.tight_layout()
plt.show()
def save_infomation(self, filename):
"""
Save the information
:param filename: The filename under which the information shall be stored
"""
df = pd.DataFrame(columns=self.head1, data=self.data)
df.to_csv(filename + "1.csv")
df = pd.DataFrame(columns=self.head2, data=self.update_time_record)
df.to_csv(filename + "2.csv")
class SlamEvaluation:
def __init__(self, slam, evaluation_cfg, robot):
"""
Initializes an object of the SlamEvaluation class
:param slam: The slam algorithm that will be evaluated
:param evaluation_cfg: The configurations for the class.
Currently only used to calculate number of simulation cycles
:param robot: robot object in the real world
"""
self.slam = slam
self.cfg = evaluation_cfg
self.average_distances_lm = []
self.distances_robot = []
self.robot = robot
def evaluate(self, obstacles):
"""
Evaluates the average distance of the estimated obstacle positions to the closest actual obstacle in the map.
The value is saved.
:param obstacles: The list of actual obstacles of the map
"""
slam_obstacles = self.slam.get_landmarks()
squared_distances = []
for i, slam_obstacle in enumerate(slam_obstacles):
slam_obstacle_id = slam_obstacle[2]
for obstacle in obstacles:
if type(obstacle) == FeaturePoint and obstacle.id == slam_obstacle_id:
sq_dist = self.__calc_squared_distance(slam_obstacle[:2], obstacle.pose.sunpack())
squared_distances.append(sq_dist)
self.average_distances_lm.append(sum(squared_distances) / len(squared_distances))
slam_pose = self.slam.get_estimated_pose()
self.distances_robot.append(self.__calc_squared_distance(slam_pose.sunpack(), self.robot.pose.sunpack()))
def plot(self):
"""
Produces a plot of how the average distance changed over the course of the simulation.
Saves the plot in a png file.
"""
fig, ax = plt.subplots()
# Calculates number of elapsed simulation cycles
sim_cycles = len(self.average_distances_lm) * self.cfg["interval"]
ax.plot(range(0, sim_cycles, self.cfg["interval"]), self.average_distances_lm)
ax.plot(range(0, sim_cycles, self.cfg["interval"]), self.distances_robot)
ax.grid()
if isinstance(self.slam, EKFSlam):
ax.set(xlabel='Simulation cycles', ylabel='Average distance to true landmark in meters',
title='Evaluation of EKF SLAM')
plt.savefig('ekf_slam_evaluation.png')
elif isinstance(self.slam, FastSlam):
ax.set(xlabel='Simulation cycles', ylabel='Average distance to true landmark in meters',
title='Evaluation of FastSLAM')
plt.savefig('fast_slam_evaluation.png')
elif isinstance(self.slam, GraphBasedSLAM):
ax.set(xlabel='Simulation cycles', ylabel='Average distance to true landmark in meters',
title='Evaluation of Graph-based Slam')
plt.savefig('graph_based_slam_evaluation.png')
ax.grid()
plt.show()
def __find_min_distance(self, slam_obstacle, obstacles):
"""
Finds the distance of the estimated obstacle to the the closest actual obstacle
:param slam_obstacle: An estimated obstacle position of a SLAM algorithm
:param obstacles: The list of actual obstacles in the map
:return: Distance of estimated obstacle to closest actual obstacle
"""
squared_distances = [self.__calc_squared_distance(slam_obstacle, obstacle.pose.sunpack()) for obstacle in obstacles]
return sqrt(min(squared_distances))
@staticmethod
def __calc_squared_distance(x, y):
"""
Calculates squared distance between two positions.
The squared distance is sufficient for finding the minimum distance.
:param x: First position
:param y: Second position
:return: squared distance between the two positions
"""
diff = (x[0] - y[0], x[1] - y[1])
return diff[0] ** 2 + diff[1] ** 2
@staticmethod
def __calc_squared_distance(x, y):
"""
Calculates squared distance between two positions.
The squared distance is sufficient for finding the minimum distance.
:param x: First position
:param y: Second position
:return: squared distance between the two positions
"""
diff = (x[0] - y[0], x[1] - y[1])
return diff[0] ** 2 + diff[1] ** 2
\ No newline at end of file
......@@ -9,6 +9,9 @@ class Timer:
"""
def __init__(self):
self.reset()
def reset(self):
# Timing how long does the robot not reach the goal?
self.previous_goal = [0, 0]
self.start_time_to_goal = time.time()
......
"""
The parent class of any types of edge
The Edge class has to be inherited and the following methods has to be implemented,
- calc_error_vector(self, x1, x2, z): In this method you are going to define the error of the constraint in the graph.
- linearize_constraint(self, x1, x2, z): In this method you are going to linearize the problem by calculating
the jacobian matrices of the error w.r.t x1 and x2
- calc_error_vector(self, x, m, z): In this method you are going to define the error of the constraint in the graph.
- linearize_constraint(self, x, m, z): In this method you are going to linearize the problem by calculating
the jacobian matrices of the error w.r.t x and m
"""
class Edge:
def __init__(self, id_vertex1, id_vertex2, z, information, list_vertices):
def __init__(self, vertex1, vertex2, z, information):
"""
A Edge class. It is a component of a graph
:param id_vertex1: id of previous vertex
:param id_vertex2: id of current vertex
:param vertex1: previous vertex
:param vertex2: icurrent vertex
:param z: actual measurement obtained by sensor
:param information: information matrix
:param list_vertices: list of vertices used to look up. a single vertex is a Vertex object.
"""
self.list_vertices = list_vertices # list of vertices in the graph
self.id1 = id_vertex1 # index of previous vertex
self.id2 = id_vertex2 # index of current vertex
self.vertex1 = self.find_vertice_by_id(id_vertex1)
self.vertex2 = self.find_vertice_by_id(id_vertex2)
self.vertex1 = vertex1
self.vertex2 = vertex2
self.z = z # a measurement from sensor, [r, phi] for range-bearing, [x_l, y_r] for wheel encoder
self.information = information # information matrix
self.error = None # error vector
......@@ -34,30 +30,18 @@ class Edge:
:param z: vector of actual measurement from sensors
:return:
error = z1 - z2
- z1 can be calculated through the states of 2 vertices x1, x2
- z1 can be calculated through the states of 2 vertices x, m
- z2 is obtained by sensors
"""
raise NotImplementedError()
def linearize(self):
"""
Linearize the constraint.
:return: an error vector, jacobian matrices A, B.
e error vector of the constraint.
A Jacobian wrt the pose vector of vertex 1.
B Jacobian wrt the pose vector of vertex 2.
"""
return self.linearize_constraint(self.vertex1.pose, self.vertex2.pose, self.z)
def linearize_constraint(self, x1, x2, z):
"""
Calculate error vector and jacobian matrices
:return:
error: an error vector
A: a jacobian matrix, A = d(error)/d(x1)
B: a jacobian matrix, B = d(error)/d(x2)
A: a jacobian matrix, A = d(error)/d(x)
B: a jacobian matrix, B = d(error)/d(m)
"""
raise NotImplementedError()
......@@ -72,18 +56,20 @@ class Edge:
self.error = self.calc_error_vector(self.vertex1.pose, self.vertex2.pose, self.z)
return (self.error.T @ self.information @ self.error)[0, 0]
def find_vertice_by_id(self, id):
def linearize(self):
"""
Look up a vertex with its id
:param id:
:return: vertex object
Linearize the constraint.
:return: an error vector, jacobian matrices A, B.
e error vector of the constraint.
A Jacobian wrt the pose vector of vertex 1.
B Jacobian wrt the pose vector of vertex 2.
"""
vertex = None
for v in self.list_vertices:
if v.id == id:
vertex = v
break
return vertex
A, B = self.linearize_constraint(self.vertex1.pose, self.vertex2.pose, self.z)
e = self.calc_error_vector(self.vertex1.pose, self.vertex2.pose, self.z)
return e, A, B