Commit cbf233de authored by czb5793's avatar czb5793
Browse files

1. Add a callback function for evaluating time used for updates.

2. Reduce the frequency to run the backend, backend only start once loop closures occur and miss the consistency between the measurement and state estimate
parent a39509b6
from threading import Thread, Lock
import time
from supervisor.slam.graph.landmark_graph import *
from supervisor.slam.Slam import Slam
from utils.math_util import normalize_angle
......@@ -10,27 +10,27 @@ optimize_allowed = True
class GraphBasedSLAM(Slam):
def __init__(self, supervisor_interface, slam_cfg, step_time):
def __init__(self, supervisor_interface, slam_cfg, step_time, callback = None):
"""
Initializes an object of the GraphBasedSLAM class
:param supervisor_interface: The interface to interact with the robot supervisor
:param slam_cfg: The configuration for the SLAM algorithm
:param step_time: The discrete time that a single simulation cycle increments
:param callback: callback function
"""
# Bind the supervisor interface
self.supervisor = supervisor_interface
# Extract relevant configurations
self.dt = step_time
self.distance_threshold = slam_cfg["graph_based_slam"]["distance_threshold"]
self.robot_state_size = slam_cfg["robot_state_size"]
self.landmark_state_size = slam_cfg["landmark_state_size"]
self.sensor_noise = np.diag([slam_cfg["graph_based_slam"]["sensor_noise"]["x"],
slam_cfg["graph_based_slam"]["sensor_noise"]["y"]])**2
self.motion_noise = np.diag([slam_cfg["graph_based_slam"]["motion_noise"]["x"],
slam_cfg["graph_based_slam"]["motion_noise"]["y"],
np.deg2rad(slam_cfg["graph_based_slam"]["motion_noise"]["theta"])]) ** 2
self.min_distance_threshold = slam_cfg["graph_based_slam"]["distance_threshold"]
self.min_distance_threshold = slam_cfg["graph_based_slam"]["distance_threshold"] # minimum distance for data assosiation
self.frontend_density = slam_cfg["graph_based_slam"]["frontend_density"]
self.optimization_interval = slam_cfg["graph_based_slam"]['optimization_interval'] # the number interval of pose-vertices added that the graph optimization is executed.
self.frontend_interval = slam_cfg["graph_based_slam"]['frontend_interval'] # the timestep interval of executing the frontend part.
......@@ -38,12 +38,12 @@ class GraphBasedSLAM(Slam):
self.mu = np.zeros((self.robot_state_size, 1))
self.Sigma = np.zeros((self.robot_state_size, self.robot_state_size)) # The state covariance, initially set to absolute certainty of the initial robot pose
self.step_counter = 0
self.max_range = self.supervisor.proximity_sensor_max_range()
self.min_range = self.supervisor.proximity_sensor_min_range()
self.graph = LMGraph()
self.odom_pose = self.__reset_odometry_measurement() # accumulative odometry estimation from wheel encoders
self.fix_hessian = 0 # number of fixed vertices while the graph optimzation.
self.__init_first_step() # initialize the first step
self.flg_optim = False
self.callback = callback
def __init_first_step(self):
""" add the initial robot pose as the first pose-vertex """
......@@ -93,6 +93,8 @@ class GraphBasedSLAM(Slam):
:param u: Motion command, A single item is a vector of [[translational_velocity], [angular_velocity]]
:param z: List of measurements. A single item is a tuple of (range, angle, landmark_id)
"""
start_time = time.time()
self.mu = self.motion_model(self.mu, u, self.dt) # Calculate next step
J = self.jaco_motion_model(self.mu, u, self.dt) # Calculate Jacobian matrix
self.Sigma = J @ self.Sigma @ J.T + self.motion_noise # Update covariance matrix, propagate it while robot is moving
......@@ -101,11 +103,15 @@ class GraphBasedSLAM(Slam):
""" Update the Graph """
if self.step_counter % self.frontend_interval == 0: # create a vertex each n steps
self.__front_end(z)
num_poses, _ = self.graph.count_vertices()
num_vertices = len(self.graph.vertices)
if num_vertices > 0 and self.flg_optim == True:
print ("Graph-based SLAM starts the backend-optimization with " +
str(num_vertices) + " vertices")
self.flg_optim = False
self.__back_end()
if num_poses % self.optimization_interval == 0 \
and num_poses > 0 or num_poses == self.optimization_interval//2:
self.__back_end() # graph optimization
if self.callback is not None:
self.callback(str(self), time.time() - start_time) # time used for updating
def __front_end(self, z):
"""
......@@ -116,20 +122,32 @@ class GraphBasedSLAM(Slam):
""" calculate the next vertex of poses """
vertex1 = self.graph.get_last_pose_vertex() # a previous vertex
vertex2 = PoseVertex(self.mu, self.Sigma) # a current vertex
self.graph.add_vertex(vertex2)
distance = (vertex1.pose[0, 0] - vertex2.pose[0, 0])**2 + (vertex1.pose[1, 0] - vertex2.pose[1, 0])**2
if distance < self.frontend_density**2: # keep the vertex density not high
return
self.graph.add_vertex(vertex2)
""" calculate landmark vertices """
for i, zi in enumerate(z):
for i, zi in enumerate(z): # zi = [x, y, id].T
lm_id = zi[2]
# Only execute if sensor has observed landmark
if not self.supervisor.proximity_sensor_positive_detections()[i] \
or zi[2] == -1: # not a feature
or lm_id == -1: # not a feature
continue
pos_lm = self.calc_landmark_position(self.mu, zi) # calculate x-y position from range-bearing in world coordinate
""" Data Association """
vertex3 = self.__data_association(zi) # vertex3 is a vertex of landmark
if vertex3 == None: # Detect a new landmark, this landmark has not been detected in the past
vertex3 = LandmarkVertex(pos_lm, self.sensor_noise, zi[2])
self.graph.add_vertex(vertex3)
vertex3 = self.__data_association(zi) # Vertex3 represents an estimated landmark
if vertex3 == None:
# Detect a new landmark: this landmark has not been detected in the past
vertex3 = LandmarkVertex(pos_lm, self.sensor_noise, lm_id) # create a new vertex representing a landmark
self.graph.add_vertex(vertex3) # add the vertex to the graph
else:
# Determine whether robot is revisiting a landmark, if yes, start backend.
# Calculate the distance between the estimated landmarks via slam and via actual measurement,
# if the distance is larger than the threshold, start backend
distance2 = (pos_lm[0, 0] - vertex3.pose[0, 0])**2 + (pos_lm[1, 0] - vertex3.pose[1, 0])**2
if distance2 > self.min_distance_threshold**2 and self.flg_optim == False:
self.flg_optim = True
measurement, info = PoseLandmarkEdge.encode_measurement(zi, self.sensor_noise)
self.graph.add_edge(vertex2, vertex3, measurement, info) # add an PoseLandmarkEdge edge
......@@ -138,14 +156,12 @@ class GraphBasedSLAM(Slam):
measurement, info = PosePoseEdge.encode_measurement(vertex1.pose, vertex2.pose, self.motion_noise)
self.graph.add_edge(vertex1, vertex2, measurement, info)
#lm_ids = [v.id for v in self.graph.get_estimated_landmark_vertices()]
#print ("graph based slam", lm_ids)
def __back_end(self):
"""
Back end part of the Graph based slam where the graph optimization is executed.
"""
self.graph.graph_optimization(number_fix=self.fix_hessian, damp_factor=5)
self.graph.graph_optimization(number_fix=self.fix_hessian, damp_factor=5, solver="cholesky")
last_vertex = self.graph.get_last_pose_vertex()
self.mu = np.copy(last_vertex.pose) # update current state
self.Sigma = np.copy(last_vertex.sigma)
......@@ -200,8 +216,6 @@ class GraphBasedSLAM(Slam):
[0, 0, 1]])
return G
@staticmethod
def motion_model(x, u, dt):
"""
......@@ -254,4 +268,7 @@ class GraphBasedSLAM(Slam):
"""
v = (v_r + v_l) * 0.5
w = (v_r - v_l) / width
return v, w
\ No newline at end of file
return v, w
def __str__(self):
return "Graph-based SLAM"
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment