Source code for jitcsim.models.kuramoto_ode

import os
import os.path
import numpy as np
from numpy import pi
from os.path import join
from jitcode import jitcode, jitcode_lyap, y
from symengine import sin, cos, Symbol, symarray
from jitcsim.utility import (order_parameter as _order,
                             local_order_parameter as _local_order)
os.environ["CC"] = "clang"


[docs]class Kuramoto_Base: """ Base class for the Kuramoto model. Parameters ---------- N: int number of nodes adj: 2d array adjacency matrix t_initial: float, int initial time of integration t_final: float, int final time of integration t_transition: float, int transition time interval : float time interval for sampling alpha : flaot frustration omega : float initial angular frequencies initial_state : array of size N initial phase of oscillators integration_method : str name of the integrator One of the following (or a new method supported by either backend): * `"dopri5"` - Dormand's and Prince's explicit fifth-order method via `ode` * `"RK45"` - Dormand's and Prince's explicit fifth-order method via `solve_ivp` * `"dop853"` - DoP853 (explicit) via `ode` * `"DOP853"` - DoP853 (explicit) via `solve_ivp` * `"RK23"` - Bogacki's and Shampine's explicit third-order method via `solve_ivp` * `"BDF"` - Implicit backward-differentiation formula via `solve_ivp` * `"lsoda"` - LSODA (implicit) via `ode` * `"LSODA"` - LSODA (implicit) via `solve_ivp` * `"Radau"` - The implicit Radau method via `solve_ivp` * `"vode"` - VODE (implicit) via `ode` The `solve_ivp` methods are usually slightly faster for large differential equations, but they come with a massive overhead that makes them considerably slower for small differential equations. Implicit solvers are slower than explicit ones, except for stiff problems. If you don't know what to choose, start with `"dopri5"`. control : list of str control parameters use_omp : boolian if `True` allow to use OpenMP output : str output directory verbose: boolian if `True` some information about the process will be desplayed. """ def __init__(self, par) -> None: for item in par.items(): name = item[0] value = item[1] if name not in par['control']: setattr(self, name, value) self.control_pars = [] for i in self.control: if i == "omega": name = i value = symarray(name, self.N) setattr(self, name, value) self.control_pars.extend(value) else: name = i value = Symbol(name) setattr(self, name, value) self.control_pars.append(value) if not os.path.exists(self.output): os.makedirs(self.output) if not "modulename" in par.keys(): self.modulename = "km" # --------------------------------------------------------------- def set_modulename(self, modulename): self.modulename = modulename
[docs] def compile(self, modulename="km", **kwargs): """ compile model and produce shared library. translates the derivative to C code using SymEngine's `C-code printer <https://github.com/symengine/symengine/pull/1054>`_. Parameters ---------- kwargs : (key, value) used in generate_f_C including - simplify : boolean or None - do_cse: boolian, Whether SymPy's `common-subexpression detection <http://docs.sympy.org/dev/modules/rewriting.html#module-sympy.simplify.cse_main>`_ should be applied before translating to C code. It is almost always better to let the compiler do this (unless you want to set the compiler optimisation to `-O2` or lower): For simple differential equations this should not make any difference to the compiler's optimisations. For large ones, it may make a difference but also take long. As this requires all entries of `f` at once, it may void advantages gained from using generator functions as an input. Also, this feature uses SymPy and not SymEngine. - chunk_size: int If the number of instructions in the final C code exceeds this number, it will be split into chunks of this size. See `Handling very large differential equations <http://jitcde-common.readthedocs.io/#handling-very-large-differential-equations>`_ on why this is useful and how to best choose this value. It also used for paralleling using OpenMP to determine task scheduling. If smaller than 1, no chunking will happen. """ self.set_modulename(modulename) I = jitcode(self.rhs, n=self.N, control_pars=self.control_pars) I.generate_f_C(**kwargs) I.compile_C(omp=self.use_omp, modulename=self.modulename) I.save_compiled(overwrite=True, destination=join(self.output, ''))
# --------------------------------------------------------------- def set_initial_state(self, x0): assert(len(x0) == self.N) self.initial_state = x0 # ---------------------------------------------------------------
[docs] def simulate(self, par, **integrator_params): ''' integrate the system of equations and return the coordinates and times Parameters ---------- par : list list of values for control parameters in order of appearence in control Return : dict(t, x) - t times - x coordinates. ''' I = jitcode(n=self.N, control_pars=self.control_pars, module_location=join(self.output, self.modulename+".so")) I.set_integrator(name=self.integration_method, **integrator_params) I.set_parameters(par) I.set_initial_value(self.initial_state, time=self.t_initial) times = self.t_transition + \ np.arange(self.t_initial, self.t_final - self.t_transition, self.interval) phases = np.zeros((len(times), self.N)) for i in range(len(times)): phases[i, :] = I.integrate(times[i]) % (2*np.pi) return {"t": times, "x": phases}
# ---------------------------------------------------------------
[docs] def order_parameter(self, phases): """ calculate the Kuramoto order parameter Parameters ----------- phases : 2D numpy array [nstep by N] phase of oscillators """ order = _order(phases) return order
# ---------------------------------------------------------------
[docs] def local_order_parameter(self, phases, indices): """ calculate local Kuramoto order parameter for given node indices Parameters ---------- phases : float numpy array phase of each oscillator. indices : int numpy array indices of given nodes. """ order = _local_order(phases, indices) return order
# ---------------------------------------------------------------
[docs]class Kuramoto_II(Kuramoto_Base): """ **Kuramoto model type II** .. math:: \\frac{d\\theta_i}{dt} &= \\omega_i + \\sum_{j=0}^{N-1} a_{i,j} \\sin(y_j - y_i - \\alpha) Parameters ---------- N: int number of nodes adj: 2d array adjacency matrix t_initial: float, int initial time of integration t_final: float, int final time of integration t_transition: float, int transition time interval : float time interval for sampling alpha : flaot frustration omega : float initial angular frequencies initial_state : array of size N initial phase of oscillators integration_method : str name of the integrator One of the following (or a new method supported by either backend): * `"dopri5"` - Dormand's and Prince's explicit fifth-order method via `ode` * `"RK45"` - Dormand's and Prince's explicit fifth-order method via `solve_ivp` * `"dop853"` - DoP853 (explicit) via `ode` * `"DOP853"` - DoP853 (explicit) via `solve_ivp` * `"RK23"` - Bogacki's and Shampine's explicit third-order method via `solve_ivp` * `"BDF"` - Implicit backward-differentiation formula via `solve_ivp` * `"lsoda"` - LSODA (implicit) via `ode` * `"LSODA"` - LSODA (implicit) via `solve_ivp` * `"Radau"` - The implicit Radau method via `solve_ivp` * `"vode"` - VODE (implicit) via `ode` The `solve_ivp` methods are usually slightly faster for large differential equations, but they come with a massive overhead that makes them considerably slower for small differential equations. Implicit solvers are slower than explicit ones, except for stiff problems. If you don't know what to choose, start with `"dopri5"`. control : list of str control parameters use_omp : boolian if `True` allow to use OpenMP output : str output directory verbose: boolian if `True` some information about the process will be desplayed. """ def __init__(self, par) -> None: super().__init__(par) # ---------------------------------------------------------------
[docs] def rhs(self): ''' **Kuramoto model of type II** .. math:: \\frac{d\\theta_i}{dt} &= \\omega_i + \\sum_{j=0}^{N-1} a_{i,j} \\sin(y_j - y_i - \\alpha) ''' for i in range(self.N): sumj = sum(sin(y(j)-y(i) - self.alpha) for j in range(self.N) if self.adj[i, j]) yield self.omega[i] + self.coupling * sumj
# ---------------------------------------------------------------
[docs]class Kuramoto_I(Kuramoto_Base): """ **Kuramot model type I** .. math:: \\frac{d\\theta_i}{dt} = \\omega_i + 0.5 * \\sum_{j=0}^{N-1} a_{i,j} \\Big(1 - \\cos(y_j - y_i - \\alpha) \\Big) Parameters ---------- N: int number of nodes adj: 2d array adjacency matrix t_initial: float, int initial time of integration t_final: float, int final time of integration t_transition: float, int transition time interval : float time interval for sampling alpha : flaot frustration omega : float initial angular frequencies initial_state : array of size N initial phase of oscillators integration_method : str name of the integrator One of the following (or a new method supported by either backend): * `"dopri5"` - Dormand's and Prince's explicit fifth-order method via `ode` * `"RK45"` - Dormand's and Prince's explicit fifth-order method via `solve_ivp` * `"dop853"` - DoP853 (explicit) via `ode` * `"DOP853"` - DoP853 (explicit) via `solve_ivp` * `"RK23"` - Bogacki's and Shampine's explicit third-order method via `solve_ivp` * `"BDF"` - Implicit backward-differentiation formula via `solve_ivp` * `"lsoda"` - LSODA (implicit) via `ode` * `"LSODA"` - LSODA (implicit) via `solve_ivp` * `"Radau"` - The implicit Radau method via `solve_ivp` * `"vode"` - VODE (implicit) via `ode` The `solve_ivp` methods are usually slightly faster for large differential equations, but they come with a massive overhead that makes them considerably slower for small differential equations. Implicit solvers are slower than explicit ones, except for stiff problems. If you don't know what to choose, start with `"dopri5"`. control : list of str control parameters use_omp : boolian if `True` allow to use OpenMP output : str output directory verbose: boolian if `True` some information about the process will be desplayed. """ def __init__(self, par) -> None: super().__init__(par)
[docs] def rhs(self): ''' **Kuramoto model of type I** .. math:: \\frac{d\\theta_i}{dt} = \\omega_i + 0.5 * \\sum_{j=0}^{N-1} a_{i,j} \\Big(1 - \\cos(y_j - y_i - \\alpha) \\Big) Return : right hand side of the Kuramoto model ''' for i in range(self.N): sumj = 0.5 * sum(1-cos(y(j)-y(i) - self.alpha) for j in range(self.N) if self.adj[j, i]) yield self.omega[i] + self.coupling * sumj
[docs]class SOKM_SingleLayer(Kuramoto_Base): """ **Second order Kuramoto Model for single layer network** .. math:: m \\frac{d^2 \\theta_i(t)}{dt^2}+\\frac{d\\theta_i(t)}{dt} = \\omega_i + \\frac{\\lambda}{\\langle k \\rangle} \\sum_{j=1}^N \\sin \\Big[ \\theta_j(t) - \\theta_i(t) \\Big] Reference: Kachhvah, A.D. and Jalan, S., 2017. Multiplexing induced explosive synchronization in Kuramoto oscillators with inertia. EPL (Europhysics Letters), 119(6), p.60005. """ def __init__(self, par) -> None: super().__init__(par) def rhs(self): for i in range(self.N): yield y(i+self.N) for i in range(self.N): sumj = sum(sin(y(j)-y(i)) for j in range(self.N) if self.adj[j, i]) yield (-y(i+self.N) + self.omega[i] + self.coupling * sumj) * self.inv_m
[docs] def compile(self, **kwargs): I = jitcode(self.rhs, n=2 * self.N, control_pars=self.control_pars) I.generate_f_C(**kwargs) I.compile_C(omp=self.use_omp, modulename=self.modulename) I.save_compiled(overwrite=True, destination=join(self.output, ''))
def set_initial_state(self, x0): assert(len(x0) == 2 * self.N) self.initial_state = x0
[docs] def simulate(self, par, **integrator_params): ''' Integrate the system of equations and return the coordinates and times Return : dict(t, x) - **t** times - **x** coordinates. ''' I = jitcode(n=2 * self.N, control_pars=self.control_pars, module_location=join(self.output, self.modulename+".so")) I.set_integrator(name=self.integration_method, **integrator_params) I.set_parameters(par) I.set_initial_value(self.initial_state, time=self.t_initial) times = self.t_transition + \ np.arange(self.t_initial, self.t_final - self.t_transition, self.interval) phases = np.zeros((len(times), 2 * self.N)) for i in range(len(times)): phases[i, :] = I.integrate(times[i]) phases[i, :self.N] = phases[i, :self.N] % (2*np.pi) return {"t": times, "x": phases}
class Lyap_Kuramoto_II(Kuramoto_Base): def __init__(self, par) -> None: super().__init__(par) if not "modulename" in par.keys(): self.modulename = "lyap_km" try: self.verbose = par['verbose'] except: self.verbose = False def rhs(self): ''' **Kuramoto model of type II** .. math:: \\frac{d\\theta_i}{dt} = \\omega_i + \\sum_{j=0}^{N-1} a_{i,j} \\sin(y_j - y_i - \\alpha) Return : right hand side of the Kuramoto model. ''' for i in range(self.N): sumj = sum(sin(y(j)-y(i) - self.alpha) for j in range(self.N) if self.adj[j, i]) yield self.omega[i] + self.coupling * sumj def compile(self, **kwargs): I = jitcode_lyap(self.rhs, n=self.N, n_lyap=self.n_lyap, control_pars=self.control_pars) I.generate_f_C(**kwargs) I.compile_C(omp=self.use_omp, modulename=self.modulename, verbose=self.verbose) I.save_compiled(overwrite=True, destination=join(self.output, '')) def simulate(self, par, **integrator_params): ''' integrate the system of equations and calculate the Lyapunov exponents. Parameters ----------- par : [list of int, float] values for control parameter(s). Return : dict(t, x) - **t** times - **x** coordinates. ''' I = jitcode_lyap(n=self.N, n_lyap=self.n_lyap, control_pars=self.control_pars, module_location=join(self.output, self.modulename+".so")) I.set_integrator(name=self.integration_method, **integrator_params) I.set_parameters(par) I.set_initial_value(self.initial_state, time=self.t_initial) times = np.arange(self.t_initial, self.t_final, self.interval) lyaps = np.zeros((len(times), self.n_lyap)) for i in range(len(times)): lyaps[i, :] = I.integrate(times[i])[1] return {"t": times, "lyap": lyaps} # ---------------------------------------------------------------