Skip to content
Snippets Groups Projects
Commit 871adee3 authored by Lars Stietz's avatar Lars Stietz
Browse files

Merge branch '8-simulation-update' into 'main'

Resolve "Simulation update"

Closes #8

See merge request !11
parents 8bc1bf2e 92db0a37
No related branches found
No related tags found
1 merge request!11Resolve "Simulation update"
simulation_h_surface_periodic.png

61.5 KiB

from .swe_solver import SWESolver
\ No newline at end of file
from .swe_solver import SWESolver, rampFunc, gaussian_bathymetry
\ No newline at end of file
from time import time
import numpy as np
import dedalus.public as d3
from scipy import interpolate
from dedalus.core.domain import Domain
from time import time
from .utils.read_left_bc import leftbc
def gaussian_bathymetry(x: np.ndarray, params: tuple[float, float]) -> np.ndarray:
"""Gaussian bathymetry function.
......@@ -13,6 +15,23 @@ def gaussian_bathymetry(x: np.ndarray, params: tuple[float, float]) -> np.ndarra
"""
return 0.2*np.exp(-1/params[1]*(x-params[0])**2)
def rampFunc(x: np.ndarray) -> np.ndarray:
"""Exact bathymetry function from waterchannel experiment.
Args:
x: The x-coordinate.
Returns:
The bathymetry value at x."""
b_points = np.concatenate((np.zeros(4),
np.array([0, 0.024, 0.053, 0.0905, 0.133, 0.182, 0.2, 0.182, 0.133,
0.0905, 0.053, 0.024, 0]),
np.zeros(21)))
x_points = np.concatenate((np.arange(1.5, 3.5, 0.5),
np.array([3.4125, 3.5, 3.6, 3.7, 3.8, 3.9, 4, 4.1, 4.2, 4.3, 4.4, 4.5, 4.5875]),
np.arange(5, 15.5, 0.5)))
itp_bathy = interpolate.PchipInterpolator(x_points, b_points)
return itp_bathy(x)
class CustomDomain:
"""Domain for the shallow water equations.
......@@ -24,13 +43,26 @@ class CustomDomain:
dom: The domain.
x: The x-grid.
"""
def __init__(self, xbound: tuple[float, float], nx: int, dealias: float):
def __init__(self, xbound: tuple[float, float], nx: int, dealias: float,
basis_type=d3.RealFourier):
self.xcoord = d3.Coordinate('x')
self.dist = d3.Distributor(self.xcoord, dtype=np.float64)
self.xbasis = d3.RealFourier(self.xcoord, size=nx, bounds=xbound, dealias=dealias)
self.xbasis = basis_type(self.xcoord, size=nx, bounds=xbound,
dealias=dealias)
self.dom = Domain(self.dist, bases=[self.xbasis])
self.x = self.dist.local_grid(self.xbasis)
class PeriodicDomain(CustomDomain):
"""Domain for the periodic problem."""
def __init__(self, xbound: tuple[float, float], nx: int, dealias: float):
super().__init__(xbound, nx, dealias, basis_type=d3.RealFourier)
class WaterChannelDomain(CustomDomain):
"""Domain for the water channel problem."""
def __init__(self, xbound: tuple[float, float], nx: int, dealias: float):
super().__init__(xbound, nx, dealias, basis_type=d3.Chebyshev)
class InitialConditions:
"""Initial conditions for the shallow water equations.
......@@ -41,20 +73,68 @@ class InitialConditions:
b: The bathymetry.
H: The total water height.
"""
def __init__(self, domain: CustomDomain, xbound: tuple[float, float]):
def __init__(self, domain: CustomDomain):
self.h = domain.dist.Field(name='h', bases=domain.xbasis)
self.u = domain.dist.Field(name='u', bases=domain.xbasis)
self.t = domain.dist.Field()
self.b = domain.dist.Field(bases=domain.xbasis)
self.H = 0.3 + 0.5 * np.exp(-(domain.x - xbound[1] / 2) ** 2 / 2 ** 2) * 0.05 * np.sin(0.2 * np.pi * domain.x)
self.H = 0.3
def change_space_scales(self, scale: float):
"""Change the space scales of the fields.
Args:
scale: The scale factor.
"""
self.h.change_scales(scale)
self.u.change_scales(scale)
self.b.change_scales(scale)
def set_water_level(self, level: float):
"""Set the water level.
Args:
level: The water level.
"""
self.H = level
class PeriodicInitialConditions(InitialConditions):
"""Initial conditions for the periodic problem."""
def __init__(self, domain: CustomDomain, xbound: tuple[float, float]=(0., 10.)):
super().__init__(domain)
self.H = 0.3 + 0.5 * np.exp(-(domain.x - xbound[1] / 2) ** 2 / 2 ** 2)\
* 0.05 * np.sin(0.2 * np.pi * domain.x)
class WaterChannelInitialConditions(InitialConditions):
"""Initial conditions for the water channel problem."""
def __init__(self, domain: CustomDomain, tstart: float=32.):
super().__init__(domain)
self.bc_field = domain.dist.Field()
self.tau1 = domain.dist.Field(name='tau1')
self.tau2 = domain.dist.Field(name='tau2')
self.tstart = tstart
def change_space_scales(self, scale: float):
"""Change the space scales of the fields.
Args:
scale: The scale factor.
"""
self.h.change_scales(scale)
self.u.change_scales(scale)
self.b.change_scales(scale)
self.bc_field.change_scales(scale)
self.tau1.change_scales(scale)
self.tau2.change_scales(scale)
class Solver:
"""Solver for the shallow water equations.
"""Base solver for the shallow water equations.
Attributes:
dx: The x-derivative.
name_dict: The dictionary of names.
solver: The solver.
ic: The initial conditions.
domain: The domain.
"""
def __init__(self, domain: CustomDomain, initial_conditions: InitialConditions, params: dict):
self.dx = lambda A: d3.Differentiate(A, domain.xcoord)
......@@ -69,13 +149,76 @@ class Solver:
'dx': self.dx
}
problem = d3.IVP([initial_conditions.h, initial_conditions.u],
time=initial_conditions.t, namespace=self.name_dict)
self.ic = initial_conditions
self.domain = domain
def get_problem(self):
"""Get the solver for the problem.
Returns:
The solver.
"""
raise NotImplementedError("This method should be implemented by subclasses")
class PeriodicSolver(Solver):
"""Solver for the periodic problem."""
def get_problem(self):
"""Periodic boundary condition solver."""
problem = d3.IVP([self.ic.h, self.ic.u],
time=self.ic.t, namespace=self.name_dict)
problem.add_equation("dt(h) = -dx(h*u)")
problem.add_equation("dt(u) + g*dx(h) + kappa*u = - g*dx(b) - u*dx(u)")
solver = problem.build_solver(d3.RK443)
return solver
class WaterChannelSolver(Solver):
"""Solver for the water channel problem."""
def __init__(self, domain: WaterChannelDomain,
initial_conditions: WaterChannelInitialConditions,
params: dict, bcfile: str='/home/lars/DATA/Code/bayesian-inverse-problems/experiment_data/Data_Sensors_orig/With_Bathymetry/mean_bc.txt'):
super().__init__(domain, initial_conditions, params)
self.lbc = leftbc(bcfile)
def get_problem(self):
"""Water channel boundary condition solver."""
def hl_function(*args):
t = args[0].data
htemp = self.lbc.f(t + self.ic.tstart)
self.ic.bc_field["g"] = self.ic.H + htemp - self.ic.b['g'][0]
return self.ic.bc_field["g"]
def hl(*args, domain=self.ic.bc_field.domain, F=hl_function):
return d3.GeneralFunction(self.domain.dist, domain, layout='g', tensorsig=(),
dtype=np.float64, func=F, args=args)
lift_basis = self.domain.xbasis.derivative_basis(1)
lift = lambda A, n: d3.Lift(A, lift_basis, n)
tau1 = self.domain.dist.Field(name='tau1')
tau2 = self.domain.dist.Field(name='tau2')
self.name_dict['lift'] = lift
self.name_dict['tau1'] = tau1
self.name_dict['tau2'] = tau2
self.name_dict['hl'] = hl
# Problem
problem = d3.IVP([self.ic.h, self.ic.u, tau1, tau2], time=self.ic.t,
namespace=self.name_dict)
problem.add_equation("dt(h) + lift(tau1, -1) + dx(u) = - dx((h-1)*u)")
problem.add_equation("dt(u) + lift(tau2, -1) + g*dx(h) + kappa*u = - u*dx(u) - g*dx(b)")
problem.add_equation("h(x='left') = hl(t)")
problem.add_equation("u(x='right') = 0")
# Build solver
self.solver = problem.build_solver(d3.RK443)
solver = problem.build_solver(d3.RK443)
return solver
#! TODO: Implement left boundary condition
class SWESolver():
......@@ -85,7 +228,7 @@ class SWESolver():
xbound: The x-boundary.
dt: The time step.
nx: The number of grid points.
tend: The end time.
total_t: The end time.
g: The gravity constant.
kappa: The bottom friction coefficient.
dealias: The dealiasing factor.
......@@ -95,22 +238,30 @@ class SWESolver():
solve: Solve the shallow water equations.
"""
def __init__(self, xbound: tuple[float, float], dt: float, nx: int, tend: float,
g: float=9.81, kappa: float=0.2, dealias: float=2/3):
def __init__(self, xbound: tuple[float, float], dt: float, nx: int, total_t: float, tstart: float=32.,
g: float=9.81, kappa: float=0.2, dealias: float=2/3, problemtype: str='periodic'):
self.xbound = xbound
self.dt = dt
self.nx = nx
self.tend = tend
self.total_t = total_t
self.params = {'g': g, 'kappa': kappa, 'dealias': dealias}
self.problemtype = problemtype
# Initialize domain, initial conditions, and solver
self.domain = CustomDomain(self.xbound, self.nx, self.params['dealias'])
self.initial_conditions = InitialConditions(self.domain, self.xbound)
self.solver = Solver(self.domain, self.initial_conditions, self.params)
def solve(self, b_array: np.ndarray):
if self.problemtype == 'periodic':
self.domain = PeriodicDomain(self.xbound, self.nx, self.params['dealias'])
self.ic= PeriodicInitialConditions(self.domain)
self.solver = PeriodicSolver(self.domain, self.ic, self.params)
elif self.problemtype == 'waterchannel':
self.domain = WaterChannelDomain(self.xbound, self.nx, self.params['dealias'])
self.ic = WaterChannelInitialConditions(self.domain, tstart=tstart)
self.solver = WaterChannelSolver(self.domain, self.ic, self.params)
else:
raise ValueError("Invalid problem type")
def solve(self, b_array: np.ndarray, sensor_pos:np.ndarray = np.array([3.5, 5.5, 7.5])):
"""Solve the shallow water equations.
Args:
peak: The peak of the Gaussian bathymetry.
......@@ -121,50 +272,78 @@ class SWESolver():
t_list: The list of time.
"""
self.solver.solver.stop_wall_time = 15000
self.solver.solver.stop_iteration = int(self.tend/abs(self.dt))+1
self.solver.solver.stop_sim_time = self.tend - 1e-13
self.initial_conditions.b.change_scales(1)
self.initial_conditions.h.change_scales(1)
self.initial_conditions.u.change_scales(1)
self.initial_conditions.b['g'] = np.squeeze(b_array)
self.initial_conditions.h['g'] = self.initial_conditions.H\
- self.initial_conditions.b['g']
self.initial_conditions.u['g'] = 0
h_list = [np.copy(self.initial_conditions.h['g'])]
u_list = [np.copy(self.initial_conditions.u['g'])]
t_list = [self.solver.solver.sim_time]
while self.solver.solver.proceed:
self.solver.solver.step(self.dt)
if self.solver.solver.iteration % 1 == 0:
self.initial_conditions.h.change_scales(1)
h_list.append(np.copy(self.initial_conditions.h['g']))
self.initial_conditions.u.change_scales(1)
u_list.append(np.copy(self.initial_conditions.u['g']))
t_list.append(self.solver.solver.sim_time)
if np.max(self.initial_conditions.h['g']) > 100:
break
self.initial_conditions.b.change_scales(1)
self.initial_conditions.h.change_scales(1)
self.initial_conditions.u.change_scales(1)
return np.array(h_list), np.array(u_list), np.array(t_list)
# Set the initial conditions
self.ic.b['g'] = np.squeeze(b_array)
self.ic.h['g'] = self.ic.H - self.ic.b['g']
self.ic.u['g'] = 0
b_sensor = self.eval_at_sensor_positions(self.ic.b, sensor_pos)
temph_sensor = self.eval_at_sensor_positions(self.ic.h, sensor_pos) + b_sensor
self.ic.change_space_scales(1)
# Set parameters for the solver
solver = self.solver.get_problem()
solver.stop_wall_time = 15000
solver.stop_iteration = int(self.total_t/abs(self.dt))+1
solver.stop_sim_time = self.total_t - 1e-13
H_sensor_list = [temph_sensor]
h_complete_list = [np.copy(self.ic.h['g'])]
u_complete_list = [np.copy(self.ic.u['g'])]
t_list = [solver.sim_time]
while solver.proceed:
solver.step(self.dt)
temph_sensor = self.eval_at_sensor_positions(self.ic.h, sensor_pos) + b_sensor
H_sensor_list.append(temph_sensor)
self.ic.h.change_scales(1)
h_complete_list.append(np.copy(self.ic.h['g']))
self.ic.u.change_scales(1)
u_complete_list.append(np.copy(self.ic.u['g']))
t_list.append(solver.sim_time)
if np.max(self.ic.h['g']) > 100:
break
self.ic.change_space_scales(1)
H_sensor = np.array(H_sensor_list)
return H_sensor, np.array(t_list),\
np.array(h_complete_list), np.array(u_complete_list)
def eval_at_sensor_positions(self, sol, pos):
"""
Evaluates dedalus field at sensor positions.
Args:
sol : dist.Field
Field.
pos : list
Sensor positions.
Returns:
temp : np.ndarray
Field 'sol' evaluated at sensor positions 'pos'.
"""
temp = np.zeros_like(pos)
for i, sensor_pos in enumerate(pos):
sol_int = sol(x=sensor_pos)
temp[i] = float(sol_int.evaluate()['g'])
return temp
def main():
"""Main function."""
xbounds = (0., 10.)
nx = 64
tend = 10
total_t = 10
timestep = 1e-3
g = 9.81
kappa = 0.2
dealias = 3/2
solver = SWESolver(xbounds, timestep, nx, tend, g, kappa, dealias)
solver = SWESolver(xbounds, timestep, nx, total_t, g, kappa, dealias, problemtype='waterchannel')
b = gaussian_bathymetry(solver.domain.x, (5., 1.))
# time the solver
......@@ -176,4 +355,4 @@ def main():
if __name__ == "__main__":
main()
\ No newline at end of file
main()
from .read_left_bc import leftbc, data
\ No newline at end of file
import numpy as np
from scipy.interpolate import CubicSpline
class leftbc(object):
def __init__(self, filename):
self.heights = []
time = 0
with open(filename, 'r') as f:
for line in f.readlines()[1:]:
fields = line.split()
self.heights.append(float(fields[0])/100) # convert from cm to m
time += 0.01 # sampling rate in seconds
nt = np.shape(self.heights)[0]
taxis = np.linspace(0.0, time, nt)
self.final_time = time
self.f = CubicSpline(taxis, self.heights)
self.heights = np.array(self.heights)
class data(object):
def __init__(self, filename):
heights1 = []
heights2 = []
heights3 = []
time = 0
with open(filename, 'r') as f:
for line in f.readlines()[1:]:
fields = line.split()
heights1.append(float(fields[1])/100) # convert from cm to m
heights2.append(float(fields[2])/100) # convert from cm to m
heights3.append(float(fields[3])/100) # convert from cm to m
time += 0.01 # sampling rate in seconds
nt = np.shape(heights1)[0]
heights = np.zeros((nt,3))
heights[:,0] = heights1
heights[:,1] = heights2
heights[:,2] = heights3
taxis = np.linspace(0.0, time, nt)
self.final_time = time
self.f = []
for j in range(3):
self.f.append(CubicSpline(taxis, heights[:,j]))
def main():
data_dir = "experiment_data/Data_Sensors_orig/With_Bathymetry"
bc_array = np.zeros((10000, 20))
for i in range(1, 21):
print("Reading file: ", i)
filename = data_dir + f"/Heat{i}.txt"
bc = leftbc(filename)
bc_array[:,i-1] = bc.heights*100
mean_bc = np.mean(bc_array, axis=1)
with open(data_dir + "/mean_bc.txt", 'w') as f:
f.write("Sensor 1 [cm] \n")
for i in range(10000):
f.write(f"{mean_bc[i]}\n")
if __name__ == "__main__":
main()
import os
import h5py
import numpy as np
from swe_solver import SWESolver
from swe_wrapper import SWESolver, rampFunc
def main():
"""This script is used to generate the simulation data to use
for reconstruction of bathymetry."""
# Get the parent directory of the current file
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
path = os.path.join(parent_dir, "data/toy_measurement")
filename = "simulation_data_mu_s.h5"
full_path = os.path.join(path, filename)
path = os.path.join(current_dir, "data/toy_measurement")
# Ensure the directory exists
os.makedirs(path, exist_ok=True)
# Set up the parameters for measurement simulation (# for simulation in MCMC)
xbounds = (0., 10.)
nx = 64
tend = 10
timestep = 1e-3
xbounds = (1.5, 15.)
nx = 130
total_t = 10
tstart = 32.
timestep = 5e-5
g = 9.81
kappa = 0.2
dealias = 3/2
bathy_peak = (5,1)
problemtype = "waterchannel"
# Create SWE solver and calculate the solution
solver = SWESolver(xbounds, timestep, nx, tend, g, kappa, dealias)
h_list, u_list, t_list = solver.solve(bathy_peak)
h_array = np.array(h_list)
u_array = np.array(u_list)
t_array = np.array(t_list)
solver.initial_conditions.b.change_scales(1)
b_array = np.copy(solver.initial_conditions.b['g'])
solver = SWESolver(xbounds, timestep, nx, total_t,
tstart=tstart, g=g, kappa=kappa, dealias=dealias,
problemtype=problemtype)
x = solver.domain.x
bathy = rampFunc(x)
print("Start Simulation")
H_sensor, h_array, t_array, u_array = solver.solve(bathy)
print("Simulation Done")
dx = (xbounds[1]-xbounds[0])/nx
H_array = h_array + np.tile(b_array, (t_array.size, 1))
# filename
filename = f"simulation_data_{problemtype}.h5"
full_path = os.path.join(path, filename)
print("Saving results to: ", full_path)
with h5py.File(full_path, "w") as f:
f.create_dataset("y", data=H_array)
f.create_dataset("h", data=h_array)
f.create_dataset("u", data=u_array)
f.create_dataset("b_exact", data=b_array)
f.create_dataset("H_sensor", data=H_sensor)
f.create_dataset("b_exact", data=bathy)
f.create_dataset("xgrid", data=np.copy(solver.domain.x))
f.create_dataset("t_array", data=t_array)
f.attrs["T_N"] = tend
f.attrs["T_N"] = total_t
f.attrs["xmin"] = xbounds[0]
f.attrs["xmax"] = xbounds[1]
f.attrs["tstart"] = tstart
f.attrs["dt"] = timestep
f.attrs["dx"] = dx
f.attrs["g"] = g
f.attrs["k"] = kappa # Leave this "k" in hdf5 file,
# otherwise compute all old solutions again.
f.attrs["k"] = kappa
f.attrs["M"] = nx
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment