#!/usr/bin/python # -*- coding: utf-8 -*- '''Modules time_series.py provides Class TimeSeries in the context of ENIB ZG2 course''' import csv import matplotlib.pyplot as plt __author__ = "Gireg Desmeulles" __email__ = "desmeulles@enib.fr" __version__ = "2.0" class TimeSeries: ''' TimeSeries object allows to handle, plot, dump et load timeSeries for ZG2 project''' #constructor def __init__(self,filename=None,time_stamp_column_number=0): self.__data=[] self.__labels=[] if filename: with open(filename, newline='') as csvfile: spamreader = csv.reader(csvfile, delimiter=',', quotechar='"') for row in spamreader: self.__data.append(row) self.__labels=self.__data.pop(0) if time_stamp_column_number: self.swap_columns(0,time_stamp_column_number) for line in self.__data: for j in range(len(line)): line[j]=float(line[j]) #accessors/mutators def get_data(self):return self.__data[:] def get_labels(self):return self.__labels[:] def set_data(self,value): self.__data=value def set_labels(self,value): self.__labels=value #public methods def swap_columns(self,c1,c2): tmp=self.__labels[c1] self.__labels[c1]=self.__labels[c2] self.__labels[c2]=tmp for line in self.__data : tmp = line[c1] line[c1]=line[c2] line[c2]=tmp def plot(self, x_label=None,y_label="",title="",filename=None, show=True): nb_columns=len(self.__data[0]) # Data for plotting columns = [[] for x in range(nb_columns)] for line in self.__data: for j in range(len(line)): columns[j].append(line[j]) fig, ax = plt.subplots() for i in range(nb_columns-1): ax.plot(columns[0], columns[i+1], label=self.__labels[i+1]) ax.set(xlabel=x_label, ylabel=y_label, title=title) ax.legend() ax.grid() if filename: fig.savefig(filename) if show: plt.show() def dump(self,filename): with open(filename, 'w', newline='') as csvfile: spamwriter = csv.writer(csvfile, delimiter=',',) spamwriter.writerow(self.__labels) for line in self.__data: output = [] for val in line: output.append(str(val)) spamwriter.writerow(output) def compute_period(self, column_number): max_index=[] up=None for i in range(1,len(self.__data)-1): if self.__data[i][column_number]>self.__data[i-1][column_number] and self.__data[i][column_number]>self.__data[i+1][column_number]: max_index.append(i) periods=[] if len(max_index)>1: for i in range(1,len(max_index)): t1 = self.__data[max_index[i-1]][0] t2 = self.__data[max_index[i]][0] periods.append(t2-t1) #print (periods) return (sum(periods) / len(periods)) #New method def clone(self): '''clone ts''' ts_out=TimeSeries() ts_out.__data=self.get_data() ts_out.__labels=self.get_labels() return ts_out def round_data(self, column_number,precision): ''' round data with a certain precision in a column''' for i in range(len(self.__data)): self.__data[i][column_number]=round(self.__data[i][column_number],precision) def fuse(self,ts): '''fuse two ts into a third one''' #column 1 is timestamp in all ts ts_out = TimeSeries() ts_out.__labels = self.__labels+ts.__labels[1:] d1 = self.get_data() d2 = ts.get_data() #list to build output data missing_data_1 = ['missing' for x in range(len(d1[0])-1)] missing_data_2 = ['missing' for x in range(len(d2[0])-1)] #insert value terminal value max_time=d1[len(d1)-1][0]+d2[len(d2)-1][0] d1.append([max_time]+missing_data_1) d2.append([max_time]+missing_data_1) i1=0 i2=0 data=[] while i1<(len(d1)-1) or i2<(len(d2)-1): dt_value = d2[i2][0] - d1[i1][0] if dt_value == 0: #2 samples at the same time stamp data.append([d1[i1][0]]+d1[i1][1:]+d2[i2][1:]) i1+=1 i2+=1 elif dt_value>0: #next value is in d1 data.append([d1[i1][0]]+d1[i1][1:]+missing_data_2) i1+=1 else : #next value is in d2 data.append([d2[i2][0]]+missing_data_1+d2[i2][1:]) i2+=1 ts_out.__data=data return ts_out def interpolate_missing_data(self): '''fill missing data in a ts''' data=self.__data for column_number in range(1,len(data[0])): #for each column #fill first values: if data[0][column_number]=='missing': #find first value i=1 while data[i][column_number]=='missing': i+=1 first_value=data[i][column_number] while i>=0: data[i][column_number]=first_value i-=1 #fill last values if data[len(data)-1][column_number]=='missing': #find first value i=len(data)-2 while data[i][column_number]=='missing': i-=1 last_value=data[i][column_number] while i<=len(data)-1: data[i][column_number]=last_value i+=1 #interpolate values for raw_number in range(1,len(data)): if data[raw_number][column_number]=='missing': t_missing=data[raw_number][0] missing_value=None #find previous value: i=raw_number-1 while data[i][column_number]=='missing': i-=1 t_prev=data[i][0] previous_value=data[i][column_number] #find next value: i=raw_number+1 while data[i][column_number]=='missing': i+=1 t_next=data[i][0] next_value=data[i][column_number] #interpolate #slope of line m = (next_value - previous_value )/(t_next - t_prev) missing_value= m * (t_missing-t_prev) + previous_value data[raw_number][column_number]=missing_value def shift(self,time_shift): '''shift ts by time_shift''' for i in range(len(self.__data)): self.__data[i][0]+=time_shift def truncate(self, t_min, t_max): '''truncate data with timestamps < t_min and > t_max''' for i in range(len(self.__data),0): if self.__data[i][0]t_max: self.__data.pop(i) def compute_error(self, column_number_1,column_number_2,error_label="error curve (rad/s)"): '''build error curve''' for i in range(len(self.__data)): self.__data[i].append(self.__data[i][column_number_2]-self.__data[i][column_number_1]) self.__labels.append(error_label) def compute_area(self,column_number): '''integrate data''' area=0 for i in range(1,len(self.__data)-1): d_t=(self.__data[i+1][0]-self.__data[i-1][0])/2. value=abs(self.__data[i][column_number]) d_area=d_t*value area+=d_area return area def pop(self,column_number): '''remove column in ts''' for i in range(len(self.__data)): self.__data[i].pop(column_number) self.__labels.pop(column_number) def remove_outliers(self,column_number): for i in range(1,len(self.__data)-1): rate = self.__data[i][column_number]/self.__data[i+1][column_number] if rate > 5.0 or rate<0.2: self.__data[i][column_number]='missing' def apply_to_column(self,column_number,f): for i in range(len(self.__data)): self.__data[i][column_number]=f(self.__data[i][column_number]) if __name__=='__main__': #ZG2 semaine 1 def test_1(): ts = TimeSeries('HCSR04_data4_ressort_2022_03_10.csv',3) print(ts) ts.plot(y_label='distance(mm)',title='Courbe ZG2!!',filename='test.png') #ZG2 semaine 2 def test_2(): ts = TimeSeries('HCSR04_data4_ressort_2022_03_10.csv',3) ts.dump('test.csv') #ZG2 semaine 3 def test_3(): ts1 = TimeSeries('ts1.csv') ts2 = TimeSeries('ts2.csv') ts1.round_data(0,5) ts2.round_data(0,5) ts3=ts1.fuse(ts2) print("\nts3:") print(ts3) ts3.interpolate_missing_data() ts3.plot() print('test time_series.py') test_1() test_2() test_3()