Mettre en place KAFKA CONNECT et KAFKA pour faire du change data capture (CDC)

Je suis en train de monter un moteur d’analyse permettant d’analyser les logs d’un ESB.

L’outil standard stockes les évènements dans une base de données.

Afin d’être le moins intrusif possible dans le code des médiations, j’ai choisi de mettre en place un mécanisme de change data capture (CDC) avec KAFKA CONNECT et KAFKA. Pour ce faire j’ai déployé la solution confluent. Celle-ci permet entre autres d’avoir des connecteurs JDBC permettant la connexion aux bases de données relationnelles.

KAFKA est un système de messagerie distribué crée initialement par LinkedIn. Kafka connect fournit une interface en entrée et en sortie de ce dernier.

Si vous voulez plus d’informations, vous pouvez aller du coté de la présentation du devox 2016

Installation

J’ai utilisé la procédure décrite ici

Démarrage des différents démons

Configuration du connecteur JDBC pour KAFKA CONNECT

Il faut tout d’abord copier le driver jdbc dans le répertoire share/java/kafka-connect-jdbc/

Ensuite, il faut créer un fichier de configuration (ex. esb-logs.properties )

Les contraintes de KAFKA CONNECT sont les suivantes

  • Si on veut faire des extractions incrémentales ( ce qui est mon cas) , il faut soit un élément de type TIMESTAMP qui ne puisse pas être NULL, soit une PK auto-incrémentée.
  • Les requêtes doivent être relativement assez simples. Personnellement, je préfère utiliser des vues qui me retournent l’exécution des requêtes SQL.

Exécution

Lancer la commande

Pour vérifier que tout est bien dans le topic KAFKA

Conclusion

Pour l’instant, j’ai réussi à extraire des données de ma base de données vers un broker KAFKA. La suite me permettra de les traiter en mode streaming dans SPARK.

Normaliser une adresse avec ElasticSearch et la base adresse nationale

Il y a fort fort longtemps, j’intégrais la base adresse nationale dans elasticsearch via logstash. Je ne suis pas allé plus loin faute de temps et peut-être d’envie.

Que peut-on faire avec ces quelques gigas de données me direz vous?

  • Faire des recherches pour avoir les coordonnées géographiques d’une adresse donnée
  • Faire une recherche pour avoir l’adresse normalisée
  • Faire un rapprochement avec d’autres données (avec Spark par exemple)
  • Sans doute plein d’autres use cases

Je vais m’attarder sur le deuxième point. A quoi ça sert ? et bien à avoir une adresse « propre » et utilisable par un système d’information (ex. ce que font les impôts, ou amazon). Il existe quelques solutions propriétaires qui réalisent ceci et sont assez chers. Je me suis donc mis dans la tête de le faire via elasticsearch.

Rappels des épisodes précédents

Voici le schéma de l’architecture

Présentation1

Pré-requis

Configuration Logstash

Voici ma configuration Logstash

J’ai configuré LOGSTASH pour réaliser les actions suivantes :

  • extraction des données d’un fichier
  • suppression de la ligne d’en-tête
  • gestion de la localisation avec un champ de type ‘geo_point’.

C’est assez simple (pour l’instant)

Mapping ELASTICSEARCH

Analyzers

J’ai configuré les analyzers de la manière suivante:

J’ analyse tous les mots (whitespace tokenizer) en minuscule en appliquant un filtre (ngram) permet de rechercher par caractère (ex. imp au lieu de impasse).

La c’est super couteux en espace disque et impose un plus gros traitement lors de l’insertion mais permet d’ alléger le temps de traitement des requêtes ( le gros du travail est fait lors du chargement).

Mapping

La je n’ai pas fait grand chose de particulier si ce n’est le typage des différents champs (ex. la localisation )

Création de l’index

Dans sense ou via cUrl, lancer la commande suivante :

Pour le mapping complet, voir sur mon compte github

Chargement

voir mon article précédent

Interrogation

Maintenant je peux interroger mon index

Imaginons que je fasse saisir le code postal, la ville et l’adresse et que j’interroge elasticsearch pour obtenir une adresse normalisée

Conclusion

Ce n’est qu’un début. Ce n’est sans doute pas encore très performant, si vous avez des remarques, n’hésitez pas. En tout cas, les briques et outils décrits ci-dessus permettent de normaliser des adresses a minima avec des logiciels libres, ce qui n’est pas rien.

Les sources sont disponibles sur GITHUB. Je ne pense pas que je vais en faire un projet à proprement parler mais plutôt d’une boîte à outils pour manipuler ce genre de données. J’y ajouterai sans doute quelques recherches géospatiales et peut être l’ intégration de graph.

Quoi de neuf du coté du front-end

Depuis le début de l’année je suis en train de faire une spécialisation Coursera : Full Stack Developer.

Pourquoi me direz-vous ? Tout d’abord, j’ai principalement travaillé sur du back-office ou de l’outillage. Ça m’a servi de rattrapage sur ces technologies :

  • HTML/CSS ( ne rigolez pas j’ai appris des choses…)
  • Bootstrap
  • Angular
  • Applications mobiles hybrides
  • NodeJS

Sachant que j’ai principalement travaillé sur JAVA, ça m’a fait un peu bizarre

Qu’est-ce que j’en ai retenu:

Javascript et les langages interprétés, ce n’est pas fait pour moi. Un compilateur c’est quand même bien pour découvrir des erreurs de syntaxes (je suis trop vieux pour ces co%%%£$$) …. Les puristes du javascript me diront qu’il y a les tests unitaires. Pour moi ça ne remplace pas un compilateur. Pas étonnant que des initiatives comme typescript ou babeljs arrivent massivement sur le marché.

Je comprends pourquoi AngularJS a inondé le marché ces dernières années. C’est assez bien foutu.

Le combo NodeJS/express/mongoose permet de réaliser des APIS et des requêtes facilement .

Un exemple de route pour une requête GET

Un exemple de requête qui m’a bluffé

Pour conclure, je ne  pense pas que je ferai des sites web de sitôt, mais ça m’a donné une bonne vision d’ensemble des technologies.

Devoxx 2016

Ben voila, j’écris ce billet entre deux confs lors de l’édition 2016 du devoxx.index
QUOI, vous ne connaissez pas devoxx ? c’est à mon avis LA conférence française du développement JAVA, SCALA,BIG DATA.

C’est très très technique et c’est tant mieux. Commerciaux et chefs de projets, vous pouvez passer votre chemin 🙂

J’ai pu notamment faire des hands on sur apache kafka, vert.x et sur la création d’architectures.

Petite nouveauté, les conférences seront disponibles sur youtube ( je ne connais pas encore la chaîne ).

Si vous ne connaissez pas, voici quelques conférences de l’année dernière

Intégrer dans docker une application ANGULAR

dockerangularjs

Dans le cadre d’un POC j’ai eu à « dockeriser » une application ANGULARJS. Cette dernière utilise les briques logicielles suivantes :

J’ai donc utilisé docker-compose pour orchestrer le tout. Voici la configuration

Définition du front

Dans le répertoire front, j’ai ajouté les sources (JAVASCRIPT, HTML, CSS,…). Ca donne

Définition du back

Pour le back, vu que c’est du mock, je ne me suis pas trop embêté. J’ai donc utilisé json-server et ajouté dans le répertoire config le fichier json correspondant aux bouchons.

Exécution

Construction du projet

A la racine

Exécution

Conclusion

Et voila vous avez un site basé sur angular avec utilisation d’une API REST utilisable . Il manque encore certaines briques indispensables telles que ha_proxy par ex, mais ça me suffit pour une démo.

 

 

 

 

Paramétrage de ma Debian effectué post-installation

après avoir réinstallé mon PC et optimisé les accès au disque SSD, me voici dans le téléchargement des packages

Sources logicielles

J’ai copié mes anciennes sources de packages dans le répertoire /etc/apt/sources.list.d.

J’ai exclu la source deb-multimedia.org.

Packages installés

Attention, certains packages sont spécifiques à ma plateforme! A vous de voir si vous voulez les utiliser

Configuration nvidia

Il faut créer le fichier /etc/X11/xorg.conf.d/20-nvidia.conf avec le contenu suivant :

 

Installation d’un disque SSD sur mon PC équipé de Debian Jessie

Après 6 ans de bons et loyaux services, mon PC commençait à fatiguer un petit peu. J’ai donc décidé de m’équipe d’un disque SSD. J’ai souhaité conserver mon ancien disque et répartir selon les besoins les données entre les deux.

Voici les étapes de l’installation , de la configuration et des optimisations effectuées.3319_ssd-2

Références

Je me suis appuyé principalement sur les documentations Debian et Ubuntu. Elles sont vraiment bien faites. Je vous les conseille 🙂

Montage du disque SSD

Afin d’optimiser le montage et de limiter les I/O sur le disque SSD, il faut ajouter l’option noatime au montage du disque.

Voici le contenu de mon fichier /etc/fstab

Alignement du disque

On peut paramétrer le montage du disque avec l’option discard ou comme ici, utiliser la commande fstrim et la lancer via un cron. J’ai choisi la deuxième option ( cron hebdomadaire )

Paramétrage de certains répertoires

Répertoire TMP en mémoire

Une des bonnes pratiques est de ne pas mettre les répertoires qui ont des contenus qui ne cessent d’être modifiés (ex. /tmp, /var ) . Pour ça il y a deux stratégies, l’une de mettre en mémoire les espaces souhaités (dans mon cas le /tmp ) ou sur un disque classique (le /var ).

Tout d’abord, il faut activer la fonctionnalité via le fichier /etc/default/tmpfs

puis monter le répertoire /tmp en mode tmpfs

Répertoire /var

J’ai décidé d’externaliser le répertoire /var dans mon ancien disque

Pour monter le répertoire au démarrage, il faut utiliser la fonctionnalité « bind » dans le fichier /etc/fstab.

Montage des différents disques au démarrage

Par défaut, mon ancien disque n’est pas accessible. J’ai donc utiliser gnome-disks pour configurer les partitions et les rendre accessibles au boot.

Répertoires des utilisateurs

Pour tous les utilisateurs, j’ai externalisé les éléments suivants :

  • répertoire .cache
  • répertoires Downloads, Documents, Desktop, Musique,Vidéos

Ils sont liés par des liens symboliques.

Conclusion

Il n’y a rien de vraiment compliqué. Je ne sais pas si ces étapes sont obligatoires, mais je me suis dit que quitte à réinstaller le système, autant bien le faire 🙂

Si vous voyez des choses à améliorer et/ou corriger, n’hésitez pas à le mettre en commentaire 🙂

Utilisation de spark en mode streaming

Après avoir intégré toutes les briques dans docker, il ne me « restait » plus qu’à coder la brique permettant de me connecter à une queue MQTT et d’insérer des données dans Elasticsearch.

 

Spark-logo-192x100px

J’ai décidé d’utiliser Apache Spark. Ce composant remplace Hadoop pour les traitements map reduce et permet d’exécuter ce genre de traitement dans plusieurs typologies d’environnement ( batch, cluster spark, cluster hadoop,…).

J’aime bien ce framework car il offre différentes possibilités et permet via un simple batch ( commande java -jar ) de lancer des traitements map reduce performants avec différents langages (scala, java,python, R).

Pour ce faire j’ai aussi décidé d’utiliser le langage SCALA. Spark est développé sur ce langage et fournit plus de fonctionnalités en scala que sur les autres.

Connexion à une queue MQTT

Sauvegarde dans Elasticsearch

Elasticsearch fournit une API permettant à des dérivés d’Hadoop de se connecter à un cluster Elasticsearch. L’utilisation est des plus simples.

Le programme

J’ai volontairement crée plus de variables qu’il ne faut car j’ai pas mal mis de logs 🙂

Je vais essayer de mettre mon code sur github prochainement.

 

 

 

 

 

 

 

Docker compose

Me voilà avec mes images docker réalisées. Maintenant, il faut les lier entre elles et packager le tout.

J’ai choisi d’utiliser docker-compose. c’est l’outil standard fourni par docker. Sa principale lacune est que cet outil ne s’exécute que localement. En gros, on ne pourra pas utiliser docker-compose pour exécuter de manière distribuée les différents containers (ex. la base de données sur un nœud, le serveur web sur un autre,…). Pour mon POC je n’ai pas trop besoin de ça et, il faut le dire, j’ai un peu la flemme d’installer des softs comme kubernetes.

Je ne décrirai pas l’installation. C’est beaucoup mieux fait ici.

Je mettrai prochainement le code sur github. Je pense que le code sera mis à jour ultérieurement. Il se peut donc que certains exemples de cet articles soient obsolètes.

Définition des différents composants

J’ai trois composants :

  • une instance rabbitmq qui réceptionne des messages via le protocole MQTT
  • elasticsearch qui stocke les différents évènements et les mets à disposition
  • un programme s’appuyant sur spark-streaming qui se connecte à rabbitmq , transforme les évènements réceptionnés et les envoient dans elasticsearch.

La spécification des différents composants se fait par un fichier docker-compose.yml qui est dans le répertoire racine.

Je spécifie les images que je souhaite construire, les ports exposés et les variables d’environnement

Le fichier docker-compose.yml

elasticsearch

J’ai surchargé l’image officielle d’elasticsearch en ajoutant un fichier de configuration personnalisé et en installant le plugin shield et marvel. Voici le dockerfile:

Rabbitmq

Ici j’ai souhaité ajouter deux utilisateurs au démarrage du container

La configuration se fait dans le fichier init.sh

Spark

Enfin mon code scala qui exécute SPARK est packagé sous qui la forme d’un JAR qui contient toutes les dépendances.

Le dockerfile correspondant

Je ne décrirai pas le code ici. Peut-être dans un futur article.

Construction

Exécution