import numpy as np | |
from IQ_Utils import L | |
class SS_Decoder: | |
def __init__(self, params, signals): | |
# 10 Last samples used only on drawing. | |
self.PSS_corr_magSQR_output_frame_main = [0] * 10 * params.analysis_frame_len | |
self.SSS_corr_magSQR_output_frame_main = [0] * 10 * params.analysis_frame_len | |
# We have found PSS singal | |
self.PSS_peak_detected = 0 | |
# We have found SSS signal | |
self.SSS_peak_detected = 0 | |
# Index in our buffer, previous index | |
self.hold_PSS_max_idx = 0 | |
self.hold_SSS_max_idx = 0 | |
# Do Inverse Fast Fourier Transformation | |
self.doIFFT = 0 | |
# Index on our index current. | |
self.PSS_Subframe_max_idx = 0 | |
self.SSS_Subframe_max_idx = 0 | |
self.params = params | |
self.signals = signals | |
self.PSS_Subframe_max_val = 0 | |
self.SSS_Subframe_max_val = 0 | |
# 3 Samples. | |
self.IQ_ThreeframeBuffer_main = [0] * 3 * params.analysis_frame_len | |
def pss_run(self, IQ_frame_main_norm): | |
params = self.params | |
# Keep feeding our 3 sample buffer, newest data to last | |
self.IQ_ThreeframeBuffer_main[:2*params.analysis_frame_len] = self.IQ_ThreeframeBuffer_main[params.analysis_frame_len:] | |
self.IQ_ThreeframeBuffer_main[2*params.analysis_frame_len:] = IQ_frame_main_norm | |
#if (params.is_NR==1): | |
if (0): | |
# Reset 4 symbols of extension | |
self.IQ_ThreeframeBuffer_main[3*params.analysis_frame_len-params.analysis_frame_overlap:] = [0] * params.analysis_frame_overlap | |
PSS_TD_seq = self.signals.PSS_TD_complex_pad_fftSize_wShift_ifft # Local | |
# Cross-correlation with our reference signal to find reference singal | |
IQ_ThreeframeBuffer_PSScorr_main = np.correlate(self.IQ_ThreeframeBuffer_main,PSS_TD_seq,'full') | |
#print [len(IQ_ThreeframeBuffer_PSScorr_main), len(self.IQ_ThreeframeBuffer_main), len(PSS_TD_seq)] | |
# Get middle sample. | |
IQ_frame_PSScorr_main = IQ_ThreeframeBuffer_PSScorr_main[params.analysis_frame_len:2*params.analysis_frame_len] | |
# Convert our sample to abs. | |
IQ_frame_ABS_PSScorr_main = np.absolute(IQ_frame_PSScorr_main) | |
# Keep feeding our 10 sample buffer (10ms one radioframe). Add current sample to last. | |
self.PSS_corr_magSQR_output_frame_main[:9*params.analysis_frame_len] = self.PSS_corr_magSQR_output_frame_main[params.analysis_frame_len:] | |
self.PSS_corr_magSQR_output_frame_main[9*params.analysis_frame_len:] = IQ_frame_ABS_PSScorr_main | |
self.PSS_Subframe_max_idx = np.argmax(IQ_frame_ABS_PSScorr_main) # Find maximun value from our data "Maximum Likehood" | |
self.PSS_Subframe_max_val = IQ_frame_ABS_PSScorr_main[self.PSS_Subframe_max_idx] | |
# Calculate average level | |
PSS_Subframe_mean = np.mean(IQ_frame_ABS_PSScorr_main) | |
# Find peak | |
if (params.is_TDD == 1): | |
PSS_threshold = 0.2 | |
else: | |
PSS_threshold = 0.1 | |
if ((self.PSS_Subframe_max_val > 2.0*PSS_Subframe_mean)and(self.PSS_Subframe_max_val > PSS_threshold)): | |
self.doIFFT = 1 | |
self.PSS_peak_detected = 1 | |
if (self.hold_PSS_max_idx == 0): | |
self.hold_PSS_max_idx = self.PSS_Subframe_max_idx | |
L.info( 'PSS peak idx: ' + str(self.PSS_Subframe_max_idx) ) | |
L.info( 'PSS max: ' +str(self.PSS_Subframe_max_val) ) | |
else: | |
self.doIFFT = 0 | |
self.PSS_peak_detected = 0 | |
def nr_sss_run(self, fetch_SSS_seq): | |
params = self.params | |
SSS_TD_seq = self.signals.SSS_TD_complex_pad_fftSize_wShift_ifft # Local | |
# Cross-correlation with our reference signal to find reference singal | |
IQ_ThreeframeBuffer_SSScorr_main = np.correlate(self.IQ_ThreeframeBuffer_main,SSS_TD_seq,'full') | |
# Get middle sample. | |
IQ_frame_SSScorr_main = IQ_ThreeframeBuffer_SSScorr_main[params.analysis_frame_len:2*params.analysis_frame_len] | |
# Convert our sample to abs. | |
IQ_frame_ABS_SSScorr_main = np.absolute(IQ_frame_SSScorr_main) | |
# Keep feeding our 10 sample buffer (10ms one radioframe). Add current sample to last. | |
self.SSS_corr_magSQR_output_frame_main[:9*params.analysis_frame_len] = self.SSS_corr_magSQR_output_frame_main[params.analysis_frame_len:] | |
self.SSS_corr_magSQR_output_frame_main[9*params.analysis_frame_len:] = IQ_frame_ABS_SSScorr_main | |
self.SSS_Subframe_max_idx = np.argmax(IQ_frame_ABS_SSScorr_main) # Find maximun value from our data "Maximum Likehood" | |
self.SSS_Subframe_max_val = IQ_frame_ABS_SSScorr_main[self.SSS_Subframe_max_idx] | |
# Calculate average level | |
SSS_Subframe_mean = np.mean(IQ_frame_ABS_SSScorr_main) | |
# Find peak | |
if (params.is_TDD == 1): | |
SSS_threshold = 0.2 | |
else: | |
SSS_threshold = 0.1 | |
if ((self.SSS_Subframe_max_val > 2.0*SSS_Subframe_mean)and(self.SSS_Subframe_max_val > SSS_threshold)): | |
self.SSS_peak_detected = 1 | |
L.info( 'SSS peak idx: ' + str(self.SSS_Subframe_max_idx) ) | |
L.info( 'SSS max: ' +str(self.SSS_Subframe_max_val) ) | |
else: | |
self.SSS_peak_detected = 0 | |
#Correlation Subframe | |
SSS_corr = np.sum(np.multiply(fetch_SSS_seq,self.signals.d_sss)) | |
SSS_corr_abs_sqr = np.square(np.absolute(SSS_corr)) | |
return 0 | |
def lte_sss_run(self, fetch_SSS_seq): | |
# Deinterleave SSS | |
SSS_seq_2k = fetch_SSS_seq[::2] | |
SSS_seq_2kPlus1 = fetch_SSS_seq[1::2] | |
# ***** 2k processing part ***** | |
# Pre-generate seq for correlation with SSS (2k) | |
s0c0_ = np.multiply(self.signals.s0_,self.signals.c0_) #For Subfame_0 | |
s1c0_ = np.multiply(self.signals.s1_,self.signals.c0_) #For Subfame_5 | |
# Correlation Subframe_0 | |
SSS_corr_s0c0 = np.sum(np.multiply(SSS_seq_2k,s0c0_)) | |
SSS_corr_abs_sqr_s0c0 = np.square(np.absolute(SSS_corr_s0c0)) | |
# Correlation Subframe_5 | |
SSS_corr_s1c0 = np.sum(np.multiply(SSS_seq_2k,s1c0_)) | |
SSS_corr_abs_sqr_s1c0 = np.square(np.absolute(SSS_corr_s1c0)) | |
#print [SSS_corr_abs_sqr_s0c0, SSS_corr_abs_sqr_s1c0] | |
#print ['SSS subframe',np.argmax([SSS_corr_abs_sqr_s0c0, SSS_corr_abs_sqr_s1c0])] | |
# ***** 2k+1 processing part ***** | |
# Pre-generate seq for correlation with SSS (2k) | |
#For Subfame_0 | |
s1c1_ = np.multiply(self.signals.s1_,self.signals.c1_) | |
s1c1z1m0 = np.multiply(s1c1_,self.signals.z1_m0) | |
#For Subfame_5 | |
s0c1_ = np.multiply(self.signals.s0_,self.signals.c1_) | |
s0c1z1m1 = np.multiply(s0c1_,self.signals.z1_m1) | |
# Correlation Subframe_0 | |
SSS_corr_s1c1z1m0 = np.sum(np.multiply(SSS_seq_2kPlus1,s1c1z1m0)) | |
SSS_corr_abs_sqr_s1c1z1m0 = np.square(np.absolute(SSS_corr_s1c1z1m0)) | |
# Correlation Subframe_5 | |
SSS_corr_s0c1z1m1 = np.sum(np.multiply(SSS_seq_2kPlus1,s0c1z1m1)) | |
SSS_corr_abs_sqr_s0c1z1m1 = np.square(np.absolute(SSS_corr_s0c1z1m1)) | |
#print [SSS_corr_abs_sqr_s1c1z1m0, SSS_corr_abs_sqr_s0c1z1m1] | |
# Frame Timing Decision module | |
SSS_metric_Subframe0 = SSS_corr_abs_sqr_s0c0 + SSS_corr_abs_sqr_s1c1z1m0 | |
SSS_metric_Subframe5 = SSS_corr_abs_sqr_s1c0 + SSS_corr_abs_sqr_s0c1z1m1 | |
#print[SSS_metric_Subframe0,SSS_metric_Subframe5] | |
#ToDo: Revise threshold condition below | |
if (SSS_metric_Subframe0 > 10.0*SSS_metric_Subframe5): | |
start_half_frame = 0 | |
#ToDo: Revise threshold condition below | |
if (SSS_metric_Subframe5 > 10.0*SSS_metric_Subframe0): | |
start_half_frame = 1 | |
# else: | |
# start_half_frame = 1 | |
#print "half_frame = " + str(start_half_frame) | |
half_subframe_offset = start_half_frame * 70 | |
# SSS HACK!!! (Needed for LowRX) | |
if (start_half_frame == 1): | |
start_half_frame = 0 | |
else: | |
start_half_frame = 1 | |
self.SSS_peak_detected = 0 | |
return half_subframe_offset |