Utilisation de la plateforme CONFLUENT pour faire du CDC

Après une pause dans mon exploration de KAFKA et KAFKA CONNECT, je me suis remis à faire du CDC.

Cette fois j’ai décidé d’utiliser les possibilités natives de la plateforme CONFLUENT. A savoir le proxy REST et les plugins JDBC et ELASTICSEARCH.

J’ai donc mis en quelques coups de cuillères à pot :

  • Une extraction de données incrémentale à partir d’une base de données
  • Un chargement dans Elasticsearch

 

Création des connecteurs

Création du connecteur d’extraction

La plateforme CONFLUENT offre un ensemble d’APIS qui permettent la configuration des connecteurs.

Ex. :

Lecture d’un état de connecteur

 curl -XGET http://127.0.0.1:8083/connectors

 

Voila comment j’ai chargé la configuration :

J’ai d’abord crée un fichier JSON

{
  "name": "jdbc-sql",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 2,
    "connection.url": "jdbc:oracle:thin:URL",
    "mode": "incremental",
    "timestamp.column.name": "JOBSTART",
    "query":"select * from LOG",
    "topic.prefix": "jdbc-avro-jdbc",
    "table.types" : "VIEW"
  }
}

puis je l’ai chargé via l’API en utilisant une requête POST

Création du connecteur déversant dans Elasticsearch

Même principe avec ce fichier JSON

{
  "name": "jdbc-elk-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": 2,
    "connection.url": "http://host:9200",
    "type.name": "monitor",
    "topics": "jdbc-avro-jdbc",
    "topic.key.ignore": "jdbc-avro-jdbc",
    "topic.index.map":"jdbc-avro-jdbc:monitor",
    "key.ignore": "true"
  }
}

Avec cette configuration j’ai maintenant un cluster ELASTICSEARCH qui récupère au fil de l’eau mes données provenant de la base de données.