Rule definitions
In my previous post I detailed the changes to the data collecting Python script and an additional script fired by crontab. In this post I will describe the code behind the rule-engine.
Table to store rule definitions
create table rules(
rule_id int not null auto_increment comment 'Internal ID'
, rule_desc char(200) not null comment 'Description of the rule'
, rule_query text not null comment 'Query to be used for the rule'
, rule_message char(200) not null comment 'Format string to be used when rule is triggered (using pfrint format)'
, rule_active char(1) not null default 'N' comment 'Should the rule be checked (Y/N)'
, rule_preproc text comment 'Process to perform before checking rule (optional, results not checked)'
, rule_postproc text comment 'Process to perform after checking rule (optional, results not checked)'
, rule_last_used timestamp comment 'When as the rule last triggered'
, primary key (rule_id)
);
create index rule_idx on rules(rule_id);
PHP Class - Rules.php
For an usage example, see my previous post.
<?php
class Rules
{
/* Connection to MySQL database */
private $connection;
/* Private properties */
private $_description; /* Exposed via getDescription() */
private $_preprocess;
private $_postprocess;
private $_query;
private $_message;
private $_output; /* Exposed via getOutput() */
private $_active; /* Exposed via getActive() */
private function reset()
{
$this->_description = NULL;
$this->_preprocess = NULL;
$this->_postprocess = NULL;
$this->_query = NULL;
$this->_message = NULL;
$this->_output = NULL;
$this->_active = NULL;
}
private function connect( $host, $username, $password, $database )
{
$this->connection = new mysqli( $host, $username, $password, $database );
}
private function disconnect()
{
$this->connection->close();
}
/* Get rule data */
private function get_rule_data( $ruleid )
{
if ( $results = $this->connection->query( "select rule_desc
, rule_query
, rule_message
, rule_active
, rule_preproc
, rule_postproc
from rules
where rule_id = ${ruleid}" ) ) {
$row = mysqli_fetch_array( $results );
$this->_description = $row[0];
$this->_query = $row[1];
$this->_message = $row[2];
$this->_active = $row[3];
$this->_preprocess = $row[4];
$this->_postprocess = $row[5];
}
}
public function run_rule( $ruleid )
{
/* Reset */
$this->reset();
/* Get rule data */
$this->get_rule_data( $ruleid );
if ( $this->_active == 'Y' ) {
/* Clear any previous output */
$this->_output = NULL;
/* Execute pre process */
if ( strlen( $this->_preprocess ) > 0 ) {
$this->connection->query( $this->_preprocess );
}
/* Execute the actual rule */
$rule_result = $this->connection->query( $this->_query );
/* Format the message */
if ( mysqli_num_rows( $rule_result ) == 1 ) {
$this->_output = vsprintf( $this->_message, mysqli_fetch_array( $rule_result, MYSQLI_NUM ) );
}
/* Execute post process */
if ( strlen( $this->_postprocess ) > 0 ) {
$this->connection->query( $this->_postprocess );
}
/* Update last usage timestamp for rule */
$this->connection->query( "update rules set rule_last_used=now() where rule_id = ${ruleid}" );
$this->connection->commit();
/* Result result message */
return ( strlen( $this->_output ) > 0 );
}
/* Return failed indicator */
return false;
}
/* Return the value of the private property _ */
public function getDescription()
{
return $this->_description;
}
/* Return the value of the private property _output */
public function getOutput()
{
return $this->_output;
}
/* Return the value of the private property _active */
public function getActive()
{
return $this->_active;
}
/* Default Constructor */
public function __construct( $host, $username, $password, $database )
{
$this->connect( $host, $username, $password, $database );
}
/* Default Destructor */
function __destruct() {
$this->disconnect();
}
}
?>
Python Module - Rules.py
For an usage example, see my previous post.
# -*- coding: utf-8 -*-
import MySQLdb as sql
class Rules:
__description = None
__preprocess = None
__postprocess = None
__query = None
__message = None
__output = None
__active = None
def reset( self ):
self.__description = None
self.__preprocess = None
self.__postprocess = None
self.__query = None
self.__message = None
self.__output = None
self.__active = None
def connect( self, host, username, password, database ):
self.connection = sql.connect( host, username, password, database )
self.cursor = self.connection.cursor()
def disconnect( self ):
self.connection.close()
def get_rule_data( self, ruleid ):
self.cursor.execute( "select rule_desc \
, rule_query \
, rule_message \
, rule_active \
, rule_preproc \
, rule_postproc \
from rules \
where rule_id = {0}".format( ruleid ) )
row = self.cursor.fetchone()
self.__description = row[0];
self.__query = row[1];
self.__message = row[2];
self.__active = row[3];
self.__preprocess = row[4];
self.__postprocess = row[5];
def run_rule( self, ruleid ):
self.reset()
self.get_rule_data( ruleid )
if ( self.__active == 'Y' ):
# Clear any previous output
self.__output = None;
# Execute pre process
if ( self.__preprocess is not None and len( self.__preprocess ) > 0 ):
self.cursor.execute( self.__preprocess )
# Execute the actual rule
rule_result = self.cursor.execute( self.__query )
# Format the message
if ( self.cursor.rowcount == 1 ):
self.__output = self.__message % ( self.cursor.fetchone() )
# Execute post process
if ( self.__postprocess is not None and len( self.__postprocess ) > 0 ):
self.cursor.execute( self.__postprocess )
# Update last usage timestamp for rule
self.cursor.execute( "update rules set rule_last_used=now() where rule_id = {0}".format( ruleid ) )
self.connection.commit();
# Result result message
return ( self.__output is not None and len( self.__output ) > 0 )
# Return failed indicator
return false;
def getDescription( self ):
return self.__description
def getOutput( self ):
return self.__output
def getActive( self ):
return self.__active
def __init__( self, host, username, password, database ):
self.connect( host, username, password, database )
def __del__( self ):
self.disconnect()
Rule Definitions
Temperature Difference *** Query
select abs(r.current - r.previous) as diff
, r.current, r.previous
from ( select substring_index( group_concat( x.value order by x.timestamp desc ),',',1)+0 as current
, substring_index( group_concat( x.value order by x.timestamp ),',',1)+0 as previous
from ( select sd.*
from sensor_data sd
where sd.sensor_id = 1
order by sd.timestamp desc
limit 2
) x
) r
where abs(r.current - r.previous) > 1;
Message
Difference of %.2fC between two readings (%.2fC - %.2fC)
Pre-process
None
Post-process
None
New Record *** This rule is now only defined to the first sensor, because at the moment the Rules module only expects one row of data from the rule query to be returned. The pre- and post-process however do not have this limitation, so high/values for all sensors will be determined and stored.
Query
select sen.sensor_name
, least(new.min_value, old.min_value)
, greatest(new.max_value, old.max_value)
from sensor_high_low old
, tmp_high_low new
, sensors sen
where old.sensor_id = new.sensor_id
and old.sensor_id = sen.sensor_id
and ( old.min_value > new.min_value or old.max_value < new.max_value );
Message
New record for %s, Low = %.2f / High = %.2f
Pre-process
call new_high_low()
Post-process
call update_high_low()
No Data Logged *** Query
select date_format( x.ts, '%M %d, %Y at %H:%i:%s' )
from ( select max(timestamp) as ts
from sensor_data
) x
where timestampdiff( MINUTE, x.ts, now() ) > 5;
Message
No data has been logged for 5 minutes now. Last entry was on %s
Pre-process
None
Post-process
None
MySQL Procedures
On how to create a procedure in MySQL, please read the MySQL documentation on CREATE PROCEDURE and CREATE FUNCTION Syntax.
The ‘New Record’ rule makes use of two MySQL procedures and are defined as detailed below.
Definition of the new_high_low
procedure
create procedure new_high_low()
begin
truncate table tmp_high_low;
insert into tmp_high_low( sensor_id, min_value, max_value )
select sensor_id
, min(value)
, max(value)
from sensor_data
where timestamp >= date_sub(NOW(), interval 8 hour)
group by sensor_id;
-- Save changes
commit;
end;
Definition of the update_high_low
procedure
create procedure update_high_low()
begin
insert into sensor_high_low( sensor_id, min_value, max_value )
select x.sensor_id
, x.min_value
, x.max_value
from tmp_high_low x
on duplicate key
update min_value=if( sensor_high_low.min_value > x.min_value, x.min_value, sensor_high_low.min_value )
, max_value=if( sensor_high_low.max_value < x.max_value, x.max_value, sensor_high_low.max_value );
-- Save changes
commit;
end;