Utilisation de spark en mode streaming

Après avoir intégré toutes les briques dans docker, il ne me « restait » plus qu’à coder la brique permettant de me connecter à une queue MQTT et d’insérer des données dans Elasticsearch.

 

Spark-logo-192x100px

J’ai décidé d’utiliser Apache Spark. Ce composant remplace Hadoop pour les traitements map reduce et permet d’exécuter ce genre de traitement dans plusieurs typologies d’environnement ( batch, cluster spark, cluster hadoop,…).

J’aime bien ce framework car il offre différentes possibilités et permet via un simple batch ( commande java -jar ) de lancer des traitements map reduce performants avec différents langages (scala, java,python, R).

Pour ce faire j’ai aussi décidé d’utiliser le langage SCALA. Spark est développé sur ce langage et fournit plus de fonctionnalités en scala que sur les autres.

Connexion à une queue MQTT

Sauvegarde dans Elasticsearch

Elasticsearch fournit une API permettant à des dérivés d’Hadoop de se connecter à un cluster Elasticsearch. L’utilisation est des plus simples.

Le programme

J’ai volontairement crée plus de variables qu’il ne faut car j’ai pas mal mis de logs 🙂

Je vais essayer de mettre mon code sur github prochainement.

 

 

 

 

 

 

 

Extraire les données d’une base de données relationnelle avec Spark

Me voila en train de tester Apache Spark.AAEAAQAAAAAAAAImAAAAJDcyMTQ0N2JkLWRjYzMtNDZjMy05OWQ4LTljNzFiM2M0NTg0Mw

Pour info, Spark est un projet libre du consortium Apache qui a le vent en poupe depuis quelques temps. Il permet entre autres de réaliser des opérations de type Map Reduce en mémoire

Je souhaite automatiser le chargement de données dans ELASTIC SEARCH et faire au passage un petit map reduce sur les données en entrée.

Voici ce que j’ai fait pour extraire les données d’une base relationnelle ORACLE via JDBC. Bien évidemment, ça fonctionne avec toutes les autres SGBDR accessibles via JDBC.

Configuration de Spark

J’utilise le mode « autonome » de spark, c.-à.d sans cluster hadoop.

Pour l’instant, je n’utilise que les modules streaming et sql.

Au préalable , il faut configurer la variable d’environnement SPARK_CLASSPATH et ajouter le chemin vers le driver JDBC

Je n’ai pas téléchargé la distribution car j’utilise spark au travers des librairies utilisées dans mon programme.

Développement

J’ai décidé d’utiliser le langage SCALA pour développer mon module. Pourquoi SCALA me direz vous ? Parce que ça fait quelques années que je me dis que je dois m’y mettre ( peut être est-ce trop tard ….) et que selon plusieurs commentaires que j’ai pu avoir au DEVOXX, SCALA est clairement plus adapté aux traitements réalisés par SPARK

Dépendances

Voici la configuration de mon pom.xml build.sbt :

Pour l’exemple, voici une classe « fourre tout » qui montre le chargement de Spark ainsi que l’extraction des données

Problèmes rencontrés

Gestion des champs numériques

Comme évoqué ci-dessus, j’ai eu un bug sur la gestion des NUMERIC ,

Comme moyen de contournement, j’ai (après quelques recherches sur le net) opté pour surcharger le dialecte JDBC:

et enregistré le dialecte à l’exécution

Présence du binaire winutils.exe sur windows 7 64bits

Si comme moi vous avez le bug référencé sur le JIRA du projet, il suffit de télécharger le binaire winutils.exe, puis de positionner la variable d’environnement au démarrage en mettant le bon chemin ( attention, il faut un sous répertoire « bin’ .

And now, something completely different

Outre les difficultés liées à l’apprentissage simultané de plusieurs technologies, on y arrive. Le projet est bien documenté.

Maintenant, je vais créer plusieurs indicateurs et faire un chargement dans Elasticsearch. Je décrirai ça dans un prochain article