Utilisation de la plateforme CONFLUENT pour faire du CDC

Après une pause dans mon exploration de KAFKA et KAFKA CONNECT, je me suis remis à faire du CDC.

Cette fois j’ai décidé d’utiliser les possibilités natives de la plateforme CONFLUENT. A savoir le proxy REST et les plugins JDBC et ELASTICSEARCH.

J’ai donc mis en quelques coups de cuillères à pot :

  • Une extraction de données incrémentale à partir d’une base de données
  • Un chargement dans Elasticsearch

 

Création des connecteurs

Création du connecteur d’extraction

La plateforme CONFLUENT offre un ensemble d’APIS qui permettent la configuration des connecteurs.

Ex. :

Lecture d’un état de connecteur

 curl -XGET http://127.0.0.1:8083/connectors

 

Voila comment j’ai chargé la configuration :

J’ai d’abord crée un fichier JSON

{
  "name": "jdbc-sql",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 2,
    "connection.url": "jdbc:oracle:thin:URL",
    "mode": "incremental",
    "timestamp.column.name": "JOBSTART",
    "query":"select * from LOG",
    "topic.prefix": "jdbc-avro-jdbc",
    "table.types" : "VIEW"
  }
}

puis je l’ai chargé via l’API en utilisant une requête POST

Création du connecteur déversant dans Elasticsearch

Même principe avec ce fichier JSON

{
  "name": "jdbc-elk-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": 2,
    "connection.url": "http://host:9200",
    "type.name": "monitor",
    "topics": "jdbc-avro-jdbc",
    "topic.key.ignore": "jdbc-avro-jdbc",
    "topic.index.map":"jdbc-avro-jdbc:monitor",
    "key.ignore": "true"
  }
}

Avec cette configuration j’ai maintenant un cluster ELASTICSEARCH qui récupère au fil de l’eau mes données provenant de la base de données.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *