Source code for jitcsim.models.kuramoto_sde

import os
import os.path
import numpy as np
from numpy import pi
from os.path import join
from jitcsde import jitcsde, y
from symengine import sin, cos, Symbol, symarray
from jitcsim.utility import (order_parameter as _order,
                             local_order_parameter as _local_order)
os.environ["CC"] = "clang"


[docs]class Kuramoto_Base: """ Base class for the Kuramoto model. Parameters ---------- N: int number of nodes adj: 2d array adjacency matrix t_initial: float, int initial time of integration t_final: float, int final time of integration t_transition: float, int transition time interval : float time interval for sampling sigma : float noise aplitude of normal distribution alpha : flaot frustration omega : float initial angular frequencies initial_state : array of size N initial phase of oscillators control : list of str control parameters use_omp : boolian if `True` allow to use OpenMP output : str output directory verbose: boolian if `True` some information about the process will be desplayed. """ def __init__(self, par) -> None: for item in par.items(): name = item[0] value = item[1] if name not in par['control']: setattr(self, name, value) self.control_pars = [] for i in self.control: if i == "omega": name = i value = symarray(name, self.N) setattr(self, name, value) self.control_pars.extend(value) else: name = i value = Symbol(name) setattr(self, name, value) self.control_pars.append(value) if not os.path.exists(self.output): os.makedirs(self.output) if not "modulename" in par.keys(): self.modulename = "km" self.integtaror_params_set = False self.SET_SEED = False self.seed = None # --------------------------------------------------------------- def set_seed(self, seed): self.SET_SEED = True self.seed = seed def compile(self, **kwargs): I = jitcsde(self.f_sym, self.g_sym, n=self.N, control_pars=self.control_pars, additive=True) # I.generate_f_C(**kwargs) I.compile_C(omp=self.use_omp, **kwargs) I.save_compiled(overwrite=True, destination=join(self.output, self.modulename)) # ---------------------------------------------------------------
[docs] def set_integrator_parameters(self, atol=1e-5, rtol=1e-2, min_step=1e-5, max_step=10.0): """! set properties for integrator """ self.integrator_params = {"atol": atol, "rtol": rtol, "min_step": min_step, "max_step": max_step } self.integtaror_params_set = True
# --------------------------------------------------------------- def set_initial_state(self, x0): assert(len(x0) == self.N) self.initial_state = x0 # ---------------------------------------------------------------
[docs] def simulate(self, par=[], mode_2pi=True): ''' integrate the system of equations and return the coordinates and times Parameters ----------- par : list list of values for control parameters in order of appearence in control Return : dict(t, x) - t times - x coordinates. ''' I = jitcsde(n=self.N, verbose=False, control_pars=self.control_pars, module_location=join(self.output, self.modulename+".so")) if self.SET_SEED: I.set_seed(self.seed) I.set_initial_value(self.initial_state, time=self.t_initial) I.set_parameters(par) if not self.integtaror_params_set: self.set_integrator_parameters() I.set_integration_parameters(**self.integrator_params) times = self.t_transition + \ np.arange(self.t_initial, self.t_final - self.t_transition, self.interval) phases = np.zeros((len(times), self.N)) for i in range(len(times)): if mode_2pi: phases[i, :] = I.integrate(times[i]) % (2*np.pi) else: phases[i, :] = I.integrate(times[i]) return {"t": times, "x": phases}
# --------------------------------------------------------------- def order_parameter(self, phases): order = _order(phases) return order # --------------------------------------------------------------- def local_order_parameter(self, phases, indices): order = _local_order(phases, indices) return order
# ---------------------------------------------------------------
[docs]class Kuramoto_II(Kuramoto_Base): """ **Kuramoto model with noise.** .. math:: \\frac{d\\theta_i}{dt} = \\omega_i + \\xi_i + \\sum_{j=0}^{N-1} a_{i,j} \\sin(y_j - y_i - \\alpha) Parameters ---------- N: int number of nodes adj: 2d array adjacency matrix t_initial: float, int initial time of integration t_final: float, int final time of integration t_transition: float, int transition time interval : float time interval for sampling sigma : float noise aplitude of normal distribution alpha : flaot frustration omega : float initial angular frequencies initial_state : array of size N initial phase of oscillators control : list of str control parameters use_omp : boolian if `True` allow to use OpenMP output : str output directory verbose: boolian if `True` some information about the process will be desplayed. """ def __init__(self, par) -> None: super().__init__(par) # ---------------------------------------------------------------
[docs] def g_sym(self): """ to do. """ for i in range(self.N): yield self.sigma
[docs] def f_sym(self): ''' **Kuramoto model of type II** .. math:: \\frac{d\\theta_i}{dt} = \\omega_i + \\xi_i + \\sum_{j=0}^{N-1} a_{i,j} \\sin(y_j - y_i - \\alpha) ''' for i in range(self.N): sumj = sum(sin(y(j)-y(i) - self.alpha) for j in range(self.N) if self.adj[j, i]) yield self.omega[i] + self.coupling * sumj
# --------------------------------------------------------------- # class Kuramoto_I(Kuramoto_Base): # def __init__(self, par) -> None: # super().__init__(par) # def rhs(self): # '''! # Kuramoto model of type I # \f$ # \frac{d\theta_i}{dt} = \omega_i + 0.5 * \sum_{j=0}^{N-1} a_{i,j} \Big(1 - \cos(y_j - y_i - alpha) \Big) \hspace{1cm} \text{for Type I}\\ # \f$ # @return right hand side of the Kuramoto model # ''' # for i in range(self.N): # sumj = 0.5 * np.sum(1-cos(y(j)-y(i) - self.alpha) # for j in range(self.N) if self.adj[i, j]) # yield self.omega[i] + self.coupling * sumj # class SOKM_SingleLayer(Kuramoto_Base): # """! # Second order Kuramoto Model for single layer network # \f$ # m \frac{d^2 \theta_i(t)}{dt^2}+\frac{d\theta_i(t)}{dt} = \omega_i + \frac{\lambda}{\langle k \rangle} \sum_{j=1}^N \sin \Big[ \theta_j(t) - \theta_i(t) \Big] # \f$ # Reference: # Kachhvah, A.D. and Jalan, S., 2017. Multiplexing induced explosive synchronization in Kuramoto oscillators with inertia. EPL (Europhysics Letters), 119(6), p.60005. # """ # def __init__(self, par) -> None: # super().__init__(par) # def rhs(self): # for i in range(self.N): # yield y(i+self.N) # for i in range(self.N): # sumj = sum(sin(y(j)-y(i)) # for j in range(self.N) # if self.adj[i, j]) # yield (-y(i+self.N) + self.omega[i] + # self.coupling * sumj) * self.inv_m # def compile(self, **kwargs): # I = jitcode(self.rhs, n=2 * self.N, # control_pars=self.control_pars) # I.generate_f_C(**kwargs) # I.compile_C(omp=self.use_omp, modulename=self.modulename) # I.save_compiled(overwrite=True, destination=join(self.output, '')) # def set_initial_state(self, x0): # assert(len(x0) == 2 * self.N) # self.initial_state = x0 # def simulate(self, par, **integrator_params): # '''! # integrate the system of equations and return the # coordinates and times # @return dict(t, x) # - **t** times # - **x** coordinates. # ''' # I = jitcode(n=2 * self.N, # control_pars=self.control_pars, # module_location=join(self.output, self.modulename+".so")) # I.set_integrator(name=self.integration_method, # **integrator_params) # I.set_parameters(par) # I.set_initial_value(self.initial_state, time=self.t_initial) # times = self.t_transition + \ # np.arange(self.t_initial, self.t_final - # self.t_transition, self.interval) # phases = np.zeros((len(times), 2 * self.N)) # for i in range(len(times)): # phases[i, :] = I.integrate(times[i]) # phases[i, :self.N] = phases[i, :self.N] % (2*np.pi) # return {"t": times, "x": phases} # class Lyap_Kuramoto_II(Kuramoto_Base): # def __init__(self, par) -> None: # super().__init__(par) # if not "modulename" in par.keys(): # self.modulename = "lyap_km" # try: # self.verbose = par['verbose'] # except: # self.verbose = False # def rhs(self): # '''! # Kuramoto model of type II # \f$ # \frac{d\theta_i}{dt} = \omega_i + \sum_{j=0}^{N-1} a_{i,j} \sin(y_j - y_i - alpha) \hspace{3.5cm} \text{for Type II}\\ # \f$ # @return right hand side of the Kuramoto model # ''' # for i in range(self.N): # sumj = np.sum(sin(y(j)-y(i) - self.alpha) # for j in range(self.N) if self.adj[i, j]) # yield self.omega[i] + self.coupling * sumj # def compile(self, **kwargs): # I = jitcode_lyap(self.rhs, n=self.N, n_lyap=self.n_lyap, # control_pars=self.control_pars) # I.generate_f_C(**kwargs) # I.compile_C(omp=self.use_omp, modulename=self.modulename, # verbose=self.verbose) # I.save_compiled(overwrite=True, destination=join(self.output, '')) # def simulate(self, par, **integrator_params): # '''! # integrate the system of equations and calculate the Lyapunov exponents. # @param par list of values for control parameter(s). # @return dict(t, x) # - **t** times # - **x** coordinates. # ''' # I = jitcode_lyap(n=self.N, n_lyap=self.n_lyap, # control_pars=self.control_pars, # module_location=join(self.output, # self.modulename+".so")) # I.set_integrator(name=self.integration_method, # **integrator_params) # I.set_parameters(par) # I.set_initial_value(self.initial_state, time=self.t_initial) # times = np.arange(self.t_initial, self.t_final, self.interval) # lyaps = np.zeros((len(times), self.n_lyap)) # for i in range(len(times)): # lyaps[i, :] = I.integrate(times[i])[1] # return {"t": times, "lyap": lyaps} # # ---------------------------------------------------------------