| import numpy as np | |
| from IQ_Utils import L | |
| class SS_Decoder: | |
| def __init__(self, params, signals): | |
| # 10 Last samples used only on drawing. | |
| self.PSS_corr_magSQR_output_frame_main = [0] * 10 * params.analysis_frame_len | |
| self.SSS_corr_magSQR_output_frame_main = [0] * 10 * params.analysis_frame_len | |
| # We have found PSS singal | |
| self.PSS_peak_detected = 0 | |
| # We have found SSS signal | |
| self.SSS_peak_detected = 0 | |
| # Index in our buffer, previous index | |
| self.hold_PSS_max_idx = 0 | |
| self.hold_SSS_max_idx = 0 | |
| # Do Inverse Fast Fourier Transformation | |
| self.doIFFT = 0 | |
| # Index on our index current. | |
| self.PSS_Subframe_max_idx = 0 | |
| self.SSS_Subframe_max_idx = 0 | |
| self.params = params | |
| self.signals = signals | |
| self.PSS_Subframe_max_val = 0 | |
| self.SSS_Subframe_max_val = 0 | |
| # 3 Samples. | |
| self.IQ_ThreeframeBuffer_main = [0] * 3 * params.analysis_frame_len | |
| def pss_run(self, IQ_frame_main_norm): | |
| params = self.params | |
| # Keep feeding our 3 sample buffer, newest data to last | |
| self.IQ_ThreeframeBuffer_main[:2*params.analysis_frame_len] = self.IQ_ThreeframeBuffer_main[params.analysis_frame_len:] | |
| self.IQ_ThreeframeBuffer_main[2*params.analysis_frame_len:] = IQ_frame_main_norm | |
| #if (params.is_NR==1): | |
| if (0): | |
| # Reset 4 symbols of extension | |
| self.IQ_ThreeframeBuffer_main[3*params.analysis_frame_len-params.analysis_frame_overlap:] = [0] * params.analysis_frame_overlap | |
| PSS_TD_seq = self.signals.PSS_TD_complex_pad_fftSize_wShift_ifft # Local | |
| # Cross-correlation with our reference signal to find reference singal | |
| IQ_ThreeframeBuffer_PSScorr_main = np.correlate(self.IQ_ThreeframeBuffer_main,PSS_TD_seq,'full') | |
| #print [len(IQ_ThreeframeBuffer_PSScorr_main), len(self.IQ_ThreeframeBuffer_main), len(PSS_TD_seq)] | |
| # Get middle sample. | |
| IQ_frame_PSScorr_main = IQ_ThreeframeBuffer_PSScorr_main[params.analysis_frame_len:2*params.analysis_frame_len] | |
| # Convert our sample to abs. | |
| IQ_frame_ABS_PSScorr_main = np.absolute(IQ_frame_PSScorr_main) | |
| # Keep feeding our 10 sample buffer (10ms one radioframe). Add current sample to last. | |
| self.PSS_corr_magSQR_output_frame_main[:9*params.analysis_frame_len] = self.PSS_corr_magSQR_output_frame_main[params.analysis_frame_len:] | |
| self.PSS_corr_magSQR_output_frame_main[9*params.analysis_frame_len:] = IQ_frame_ABS_PSScorr_main | |
| self.PSS_Subframe_max_idx = np.argmax(IQ_frame_ABS_PSScorr_main) # Find maximun value from our data "Maximum Likehood" | |
| self.PSS_Subframe_max_val = IQ_frame_ABS_PSScorr_main[self.PSS_Subframe_max_idx] | |
| # Calculate average level | |
| PSS_Subframe_mean = np.mean(IQ_frame_ABS_PSScorr_main) | |
| # Find peak | |
| if (params.is_TDD == 1): | |
| PSS_threshold = 0.2 | |
| else: | |
| PSS_threshold = 0.1 | |
| if ((self.PSS_Subframe_max_val > 2.0*PSS_Subframe_mean)and(self.PSS_Subframe_max_val > PSS_threshold)): | |
| self.doIFFT = 1 | |
| self.PSS_peak_detected = 1 | |
| if (self.hold_PSS_max_idx == 0): | |
| self.hold_PSS_max_idx = self.PSS_Subframe_max_idx | |
| L.info( 'PSS peak idx: ' + str(self.PSS_Subframe_max_idx) ) | |
| L.info( 'PSS max: ' +str(self.PSS_Subframe_max_val) ) | |
| else: | |
| self.doIFFT = 0 | |
| self.PSS_peak_detected = 0 | |
| def nr_sss_run(self, fetch_SSS_seq): | |
| params = self.params | |
| SSS_TD_seq = self.signals.SSS_TD_complex_pad_fftSize_wShift_ifft # Local | |
| # Cross-correlation with our reference signal to find reference singal | |
| IQ_ThreeframeBuffer_SSScorr_main = np.correlate(self.IQ_ThreeframeBuffer_main,SSS_TD_seq,'full') | |
| # Get middle sample. | |
| IQ_frame_SSScorr_main = IQ_ThreeframeBuffer_SSScorr_main[params.analysis_frame_len:2*params.analysis_frame_len] | |
| # Convert our sample to abs. | |
| IQ_frame_ABS_SSScorr_main = np.absolute(IQ_frame_SSScorr_main) | |
| # Keep feeding our 10 sample buffer (10ms one radioframe). Add current sample to last. | |
| self.SSS_corr_magSQR_output_frame_main[:9*params.analysis_frame_len] = self.SSS_corr_magSQR_output_frame_main[params.analysis_frame_len:] | |
| self.SSS_corr_magSQR_output_frame_main[9*params.analysis_frame_len:] = IQ_frame_ABS_SSScorr_main | |
| self.SSS_Subframe_max_idx = np.argmax(IQ_frame_ABS_SSScorr_main) # Find maximun value from our data "Maximum Likehood" | |
| self.SSS_Subframe_max_val = IQ_frame_ABS_SSScorr_main[self.SSS_Subframe_max_idx] | |
| # Calculate average level | |
| SSS_Subframe_mean = np.mean(IQ_frame_ABS_SSScorr_main) | |
| # Find peak | |
| if (params.is_TDD == 1): | |
| SSS_threshold = 0.2 | |
| else: | |
| SSS_threshold = 0.1 | |
| if ((self.SSS_Subframe_max_val > 2.0*SSS_Subframe_mean)and(self.SSS_Subframe_max_val > SSS_threshold)): | |
| self.SSS_peak_detected = 1 | |
| L.info( 'SSS peak idx: ' + str(self.SSS_Subframe_max_idx) ) | |
| L.info( 'SSS max: ' +str(self.SSS_Subframe_max_val) ) | |
| else: | |
| self.SSS_peak_detected = 0 | |
| #Correlation Subframe | |
| SSS_corr = np.sum(np.multiply(fetch_SSS_seq,self.signals.d_sss)) | |
| SSS_corr_abs_sqr = np.square(np.absolute(SSS_corr)) | |
| return 0 | |
| def lte_sss_run(self, fetch_SSS_seq): | |
| # Deinterleave SSS | |
| SSS_seq_2k = fetch_SSS_seq[::2] | |
| SSS_seq_2kPlus1 = fetch_SSS_seq[1::2] | |
| # ***** 2k processing part ***** | |
| # Pre-generate seq for correlation with SSS (2k) | |
| s0c0_ = np.multiply(self.signals.s0_,self.signals.c0_) #For Subfame_0 | |
| s1c0_ = np.multiply(self.signals.s1_,self.signals.c0_) #For Subfame_5 | |
| # Correlation Subframe_0 | |
| SSS_corr_s0c0 = np.sum(np.multiply(SSS_seq_2k,s0c0_)) | |
| SSS_corr_abs_sqr_s0c0 = np.square(np.absolute(SSS_corr_s0c0)) | |
| # Correlation Subframe_5 | |
| SSS_corr_s1c0 = np.sum(np.multiply(SSS_seq_2k,s1c0_)) | |
| SSS_corr_abs_sqr_s1c0 = np.square(np.absolute(SSS_corr_s1c0)) | |
| #print [SSS_corr_abs_sqr_s0c0, SSS_corr_abs_sqr_s1c0] | |
| #print ['SSS subframe',np.argmax([SSS_corr_abs_sqr_s0c0, SSS_corr_abs_sqr_s1c0])] | |
| # ***** 2k+1 processing part ***** | |
| # Pre-generate seq for correlation with SSS (2k) | |
| #For Subfame_0 | |
| s1c1_ = np.multiply(self.signals.s1_,self.signals.c1_) | |
| s1c1z1m0 = np.multiply(s1c1_,self.signals.z1_m0) | |
| #For Subfame_5 | |
| s0c1_ = np.multiply(self.signals.s0_,self.signals.c1_) | |
| s0c1z1m1 = np.multiply(s0c1_,self.signals.z1_m1) | |
| # Correlation Subframe_0 | |
| SSS_corr_s1c1z1m0 = np.sum(np.multiply(SSS_seq_2kPlus1,s1c1z1m0)) | |
| SSS_corr_abs_sqr_s1c1z1m0 = np.square(np.absolute(SSS_corr_s1c1z1m0)) | |
| # Correlation Subframe_5 | |
| SSS_corr_s0c1z1m1 = np.sum(np.multiply(SSS_seq_2kPlus1,s0c1z1m1)) | |
| SSS_corr_abs_sqr_s0c1z1m1 = np.square(np.absolute(SSS_corr_s0c1z1m1)) | |
| #print [SSS_corr_abs_sqr_s1c1z1m0, SSS_corr_abs_sqr_s0c1z1m1] | |
| # Frame Timing Decision module | |
| SSS_metric_Subframe0 = SSS_corr_abs_sqr_s0c0 + SSS_corr_abs_sqr_s1c1z1m0 | |
| SSS_metric_Subframe5 = SSS_corr_abs_sqr_s1c0 + SSS_corr_abs_sqr_s0c1z1m1 | |
| #print[SSS_metric_Subframe0,SSS_metric_Subframe5] | |
| #ToDo: Revise threshold condition below | |
| if (SSS_metric_Subframe0 > 10.0*SSS_metric_Subframe5): | |
| start_half_frame = 0 | |
| #ToDo: Revise threshold condition below | |
| if (SSS_metric_Subframe5 > 10.0*SSS_metric_Subframe0): | |
| start_half_frame = 1 | |
| # else: | |
| # start_half_frame = 1 | |
| #print "half_frame = " + str(start_half_frame) | |
| half_subframe_offset = start_half_frame * 70 | |
| # SSS HACK!!! (Needed for LowRX) | |
| if (start_half_frame == 1): | |
| start_half_frame = 0 | |
| else: | |
| start_half_frame = 1 | |
| self.SSS_peak_detected = 0 | |
| return half_subframe_offset |