Spark Streaming is one of the most widely used frameworks for real time processing in the world with Apache Flink, Apache Storm and Kafka Streams. However, when compared to the others, Spark Streaming has more performance problems and its process is through time windows instead of event by event, resulting in delay.

After developing several real-time projects with Spark and Apache Kafka as input data, in Stratio we have found that many of these performance problems come from not being aware of key details. For example, a good configuration, installation, and development may make the application 10 to 20 times faster.

Below you will find the key aspects to optimize application implementation using Spark Streaming:

Kafka direct implementation

In the Spark API – 2.X version there are two implementations available to receive data from Kafka: one, based in a compatible receiver with kafka 0.8.x, and another, called Direct, compatible only with versions 0.10.X and superiors. The implementation based on Receiver is less parallelizing and not compatible with TLS security. In order to parallelize the process you need to create several DStreams which read differents topics. Given that the data from kafka is only received by one executor, this data will be stored in the Block Manager of Spark, and then will be used one at the time in the transformations made by the executors. To obtain HA of the streaming application the checkpointing must be activated.

    val kafkaStream = KafkaUtils.createStream(
         streamingContext,
         [ZK quorum],
         [consumer group id],
         [per-topic number of Kafka partitions to consume]
    )

The direct implementation is completely parallelizable since every Spark executor is able to receive data from Kafka. It doesn’t matter if the information comes from one or more topics. This is compatible with TLS and includes its own management of the offsets, that is to say, you don’t need to activate the checkpointing.

     val kafkaStream = KafkaUtils.createDirectStream[String, String]
(ssc,locationStrategy, consumerStrategy)

Because of this, and as shown below, it is better to use the implementation based on direct.

The offsets management

The offsets indicate where the groupld assigned to the Spark consumer is reading from. This is very important because it guarantees the HA during the streaming process and avoids losing data if there is an error.

In the  Kafka version 0.10.x and the Spark 2.x version the offsets management is included, which uses Kafka to manage it through maintenance topics. To obtain HA during the Streaming process and avoid losing data, there are three options when using direct implementation:

  1. Activating the checkpointing during the creation of the Spark context can cause performance problems. Let’s see:
          val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
  2. Making our own implementation keeping the offsets in zookeeper or in Hbase.
  3. Using the new API to manage the offsets is not necessary neither to make the implementation nor to activate the checkpoint when associated with low performance moments. With the following code lines you can keep the offsets in Kafka at any time because the data has been processed correctly:
  stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      
      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }

Partitions and computing distribution

Making partitions in Kafka over the topics which are going to be consumed is very important, hence this will allow you to parallelize the reception of the events in different Spark executors. Creating a relation partition-executor will make every executor receive a chunk of data from the Kafka topic. Below we can see how to build a topic with three partitions and the configuration needed to run the Spark job with three executors using two cores each and 2GB of RAM.

./bin/kafka-topics.sh --create --replication-factor 1 --partitions 3 --zookeeper zookeeper:2181 
--topic topicname

 conf.set("spark.cores.max", 6)
 conf.set("spark.executor.cores", 2)
 conf.set("spark.executor.memory", 2GB)

To know more, this is an interesting article about topics/partitions in a kafka cluster.

Spark Streaming configurations

There are three configurations that have a direct impact on the streaming application, namely:


1. Spark locality wait

Optimize the executor election when Spark compute one task, this have direct impact into the Scheduling Delay.

     conf.set("spark.locality.wait", 100)


2. Spark Streaming BackPressure

Activating the BackPressure resulted in the stable performance of a streaming application. You should activate it in Spark production environments with Kafka:

    conf.set("spark.streaming.backpressure.enabled", true)

3. Spark Kafka consumer poll timeout

The most important configuration parameter assigned to the Kafka consumer is through the SparkContext. The variable is the timeout in the .poll(timeout) function. This function is very delicate because it is the one which returns the records to Spark requested by Kafka by a .seek. If after two attempts there is a timeout, the Task is FAILED and sent to another Spark executor causing a delay in the streaming window.

If the timeout is too low and the Kafka brokers need more time to answer there will be many “TASK FAILED”s, which causes a delay in the preparation of the assigned tasks into the Executors. However, if this timeout is too high the Spark executor wastes a lot of time doing nothing and produces large delays (pollTimeout + task scheduling in the new executor).

The default value of the Spark version 2.x. was 512ms, which could be a good start. If the Spark jobs cause many “TASK FAILED”s you would need to raise that value and investigate why the Kafka brokers took so long to send the records to the poll.

    conf.set("spark.streaming.kafka.consumer.poll.ms", 512)

A problem that can result from this delay in the “poll” is that Spark uses the management of the offsets to guarantee the right reception of the events one by one. This means that it is not using the poll function to receive a list of events but goes one at a time. For example, if in a window it has to read the offset 100 to 200 it is going to generate 100 calls to the poll function instead of just one indicating that it wants to read the offset 100 to 200.

Spark Kafka consumer configurations

There are several important parameters when configuring the Kafka consumer. Let’s see some of them with their default values:

   session.timeout.ms, 10000 in Kafka +0.10.1 and 30000 in Kafka -0.10.0

This can be incremented in order to avoid generating consumers losses, only if it is in the range of the properties group.min.session.timeout.ms and group.max.session.timeout.ms

   heartbeat.interval.ms, 3000

It must be ⅓ of the time of session.timeout.ms and you should increase it as incrementing session.timeout.ms maintaining the proportion.

   max.poll.interval.ms, 300000

The maximum time between polls before generating the consumer loss.

   request.timeout.ms, 305000

It must be bigger than max.poll.interval.ms

connections.max.idle.ms, 540000

When the connection is inactive, it would be closed after these miliseconds

Parameters to be configured in Kafka brokers:

num.network.threads, 3

You should increment the number of threads in production systems with several partitions per topic.

Quoting directly from the Spark documentation:

“If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms”.

Serialization

The serialization of the events sent to Kafka is very important. They reduce the traffic between the data of the internal net and the Kafka brokers and also between these and the Spark executors. For this it is recommended to use Avro.

The Kafka producers must send the data to the Kafka cluster serialized by Avro, having include the scheme in the event. This can be used as a support for the data that changes its morphology, but the Spark jobs have to be adapted to deal with the evolution of the scheme.

In Stratio we recommend implementing a serializer just for the Kafka consumer which is being executed in the executors. By doing this, these Avro events can be transformed into the type of event needed in Spark, it does not matter if it is its own class or a GenericRowWithSchema, which includes the same Avro schema and will allow you to have a dynamic scheme.

Example of a producer using the Twitter Bijection library:

       val schemaJson = """{
            | "namespace": "post.avro",
            | "type": "record", 
            | "foo": "var", 
            | "fields": [ {"name": "foo", "type": "string"} ]

            | }""".stripMargin
      val schema = parser.parse(jsonSchema)
      val record = GenericAvroCodecs.toBinary(schema)
      
      val avroRecord = new GenericData.Record(schema) 
      avroRecord.put("foo", "var")

      val bytesToSend = record.apply(avroRecord)

      val props = new Properties()
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-host:9092")
      props.put("key.serializer", StringSerializer)
      props.put("value.serializer", DefaultSerializer)
                    
      val producer = new KafkaProducer[String, String](props) 
      val record = new ProducerRecord[String, Array[Byte]](topic, bytesToSend)
      
      producer.send(record)

Example of the consumer with a user serializer:

     val serializers = Map( 
         "key.deserializer" -> classOf[StringDeserializer],
         "value.deserializer" -> classOf[RowDeserializer] 
     ) 
     
     val consumerStrategy = ConsumerStrategies.Subscribe[String, Row](
         extractTopics,
         autoCommit ++ autoOffset ++ serializers ++ metaDataBrokerList ++ groupId ++ partitionStrategy,
         offsets
     )

     val kafkaStream = KafkaUtils.createDirectStream[String, Row](ssc, locationStrategy, consumerStrategy)
 
     //some time after
     kafkaStream.foreachRDD { rdd =>
         //if all the events have the same schema in the same spark window 

         val schema = rdd.first.schema
         val sparkDataframe = sqlContext.createDataFrame(rdd, schema) 

         sparkDataframe.createOrReplaceTempView("kafkaEvents")

         val queryResult = sqlContext.sql("SELECT foo from kafkaEvents")
         
         //log the results queryResult.collect().foreach(row => log.info(s"Event received: $row"))
         //save the results to MongoDB
         queryResult.write
             .format("com.stratio.datasource.mongodb")
             .mode(Append)
             .options(mongoDBOptions)
             .save()

         //save the results to HDFS in Parquet format
         queryResult.write
             .mode(Append)
             .options(parquetOptions)
             .parquet("$path/$tableName") 
     }

The serialization of the data inside Spark is also important. Spark recommends using Kryo serialization to reduce the traffic and the volume of the RAM and the disc used to execute the tasks. This may increase the performance 10x of a Spark application 10 when computing the execution of RDD DAG.

In order to use the Kryo serialization in Spark the “classes” should be registered in the application and the Kryo serialization should be activated.

      conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      conf.registerKryoClasses(Array(classOf[Foo], classOf[Var]))
      conf.set("spark.kryo.registrationRequired", "true")

JVM configuration

Driving Java arguments to the executors and to the driver when executing the Spark application allows you to use JVM properly.

Optimizing the Garbage Collector can reduce the length of time. Comparing:

Spark batch applications

     -XX:+UseConcMarkSweepGC with --XX:+UseG1GC


Spark Streaming applications

      -XX:+UseConcMarkSweepGC


Configuring it in Spark Context

      conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC")

It is very important to adjust the memory portion dedicated to the data structure and to the JVM heap, especially if there are too many pauses or they are too long due to GC.

Author

Big Data Architect @Stratio. Passionate about cycling, and enjoying a nice Alhambra 1925 after a hard day of work!

7 Comments

  1. Did you try comparing Spark Structured Streaming and Direct Streams?

    • Jose Carlos García Serrano Reply

      Hi,

      now in Stratio we are migrating components to Structured Streaming, but complex stateful applications use the Direct Streams with mapWithState implementation.

      In the Databricks blog you can find one benchmark comparing batch, streaming and other technologies.

      Regards

    • Late response, but that issue only occurs if you don’t set a max rate limit.
      Explanation: Backpressure tunes to optimize the processing rate. The problem is that sometimes the processing time of each batch is too great for your needs.
      Solution: To prevent this, make sure you tune your runtimes and max executor settings if using dynamic allocation.
      –conf spark.streaming.kafka.maxRatePerPartition=50000

  2. Hi, The RowDeserializer that you’ve mentioned, is that a custom class? If not can you let me know the import? Thanks!

    • Jose Carlos García Serrano Reply

      Hi Arty,

      Thanks for your comments.

      This deserializer is a custom class that we implement in the proyect “Stratio Sparta”. The implementation is not difficult and you can extends with more functions as json, avro, or other formats.

      Regards Jose Carlos.

  3. Laszlo Takacs Reply

    Hi,
    I tried to build a consumer-group based solution with Subscribe but firstly when I start two executors with two partitions the second one kills the first one with illegalstateexception. What is started first get the two partitions and when the second one try to start during revoking the first lose the partition. I also tried assign as a manual dedication method but int his case the issue is that the offset not committed back to kafka. That means if the job abort then the next starting will start from the beginning again.
    Have you experienced similar behavior in Spark? Do you have any practice or advice how I can step over on this obstacle?
    I appreciate your kind help in advance,
    Laszlo

Write A Comment