Compute Extreme Precipitation Indices using Python

Compute extreme precipitation indices
Evaluation of extreme indices
In order to provide some insight into changes in extreme climate events (e.g., dry or wet spells or cold/warm spells, etc.), a set of descriptive indices of extremes could be developed.
These indices are helpful for monitoring climate change itself and can be used as benchmarks for evaluating climate change scenarios (see the STARDEX project, 2004; Gachon et al., 2005). The indices describe particular characteristics of extremes, including frequency, amplitude/intensity and persistence/duration.
In the present post, a set of eight frequency and intensity precipitation indices will be computed (see table bellow).
-
The mean precipitation intensity is defined simply by the amount of total precipitation falling in a wet day (using the threshold ≥ 1 mm/day to define a wet day).
-
The maximum number of consecutive dry days is used to characterize the length of dry spells.
-
The maximum number of consecutive wet days is used to characterize the length of wet spells.
-
The greatest three days total rainfall describes extremes in precipitation amount.
-
For a very wet day, the 90th percentile value is obtained from all non-zero total precipitation events (i.e. ≥ 1 mm/day).
Datasets used
We will compute these extreme precipitation indices using daily precipitation data from the second generation homogenized dataset of Environment and Climate Change Canada developed by Vincent et al. 2012.
We will first load and clean precipitation timeseries folowing this post: https://www.guillaumedueymes.com/post/eccc_temp/
1- Let’s import Python librairies
import pandas as pd
import os
from datetime import date
import calendar
import numpy as np
from dateutil.relativedelta import relativedelta
import warnings
warnings.filterwarnings("ignore")
We will first work with one station from this document:
dataframe = pd.read_excel("./Adj_Precipitation_Stations.xls", skiprows = range(0, 3))
station = dataframe.iloc[0]
station
Prov BC
nom de la station AGASSIZ
stnid 1100120
année déb. 1890
mois déb. 1
année fin. 2017
mois fin. 12
lat (deg) 49.2431
long (deg) -121.76
élév (m) 15
stns jointes No
Name: 0, dtype: object
2 - Loading source file
Let’s load time serie ofr AGASSIZ’s station and clean the data. We will load daily precipitation from 1981 to 2017:
stnid = '1100120'
path = 'F:/DATA/Donnees_Stations/2nd_generation_V2019/Adj_Daily_Total_v2017'
varin = 'dt'
f1 = open(path+'/'+str(varin)+str(stnid)+'.txt', 'r')
f2 = open('./tmp.txt', 'w')
for line in f1:
for word in line:
if word == 'M':
f2.write(word.replace('M', ' '))
elif word == 'a':
f2.write(word.replace('a', ' '))
else:
f2.write(word)
f1.close()
f2.close()
df_station = pd.read_csv('./tmp.txt', delim_whitespace=True, skiprows = range(0, 4))
df_station.columns = ['Year', 'Month', 'D1','D2','D3','D4','D5','D6','D7','D8','D9','D10',
'D11','D12','D13','D14','D15','D16','D17','D18','D19','D20',
'D21','D22','D23','D24','D25','D26','D27','D28','D29','D30','D31']
os.remove("./tmp.txt")
# nettoyage des valeurs manquantes
try:
df_station = df_station.replace({'E':''}, regex=True)
except:
pass
try:
df_station = df_station.replace({'a':''}, regex=True)
except:
pass
try:
df_station = df_station.replace({'-9999.9':''}, regex=True)
except:
pass
try:
df_station = df_station.replace({-9999.9:''}, regex=True)
except:
pass
for col in df_station.columns[2:]:
df_station[col] = pd.to_numeric(df_station[col], errors='coerce')
We will clean this dataset in order to only have daly values timeserie:
yearmin = 1980
yearmax = 2017
m_start = df_station['Month'].loc[(df_station['Year'] == yearmin)].min()
m_end = df_station['Month'].loc[(df_station['Year'] == yearmax)].max()
d_end = calendar.monthrange(yearmax, m_end)[1]
tmp_pr = [ ]
for year in range(yearmin,yearmax+1): ### Loop over years
for month in range(1,13):
df = []
last_day = calendar.monthrange(year, month)[1]
tmin = df_station.loc[(df_station["Year"] == year) & (df_station["Month"] == month)].iloc[:,2:last_day+2].values
if len(tmin) == 0:
a = np.empty((calendar.monthrange(year,month)[1]))
a[:] = np.nan
df=pd.DataFrame(a)
else:
df=pd.DataFrame(tmin.T)
start = date(year, month, 1)
end = date(year, month, last_day)
delta=(end-start)
nb_days = delta.days + 1
rng = pd.date_range(start, periods=nb_days, freq='D')
df['datetime'] = rng
df.index = df['datetime']
tmp_pr.append(df)
tmp_pr = pd.concat(tmp_pr)
preacc = pd.DataFrame({'datetime': tmp_pr['datetime'], 'Pr': tmp_pr.iloc[:,0]}, columns = ['datetime','Pr'])
preacc.index = preacc['datetime']
preacc = preacc.drop(["datetime"], axis=1)
Quick plot:
import matplotlib.pylab as plt
import datetime
plt.rcParams["figure.figsize"]=[10,6]
plt.plot(preacc.index, preacc[:], label='Preciputation Station', linewidth=2, c='r')
plt.title('Daily precipitation: from ' + datetime.date(yearmin, 1, 1).strftime('%Y')+ ' et ' + datetime.date(yearmax, 1, 1).strftime('%Y'), fontsize=15, color='black', weight='semibold')
plt.xlabel('Time', fontsize=15, color='black', weight='semibold')
plt.ylabel('mm', fontsize=15, color='black', weight='semibold')
plt.show()
3 - Compute monthly extreme precipitation indices
We will compute extreme precipitation indices presented in previous table.
# Total cumulated precipitation
def PrecTOT(S):
import numpy as np
ind_PrecTOT=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
if ((N2/N) < 0.8):
ind_PrecTOT = np.empty(1)
ind_PrecTOT = np.nan
else:
ind_PrecTOT = np.nansum(S_no_nan)
return ind_PrecTOT
# Mean precipitation value of input signal
def MOY(S):
import numpy as np
ind_moy=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
if ((N2/N) < 0.8):
ind_moy = np.empty(1)
ind_moy = np.nan
else:
ind_moy = np.nanmean(S_no_nan)
return ind_moy
# No of wet days (precipitation ≥ 1 mm) [%]
def Prcp1(S):
import numpy as np
ind_Prcp1=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
if (N2 == 0):
N2=1
if ((N2/N) < 0.8):
ind_Prcp1 = np.empty(1)
ind_Prcp1 = np.nan
else:
ind_Prcp1 = 0
for row in S_no_nan:
if row >= 1 :
ind_Prcp1 += 1
ind_Prcp1 = 100 * (ind_Prcp1/N2)
return ind_Prcp1
# Calculates the 90th percentile of precipitation (mm/day).
def Prec90p(S):
import numpy as np
ind_Prec90p=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
# condition on available data >= 80%
if ((N2/N) < 0.8):
ind_Prec90p = np.empty(1)
ind_Prec90p = np.nan
else:
SS = S[S > 1]
# For Prec90p, we need more than 15 values
if len(SS) < 15 :
ind_Prec90p = np.empty(1)
ind_Prec90p = np.nan
else :
s_sorted = np.sort(SS)
# the rank "m" is calculated from the semi-empirical probability formula of Cunnane (1978)
m=(len(SS)+0.2)*0.9 +0.4
# if rank "m" is not an integer, we have to perform a lineare interpolation
if np.remainder(m,1) != 0:
m_floor=np.floor(m)
m_ceil=np.ceil(m)
slope=(s_sorted[int(m_ceil)-1]-s_sorted[int(m_floor)-1])/(m_ceil-m_floor)
ind_Prec90p=s_sorted[int(m_floor)-1]+slope*(m-m_floor)
else:
ind_Prec90p=s_sorted[int(m)-1];
return ind_Prec90p
# Calculates the simple daily intensity of precipitation of the input signal.
def SDII(S):
import numpy as np
ind_SDII=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
if ((N2/N) < 0.8):
ind_SDII = np.empty(1)
ind_SDII = np.nan
else:
SS = S[S > 1]
ind_SDII = np.nanmean(SS)
return ind_SDII
# Maximum number of dry consecutive days
def CDD(S):
import numpy as np
ind_CDD=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
if ((N2/N) < 0.8):
ind_CDD = np.empty(1)
ind_CDD = np.nan
else:
temp = 0
ind_CDD = 0
j =0
while (j < N2):
while (j < N2 ) and (S_no_nan[j] < 1.0 ):
j += 1
temp +=1
if ind_CDD < temp:
ind_CDD = temp
temp = 0
j += 1
return ind_CDD
# Maximum number of wet consecutive days
def CWD(S):
import numpy as np
ind_CWD=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
if ((N2/N) < 0.8):
ind_CWD = np.empty(1)
ind_CWD = np.nan
else:
temp = 0
ind_CWD = 0
j =0
while (j < N2):
while (j < N2 ) and (S_no_nan[j] > 1.0 ):
j += 1
temp +=1
if ind_CWD < temp:
ind_CWD = temp
temp = 0
j += 1
return ind_CWD
# Calculates the maximum total precipitation cumulated in 3 consecutive days (mm).
def R3d(S):
import numpy as np
ind_R3d=[]
S_no_nan = S[~np.isnan(S)]
N = len(S)
N2 = len(S_no_nan)
if ((N2/N) < 0.8):
ind_R3d = np.empty(1)
ind_R3d = np.nan
else:
temp = 0
ind_R3d = 0
for i in range(0,N-2):
if (~np.isnan(S[i])) and (~np.isnan(S[i+1])) and (~np.isnan(S[i+2])):
temp = S[i] + S[i+1] + S[i+2]
if ind_R3d < temp:
ind_R3d = temp
return ind_R3d
We can put all this fonctions in a common module, it will be easier to load an apply functions:
import Indices_Precipitation
list_indices = ['PrecTOT','MOY','SDII' ,'Prcp1','CWD','CDD','Prec90p','R3d']
resamp_preacc = []
for ind in list_indices:
if ind == 'PrecTOT':
indice = Indices_Precipitation.PrecTOT
indice_out = 'PrecTOT'
elif ind == 'SDII':
indice = Indices_Precipitation.SDII
indice_out = 'SDII'
elif ind == 'Prcp1':
indice = Indices_Precipitation.Prcp1
indice_out = 'Prcp1'
elif ind == 'CWD':
indice = Indices_Precipitation.CWD
indice_out = 'CWD'
elif ind == 'MOY':
indice = Indices_Precipitation.MOY
indice_out = 'MOY'
elif ind == 'CDD':
indice = Indices_Precipitation.CDD
indice_out = 'CDD'
elif ind == 'Prec90p':
indice = Indices_Precipitation.Prec90p
indice_out = 'Prec90p'
elif ind == 'R3d':
indice = Indices_Precipitation.R3d
indice_out = 'R3d'
resamp_preacc.append(preacc.resample('M').agg([indice]))
monthly_indices = pd.concat(resamp_preacc, axis=1)
monthly_indices.columns = monthly_indices.columns.droplevel(0)
- We now can make some quick plots:
import matplotlib.pylab as plt
import datetime
plt.rcParams["figure.figsize"]=[10,6]
plt.plot(monthly_indices.index, monthly_indices['PrecTOT'], label='Preciputation Station', linewidth=2, c='b')
plt.title('Monthly cumulated precipitation: from ' + datetime.date(yearmin, 1, 1).strftime('%Y')+ ' et ' + datetime.date(yearmax, 1, 1).strftime('%Y'), fontsize=15, color='black', weight='semibold')
plt.xlabel('Time', fontsize=15, color='black', weight='semibold')
plt.ylabel('mm', fontsize=15, color='black', weight='semibold')
plt.show()
import seaborn as sns
import matplotlib.pyplot as plt
ax = plt.axes()
sns.boxplot(x=monthly_indices.index.month, y="Prcp1", data=monthly_indices, palette="Set1")
ax.set_title('Number of days with precipitation in AGASSIZ from 1980 to 2017')
ax.set_ylabel('%')
figure = ax.get_figure()
figure.set_size_inches(12, 8)
plt.show()
4 - Compute saisonal extreme precipitation indices
In this section, we will compute seasonal total precipitation for each year from 1981 to 2017 using Indices_Precipitation.PrecTOT fonction.
#################################### Calcul des indices saisonniers
djf = []
mam = []
son=[]
jja=[]
incr= date(1981, 1, 1)
end = date(2017, 12, 31)
while incr <= end:
current_year = str(incr.year)
last_year = str(incr.year-1)
try:
dec = preacc[last_year][np.in1d(preacc[last_year].index.month, [12])]
except:
rng = pd.date_range(last_year, periods=31, freq='D')
dec = pd.DataFrame({'datetime': rng, 'variable': [np.nan]*31}, columns = ['datetime','variable'])
j_f = preacc[current_year][np.in1d(preacc[current_year].index.month, [1,2])]
djf.append(Indices_Precipitation.PrecTOT(dec.append(j_f)))
mam.append(Indices_Precipitation.PrecTOT((preacc[current_year][np.in1d(preacc[current_year].index.month, [3,4,5])])))
jja.append(Indices_Precipitation.PrecTOT((preacc[current_year][np.in1d(preacc[current_year].index.month, [6,4,8])])))
son.append(Indices_Precipitation.PrecTOT((preacc[current_year][np.in1d(preacc[current_year].index.month, [9,10,11])])))
incr = incr + relativedelta(years=1)
TIME=[]
for y in range(1981,2017+1,1):
TIME.append(y)
df_season = pd.DataFrame({'Date': TIME,'Indice DJF': djf, 'Indice MAM':mam, 'Indice JJA':jja, 'Indice SON':son},
columns = ['Date','Indice DJF', 'Indice MAM', 'Indice JJA', 'Indice SON'])
df_season.set_index("Date", inplace = True)
df_season[["Indice DJF", "Indice MAM", "Indice JJA", "Indice SON"]].plot(kind="bar", stacked=True)
ax = plt.axes()
ax.set_title("Total precipitation / season: Agassiz")
ax.set_ylabel('mm')
ax.set_xlabel('Time')
figure = ax.get_figure()
figure.set_size_inches(12, 8)
plt.show()
5 - Compute annual extreme precipitation indices
In this last exemple, we will compute the maximum number of wet consecutive days for each year in our Agassiz’s daily precipitation time serie.
We will call Indices_Precipitation.SDII fonction.
annual = []
df_annual = []
incr= date(1980, 1, 1)
end = date(2017, 12, 31)
while incr <= end:
current_year = str(incr.year)
annual.append(Indices_Precipitation.SDII(preacc[current_year].values))
incr = incr + relativedelta(years=1)
TIME=[]
for y in range(1980,2017+1,1):
TIME.append(y)
df_annual = pd.DataFrame({'Date': TIME,'Consecutive Wet Days': annual}, columns = ['Date','Consecutive Wet Days'])
ax = plt.axes()
plt.plot( 'Date', 'Consecutive Wet Days', data=df_annual, marker='o', color='mediumvioletred')
ax.set_title('Annual Maximum of Consecutive Wet Days in AGASSIZ from 1980 to 2017')
ax.set_ylabel('Days')
figure = ax.get_figure()
figure.set_size_inches(12, 8)
plt.show()