PROJET AUTOBLOG


Planet-Libre

source: Planet-Libre

⇐ retour index

Littlewing : Utilisation de spark en mode streaming

jeudi 10 décembre 2015 à 10:42

Après avoir intégré toutes les briques dans docker, il ne me “restait” plus qu’à coder la brique permettant de me connecter à une queue MQTT et d’insérer des données dans Elasticsearch.

 

Spark-logo-192x100px

J’ai décidé d’utiliser Apache Spark. Ce composant remplace Hadoop pour les traitements map reduce et permet d’exécuter ce genre de traitement dans plusieurs typologies d’environnement ( batch, cluster spark, cluster hadoop,…).

J’aime bien ce framework car il offre différentes possibilités et permet via un simple batch ( commande java -jar ) de lancer des traitements map reduce performants avec différents langages (scala, java,python, R).

Pour ce faire j’ai aussi décidé d’utiliser le langage SCALA. Spark est développé sur ce langage et fournit plus de fonctionnalités en scala que sur les autres.

Connexion à une queue MQTT

class MQTTStreaming(val sc: StreamingContext) {

  val conf = ConfigFactory.load()

  val logger = Logger.getLogger(this.getClass)

  def extractFromMQTT(): DStream[Log] = {
    val receiver = MQTTUtils.createStream(sc, conf.getString(ConfigurationKeys.MQTT_URL.toString),
      conf.getString(ConfigurationKeys.MQTT_TOPIC.toString))
    return receiver.map(r => (new Log(UUID.randomUUID().toString, r, Date.from(Instant.now()))))
  }
}

Sauvegarde dans Elasticsearch

Elasticsearch fournit une API permettant à des dérivés d’Hadoop de se connecter à un cluster Elasticsearch. L’utilisation est des plus simples.

def saveToES(array: Array[Log]): Unit = {
  EsSpark.saveToEs(sparkContext.makeRDD(array), configuration.getString(ConfigurationKeys.ES_INDEX.toString), getOptions())
}

Le programme

Logger.getLogger("org").setLevel(Level.WARN)
  Logger.getLogger("akka").setLevel(Level.WARN)
  val sparkContext = SparkContext.getOrCreate(new SparkUtils().getSparkConf())
  val streamingContext = new StreamingContext(sparkContext, Duration.apply(10000))
  val mqttStreaming = new MQTTStreaming(streamingContext)
  val stream = mqttStreaming.extractFromMQTT()
  val esloader = new ESLoader(sparkContext)
  stream.foreachRDD(rdd => {
    val collect = rdd.collect()
    esloader.saveToES(collect)
  })
  streamingContext.start()
  streamingContext.awaitTermination()
}

J’ai volontairement crée plus de variables qu’il ne faut car j’ai pas mal mis de logs :)

Je vais essayer de mettre mon code sur github prochainement.

 

 

 

 

 

 

 

Gravatar de Littlewing
Original post of Littlewing.Votez pour ce billet sur Planet Libre.