Utilisation de la plateforme CONFLUENT pour faire du CDC

Après une pause dans mon exploration de KAFKA et KAFKA CONNECT, je me suis remis à faire du CDC.

Cette fois j’ai décidé d’utiliser les possibilités natives de la plateforme CONFLUENT. A savoir le proxy REST et les plugins JDBC et ELASTICSEARCH.

J’ai donc mis en quelques coups de cuillères à pot :

  • Une extraction de données incrémentale à partir d’une base de données
  • Un chargement dans Elasticsearch

 

Création des connecteurs

Création du connecteur d’extraction

La plateforme CONFLUENT offre un ensemble d’APIS qui permettent la configuration des connecteurs.

Ex. :

Lecture d’un état de connecteur

 curl -XGET http://127.0.0.1:8083/connectors

 

Voila comment j’ai chargé la configuration :

J’ai d’abord crée un fichier JSON

{
  "name": "jdbc-sql",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 2,
    "connection.url": "jdbc:oracle:thin:URL",
    "mode": "incremental",
    "timestamp.column.name": "JOBSTART",
    "query":"select * from LOG",
    "topic.prefix": "jdbc-avro-jdbc",
    "table.types" : "VIEW"
  }
}

puis je l’ai chargé via l’API en utilisant une requête POST

Création du connecteur déversant dans Elasticsearch

Même principe avec ce fichier JSON

{
  "name": "jdbc-elk-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": 2,
    "connection.url": "http://host:9200",
    "type.name": "monitor",
    "topics": "jdbc-avro-jdbc",
    "topic.key.ignore": "jdbc-avro-jdbc",
    "topic.index.map":"jdbc-avro-jdbc:monitor",
    "key.ignore": "true"
  }
}

Avec cette configuration j’ai maintenant un cluster ELASTICSEARCH qui récupère au fil de l’eau mes données provenant de la base de données.

Créer un connecteur KAFKA CONNECT

Après avoir essayé le connecteur de confluent kafka-connect-jdbc, je me suis aperçu qu’il y avait encore quelques bugs qui le rendaient inutilisable dans mon environnement ( notamment avec les champs numériques).

J’ai donc décidé de créer mon propre connecteur. Certes celui-ci sera moins générique mais il correspondra à mon besoin ( ou pas …).

API utilisées

  • JAVA8
  • API KAFKA CONNECT

Configuration maven

Cette configuration me permet de créer un fat jar avec les dépendances nécessaires à la bonne exécution du connecteur.

Voici mon fichier pom.xml

 

Développement

Comme le dit le guide de développement, il faut créer a minima une classe héritant de la classe SourceConnector et une classe héritant de SourceTask.

MySourceConnector

Cette classe permet la récupération de la configuration du connecteur.

MySourceTask

Cette classe gère l’exécution de l’extraction et chargement dans KAFKA. Elle permet dans la méthode start() de démarrer le connecteur et de lancer les ressources (connexions JDBC).

Comme le connecteur standard, je m’appuie sur un champ de type Timestamp. celui -ci me permet de créer un offset et de faire un parcours incrémental de mes résultats .

Configuration nécessaire à l’exécution du plugin

Il faudra créer également un fichier properties contenant les informations suivantes :

Exécution

De la même manière que pour le connecteur standard…

Conclusion

Voila le squelette de mon connecteur crée. Pour l’instant les données sont sérialisées de manière un peu brutale. La prochaine étape sera de les mettre au format JSON. La suite dans un prochain numéro…

Mettre en place KAFKA CONNECT et KAFKA pour faire du change data capture (CDC)

Je suis en train de monter un moteur d’analyse permettant d’analyser les logs d’un ESB.

L’outil standard stockes les évènements dans une base de données.

Afin d’être le moins intrusif possible dans le code des médiations, j’ai choisi de mettre en place un mécanisme de change data capture (CDC) avec KAFKA CONNECT et KAFKA. Pour ce faire j’ai déployé la solution confluent. Celle-ci permet entre autres d’avoir des connecteurs JDBC permettant la connexion aux bases de données relationnelles.

KAFKA est un système de messagerie distribué crée initialement par LinkedIn. Kafka connect fournit une interface en entrée et en sortie de ce dernier.

Si vous voulez plus d’informations, vous pouvez aller du coté de la présentation du devox 2016

Installation

J’ai utilisé la procédure décrite ici

Démarrage des différents démons

Configuration du connecteur JDBC pour KAFKA CONNECT

Il faut tout d’abord copier le driver jdbc dans le répertoire share/java/kafka-connect-jdbc/

Ensuite, il faut créer un fichier de configuration (ex. esb-logs.properties )

Les contraintes de KAFKA CONNECT sont les suivantes

  • Si on veut faire des extractions incrémentales ( ce qui est mon cas) , il faut soit un élément de type TIMESTAMP qui ne puisse pas être NULL, soit une PK auto-incrémentée.
  • Les requêtes doivent être relativement assez simples. Personnellement, je préfère utiliser des vues qui me retournent l’exécution des requêtes SQL.

Exécution

Lancer la commande

Pour vérifier que tout est bien dans le topic KAFKA

Conclusion

Pour l’instant, j’ai réussi à extraire des données de ma base de données vers un broker KAFKA. La suite me permettra de les traiter en mode streaming dans SPARK.