/Documentation/Examples

qos Example

This example shows how flexible can be a system based on rule based engine. The main idea of this example is to implement a simulation Qaulity Of Service (QoS) monitoring system. We assume we can run two type of tasks in parallel - GBUILD and GUNIT. However, we should control number of processes in memory at the same time. All forked processes stay in queue untill system is capable running them. It is a simulation and a simple Swing application helps to manage limits for each task as well as show current queue. Please note how easy to decouple logic of QoS environment from code serving GUI and logging.

Sources:

TaskQueueManager.java

This class instantiates a rule session which code is written on Jython. MetaRBE uses a special xml dialect RSXML to describe rule session and rule actions in scripting languages. At this moment Jython is only choice. It is possible to have partially written actions in Java and in RSXML as it is demonstrated in this example

			package net.sf.metarbe.example.qos;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import javax.xml.parsers.ParserConfigurationException;

import net.sf.metarbe.ActionEvent;
import net.sf.metarbe.Rule;
import net.sf.metarbe.RuleAction;
import net.sf.metarbe.RuleActivationListener;
import net.sf.metarbe.RuleSemanticModel;
import net.sf.metarbe.RuleSession;
import net.sf.metarbe.RuleSessionEvent;
import net.sf.metarbe.RuleSetManager;
import net.sf.metarbe.impl.scripting.ScriptRuleSetManagerImpl;
import net.sf.metarbe.rsxml.RuleSetXmlParser;
import net.sf.metarbe.scripting.IncompatibIeInterpreterException;
import net.sf.metarbe.scripting.ScriptExecutionFailedException;
import net.sf.metarbe.scripting.ScriptRuleSession;

import org.python.core.PyDictionary;
import org.python.core.PyInteger;
import org.python.core.PyString;
import org.xml.sax.SAXException;

public class TaskQueueManager {
	private ScriptRuleSetManagerImpl ruleSetManager;
	private List <TaskListener> listeners = new LinkedList<TaskListener>();
	private RuleActivationLogTableModel logTableModel;
	
	public TaskQueueManager(String aRuleSetXml) {
		ruleSetManager = new ScriptRuleSetManagerImpl();
		try {
			new RuleSetXmlParser(ruleSetManager).parse(TaskQueueManager.class.getResourceAsStream(aRuleSetXml), false);
			
			RuleSetManager innerRuleSetManager = ruleSetManager.getRuleSetManager();
			
			RuleSemanticModel semanticModel = ruleSetManager.getRuleSession().getRuleSemanticModel();
			innerRuleSetManager.createRule("launch task", 
				semanticModel.getRuleContext("task:started"), 
				new RuleAction(){
					public void onActionEvent(ActionEvent evt) {
						Task task = (Task)evt.getRuleSessionEvent().getMatchedValue();
						fireTaskStartedEvent ( task );
					}
				});
			
			innerRuleSetManager.createRule("task has finished", 
					semanticModel.getRuleContext("task:finished"), 
					new RuleAction(){
						public void onActionEvent(ActionEvent evt) {
							Task task = (Task)evt.getRuleSessionEvent().getMatchedValue();
							fireTaskFinishedEvent ( task );
						}
					});
			
			ruleSetManager.getGlobalScript().execute(ruleSetManager.getScriptInterpreter());
			RuleSession rs = ruleSetManager.getRuleSession();
			
			rs.getRuleActivator().addRuleActivationListener(new RuleActivationListener(){
				public void apply(RuleSessionEvent anEvent, Rule aRule) {
					if(logTableModel!=null){
						logTableModel.addLogRecord(anEvent, aRule);
					}
				}
			});
			
			Map limits = new HashMap();
			limits.put("gbuild", getLimit(GBuild.class));
			limits.put("gunit", getLimit(GUnit.class));
			rs.bindParameter("limits", limits);
		} catch (ParserConfigurationException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (SAXException e) {
			e.printStackTrace();
		} catch (IncompatibIeInterpreterException e) {
			e.printStackTrace();
		} catch (ScriptExecutionFailedException e) {
			e.printStackTrace();
		}
	}

	public void setLogTableModel(RuleActivationLogTableModel logTableModel) {
		this.logTableModel = logTableModel;
	}
	
	public RuleActivationLogTableModel getLogTableModel() {
		return logTableModel;
	}
	
	public void setLimit(Class aTaskClass, int limit){
		System.err.println(ruleSetManager.getRuleSession().getValue("limits"));
		if(ruleSetManager.getRuleSession().getValue("limits")==null) return;
		if(aTaskClass.equals(GBuild.class)){
			((Map)ruleSetManager.getRuleSession().getValue("limits")).put("gbuild",limit);
		}else{
			((Map)ruleSetManager.getRuleSession().getValue("limits")).put("gunit",limit);
		}
		System.err.println(ruleSetManager.getRuleSession().getValue("limits"));
	}
	
	public int getLimit(Class aTaskClass){
		if(ruleSetManager.getRuleSession().getValue("limits")==null){
			PyDictionary d = (PyDictionary)((ScriptRuleSession)ruleSetManager.getRuleSession()).getScriptInterpreter().get("limits");
			if(aTaskClass.equals(GBuild.class)){
				return ((PyInteger)d.get(new PyString("gbuild"))).getValue();
			}else{
				return ((PyInteger)d.get(new PyString("gunit"))).getValue();
			}
		}else
			if(aTaskClass.equals(GBuild.class)){
				return (Integer)((Map)ruleSetManager.getRuleSession().getValue("limits")).get("gbuild");
			}else{
				return (Integer)((Map)ruleSetManager.getRuleSession().getValue("limits")).get("gunit");
			}
	}
	
	public void addTaskListener(TaskListener aTaskListener){
		synchronized (listeners) {
			listeners.add ( aTaskListener );
		}
	}
	
	public void removeTaskListener(TaskListener aTaskListener){
		synchronized (listeners) {
			listeners.remove ( aTaskListener );
		}
	}
	
	protected void fireTaskStartedEvent(Task aTask){
		for (Iterator listenerIt = listeners.iterator(); listenerIt.hasNext();) {
			TaskListener listener = (TaskListener) listenerIt.next();
			listener.taskStarted(aTask);
		}
	}
	
	protected void fireTaskFinishedEvent(Task aTask){
		for (Iterator listenerIt = listeners.iterator(); listenerIt.hasNext();) {
			TaskListener listener = (TaskListener) listenerIt.next();
			listener.taskFinished(aTask);
		}
	}
	
	public void manageTask(String aTask){
		ruleSetManager.getRuleSession().executeRules ( aTask );
	}
	
	public void manageTask(Task aTask){
		ruleSetManager.getRuleSession().executeRules ( aTask );
	}
	
	public ScriptRuleSetManagerImpl getRuleSetManager() {
		return ruleSetManager;
	}
	
	public void executeQueue(){
		manageTask("gbuild");
		manageTask("gunit");
		manageTask("gunit");
		manageTask("gunit");
		manageTask("gunit");
		manageTask("gbuild");
		manageTask("gunit");
		manageTask("gunit");
	}
	
	public static void main(String[] args) {
		new TaskQueueManager("qosrules.xml").executeQueue();
	}
}

		

qosrules_ui.xml

This definition is loaded by TaskQueueManager and contains code of session and few actions

			<?xml version="1.0" ?>
<ruleset language="jython" description="Describes rules for QoS aware system">
    <init>
                <global>
                        <script>
                            GBUILD_LIMIT = 2
                            GUNIT_LIMIT = 3
                            limits={ 'gbuild' : GBUILD_LIMIT, 'gunit' : GUNIT_LIMIT }
                            activeTasks={'gbuild':0,'gunit':0}
                            queue = []
                            def start_task(t):
                                activeTasks[t.name]+=1
                                print 'task %s is started' % t
                            def finish_task(t):
                                activeTasks[t.name]-=1
                                print 'task %s is finished' % t
                            def init_limits(s):
                                limits['gbuild'] = s.getValue('limits').get('gbuild')
                                limits['gunit'] = s.getValue('limits').get('gunit')
                        </script>
                </global>
                
            <layout>
                <context name="task"/>
                <context name="queue"/>
                <state name="selected"/>
                <state name="started"/>
                <state name="finished"/>
                <state name="discovered"/>
                <state name="checking"/>
            </layout>        
                                
        <events>
                <event context="task:selected,started,finished,discovered" session="s">
                        <script return="r">r = ScriptSessionEvent(context, s.getValue('source'), s.getValue('task'))</script>
                </event>
                 <event context="queue:checking" session="s">
                        <script return="r">r = ScriptSessionEvent(context, s.getValue('source'), s.getValue('task'))</script>
                </event>
        </events>
    </init>
        
    <session name="s" source="t">       
            <script><![CDATA[
            s.bindParameter('source',t)
            
            init_limits ( s )
            
            if t.name!='checkQueue':
                s.bindParameter('task',t)
            
            if t.state=='waiting':
              emit ( 'queue:checking' )
              if t.name!='checkQueue':
                  emit ( 'task:discovered' )
              rq = []
              for task in queue:
                  s.bindParameter('task',task)
                  n = activeTasks[task.name]
                  emit ( 'task:selected' )
                  if activeTasks[task.name]>n:
                      rq.append ( task )
                      emit ( 'task:started' )
              for t in rq:
                queue.remove ( t )
              rq = None    
            else:
                finish_task ( t )
                emit ( 'task:finished' )
           ]]></script>        
        </session>

     <rule name="Run task">
        <clause context="task:selected" data="task">
            <script return="r">
                r = activeTasks[task.name] &lt; limits[task.name]
            </script>
        </clause>    
        <action event="e">
                <script>
                   task = e.getRuleSessionEvent().getMatchedValue()
                   start_task ( task ) 
                </script>    
        </action>
     </rule>
    
    <rule name="Put task in a queue">
        <clause context="task:discovered"/>
        <action event="e">
            <script>
                task = e.getRuleSessionEvent().getMatchedValue()
                queue.append ( task )
             </script>    
        </action>
    </rule>
</ruleset>