Using a Custom Filter with ECDA Agent

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Reading Data into s-Server > Reading from Other Sources  > Reading Files in Remote Locations >

Using a Custom Filter with ECDA Agent

Previous pageReturn to chapter overviewNext page

At times, you may want to write your own filter for a particular type of file, such as a log file. Such a filter might exclude rows in the log file that fail to meet specific criteria. You can do so by calling an interface in the Extensible Common Data Agent JAR file called RowFilter, and implement this interface through the --filterClassname property of the ECDA Agent. See the topic Extensible Common Data Agent in this guide for more details.

To implement a custom filter, you need to create your own Java class as either a JAR file or .class file which implements the interface. Then, you need to update your classpath to add this new class. The class implements specific filtering logic, which is applied to each row in the file. For reach row, you return a Boolean value which determines whether the row is added to a stream or table in s-Server. The row is implemented as

This class needs to include two methods:

public void init(Properties tableProps) By default, this contains the options passed on the command line to the agent. They include all the values used to configure the agent including the rowType, the filterClassname, the streamName and everything else the agent uses to configure itself.  You can add your own custom properties that configure your custom filter. For example, you might write something like 'RECORD ("time" BIGINT, "ticker" VARCHAR(3), "shares" INTEGER, "price" REAL)'.
public boolean filterRow(Object[] row) This provides filtering logic, returning a Boolean value which determines whether or not the row is inserted. For example, you might write code indicating "if the first value in the array is less than 100, return TRUE, otherwise return FALSE." This filtering logic can reference the columns defined in tableProps. Row maps onto the set of columns parsed from the log file

This class would contain code along the following lines:

package <package.name>

import com.sqlstream.aspen.namespace.common.RowFilter;

Class <CustomersClassName> implements RowFilter

{

public void init(Properties tableProps){

--add properties containing rowtype value as a string representation of a rowtype.

public boolean filterRow(Object[] row) {

--add filtering code here that returns a Boolean value

   }

 

Implementing the Class

To implement the class for the RowFilter interface, you pass --filterClassname <classname> on the command line to the ECDA agent, where <classname> is the name of the class you created above. See the topic Extensible Common Data Agent in this guide for more details.

Doing so creates an instance of the filter class using the void arg constructor (no arguments).  The adapter then calls init() to get the properties of the stream/table it is filtering. By default, these properties are those defined for the agent, but you can also customize these through the tableProps property of the init method. It then calls filterRow for each row in the file. For each row, the agent either inserts it into the stream or table (TRUE) or discards it (FALSE), depending on the Boolean value returned by filterRow.

Example Implementation

For example, take a simple log file like the following:

07/Mar/2018:16:05:49, SQLS, 50, $50.00

07/Mar/2018:16:06:51, ORCL, 100, $84.00

07/Mar/2018:16:10:02, MSFT, 200, $78.00

07/Mar/2018:16:11:58, MSFT, 100, $78.00

 

With the Rowtype property defined as 'RECORD ("time" BIGINT, "ticker" VARCHAR(3), "shares" INTEGER, "price" REAL)', the parsed stream would appear like the following:

time

ticker

shares

price

07/Mar/2018:16:05:49

SQLS

50

$50.00

07/Mar/2018:16:06:51

ORCL

100

$84.00

07/Mar/2018:16:10:02

MSFT

200

$78.00

07/Mar/2018:16:11:58

MSFT

100

$78.00

 

You could then write code in the RowFilter object to insert only records with the ticker MSFT:

Class <CustomClassName> implements RowFilter {

....

public static int tickerOrdinal;

 

public void init(Properties tableProps) {

   -- code that sets the tickerOrdinal using tableProps

   if (....) {

       tickerOrdinal = 2;

   }

}

   public boolean filterRow(Object[] row) {

       if (row[tickerOrdinal].equals("MSFT")) {

           return true;      -- insert rows for MSFT to the stream.

       }

       return false;   -- discard all other rows

   }

}

 

Interface Code

This is the interface to implement:

/**

$Id: //depot/aspen/doc/booksource/IntGuideSource/Topics/int_Using_a_Custom_Filter_with_ECD.xml#5 $

**/

public interface RowFilter

{

   public void init(Properties tableProps) throws Exception;

-- true means send the row, false means filter it out

   public boolean filterRow(Object[] row);

}