Sample Code for Source Plugin

<< Click to Display Table of Contents >>

Navigation:  Integrating Blaze with Other Systems > SQLstream Software Development Kit (SDK) > Writing an Extensible Common Data Framework Plugin >

Sample Code for Source Plugin

Previous pageReturn to chapter overviewNext page

The following code creates a source plugin for Kafka. It is useful as an example of how to write a source plugin The topic Reading from Kafka describes the functionality of this plugin.

Note: This sample code works for "message-based" sources, which send out data in messages or buffers with breaks. For sources without such breaks, be sure to make appropriate changes to the segments indicated in the topic Writing an Extensible Common Data Framework Plugin.

/*

// Aspen dataflow server

// Copyright (C) 2014-2017 SQLstream, Inc.

*/

 

package com.sqlstream.aspen.namespace.kafka;

 

import java.nio.ByteBuffer;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.logging.Level;

import java.util.logging.Logger;

 

import com.sqlstream.aspen.namespace.common.AbstractDataInputSource;

import com.sqlstream.aspen.namespace.common.BuffersInputStream;

import com.sqlstream.aspen.namespace.common.CommonDataConstants;

import com.sqlstream.aspen.namespace.common.OptionsUtils;

import com.sqlstream.aspen.namespace.common.RowInserter;

import com.sqlstream.aspen.namespace.common.SourceData;

import com.sqlstream.aspen.namespace.common.SourceDataFactory;

 

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

 

/**

*

* @author Hunter Payne

* @version $Id: //depot/aspen/doc/booksource/IntGuideSource/Topics/int_Sample_Code_for_Source_Plugin.xml#10 $

**/

public class KafkaInputSource extends AbstractDataInputSource

{

   protected final static String [] requiredParameters = { "TOPIC" };

   protected final static String [] optionalParameters = {

       "STARTING_TIME",

       "STARTING_OFFSET",

       "PARTITION_OFFSET_QUERY",

       "SEED_BROKERS",

       "PORT",

       "PARTITION",

       "BUFFER_SIZE",

       "FETCH_SIZE",

       "MAX_WAIT",

       "MIN_BYTES",

       "BACKOFF_WAIT",

       "CLIENT_ID",

       "METRICS_PER_PARTITION",

       "REQUIRE_N_READERS_BEFORE_START",

       CommonDataConstants.PARSER_QUEUE_SIZE,

       CommonDataConstants.OPTIONS_QUERY

   };

 

   protected final Properties initProps = new Properties();

 

   protected final static Logger tracer =

       Logger.getLogger("com.sqlstream.aspen.namespace.kafka");

 

   protected ExecutorService service;

 

   protected volatile boolean canceled;

   protected int runningReaderCount;

 

   protected int port;

   protected String topic;

   protected String partitionString;

   protected int firstPartition;

   protected int lastPartition;

   protected long startingTime;

   protected long startingOffset;

   protected int consumerBufferSize;

   protected int fetchSize;

   protected int maxWait;

   protected int minBytes;

   protected int backoffWait;

   protected String seedBrokersString;

   protected String offsetsQuery;

   protected String clientId;

   protected boolean metricsPerPartition;

   protected boolean isOpen;

   protected int requireNReadersBeforeStart;

   protected static int runningKafkaInputSources;

   protected final static Object runningReadersLock = new Object();

 

   protected KafkaSourceDataFactory sourceDataFactory;

 

   public KafkaInputSource()

   {

       //parsedRows = new ArrayBlockingQueue<Object[]>(4096);

       canceled = false;

 

       service = Executors.newSingleThreadExecutor();

   }

 

    static class KafkaSourceData extends SourceData {

        final int offsetColumn;

        final int partitionColumn;

 

        long offset;

        int partition;

 

       public KafkaSourceData(KafkaSourceDataFactory factory, int[] columnNumbers)

        {

            super(factory, columnNumbers);

            offsetColumn = columnNumbers[0];

            partitionColumn = columnNumbers[1];

        }

 

         @Override

        public String locationDescription()

        {

            return "Offset " + offset;

        }

 

        @Override

        public void updateLocation(RowInserter inserter)

            throws SQLException

        {

            if (partitionColumn != -1) {

                inserter.setInt(partitionColumn, partition);

            }

            if (offsetColumn != -1) {

                inserter.setLong(offsetColumn, offset);

            }

        }

    }

   

    class KafkaSourceDataFactory extends SourceDataFactory<KafkaSourceData> {

 

        protected KafkaSourceDataFactory(int numBuffers, int numPartitions)

                throws SQLException

        {

            super(new String[] {"OFFSET", "PARTITION"}, context, numBuffers, 262144, true);

        }

 

        @Override

        public KafkaSourceData newBufferInfo()

        {

            return new KafkaSourceData(this, columnNumbers);

        }

    }

 

   long parseStartingTime(String startingTimeString) {

       if (startingTimeString.equalsIgnoreCase("LATEST")) {

           return kafka.api.OffsetRequest.LatestTime();

       } else if (startingTimeString.equalsIgnoreCase("EARLIEST")) {

           return kafka.api.OffsetRequest.EarliestTime();

       }

       return Long.parseLong(startingTimeString);

   }

 

   public void init(Properties props) throws Exception

   {

       OptionsUtils.filterInitProperties(

           props, requiredParameters, optionalParameters, initProps);

       Properties dynamic;

       try {

           Connection conn = context.getQueryConnection();

           dynamic = OptionsUtils.addDynamicOptions(

               initProps, conn);

       } catch (Exception e) {

           // TODO Auto-generated catch block

           dynamic = initProps;

       }

       seedBrokersString = dynamic.getProperty("SEED_BROKERS", "localhost");

       port = Integer.parseInt(dynamic.getProperty("PORT", "9092"));

       topic = dynamic.getProperty("TOPIC");

       partitionString = dynamic.getProperty("PARTITION");

       startingTime = parseStartingTime(dynamic.getProperty(

           "STARTING_TIME", "LATEST"));

       startingOffset = Long.parseLong(dynamic.getProperty(

           "STARTING_OFFSET", "-1")) ;

       offsetsQuery = dynamic.getProperty(

           "PARTITION_OFFSET_QUERY") ;

       consumerBufferSize = Integer.parseInt(dynamic.getProperty(

           "BUFFER_SIZE", "1048576")) ;

       fetchSize = Integer.parseInt(dynamic.getProperty(

           "FETCH_SIZE", "1000000")) ;

       maxWait = Integer.parseInt(dynamic.getProperty(

           "MAX_WAIT", "500")) ;

       minBytes = Integer.parseInt(dynamic.getProperty(

           "MIN_BYTES", "64")) ;

       backoffWait = Integer.parseInt(dynamic.getProperty(

           "BACKOFF_WAIT", "500")) ;

       clientId = dynamic.getProperty("CLIENT_ID");

       metricsPerPartition = "true".equalsIgnoreCase(dynamic.getProperty("METRICS_PER_PARTITION"));

       if (clientId == null || clientId.trim().isEmpty()) {

           clientId = "client_" + topic;

           if (!metricsPerPartition && partitionString != null && !partitionString.isEmpty()) {

               clientId += '_' + partitionString;

           }

       }

       if (partitionString == null || partitionString.trim().length() == 0) {

           firstPartition = 0;

           lastPartition = -1;

       } else {

           String [] partitionEnds = partitionString.split("-");

           firstPartition = Integer.parseInt(partitionEnds[0]);

           if (partitionEnds.length == 1) {

               lastPartition = firstPartition;

           } else {

               lastPartition = Integer.parseInt(partitionEnds[1]);

           }

           if (partitionEnds.length > 2 || firstPartition > lastPartition) {

               throw new SQLException("Invalid partition range " + partitionString);

           }

       }

       int numPartitions = lastPartition - firstPartition + 1;

       int numBuffers = Integer.parseInt(props.getProperty(

           CommonDataConstants.PARSER_QUEUE_SIZE,

           numPartitions <= 1

               ? CommonDataConstants.DEFAULT_PARSER_QUEUE_SIZE

               : Integer.toString(numPartitions + 1)));

       if (numPartitions == 0) {

           numPartitions = 256;

       }

       sourceDataFactory = new KafkaSourceDataFactory(numBuffers, numPartitions);

       requireNReadersBeforeStart = Integer.parseInt(dynamic.getProperty("REQUIRE_N_READERS_BEFORE_START", "0"));

   }

 

   public Properties getInitProperties()

   {

       return initProps;

   }

 

   public static long getLastOffset(

       SimpleConsumer consumer, String topic, int partition,

       long whichTime, String clientName)

       throws SQLException

   {

       TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

       Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =

           new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

       requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

       kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

           requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

       OffsetResponse response = consumer.getOffsetsBefore(request);

 

       if (response.hasError()) {

           tracer.warning(

               "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));

           return 0;

       }

       long[] offsets = response.offsets(topic, partition);

       return offsets[0];

   }

 

   public static int findMaxPartition(

       int port, String topic,

       List<String> replicaBrokers)

       throws SQLException

   {

       int maxPartition = 0;

       for (String seed : replicaBrokers) {

           SimpleConsumer consumer = null;

           try {

               consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");

               List<String> topics = Collections.singletonList(topic);

               TopicMetadataRequest req = new TopicMetadataRequest(topics);

               kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 

               List<TopicMetadata> metaData = resp.topicsMetadata();

               for (TopicMetadata item : metaData) {

                   for (PartitionMetadata part : item.partitionsMetadata()) {

                       if (part.partitionId() > maxPartition) {

                           maxPartition = part.partitionId();

                       }

                   }

               }

           } catch (Exception e) {

               throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic

                       + ", " + "] Reason: " + e, e);

           } finally {

               if (consumer != null) consumer.close();

           }

       }

       return maxPartition;

   }

 

   public static PartitionMetadata findLeader(

       int port, String topic, int partition,

       List<String> replicaBrokers)

   throws SQLException

   {

       PartitionMetadata returnMetaData = null;

       loop:

       for (String seed : replicaBrokers) {

           SimpleConsumer consumer = null;

           try {

               consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");

               List<String> topics = Collections.singletonList(topic);

               TopicMetadataRequest req = new TopicMetadataRequest(topics);

               kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 

               List<TopicMetadata> metaData = resp.topicsMetadata();

               for (TopicMetadata item : metaData) {

                   for (PartitionMetadata part : item.partitionsMetadata()) {

                       if (part.partitionId() == partition) {

                           returnMetaData = part;

                           break loop;

                       }

                   }

               }

           } catch (Exception e) {

               throw new SQLException("Error communicating with Broker [" + seed + "] to find Leader for [" + topic

                       + ", " + partition + "] Reason: " + e, e);

           } finally {

               if (consumer != null) consumer.close();

           }

       }

       if (returnMetaData != null) {

           replicaBrokers.clear();

           for (kafka.cluster.Broker replica : returnMetaData.replicas()) {

               replicaBrokers.add(replica.host());

           }

       }

       return returnMetaData;

   }

 

   public static String findNewLeader(

       String a_oldLeader,

       int port, String topic, int partition, List<String> replicaBrokers)

       throws SQLException {

       for (int i = 0; i < 3; i++) {

           boolean goToSleep = false;

           PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);

           if (metadata == null) {

               goToSleep = true;

           } else if (metadata.leader() == null) {

               goToSleep = true;

           } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {

               // first time through if the leader hasn't changed give ZooKeeper a second to recover

               // second time, assume the broker did recover before failover, or it was a non-Broker issue

               //

               goToSleep = true;

           } else {

               return metadata.leader().host();

           }

           if (goToSleep) {

               try {

                   Thread.sleep(1000);

               } catch (InterruptedException ie) {

               }

           }

       }

       throw new SQLException("Unable to find new leader after Broker failure. Exiting");

   }

 

   public static String findLeadBroker(

       int port, String topic, int partition, List<String> replicaBrokers)

       throws SQLException

   {

       // find the meta data about the topic and partition we are interested in

       //

       PartitionMetadata metadata = findLeader(port, topic, partition, replicaBrokers);

       if (metadata == null) {

           tracer.warning("Can't find metadata for Topic and Partition. Exiting");

           return null;

       }

       if (metadata.leader() == null) {

          tracer.warning("Can't find Leader for Topic and Partition. Exiting");

           return null;

       }

       return metadata.leader().host();

   }

 

 

   static void kafkaInputSourceStarted() {

       synchronized (runningReadersLock) {

           runningKafkaInputSources++;

           runningReadersLock.notifyAll();

       }

   }

   static void kafkaInputSourceStopped() {

       synchronized (runningReadersLock) {

           runningKafkaInputSources--;

       }

   }

 

   class Impl implements Runnable {

       //int rowtimeColumn, int offsetColumn, int payloadColumn,

       final String topic;

       final int partition;

       String leadBroker;

     

       final int port;

       final String clientName;

       final BuffersInputStream inputStream;

     

 

       private List<String> replicaBrokers = new ArrayList<>();

     

       private Impl(

           String topic, int partition, long startingOffset, int port,

           List<String> seedBrokers)

           throws SQLException

       {

           this.topic = topic;

           this.partition = partition;

           this.port = port;

           if (metricsPerPartition) {

               this.clientName = clientId + '_' + partition;

           } else {

               this.clientName = clientId;

           }

           replicaBrokers.clear();

           replicaBrokers.addAll(seedBrokers);

           this.leadBroker = findLeadBroker(port, topic, partition, replicaBrokers);

           this.inputStream = context.getInputStream();

       }

 

 

   public void run()

   {

       tracer.fine("starting kafka");

       Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

       kafkaInputSourceStarted();

       if (requireNReadersBeforeStart > 0) {

           synchronized (runningReadersLock) {

               while (requireNReadersBeforeStart > runningKafkaInputSources && !canceled) {

                   tracer.config("waiting for " + (requireNReadersBeforeStart - runningKafkaInputSources) + "more readers to start");

                   try {

                       runningReadersLock.wait(1000);

                   } catch (InterruptedException e) {

                       Thread.currentThread().interrupt();

                   }

               }

               tracer.config("requred number of readers running");

           }

       }

       try (BuffersInputStream.Connection<KafkaSourceData> connection = inputStream.getConnection()) {

           long readOffset = startingOffset;

           int retries = 0;

           while (!canceled) {

               SimpleConsumer consumer = null;

               try {

                   consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);

                   if (readOffset < 0) {

                       readOffset = getLastOffset(consumer,topic, partition, startingTime, clientName);

                   }

                   tracer.fine("starting with offset " + readOffset);

                   int numErrors = 0;

                   while (!canceled) {

                       if (consumer == null) {

                           consumer = new SimpleConsumer(leadBroker, port, 1000, consumerBufferSize, clientName);

                       }

                       FetchRequest req = new FetchRequestBuilder()

                               .clientId(clientName)

                               .addFetch(topic, partition, readOffset, fetchSize)

                               .maxWait(maxWait)

                               .minBytes(minBytes)

                               .build();

                       //tracer.finest("about to fetch");

                       FetchResponse fetchResponse = consumer.fetch(req);

                       //tracer.finest("back from fetch");

     

                       if (fetchResponse.hasError()) {

                           numErrors++;

                           // Something went wrong!

                           short code = fetchResponse.errorCode(topic, partition);

                           tracer.warning("Error fetching data from the Broker:" + leadBroker + " Reason: " + code + " topic: " + topic + " partition: " + partition);

                           if (numErrors > 10) {

                               canceled = true;

                               tracer.severe("exiting kafka reader topic: " + topic + " partition: " + partitionString);

                               return;

                           }

                           if (code == ErrorMapping.OffsetOutOfRangeCode())  {

                               // We asked for an invalid offset. For simple case ask for the last element to reset

                               readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);

                               continue;

                           }

                           consumer.close();

                           consumer = null;

                           if (numErrors > 1) {

                               Thread.sleep((2^numErrors));

                           }

                           leadBroker = findNewLeader(

                               leadBroker, port, topic, partition, replicaBrokers);

                           continue;

                       }

                       numErrors = 0;

                       int numRead = 0;

                       for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {

                           numRead++;

                           long currentOffset = messageAndOffset.offset();

                           if (currentOffset < readOffset) {

                               tracer.warning("Found an old offset: " + currentOffset + " Expecting: " + readOffset + " topic: " + topic + " partition: " + partition);

                               continue;

                           }

                           long nextOffset = messageAndOffset.nextOffset();

                           if (nextOffset > readOffset) {

                               readOffset = nextOffset;

                           }

                           ByteBuffer payload = messageAndOffset.message().payload();

     

                           KafkaSourceData bufferInfo = connection.getCurrentWithSize(payload.remaining());

                           if (bufferInfo == null) {

                               canceled = true;

                               break;

                           }

                           ByteBuffer buf = bufferInfo.getBuffer();

                           payload.get(buf.array(), buf.position(), payload.remaining());

                           if (tracer.isLoggable(Level.FINE)) {

                               tracer.fine("offset=" + currentOffset + " nextOffset=" + readOffset + " got msg ");

                           }

                           if (canceled) {

                               break;

                           }

                           bufferInfo.offset = currentOffset;

                           bufferInfo.partition = partition;

                           connection.submit();

                           retries = 0;

                       }

                       if (numRead == 0) {

                           connection.flush();

                           Thread.sleep(backoffWait);

                       }

                   }

               } catch (Throwable e) {

                   tracer.log(Level.SEVERE, "Error during kafka processing topic: " + topic + " partition: " + partition, e);

                   if (retries > 1) {

                       Thread.sleep((2^retries));

                   }

                   leadBroker = findNewLeader(

                       leadBroker, port, topic, partition, replicaBrokers);

               } finally {

                   if (consumer != null) {

                       try {

                           consumer.close();

                       } catch (Throwable e) {

                           tracer.log(Level.SEVERE, "Error during while closing consumer", e);

                       }

                       consumer = null;

                   }

                   if (retries > 10) {

                       tracer.log(Level.SEVERE, "Error during kafka processing - exiting topic: " + topic + " partition: " + partitionString);

                       canceled = true;

                       break;

                   }

               }

           }

       } catch (Throwable e) {

           tracer.log(Level.SEVERE, "Exception in kafka topic: " + topic + " partition: " + partition, e);

       } finally {

           tracer.config("exiting kafka reader topic: " + topic + " partition: " + partition);

           canceled = true;

           kafkaInputSourceStopped();

           synchronized (KafkaInputSource.this) {

               runningReaderCount --;

               if (runningReaderCount == 0) {

                   inputStream.sendEOS();

               }

           }

       }

   }

   }

 

   public void closeAllocation()

   {

       tracer.fine("closing");

       canceled = true;

   }

 

   @Override

   public SourceDataFactory<?> getSourceDataFactory()

       throws SQLException

   {

       return sourceDataFactory;

   }

 

   @Override

   public void start()

       throws SQLException

   {

       Map<Integer, Long> partitionOffsets = new HashMap<>();

       if (offsetsQuery != null && offsetsQuery.trim().length() != 0) {

           try (PreparedStatement pstmt = context.getQueryConnection().prepareStatement(offsetsQuery)) {

               try (ResultSet rs = pstmt.executeQuery()) {

                   while (rs.next()) {

                       int partition = rs.getInt("PARTITION");

                       long offset = rs.getLong("OFFSET");

                       partitionOffsets.put(partition, offset);

                   }

               }

           }

       }

       List<String> seedBrokers =

           Arrays.asList(seedBrokersString.split(","));

       if (lastPartition == -1) {

           lastPartition = findMaxPartition(port, topic, seedBrokers);

       }

       runningReaderCount = lastPartition - firstPartition + 1;

       for (int partition = firstPartition; partition <= lastPartition; partition++) {

           long partitionOffset = partitionOffsets.containsKey(partition)

               ? partitionOffsets.get(partition) : startingOffset;

           Impl r = new Impl(topic, partition, partitionOffset, port, seedBrokers);

           Thread t = new Thread(r);

           t.setName("kafkaReader." + topic + '.' + partition);

           t.start();

       }

   }

}

 

// End KafkaInputSource.java