Python: Statistinis klaidingų duomenų taškų eliminavimas

nekorektiski_taskaiPrieš mėnesį prie RaspberryPi prisijungiau porą DHT22 oro temperatūros ir drėgmės jutiklių ir pradėjau automatizuotai iš jų surinktus duomenis saugoti MySQL duomenų bazėje (plačiau čia). Dar programuodamas duomenų surinkimą, pastebėjau, kad laikas nuo laiko DHT22 duomenyse pasitaiko klaidų, pav. vieno iš 10 matavimų rezultatas yra keliais laipsniais didesnis nei likusių ar pan. Tokias klaidas bandžiau eliminuoti suvidurkindamas kelių matavimų duomenis, tačiau tai padėjo tik iš dalies, nes net vienas klaidingas rezultatas gan stipriai iškreipia vidurkio rezultatą (žr. grafiką viršuje). Nusprendžiau matavimų klaidas eliminuoti panaudodamas statistinį „nukrypėlių“ eliminavimo metodą.

Tinkamo metodo ieškojau ilgai. Pirmiausia į galvą atėjo pasinaudoti vidutiniu kvadratiniu nuokrypiu ar Stjudento kriterijumi. Po to atradau „Dixon’s Q“ testą. Kažkiek pažaidęs, visus šiuos metodus atmečiau, nes radau daug paprastesnį ir labai lengvai python’e realizuojamą „Interquartile range“ metodą.

iqrMetodo esmė yra ta, kad iš didėjimo tvarka surūšiuoto duomenų masyvo yra atmetamos labai mažos ir labai didelės reikšmės. Tiksliau, atmetamos tos reikšmės, kuriuos yra per daug nutolusios nuo masyvo medianos. Tokių reikšmių atmetimui yra apskaičiuojami 25% ir 75% kvartiliai Q1 ir Q3, jų skirtumas IRQ=Q3-Q1 ir atstumai nuo medianos: L=Q1-1,5*IRQ ir H=Q3+1,5*IRQ. Reikšmės patenkančios už L ir H rėžių yra laikomos per daug nutolusiomis ir yra atmetamos.

Kaip tai atrodo realybėje, yra labai gerai paaiškinta čia:

Kaip tai yra realizuojama python’e;

import numpy as np
myList=[10,11,11,11,12,12,13,14,14,15,17,22]
MLS = sorted(myList)
Q1 = np.percentile(MLS,25)
Q3 = np.percentile(MLS,75)
IRQ=Q3-Q1
L=Q1-1.5*IRQ
H=Q1+1.5*IRQ
print "---"
print "myList: %s" %(myList)
print "MLS: %s" %(MLS)
print "Q1=%s Q3=%s IRQ=%s L=%s H=%s" %(Q1,Q3,IRQ,L,H)

Kode, kvartiliai Q1 ir Q3 yra apskaičiuojami NumPy bibliotekos funkcija percentile(dataList,percentile), kuriai paduodamas didėjimo tvarka surūšiuotas masyvas MLS. Likusi matematik matematika – elementari. Kodo rezultatas:

irq_pythonPagal metodiką, visos masyvo reikšmės mažesnės, nei L=6,135 ir didesnės nei H=15,875 yra tikėtinai klaidingos ir tolimesniems skaičiavimams netinkamos.

Savo DHT22 nuskaitymo reikmėms panaudodamas šį metodą parašiau funkciją, kuri ne tik atmeta tikėtinai klaidingas matavimo reikšmes bet ir iš kart apskaičiuoja matavimo rezultatų vidurkį. Funkciją pavadinau „sanity_test(dataList)„:

import numpy as np

def sanity_test(dataList):
    print "myList: %s" %(dataList)
    MLS = sorted(dataList)
    print "MLS: %s" %(MLS)
    Q1 = np.percentile(MLS,25)
    Q3 = np.percentile(MLS,75)
    IRQ=Q3-Q1
    L=Q1-1.5*IRQ
    H=Q1+1.5*IRQ
    print "L: %s H: %s" %(L,H)
    print "----"
    data = 0
    count = 0
    for i in range(0,len(MLS)):
        if (MLS[i] > L and MLS[i] < H):
            data = data + MLS[i]
            count = count + 1
            print "+ i: %s MLS[%s]: %s" %(i,i,MLS[i])
        else:
            count = count
            print "- i: %s MLS[%s]: %s" %(i,i,MLS[i])
        
    data_avg = round(data/count,1)
    print "----"
    print "data_avg=%s" %(data_avg)
    return data_avg
    
myList=[24.2,24.2,24.2,24.2,24.2,24.2,24.1,29.9,24.1,24.1,24.1,24.1,20.1]
sanity_test(myList)

Funkcija veikia gan paprastai. Pirmiausiai funkcijai paduotas masyvas yra surūšiuojamas didėjimo tvarka. Po to, kaip ir prieš tai pateiktame kode, yra apskaičiuojami kvartiliai ir išvedamos ribos L ir H. Po to, cikle yra tikrinamas kiekvienas masyvo elementas ir, atmetant sąlygos netenkinančias reikšmes, yra išvedamas vidurkis. Vidurkis yra gražinamas į išorę. Specialiai kode prirašiau daug print’ų, kad kodo rezultatas būtų kuo vaizdesnis:

sanity_testIš „screenshot“ matome, kad funkcija apskaičiavo, jog „tinkamos“ masyvo elementų reikšmių ribos L=23,95 ir H=24,25. Ciklas, eidamas per visus masyvo elementus, atmetė elementus, kurie yra mažesnė ar didesni nei šios ribos: MLS[0]=20,1 < 23,95 ir MLS[12]=29,9 > 24,25. Iš sąlygas tenkinančių reikšmių, ciklas išvedė vidurkį data_avg=24,2. Neišmetus netinkamų verčių, vidurkis gaunasi 24,3.

Įsitikinęs, kad kodas tikrai veikia, ankstesniame straipsnyje aprašytą DHT22 duomenų surinkimo kodą patobulinau taip:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import MySQLdb as mdb
import sys
import arrow
import re
import subprocess
import os
import numpy as np
#import sys

import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BCM) ## uses GPIO numbers NOT pin numbers
GPIO.setwarnings( False )
GPIO.setup(26, GPIO.OUT) ## 37 PIN / GPIO26 

os.chdir(os.path.abspath('/home/python/'))

#globals for temperature and humidity averaging
temp = 0
hum = 0

#smart averaging process witch eliminates outlier data points
def sanity_test(dataList):
    MLS = sorted(dataList)
    #print MLS
    Q1 = np.percentile(MLS,25)
    Q3 = np.percentile(MLS,75)
    IRQ=Q3-Q1
    L=Q1-1.5*IRQ
    H=Q1+1.5*IRQ
    #print "L: %s H: %s" %(L,H)

    data = 0
    count = 0
    for i in range(0,len(MLS)):
        if (MLS[i] >= L and MLS[i] <= H):
            data = data + MLS[i]
            count = count + 1
            #print "+ i: %s MLS[%s]: %s count: %s" %(i,i,MLS[i],count)
        else:
            count = count
            #print "- i: %s MLS[%s]: %s count: %s" %(i,i,MLS[i],count)
        
    data_avg = round(data/count,1)
    #print data_avg
    return data_avg

def read_dht22 ( PiPin ):
    success = 0
    output = subprocess.check_output(["./Adafruit_DHT", "2302", str(PiPin)])
    matches = re.search("Temp =\s+([-+]?[0-9.]+)", output)
    if (matches):
        global temp
        temp = matches.group(1)
        success = 1
        #print temp
    else:
        success = 0
        #print "fail"
    matches = re.search("Hum =\s+([0-9.]+)", output)
    if (matches):
        global hum
        hum = matches.group(1)
        #print hum
    else:
        success = 0
        #print "fail"
    return success

try:
    con = mdb.connect('localhost', 'USERNAME', 'PASSWORD', 'warehouse');

    cur = con.cursor()
    cur.execute("SELECT VERSION()")

    ver = cur.fetchone()
    sys_date = arrow.now().format('YYYY-MM-DD HH:mm:ss')
    
    print "Database version : %s " % ver
    print sys_date

    GPIO.output(26,True) #Turn LED ON, while reading DHT2 sensors
    
    count_1 = 0
    temperature_1 = []
    humidity_1 = []
    for x in range (0, 10):
        if (read_dht22(19)):
            count_1 = count_1 + 1
            temperature_1.append(float(temp))
            humidity_1.append(float(hum))
            #print "[19] Temp: %s Hum: %s Count: %s" %(temp,hum,count_1)
        else: count_1 = count_1 #print "echem.."

    if (count_1>0):
        temperature_1 = sanity_test(temperature_1)
        humidity_1=sanity_test(humidity_1)
    else: count_1=count_1
            
    print "[19] Temperature: %s Humidity: %s" %(temperature_1,humidity_1) 

    count_2 = 0
    temperature_2 = []
    humidity_2 = []
    for x in range (0, 10):
        if (read_dht22(13)):
            count_2 = count_2 + 1
            temperature_2.append(float(temp))
            humidity_2.append(float(hum))
            #print "[1] Temp: %s Hum: %s Count: %s" %(temp,hum,count_2)
        else: count_2 = count_2 #print "echem.."

    if (count_2>0):
        temperature_2 = sanity_test(temperature_2)
        humidity_2 = sanity_test(humidity_2)
    else: count_2=count_2
            
    print "[13] Temperature: %s Humidity: %s" %(temperature_2,humidity_2)         
 
    if (count_1 > 0 and count_2 > 0):
            cur.execute("""INSERT INTO duomenys (D_ActualDate, D_Temperature_1, D_Humidity_1 ,D_Temperature_2, D_Humidity_2, D_Info)
            VALUES ( %s, %s,%s,%s,%s, 'CronJob')""",
            [sys_date,temperature_1,humidity_1,temperature_2,humidity_2])

    elif (count_1 == 0 and count_2 > 0):
            cur.execute("""INSERT INTO duomenys (D_ActualDate, D_Temperature_1, D_Humidity_1 ,D_Temperature_2, D_Humidity_2, D_Info)
            VALUES ( %s, NULL,NULL,%s,%s, 'CronJob')""",
            [sys_date,temperature_2,humidity_2])

    elif (count_1 > 0 and count_2 == 0):
            cur.execute("""INSERT INTO duomenys (D_ActualDate, D_Temperature_1, D_Humidity_1 ,D_Temperature_2, D_Humidity_2, D_Info)
            VALUES ( %s,%s,%s,NULL,NULL, 'CronJob')""",
            [sys_date,temperature_1,humidity_1])

    else :
            cur.execute("""INSERT INTO duomenys (D_ActualDate, D_Temperature_1, D_Humidity_1 ,D_Temperature_2, D_Humidity_2, D_Info)
            VALUES ( %s, NULL,NULL,NULL,NULL, 'CronJob')""",
            [sys_date])

                
    con.commit()

    GPIO.output(26,False)
    
except mdb.Error, e:
    if con:
        con.rollback()
        
    print "Error %d: %s" % (e.args[0],e.args[1])
    sys.exit(1)
    
finally:    
        
    if con:    
        con.close()
    GPIO.cleanup()    

Visą kodą, kartu su Adafruit_DHT driveriu, galite paršisiųsti iš čia: dht22_mysql_2.zip

Naujausią kodo versiją rasite mano GitHub saugykloje!

Naudinga literatūra:

1 komentaras

  1. Atgalinis pranešimas: Avietė: DHT22 -> MySQL | Paulius Bautrėnas

Parašykite komentarą

El. pašto adresas nebus skelbiamas.