Compute Extreme Precipitation Indices using Python

Compute extreme precipitation indices

extreme-rain.jpg

Evaluation of extreme indices

In order to provide some insight into changes in extreme climate events (e.g., dry or wet spells or cold/warm spells, etc.), a set of descriptive indices of extremes could be developed.

These indices are helpful for monitoring climate change itself and can be used as benchmarks for evaluating climate change scenarios (see the STARDEX project, 2004; Gachon et al., 2005). The indices describe particular characteristics of extremes, including frequency, amplitude/intensity and persistence/duration.

In the present post, a set of eight frequency and intensity precipitation indices will be computed (see table bellow).

  • The mean precipitation intensity is defined simply by the amount of total precipitation falling in a wet day (using the threshold ≥ 1 mm/day to define a wet day).

  • The maximum number of consecutive dry days is used to characterize the length of dry spells.

  • The maximum number of consecutive wet days is used to characterize the length of wet spells.

  • The greatest three days total rainfall describes extremes in precipitation amount.

  • For a very wet day, the 90th percentile value is obtained from all non-zero total precipitation events (i.e. ≥ 1 mm/day).

Datasets used

We will compute these extreme precipitation indices using daily precipitation data from the second generation homogenized dataset of Environment and Climate Change Canada developed by Vincent et al. 2012.

We will first load and clean precipitation timeseries folowing this post: https://www.guillaumedueymes.com/post/eccc_temp/

1- Let’s import Python librairies

import pandas as pd
import os
from datetime import date
import calendar
import numpy as np
from dateutil.relativedelta import relativedelta
import warnings
warnings.filterwarnings("ignore")

We will first work with one station from this document:

dataframe = pd.read_excel("./Adj_Precipitation_Stations.xls", skiprows = range(0, 3))
station = dataframe.iloc[0]
station
Prov                      BC
nom de la station    AGASSIZ
stnid                1100120
année déb.              1890
mois déb.                  1
année fin.              2017
mois fin.                 12
lat (deg)            49.2431
long (deg)           -121.76
élév (m)                  15
stns jointes              No
Name: 0, dtype: object

2 - Loading source file

Let’s load time serie ofr AGASSIZ’s station and clean the data. We will load daily precipitation from 1981 to 2017:

stnid = '1100120'   
path = 'F:/DATA/Donnees_Stations/2nd_generation_V2019/Adj_Daily_Total_v2017'
varin = 'dt'  
f1 = open(path+'/'+str(varin)+str(stnid)+'.txt', 'r')
f2 = open('./tmp.txt', 'w')
for line in f1:
    for word in line:
            if word == 'M':
                f2.write(word.replace('M', ' '))
            elif word == 'a':
                f2.write(word.replace('a', ' '))                    
            else:
                f2.write(word)
f1.close()
f2.close()      
df_station = pd.read_csv('./tmp.txt', delim_whitespace=True, skiprows = range(0, 4))
df_station.columns = ['Year', 'Month', 'D1','D2','D3','D4','D5','D6','D7','D8','D9','D10',
                                  'D11','D12','D13','D14','D15','D16','D17','D18','D19','D20',
                                  'D21','D22','D23','D24','D25','D26','D27','D28','D29','D30','D31']
     
os.remove("./tmp.txt")
   
   # nettoyage des valeurs manquantes 
try:  
    df_station = df_station.replace({'E':''}, regex=True)
except:
       pass
try: 
    df_station = df_station.replace({'a':''}, regex=True)
except:
       pass
try:     
    df_station = df_station.replace({'-9999.9':''}, regex=True)
except:
       pass
try:     
    df_station = df_station.replace({-9999.9:''}, regex=True)
except:
       pass    
    
for col in  df_station.columns[2:]:
       df_station[col] = pd.to_numeric(df_station[col], errors='coerce')  

We will clean this dataset in order to only have daly values timeserie:

yearmin = 1980                               
yearmax = 2017                                   
m_start =  df_station['Month'].loc[(df_station['Year'] == yearmin)].min()
m_end   =  df_station['Month'].loc[(df_station['Year'] == yearmax)].max()
d_end = calendar.monthrange(yearmax, m_end)[1]                     

tmp_pr = [ ] 
for year in range(yearmin,yearmax+1):    ### Loop over years
    for month in range(1,13):
        df = []
        last_day = calendar.monthrange(year, month)[1] 
        tmin = df_station.loc[(df_station["Year"] == year) & (df_station["Month"] == month)].iloc[:,2:last_day+2].values
           
        if len(tmin) == 0:
            a = np.empty((calendar.monthrange(year,month)[1]))
            a[:] = np.nan
            df=pd.DataFrame(a)
        else:
            df=pd.DataFrame(tmin.T)
               
        start = date(year, month, 1)
        end =   date(year, month, last_day)
        delta=(end-start) 
        nb_days = delta.days + 1 
        rng = pd.date_range(start, periods=nb_days, freq='D')          
        df['datetime'] = rng
        df.index = df['datetime']
        tmp_pr.append(df)
           
tmp_pr = pd.concat(tmp_pr) 
preacc = pd.DataFrame({'datetime': tmp_pr['datetime'], 'Pr': tmp_pr.iloc[:,0]}, columns = ['datetime','Pr']) 
preacc.index = preacc['datetime']
preacc = preacc.drop(["datetime"], axis=1)

Quick plot:

import matplotlib.pylab as plt
import datetime
plt.rcParams["figure.figsize"]=[10,6]        
plt.plot(preacc.index, preacc[:],  label='Preciputation Station', linewidth=2, c='r')
plt.title('Daily precipitation: from ' + datetime.date(yearmin, 1, 1).strftime('%Y')+ ' et '  + datetime.date(yearmax, 1, 1).strftime('%Y'), fontsize=15, color='black', weight='semibold')
plt.xlabel('Time', fontsize=15, color='black', weight='semibold')
plt.ylabel('mm', fontsize=15, color='black', weight='semibold')
plt.show()

png

3 - Compute monthly extreme precipitation indices

We will compute extreme precipitation indices presented in previous table.

# Total cumulated precipitation
def PrecTOT(S):
     import numpy as np
     ind_PrecTOT=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
     if ((N2/N) < 0.8):
         ind_PrecTOT = np.empty(1)
         ind_PrecTOT = np.nan
     else:     
         ind_PrecTOT = np.nansum(S_no_nan)        
     return ind_PrecTOT 
 
# Mean precipitation value of input signal 
def MOY(S):
     import numpy as np
     ind_moy=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
     if ((N2/N) < 0.8): 
         ind_moy = np.empty(1)
         ind_moy = np.nan
     else:    
         ind_moy = np.nanmean(S_no_nan)  
     return ind_moy      


# No of wet days (precipitation ≥ 1 mm) [%]
def Prcp1(S):
     import numpy as np
     ind_Prcp1=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
     if (N2 == 0):
         N2=1
         
     if ((N2/N) < 0.8): 
         ind_Prcp1 = np.empty(1)
         ind_Prcp1 = np.nan
     else:
         ind_Prcp1 = 0
         for row in S_no_nan:
             if row >= 1 :
                 ind_Prcp1 += 1 
                 
         ind_Prcp1 = 100 * (ind_Prcp1/N2)
     return ind_Prcp1        
 
# Calculates the 90th percentile of precipitation (mm/day).
def Prec90p(S):
     import numpy as np
     ind_Prec90p=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
# condition on available data >= 80%
     if ((N2/N) < 0.8): 
         ind_Prec90p = np.empty(1)
         ind_Prec90p = np.nan
     else:
         SS = S[S > 1]
# For Prec90p, we need more than 15 values
         if len(SS)  < 15 :
             ind_Prec90p = np.empty(1)
             ind_Prec90p = np.nan  
         else :        
             s_sorted = np.sort(SS)
# the rank "m" is calculated from the semi-empirical probability formula of Cunnane (1978)
             m=(len(SS)+0.2)*0.9 +0.4
# if rank "m" is not an integer, we have to perform a lineare interpolation           
             if np.remainder(m,1) != 0:
                 m_floor=np.floor(m)
                 m_ceil=np.ceil(m)
                 slope=(s_sorted[int(m_ceil)-1]-s_sorted[int(m_floor)-1])/(m_ceil-m_floor)
                 ind_Prec90p=s_sorted[int(m_floor)-1]+slope*(m-m_floor)
             else: 
                 ind_Prec90p=s_sorted[int(m)-1];
     return ind_Prec90p
 
    
# Calculates the simple daily intensity of precipitation of the input signal.
def SDII(S):
     import numpy as np
     ind_SDII=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
     if ((N2/N) < 0.8): 
         ind_SDII = np.empty(1)
         ind_SDII = np.nan
     else:
         SS = S[S > 1]        
         ind_SDII = np.nanmean(SS)
     return ind_SDII       

# Maximum number of dry consecutive days
def CDD(S):
     import numpy as np
     ind_CDD=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
     if ((N2/N) < 0.8): 
         ind_CDD = np.empty(1)
         ind_CDD = np.nan
     else:
         temp = 0
         ind_CDD = 0 
         j =0
         while (j < N2):
             while (j < N2 ) and (S_no_nan[j] < 1.0 ):
                 j += 1
                 temp +=1
             if ind_CDD < temp:
                 ind_CDD = temp
             temp = 0
             j += 1 
     return ind_CDD      
 
# Maximum number of wet consecutive days
def CWD(S):
     import numpy as np
     ind_CWD=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
     if ((N2/N) < 0.8): 
         ind_CWD = np.empty(1)
         ind_CWD = np.nan
     else:
         temp = 0
         ind_CWD = 0 
         j =0
         while (j < N2):
             while (j < N2 ) and (S_no_nan[j] > 1.0 ):
                 j += 1
                 temp +=1
             if ind_CWD < temp:
                 ind_CWD = temp
             temp = 0
             j += 1 
     return ind_CWD      
 
    
# Calculates the maximum total precipitation cumulated in 3 consecutive days (mm).
def R3d(S):
     import numpy as np
     ind_R3d=[]
     S_no_nan = S[~np.isnan(S)]
     N = len(S)
     N2 = len(S_no_nan)
     if ((N2/N) < 0.8): 
         ind_R3d = np.empty(1)
         ind_R3d = np.nan
     else:
         temp = 0
         ind_R3d = 0 
         for i in range(0,N-2):
             if (~np.isnan(S[i])) and  (~np.isnan(S[i+1]))  and  (~np.isnan(S[i+2])):
                 temp = S[i] + S[i+1] + S[i+2]
             if ind_R3d < temp:
                 ind_R3d = temp
     return ind_R3d          

We can put all this fonctions in a common module, it will be easier to load an apply functions:

import Indices_Precipitation
list_indices = ['PrecTOT','MOY','SDII' ,'Prcp1','CWD','CDD','Prec90p','R3d']   
resamp_preacc = []
for ind in list_indices:
    if ind == 'PrecTOT':
        indice = Indices_Precipitation.PrecTOT  
        indice_out = 'PrecTOT'     
    elif ind == 'SDII':
        indice = Indices_Precipitation.SDII
        indice_out = 'SDII'
    elif ind == 'Prcp1':
        indice = Indices_Precipitation.Prcp1
        indice_out = 'Prcp1'
    elif ind == 'CWD':
        indice = Indices_Precipitation.CWD
        indice_out = 'CWD'
    elif ind == 'MOY':
        indice = Indices_Precipitation.MOY
        indice_out = 'MOY'
    elif ind == 'CDD':
        indice = Indices_Precipitation.CDD
        indice_out = 'CDD'   
    elif ind == 'Prec90p':
        indice = Indices_Precipitation.Prec90p
        indice_out = 'Prec90p'
    elif ind == 'R3d':
        indice = Indices_Precipitation.R3d
        indice_out = 'R3d'
        
    resamp_preacc.append(preacc.resample('M').agg([indice]))
monthly_indices = pd.concat(resamp_preacc, axis=1)
monthly_indices.columns = monthly_indices.columns.droplevel(0)
  • We now can make some quick plots:
import matplotlib.pylab as plt
import datetime
plt.rcParams["figure.figsize"]=[10,6]        
plt.plot(monthly_indices.index, monthly_indices['PrecTOT'],  label='Preciputation Station', linewidth=2, c='b')
plt.title('Monthly cumulated precipitation: from ' + datetime.date(yearmin, 1, 1).strftime('%Y')+ ' et '  + datetime.date(yearmax, 1, 1).strftime('%Y'), fontsize=15, color='black', weight='semibold')
plt.xlabel('Time', fontsize=15, color='black', weight='semibold')
plt.ylabel('mm', fontsize=15, color='black', weight='semibold')
plt.show()

png

import seaborn as sns
import matplotlib.pyplot as plt 
ax = plt.axes()
sns.boxplot(x=monthly_indices.index.month, y="Prcp1", data=monthly_indices, palette="Set1") 
ax.set_title('Number of days with precipitation in AGASSIZ from 1980 to 2017')
ax.set_ylabel('%')

figure = ax.get_figure()    
figure.set_size_inches(12, 8) 
plt.show()

png

4 - Compute saisonal extreme precipitation indices

In this section, we will compute seasonal total precipitation for each year from 1981 to 2017 using Indices_Precipitation.PrecTOT fonction.

#################################### Calcul des indices saisonniers  
djf = []
mam = []
son=[]
jja=[]
incr= date(1981, 1, 1)
end = date(2017, 12, 31)
while incr <= end:
    current_year = str(incr.year)
    last_year = str(incr.year-1)
    try:
        dec = preacc[last_year][np.in1d(preacc[last_year].index.month, [12])]
    except:
        rng = pd.date_range(last_year, periods=31, freq='D')
        dec = pd.DataFrame({'datetime': rng, 'variable': [np.nan]*31}, columns = ['datetime','variable']) 
                
    j_f = preacc[current_year][np.in1d(preacc[current_year].index.month, [1,2])]
            
    djf.append(Indices_Precipitation.PrecTOT(dec.append(j_f)))            
    mam.append(Indices_Precipitation.PrecTOT((preacc[current_year][np.in1d(preacc[current_year].index.month, [3,4,5])])))
    jja.append(Indices_Precipitation.PrecTOT((preacc[current_year][np.in1d(preacc[current_year].index.month, [6,4,8])])))
    son.append(Indices_Precipitation.PrecTOT((preacc[current_year][np.in1d(preacc[current_year].index.month, [9,10,11])])))            
    
    incr = incr + relativedelta(years=1)
TIME=[]
for y in range(1981,2017+1,1):
    TIME.append(y)
df_season = pd.DataFrame({'Date': TIME,'Indice DJF': djf, 'Indice MAM':mam, 'Indice JJA':jja, 'Indice SON':son}, 
                         columns = ['Date','Indice DJF', 'Indice MAM', 'Indice JJA', 'Indice SON']) 

df_season.set_index("Date", inplace = True)
df_season[["Indice DJF", "Indice MAM", "Indice JJA", "Indice SON"]].plot(kind="bar", stacked=True)
ax = plt.axes()
ax.set_title("Total precipitation / season: Agassiz")
ax.set_ylabel('mm')
ax.set_xlabel('Time') 
figure = ax.get_figure()    
figure.set_size_inches(12, 8) 
plt.show()

png

5 - Compute annual extreme precipitation indices

In this last exemple, we will compute the maximum number of wet consecutive days for each year in our Agassiz’s daily precipitation time serie.

We will call Indices_Precipitation.SDII fonction.

annual = []
df_annual = []
incr= date(1980, 1, 1)
end = date(2017, 12, 31)
while incr <= end:
    current_year = str(incr.year)
    annual.append(Indices_Precipitation.SDII(preacc[current_year].values))         
    incr = incr + relativedelta(years=1)
TIME=[]
for y in range(1980,2017+1,1):
    TIME.append(y) 
df_annual = pd.DataFrame({'Date': TIME,'Consecutive Wet Days': annual}, columns = ['Date','Consecutive Wet Days']) 
ax = plt.axes()
plt.plot( 'Date', 'Consecutive Wet Days', data=df_annual, marker='o', color='mediumvioletred')
ax.set_title('Annual Maximum of Consecutive Wet Days in AGASSIZ from 1980 to 2017')
ax.set_ylabel('Days')
figure = ax.get_figure()    
figure.set_size_inches(12, 8) 
plt.show()

png


Avatar
Guillaume Dueymes
Data Scientist and Research Assistant

My research interests include data science, data management and climate science.