Using a Custom Filter to Eliminate Invalid Rows

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > Reading Data into s-Server > Reading from Other Sources  > Reading Files in Remote Locations >

Using a Custom Filter to Eliminate Invalid Rows

Previous pageReturn to chapter overviewNext page

You can also use the Custom Filter option of the ECDA agent to filter out invalid rows.

Note: To filter duplicate rows, you need to use a SELECT DISTINCT on the server side. See the topic SELECT ALL and SELECT DISTINCT in the Streaming SQL Reference Guide.

To do so, you would write code along lines below. This code:

1.Creates a string called columns which is equivalent to INTEGER_COLUMN_INDICES from the properties that the agent reads. See the topic Extensible Common Data Agent for more details on agent properties.
2.Uses the java string tokenizer class to break up columns in a row. If, for example, if there are 2 columns you want to check, which are the 2nd and 4th columns, you set "1,3" as the INTEGER_COLUMN_INDICES. StringTokenizer breaks up 1,3 into 1 and 3.
3.Uses the filterRow object to check the columns indicated in the INTEGER_COLUMN_INDICES to make sure they contain numbers. Rows that do not contain numbers are discarded.

Code

import java.util.logging.Level;

import java.util.logging.Logger;

 

import com.sqlstream.aspen.namespace.common.RowFilter;

 

public class IntegerValidationFilter implements RowFilter

{

   public static final String COLUMNS_TO_PARSE =

"INTEGER_COLUMN_INDICES";

 

   protected static final Logger tracer =

       Logger.getLogger(RowFilter.class.getName());

 

   protected List<Integer> cols;

 

   public void init(Properties tableProps) throws Exception

   {

       String columns = tableProps.getProperty(COLUMNS_TO_PARSE);

 

       if (columns != null) {

 

           StringTokenizer st = new StringTokenizer(columns, ",");

           cols = new ArrayList<Integer>();

 

           while (st.hasMoreTokens()) {

 

               cols.add(Integer.parseInt(st.nextToken()));

           }

       }

   }

 

   public boolean filterRow(Object[] row)

   {

       if (cols != null) {

 

           for (Integer col : cols) {

 

               try {

                   Integer.parseInt((String)row[col.intValue()]);

 

               } catch (NumberFormatException nfe) {

 

                   tracer.log(Level.INFO, nfe.getMessage(), nfe);

                   return false;

               }

           }

       }

 

       return true;

   }

}