ofdm_demo.py
import numpy as np
from scipy import interpolate
import commpy as cpy
import ofdm_debug as ofdm_debug
class OFDMSystem:
def __init__(self, K=64, CP=None, P=8, pilotValue=3+3j, Modulation_type='QAM16', channel_type='random', SNRdb=25,debug=False):
# 设置OFDM子载波数量
self.K = K
# 设置循环前缀长度
if CP is not None:
self.CP = CP
else:
self.CP = self.K // 4 # 默认为K的四分之一
# 设置导频数量
self.P = P
# 设置导频值
self.pilotValue = pilotValue
# 设置调制方式
self.Modulation_type = Modulation_type
# 设置信道类型
self.channel_type = channel_type
# 设置信噪比(dB)
self.SNRdb = SNRdb
# 生成所有子载波的索引
self.allCarriers = np.arange(self.K)
# 计算导频位置(每间隔K//P个子载波一个导频)
self.pilotCarrier = self.allCarriers[::self.K // self.P]
# 将最后一个子载波也作为导频
self.pilotCarriers = np.hstack([self.pilotCarrier, np.array([self.allCarriers[-1]])])
# 更新导频数量
self.P += 1
# 调制方式到比特数的映射
self.m_map = {"BPSK": 1, "QPSK": 2, "8PSK": 3, "QAM16": 4, "QAM64": 6}
# 获取当前调制方式对应的比特数
self.mu = self.m_map[self.Modulation_type]
# 获取数据子载波的位置(排除导频位置)
self.dataCarriers = np.delete(self.allCarriers, self.pilotCarriers)
# 计算每个OFDM符号的有效载荷位数
self.payloadBits_per_OFDM = len(self.dataCarriers) * self.mu
if(debug==True):
ofdm_debug.print_paramter(self.payloadBits_per_OFDM, self.pilotCarriers, self.dataCarriers, self.P, self.Modulation_type, self.channel_type, self.SNRdb, self.mu)
def Modulation(self, bits):
if self.Modulation_type == "QPSK":
PSK4 = cpy.PSKModem(4)
return PSK4.modulate(bits)
elif self.Modulation_type == "QAM64":
QAM64 = cpy.QAMModem(64)
return QAM64.modulate(bits)
elif self.Modulation_type == "QAM16":
QAM16 = cpy.QAMModem(16)
return QAM16.modulate(bits)
elif self.Modulation_type == "8PSK":
PSK8 = cpy.PSKModem(8)
return PSK8.modulate(bits)
elif self.Modulation_type == "BPSK":
BPSK = cpy.PSKModem(2)
return BPSK.modulate(bits)
def DeModulation(self, symbol):
if self.Modulation_type == "QPSK":
PSK4 = cpy.PSKModem(4)
return PSK4.demodulate(symbol, demod_type='hard')
elif self.Modulation_type == "QAM64":
QAM64 = cpy.QAMModem(64)
return QAM64.demodulate(symbol, demod_type='hard')
elif self.Modulation_type == "QAM16":
QAM16 = cpy.QAMModem(16)
return QAM16.demodulate(symbol, demod_type='hard')
elif self.Modulation_type == "8PSK":
PSK8 = cpy.PSKModem(8)
return PSK8.demodulate(symbol, demod_type='hard')
elif self.Modulation_type == "BPSK":
BPSK = cpy.PSKModem(2)
return BPSK.demodulate(symbol, demod_type='hard')
def add_awgn(self, x_s):
data_pwr = np.mean(abs(x_s**2))
noise_pwr = data_pwr / (10 ** (self.SNRdb / 10))
noise = 1 / np.sqrt(2) * (np.random.randn(len(x_s)) + 1j * np.random.randn(len(x_s))) * np.sqrt(noise_pwr)
return x_s + noise, noise_pwr
def channel(self, in_signal):
channelResponse = np.array([1, 0, 0.3 + 0.3j])
if self.channel_type == "random":
convolved = np.convolve(in_signal, channelResponse)
return self.add_awgn(convolved)
elif self.channel_type == "awgn":
return self.add_awgn(in_signal)
def OFDM_symbol(self, QAM_payload):
symbol = np.zeros(self.K, dtype=complex)
symbol[self.pilotCarriers] = self.pilotValue
symbol[self.dataCarriers] = QAM_payload
return symbol
def IDFT(self, OFDM_data):
return np.fft.ifft(OFDM_data)
def addCP(self, OFDM_time):
cp = OFDM_time[-self.CP:]
return np.hstack([cp, OFDM_time])
def removeCP(self, signal):
return signal[self.CP:self.CP + self.K]
def DFT(self, OFDM_RX):
return np.fft.fft(OFDM_RX)
def channelEstimate(self, OFDM_demod):
# 提取导频位置的数据
pilots = OFDM_demod[self.pilotCarriers]
print(f"导频位置的1: {pilots}")
print(f"导频的值: {self.pilotValue}")
# LS信道估计
Hest_at_pilots = pilots / self.pilotValue # 假设pilotValue是已知的导频值
print(f"导频位置的信道估计: {Hest_at_pilots}")
# 插值扩展到所有子载波
Hest_abs = interpolate.interp1d(self.pilotCarriers, abs(Hest_at_pilots), kind='linear')(self.allCarriers)
Hest_phase = interpolate.interp1d(self.pilotCarriers, np.angle(Hest_at_pilots), kind='linear')(self.allCarriers)
Hest = Hest_abs * np.exp(1j * Hest_phase)
print(f"Hest result: {Hest}")
return Hest
def equalize(self, OFDM_demod, Hest):
return OFDM_demod / Hest
def simulate(self):
bits = np.random.binomial(n=1, p=0.5, size=(self.payloadBits_per_OFDM,))
ofdm_debug.print_bits(bits)
QAM_s = self.Modulation(bits)
ofdm_debug.print_qams(QAM_s)
OFDM_data = self.OFDM_symbol(QAM_s)
ofdm_debug.print_symble(OFDM_data)
OFDM_time = self.IDFT(OFDM_data)
ofdm_debug.print_ifft(OFDM_time)
OFDM_withCP = self.addCP(OFDM_time)
OFDM_TX = OFDM_withCP
ofdm_debug.print_tx(OFDM_TX)
OFDM_RX, _ = self.channel(OFDM_TX)
ofdm_debug.print_rx(OFDM_RX)
if(self.channel_type == "awgn"):
pass
else:
print("输入信号长度:N")
print("信道冲激响应长度:M")
print("卷积后输出长度:N+M-1")
print(f"self.channel 信道输出: diff={len(OFDM_RX) - len(OFDM_TX)}")
OFDM_RX_noCP = self.removeCP(OFDM_RX)
ofdm_debug.print_nocprx(OFDM_RX_noCP)
OFDM_demod = self.DFT(OFDM_RX_noCP) #它将时域信号转换为频域信号
ofdm_debug.print_DFT(OFDM_demod)
Hest = self.channelEstimate(OFDM_demod)
ofdm_debug.print_HST(Hest)
equalized_Hest = self.equalize(OFDM_demod, Hest)
QAM_est = equalized_Hest[self.dataCarriers]
bits_est = self.DeModulation(QAM_est)
ber = np.sum(abs(bits - bits_est)) / len(bits)
return ber
if __name__ == '__main__':
ofdm_system = OFDMSystem(K=64, CP=None, P=8, pilotValue=3+3j, Modulation_type='QAM16', channel_type='random', SNRdb=25)
ber = ofdm_system.simulate()
print(f"误比特率BER: {ber}")
ofdm_debug.py
import numpy as np
import matplotlib.pyplot as plt
from scipy import interpolate
import commpy as cpy
def print_paramter(payloadBits_per_OFDM,pilotCarriers,dataCarriers,P,Modulation_type,channel_type,SNRdb,mu):
print(f"有效载荷位数: {payloadBits_per_OFDM}")
print(f"导频位置: {pilotCarriers}")
print(f"数据位置: {dataCarriers}")
print(f"导频数: {P}")
print(f"调制方式: {Modulation_type}")
print(f"信道类型: {channel_type}")
print(f"信噪比: {SNRdb}")
print(f"dataCarriers num: { len(dataCarriers)}")
print(f"mu: {mu}")
def print_bits(bits):
# 每4个比特分为一组打印
print(f"bits: {len(bits)}")
for i in range(0, len(bits), 4):
print(f"{int(i//4)} {bits[i:i+4]}")
def print_qams(QAM_s):
# 每4个比特分为一组打印
print(f"QAM_s: {len(QAM_s)}")
for i in range(0, len(QAM_s)):
print(f"{i} {QAM_s[i]}")
def print_symble(symbol):
# 每4个比特分为一组打印
print(f"symbol: {len(symbol)}")
for i in range(0, len(symbol)):
print(f"{i} {symbol[i]}")
def print_ifft(ifft):
print(f"ifft: {len(ifft)}")
for i in range(0, len(ifft)):
print(f"{i} {ifft[i]}")
def print_tx(tx):
print(f"tx: {len(tx)}")
for i in range(0, len(tx)):
print(f"{i} {tx[i]}")
def print_rx(rx):
print(f"rx: {len(rx)}")
for i in range(0, len(rx)):
print(f"{i} {rx[i]}")
def print_nocprx(rx):
print(f"nocprx: {len(rx)}")
for i in range(0, len(rx)):
print(f"{i} {rx[i]}")
def print_DFT(DFT):
print(f"DFT: {len(DFT)}")
for i in range(0, len(DFT)):
print(f"{i} {DFT[i]}")
def print_HST(HST):
print(f"HST: {len(HST)}")
for i in range(0, len(HST)):
print(f"{i} {HST[i]}")
https://zhuanlan.zhihu.com/p/434928660