先说点其他的东西。
关于强非线性、强间断、多物理场强耦合或高度复杂几何形态问题能够得以有效求解的核心难题之一,是如何构建在多尺度情形、非线性作用下具有准确地识别、定位、捕获以及分离各个尺度特征尤其是小尺度局部特征能力的数值工具,这之中包括了大尺度低阶近似解与小尺度高阶微小截断误差的分离解耦。例如,对于湍流等强非线性问题,其核心就在于如何有效地从所关心的大尺度近似解中剥离小尺度特征。对于激波等强间断问题,其核心就在于准确地得到无非物理数值振荡的激波面,且在光滑区域不引入过多的数值粘性。对于多场耦合问题,其难点在于如何以稀疏的数据信息准确表征解和有效地传递局部细节特征。而对于复杂的几何形态,则关键在于有效地刻画出局部几何细节。针对这些问题求解中所体现出的共性需求——即对函数时频局部化特征进行提取与分析的数学能力,小波多分辨分析提供了一种非常有效的数学工具。我们可以通过小波分解系数量级识别局部大梯度特征和局部高频显著信息,且能够通过各种滤波手段得到数值解的稀疏表征,设计出自适应小波数值算法。
其次,开始基于哈尔小波基的一维密度估计,代码较为简单。
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm, uniform
from scipy.integrate import quad
import sys, os, time, fileinput
import matplotlib.pyplot as plt
import matplotlib as mpl
plt.style.use('default')
# sample data from normal distribution
N_data = 10000
# preload sample data
x_data = np.array([])
# point source samples
Ng = 5
N_scales = uniform.rvs(size = Ng)
scales = 0.005 * uniform.rvs(size = Ng)
scales = 0.005 * np.ones(Ng)
locs = 0.8 * uniform.rvs(size = Ng) + 0.1
for n in range(Ng):
nm_small = norm(scale = scales[n], loc = locs[n])
x_data_small = nm_small.rvs(size = int(N_scales[n] * N_data / 20))
x_data = np.concatenate((x_data, x_data_small))
# gaussian samples
nm_large = norm(scale = 0.1, loc = 0.5)
x_data_large = nm_large.rvs(size = N_data)
x_data = np.concatenate((x_data, x_data_large))
# uniform samples
uni = uniform
x_data_uni = uni.rvs(size = int(N_data / 2))
x_data = np.concatenate((x_data, x_data_uni))
# plot histogram of distribution
counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
plt.xlim([0,1])
N = len(x_data)
# load pywt haar wavelets for comparison
import pywt
wavelet_name = 'haar'
wavelet = pywt.Wavelet(wavelet_name)
phi, psi, x = wavelet.wavefun(level=9) # level does not affect timing
L = int(x[-1] - x[0])
# rescale to unit length
x = x/L
phi = np.sqrt(L) * phi
psi = np.sqrt(L) * psi
# define standard haar wavelet and scaling function
def haar_mother_(t):
return (np.heaviside(t,0) - np.heaviside(t-1,0) ) * np.interp(t,x,psi)
def haar_scale_(t):
return (np.heaviside(t,0) - np.heaviside(t-1,0) ) * np.interp(t,x,phi)
x_p = np.linspace(-1,1,1000)
plt.plot(x_p, haar_scale_(x_p - 0), c = 'red', alpha = 0.5)
plt.plot(x_p, haar_scale_(2 * x_p - 1), c = 'lightblue', alpha = 0.5)
plt.plot(x_p, haar_scale_(2 * x_p - 0) - haar_scale_(2 * x_p - 1), c = 'purple')
plt.xlim([-0.25,1.25])
# define dyadic version of wavelet and scaling function
def psi_(x,j,k):
return 2**(j/2) * haar_mother_(2**j * x - k)
def phi_(x,j0,k):
return 2**(j0/2) * haar_scale_(2**j0 * x - k)
j1 = 7 # maximum scale (6 -> ~2 min , 7 -> ~9 min)
klist1 = np.arange(0,2**j1 - 1 + 0.5,1) # translations
Nk1 = np.size(klist1)
scale_info = [j1, klist1]
# plot the density estimate using scaling coefficients
def angles_for_equal_components_(N):
""" Generate the angles in spherical coordinates corresponding to a
vector whose components are all equal """
# N = Number of angles required to specify sphere
arg_for_sqrt = np.arange(N+1, 3-0.5, -1)
polar_list = np.arccos( 1 /
np.sqrt(arg_for_sqrt)
)
azi_list = np.array([np.pi/4])
return np.concatenate((polar_list, azi_list))
def initial_scaling_angle_generator_(N, choice):
"""Generates a set of initial scaling angles on the sphere
N = Dimension of Euclidean Space
choice = Determine type of scaling angles to generate
'random': Generate a random sample of angles on the sphere
'unbiased': All scaling coefficients have the same positive value
'equiangle': All angles correspond to pi/4"""
if choice == 'random':
return np.concatenate((np.pi * np.random.random(size = N - 2), 2*np.pi*np.random.random(size = 1) ))
elif choice == 'unbiased':
return angles_for_equal_components_(N-1)
elif choice == 'equiangle':
return np.concatenate((np.pi/4 * np.random.random(size = N - 2), np.pi/4*np.ones(1) ))
def ct(r, arr):
"""
coordinate transformation from spherical to cartesian coordinates
"""
a = np.concatenate((np.array([2*np.pi]), arr))
si = np.sin(a)
si[0] = 1
si = np.cumprod(si)
co = np.cos(a)
co = np.roll(co, -1)
return si*co*r
def scaling_coefficients_(scaling_angles):
"""
convert scaling angles in hypersphere to scaling coefficients
"""
return ct(1,scaling_angles)
def sqrt_p_(x_data, scaling_angles):
"""
Calculate the square root of the density estimate as the wavelet expansion
with the scaling coefficients denoted by the scaling angles
"""
N = len(x_data)
j1, klist1 = scale_info
phi_arr = np.array([phi_(x_data,j1,klist1[nk]) for nk in range(Nk1)])
scaling_coefficients = scaling_coefficients_(scaling_angles)
scaling_coefficients_mat = np.outer(scaling_coefficients, np.ones((N,)))
scale_terms = scaling_coefficients_mat * phi_arr
return np.sum(scale_terms, axis = 0)
def safe_log_(x):
# guard against x = 0
return np.log(x + 1e-323)
def unbinned_nll_(x):
return -np.sum(safe_log_(x))
def unbinned_nll_sqrt_p_(scaling_angles):
sqrt_p_arr = sqrt_p_(x_data, scaling_angles)
p_arr = sqrt_p_arr * sqrt_p_arr
return unbinned_nll_(p_arr) / N
from iminuit import cost, Minuit
# unbiased initial scaling angles
initial_scaling_angles = initial_scaling_angle_generator_(Nk1, 'unbiased')
# initial_scaling_angles = initial_scaling_angle_generator_(Nk1, 'random')
# define iminuit object for minimization
m = Minuit(unbinned_nll_sqrt_p_, initial_scaling_angles)
# limited to sphere
limits_theta = [(0,np.pi) for n in range(Nk1-2)]
limits_phi = [(0,2*np.pi)]
limits = np.concatenate((limits_theta, limits_phi))
# set limits
m.limits = limits
# run fit
m.errordef = Minuit.LIKELIHOOD
m.migrad()
Migrad | |
FCN = -0.3265 | Nfcn = 10376 |
EDM = 8.51e-09 (Goal: 0.0001) | time = 544.2 sec |
Valid Minimum | Below EDM threshold (goal x 10) |
SOME parameters at limit | Below call limit |
Hesse ok | Covariance accurate |
# plot the fit
x_plot = np.linspace(0,1,128)
scaling_angles_fit = m.values[:]
sqrt_p_plot = sqrt_p_(x_plot, scaling_angles_fit)
p_plot = sqrt_p_plot * sqrt_p_plot
plt.plot(x_plot, p_plot, label = 'fit')
counts, bins, _ = plt.hist(x_data, bins = 128, density = True, label = 'data')
plt.xlim([0,1])
plt.legend()
sqrt_p_plot_j1 = sqrt_p_plot
j0 = j1 - 4
level = j1 - j0
scaling_coefficients_fit = np.abs(scaling_coefficients_(scaling_angles_fit))
coeffs = pywt.wavedec(scaling_coefficients_fit, wavelet_name, level=level)
scaling_coefficients_j0 = coeffs[0]
wavelet_coefficients_j0 = coeffs[1:]
klist0 = np.arange(0,2**j0 - 1 + 0.5,1)
Nk0 = np.size(klist0)
scale_info = [j0, klist0]
def sqrt_p_coarse_(x_data, scaling_coefficients_j0):
'''
Coarse representation of the density estimate
'''
N = len(x_data)
j1, klist1 = scale_info
phi_arr = np.array([phi_(x_data,j0,klist0[nk]) for nk in range(Nk0)])
scaling_coefficients_mat = np.outer(scaling_coefficients_j0, np.ones((N,)))
scale_terms = scaling_coefficients_mat * phi_arr
return np.sum(scale_terms, axis = 0)
x_plot = np.linspace(0,1,128)
sqrt_p_plot_j0 = sqrt_p_coarse_(x_plot, scaling_coefficients_j0)
p_plot_j0 = sqrt_p_plot_j0 * sqrt_p_plot_j0
plt.plot(x_plot, p_plot_j0)
plt.plot(x_plot, p_plot)
counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
plt.xlim([0,1])
plt.legend(["Coarse", "Fine", "Data"])
# Fit summary using DWT
counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
plt.plot(x_plot, p_plot_j0, label = 'Coarse')
plt.plot(x_plot, p_plot, label = 'Fine')
plt.plot(x_plot, sqrt_p_plot**2 - sqrt_p_plot_j0**2, label = '$p_f - p_c$')
plt.xlim([0,1])
plt.xlabel('$x$')
plt.ylabel('$p(x)$')
plt.legend()
# stationary wavelet transform (test)
# Note: You can't just take the scaling coefficients from the coarse representation;
## need to take the inverse (see next cell)
# This leads to a shifted signal (idk why)
sqrt_p_plot_j1 = sqrt_p_plot
j0 = j1 - 4
level = j1 - j0
scaling_coefficients_fit = np.abs(scaling_coefficients_(scaling_angles_fit))
coeffs = pywt.swt(scaling_coefficients_fit, wavelet_name, level=level, norm = True, trim_approx = False)
scaling_coefficients_j0 = coeffs[0][0]
wavelet_coefficients_j0 = coeffs[1:]
klist0 = np.arange(0,2**j1 - 1 + 0.5,1)
Nk0 = np.size(klist1)
scale_info = [j1, klist1]
def sqrt_p_coarse_(x_data, scaling_coefficients_j0):
N = len(x_data)
j1, klist1 = scale_info
phi_arr = np.array([phi_(x_data,j1,klist0[nk]) for nk in range(Nk0)])
scaling_coefficients_mat = np.outer(scaling_coefficients_j0, np.ones((N,)))
scale_terms = scaling_coefficients_mat * phi_arr
return np.sum(scale_terms, axis = 0)
x_plot = np.linspace(0,1,128)
sqrt_p_plot_j0 = sqrt_p_coarse_(x_plot, scaling_coefficients_j0)
p_plot_j0 = sqrt_p_plot_j0 * sqrt_p_plot_j0
plt.plot(x_plot, p_plot_j0)
plt.plot(x_plot, p_plot)
counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
plt.xlim([0,1])
plt.legend(["Coarse", "Fine", "Data"])
知乎学术咨询:
https://www.zhihu.com/consult/people/792359672131756032?isMe=1
工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。