Aug 22 2013 0

Rule definitions

In my previous post I detailed the changes to the data collecting Python script and an additional script fired by crontab. In this post I will describe the code behind the rule-engine.

Table to store rule definitions

create table rules(
  rule_id         int not null auto_increment comment 'Internal ID'
, rule_desc       char(200) not null comment 'Description of the rule'
, rule_query      text not null comment 'Query to be used for the rule'
, rule_message    char(200) not null comment 'Format string to be used when rule is triggered (using pfrint format)'
, rule_active     char(1) not null default 'N' comment 'Should the rule be checked (Y/N)'
, rule_preproc    text comment 'Process to perform before checking rule (optional, results not checked)'
, rule_postproc   text comment 'Process to perform after checking rule (optional, results not checked)'
, rule_last_used  timestamp comment 'When as the rule last triggered'
, primary key (rule_id)
);
create index rule_idx on rules(rule_id);

PHP Class - Rules.php

For an usage example, see my previous post. 1

<?php
class Rules
{
    /* Connection to MySQL database */
    private $connection;

    /* Private properties */
    private $_description; /* Exposed via getDescription() */
    private $_preprocess;
    private $_postprocess;
    private $_query;
    private $_message;
    private $_output; /* Exposed via getOutput() */
    private $_active; /* Exposed via getActive() */

    private function reset()
    {
        $this->_description = NULL;
        $this->_preprocess = NULL;
        $this->_postprocess = NULL;
        $this->_query = NULL;
        $this->_message = NULL;
        $this->_output = NULL;
        $this->_active = NULL;
    }

    private function connect( $host, $username, $password, $database )
    {
        $this->connection = new mysqli( $host, $username, $password, $database );
    }


    private function disconnect()
    {
        $this->connection->close();
    }


    /* Get rule data */
    private function get_rule_data( $ruleid )
    {
        if ( $results = $this->connection->query( "select rule_desc
                                                   ,      rule_query
                                                   ,      rule_message
                                                   ,      rule_active
                                                   ,      rule_preproc
                                                   ,      rule_postproc
                                                   from   rules
                                                   where  rule_id = ${ruleid}" ) ) {
            $row = mysqli_fetch_array( $results );
            $this->_description = $row[0];
            $this->_query = $row[1];
            $this->_message = $row[2];
            $this->_active = $row[3];
            $this->_preprocess = $row[4];
            $this->_postprocess = $row[5];
        }
    }


    public function run_rule( $ruleid )
    {
        /* Reset */
        $this->reset();

        /* Get rule data */
        $this->get_rule_data( $ruleid );

        if ( $this->_active == 'Y' ) {
            /* Clear any previous output */
            $this->_output = NULL;

            /* Execute pre process */
            if ( strlen( $this->_preprocess ) > 0 ) {
                $this->connection->query( $this->_preprocess );
            }

            /* Execute the actual rule */
            $rule_result = $this->connection->query( $this->_query );

            /* Format the message */
            if ( mysqli_num_rows( $rule_result ) == 1 ) {
                $this->_output = vsprintf( $this->_message, mysqli_fetch_array( $rule_result, MYSQLI_NUM ) );
            }

            /* Execute post process */
            if ( strlen( $this->_postprocess ) > 0 ) {
                $this->connection->query( $this->_postprocess );
            }

            /* Update last usage timestamp for rule */
            $this->connection->query( "update rules set rule_last_used=now() where rule_id = ${ruleid}" );
            $this->connection->commit();

            /* Result result message */
            return ( strlen( $this->_output ) > 0 );
        }

        /* Return failed indicator */
        return false;
    }


    /* Return the value of the private property _ */
    public function getDescription()
    {
        return $this->_description;
    }


    /* Return the value of the private property _output */
    public function getOutput()
    {
        return $this->_output;
    }


    /* Return the value of the private property _active */
    public function getActive()
    {
        return $this->_active;
    }


    /* Default Constructor */
    public function __construct( $host, $username, $password, $database )
    {
        $this->connect( $host, $username, $password, $database );
    }


    /* Default Destructor */
   function __destruct() {
           $this->disconnect();
   }
}
?>

Python Module - Rules.py

For an usage example, see my previous post. 1

# -*- coding: utf-8 -*-

import MySQLdb as sql

class Rules:
    __description = None
    __preprocess = None
    __postprocess = None
    __query = None
    __message = None
    __output = None
    __active = None

    def reset( self ):
        self.__description = None
        self.__preprocess = None
        self.__postprocess = None
        self.__query = None
        self.__message = None
        self.__output = None
        self.__active = None


    def connect( self, host, username, password, database ):
        self.connection = sql.connect( host, username, password, database )
        self.cursor = self.connection.cursor()


    def disconnect( self ):
        self.connection.close()


    def get_rule_data( self, ruleid ):
        self.cursor.execute( "select rule_desc \
                              ,      rule_query \
                              ,      rule_message \
                              ,      rule_active \
                              ,      rule_preproc \
                              ,      rule_postproc \
                              from   rules \
                              where  rule_id = {0}".format( ruleid ) )

        row = self.cursor.fetchone()

        self.__description = row[0];
        self.__query = row[1];
        self.__message = row[2];
        self.__active = row[3];
        self.__preprocess = row[4];
        self.__postprocess = row[5];


    def run_rule( self, ruleid ):
        self.reset()
        self.get_rule_data( ruleid )

        if ( self.__active == 'Y' ):
            # Clear any previous output
            self.__output = None;

            # Execute pre process
            if ( self.__preprocess is not None and len( self.__preprocess ) > 0 ):
                self.cursor.execute( self.__preprocess )

            # Execute the actual rule
            rule_result = self.cursor.execute( self.__query )

            # Format the message
            if ( self.cursor.rowcount == 1 ):
                self.__output = self.__message % ( self.cursor.fetchone() )

            # Execute post process
            if ( self.__postprocess is not None and len( self.__postprocess ) > 0 ):
                self.cursor.execute( self.__postprocess )

            # Update last usage timestamp for rule
            self.cursor.execute( "update rules set rule_last_used=now() where rule_id = {0}".format( ruleid ) )
            self.connection.commit();

            # Result result message
            return ( self.__output is not None and len( self.__output ) > 0 )

        # Return failed indicator
        return false;


    def getDescription( self ):
        return self.__description


    def getOutput( self ):
        return self.__output


    def getActive( self ):
        return self.__active


    def __init__( self, host, username, password, database ):
        self.connect( host, username, password, database )


    def __del__( self ):
        self.disconnect()

Rule Definitions

Temperature Difference *** Query

select abs(r.current - r.previous) as diff
,      r.current, r.previous
from  ( select substring_index( group_concat( x.value order by x.timestamp desc ),',',1)+0 as current
        ,      substring_index( group_concat( x.value order by x.timestamp ),',',1)+0 as previous
        from  ( select sd.*
                from   sensor_data sd
                where  sd.sensor_id = 1
                order by sd.timestamp desc
                limit 2
              ) x
      ) r
where abs(r.current - r.previous) > 1;

Message

Difference of %.2fC between two readings (%.2fC - %.2fC)

Pre-process

None

Post-process

None

New Record *** This rule is now only defined to the first sensor, because at the moment the Rules module only expects one row of data from the rule query to be returned. The pre- and post-process however do not have this limitation, so high/values for all sensors will be determined and stored.

Query

select sen.sensor_name
,      least(new.min_value, old.min_value)
,      greatest(new.max_value, old.max_value)
from   sensor_high_low old
,      tmp_high_low new
,      sensors sen
where  old.sensor_id = new.sensor_id
and    old.sensor_id = sen.sensor_id
and   ( old.min_value > new.min_value or old.max_value < new.max_value );

Message

New record for %s, Low = %.2f / High = %.2f

Pre-process

call new_high_low()

Post-process

call update_high_low()

No Data Logged *** Query

select date_format( x.ts, '%M %d, %Y at %H:%i:%s' )
from   ( select max(timestamp) as ts
         from   sensor_data
       ) x
where timestampdiff( MINUTE, x.ts, now() ) > 5;

Message

No data has been logged for 5 minutes now. Last entry was on %s

Pre-process

None

Post-process

None

MySQL Procedures

On how to create a procedure in MySQL, please read the MySQL documentation on CREATE PROCEDURE and CREATE FUNCTION Syntax.

The New Record’ rule makes use of two MySQL procedures and are defined as detailed below.

Definition of the new_high_low procedure 1

create procedure new_high_low()
begin
    truncate table tmp_high_low;

    insert into tmp_high_low( sensor_id, min_value, max_value )
        select sensor_id
        ,      min(value)
        ,      max(value)
        from   sensor_data
        where  timestamp >= date_sub(NOW(), interval 8 hour)
        group by sensor_id;

    -- Save changes
    commit;
end;

Definition of the update_high_low procedure 1

create procedure update_high_low()
begin
    insert into sensor_high_low( sensor_id, min_value, max_value )
        select x.sensor_id
        ,      x.min_value
        ,      x.max_value
        from   tmp_high_low x
    on duplicate key
        update min_value=if( sensor_high_low.min_value > x.min_value, x.min_value, sensor_high_low.min_value )
        ,      max_value=if( sensor_high_low.max_value < x.max_value, x.max_value, sensor_high_low.max_value );

    -- Save changes
    commit;
end;

Previous post
DNSSEC Issues I recently noticed that when I was using Google Public DNS I could not reach my own domain at littlegemsoftware.com. Using other DNS services like
Next post
Additional Rules ##Unhealthy Humidity## Ever since we moved into our new apartment we noticed that the relative humidity is very low, sometimes even as low as 20%.
This blog is powered by Blot