Come integrare i sistemi esistenti con Kafka Connect
Introduzione
L'interfaccia con Apache Kafka dalle tue applicazioni avviene tramite librerie client immediatamente disponibili per tutti i principali linguaggi di programmazione. Per lo streaming di dati tra sistemi esterni e le tue applicazioni, come i database, potresti essere tentato di scrivere le tue soluzioni per spostarle in Kafka e successivamente consumarle. Tuttavia, questo approccio è soggetto a errori e non scalabile.
In questo tutorial imparerai come inserire dati negli argomenti Kafka utilizzando Kafka Connect, uno strumento utilizzato per trasferire dati in modo affidabile tra Kafka e altri sistemi (come file system e database). Imparerai anche a trasmettere i dati da Kafka a Elasticsearch per una successiva indicizzazione. Contrariamente a un approccio manuale, è dimostrato che Kafka Connect tiene traccia automaticamente dell'avanzamento delle migrazioni dei dati e accede facilmente a molti sistemi di dati diversi.
Integrazione dei sistemi esistenti con Kafka Connect
- Acquisisci dati dal file system host
- Importa dati da MySQL
- Esporta i dati in elasticsearch
Prerequisiti
Per completare questo tutorial, avrai bisogno di:
- Una macchina con almeno 4 GB di RAM e 2 CPU. In caso di un server Ubuntu, seguire la Configurazione iniziale del server Ubuntu per le istruzioni di configurazione.
- Apache Kafka installato e configurato sul tuo Droplet. È possibile seguire il tutorial Introduzione a Kafka per le istruzioni di configurazione.
- MySQL installato e configurato sul tuo computer. Per istruzioni e passaggi per Ubuntu, visitare il tutorial Come installare MySQL su Ubuntu. Devi solo completare il Passaggio 1 e il Passaggio 2.
- Java Development Kit (JDK) 8 o versione successiva installato sul tuo Droplet o sul computer locale. Per istruzioni sull'installazione di Java su Ubuntu, vedere il tutorial Come installare Java con Apt su Ubuntu.
- Familiarità con il layout di directory standard dei progetti Java. Per ulteriori informazioni, vedere l'argomento Introduzione al layout della directory standard nella documentazione ufficiale di Maven.
- Elasticsearch installato e configurato sul tuo Droplet o computer remoto. Per un server Ubuntu, consulta la guida Come installare e configurare Elasticsearch su Ubuntu. Per questo tutorial la tua istanza Elasticsearch dovrebbe avere l'autenticazione disabilitata.
Passaggio 1: acquisizione dei dati dal file system host
In questo passaggio imparerai come configurare Kafka Connect in modalità autonoma per controllare un file sul file system host e trasmettere le modifiche a Kafka.
Come parte dei prerequisiti, hai installato Kafka in ~/kafka
. Kafka Connect viene fornito in bundle con la versione Kafka predefinita, quindi non è necessario installarlo separatamente. Passare a quella directory eseguendo:
cd ~/kafka
Per facilitare il recupero dei dati da varie fonti, Kafka Connect utilizza connettori per recuperarli. I connettori sono librerie pronte all'uso che si integrano con Kafka Connect e forniscono l'accesso a sistemi esterni, come file system e database. Un certo numero di connettori comuni sono già inclusi in Kafka e molti altri sono disponibili e vengono forniti con licenze permissive.
A parte i connettori, Kafka Connect distingue tra sorgenti e sink. Un'origine inserisce i dati in Kafka tramite un connettore, mentre un sink esporta i dati da Kafka in un sistema esterno.
Memorizzerai la configurazione per l'origine in un file chiamato file-source.properties
. Crealo e aprilo per la modifica eseguendo:
nano file-source.properties
Aggiungi le seguenti righe:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt
Questa origine è denominata local-file-source
e utilizza la classe FileStreamSource
con una sola istanza come da tasks.max
. I dati verranno aggiunti all'argomento connect-test
in Kafka, mentre il file che verrà monitorato è test.txt
. Salva e chiudi il file.
Ora che hai definito una fonte, creerai un sink che trasmetterà i dati in un file di testo separato. Crea e apri file-sink.properties
per la modifica:
nano file-sink.properties
Inserisci le seguenti righe:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
topics=connect-test
file=test-sink.txt
In modo simile a quello che hai fatto con file-source.properties
, definisci un sink specificando FileStreamSink
come classe utilizzata, con un'istanza. Imposta connect-test
come argomento da cui deve leggere e test-sink.txt
come file di output. Quando hai finito, salva e chiudi il file.
Successivamente, definirai la configurazione per Kafka Connect stesso. Crea un file per esso eseguendo quanto segue:
nano connect.properties
Aggiungi le seguenti righe:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=libs/
Qui devi prima specificare l'indirizzo del server Kafka (bootstrap.servers
). Quindi, imposti la classe JsonConverter
come convertitore sia per le chiavi che per i valori dei messaggi prodotti, il che significa che il JSON verrà inserito in Kafka. Tuttavia, disabiliti la verifica dello schema sia per le chiavi che per i valori, poiché non è necessario disporre di uno schema per il contenuto del file di testo.
Kafka Connect tiene traccia e gestisce autonomamente i progressi e necessita di un posto in cui archiviare gli offset interni. Fornisci un percorso per questo in offset.storage.file.filename
e imposta anche l'intervallo di commit dell'offset su 10s
. Infine, imposta il percorso in cui vengono archiviate le librerie dei connettori su libs/
, facendo riferimento alla directory in ~/kafka
.
Salva e chiudi il file quando hai finito. Ora hai definito la configurazione di origine, sink e Kafka Connect. Prima di eseguirlo, crea e apri il file test.txt
per la modifica.
nano test.txt
Aggiungi le seguenti righe:
Hello World!
Second Hello World!
Se lo desideri puoi aggiungere ulteriori righe. Salva e chiudi il file quando hai finito.
Infine, esegui Kafka Connect in modalità standalone con il seguente comando:
bin/connect-standalone.sh connect.properties file-source.properties file-sink.properties
In questa modalità, Kafka Connect accetta un'origine e un sink dai file su disco, il che è utile per i test. La modalità distribuita, al contrario, li accetta solo attraverso un'interfaccia HTTP e può quindi essere controllata da remoto.
Ci saranno molti output che indicano che Connect ha iniziato a monitorare test.txt
e si è connesso al cluster. In un terminale separato, esegui lo script kafka-console-consumer.sh
fornito per leggere tutti i messaggi dall'argomento connect-test
:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
L'output sarà:
Hello World!
Second Hello World!
Kafka Connect ha trasmesso in streaming i contenuti di test.txt
all'argomento connect-test
.
In una terza sessione del terminale, esegui il comando seguente per aggiungere una riga a test.txt
:
echo "Third Hello World!" >> test.txt
Guarda l'output di kafka-console-consumer.sh
nel secondo terminale. Vedrai il terzo messaggio ricevuto:
Hello World!
Second Hello World!
Third Hello World!
Mostra il contenuto di test-sink.txt
per verificare che gli stessi dati siano stati trasmessi in quel file tramite local-file-sink
:
cat test-sink.txt
L'output sarà lo stesso:
Hello World!
Second Hello World!
Third Hello World!
Kafka Connect controlla il file e propaga automaticamente le modifiche. Puoi uscire sia da Kafka Connect che dallo script consumer premendo CTRL + C
.
In questo passaggio, hai inserito il contenuto di un file di testo sul file system host in un argomento Kafka e verificato che sia stato ricevuto. Ora imparerai come importare dati da un database MySQL.
Passaggio 2: acquisizione dei dati da MySQL
In questo passaggio configurerai un database MySQL di esempio e imparerai come importarne i dati in Kafka. Eseguirai Kafka Connect in modalità distribuita e installerai il connettore Debezium affinché Kafka possa connettersi al database.
Impostazione di una banca dati
Accedi alla console MySQL eseguendo:
sudo mysql
Una volta entrato, crea un database chiamato employees
:
CREATE DATABASE employees;
L'output sarà:
Query OK, 1 row affected (0.00 sec)
Quindi, passa al database creato:
USE employees;
Infine, crea uno schema per archiviare i dati dei dipendenti:
CREATE TABLE `employee` (
`Id` int NOT NULL AUTO_INCREMENT,
`Name` varchar(45) NOT NULL,
`Surname` varchar(45) DEFAULT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
Questo creerà la tabella employee
con le colonne ID, nome e cognome. L'output sarà simile al seguente:
Query OK, 0 rows affected (0.03 sec)
Ora puoi inserire dati di esempio eseguendo:
INSERT INTO `employees`.`employee` (`Name`, `Surname`) VALUES ('John', 'Smith');
INSERT INTO `employees`.`employee` (`Name`, `Surname`) VALUES ('Mark', 'Twain');
Il database avviserà che due righe sono state modificate:
Query OK, 2 rows affected (0.01 sec)
Ora disponi di un database di esempio contenente i nomi dei dipendenti. Ora puoi creare un utente per accedere a questo database:
CREATE USER 'kafkaconnect'@'localhost' IDENTIFIED BY 'password';
Il suo nome utente sarà kafkaconnect
con password
come password.
Quindi, concedigli le autorizzazioni necessarie:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'kafkaconnect'@'localhost';
Infine, esci da MySQL:
exit
Ora hai un utente e un database da cui trasferirai i dati in Kafka.
Installazione e configurazione del connettore Debezium
Poiché Kafka non viene fornito con un connettore MySQL, dovrai installare un plug-in aggiuntivo. In questa sottosezione scaricherai e configurerai Debezium MySQL Connector per Kafka.
Utilizza questo comando per scaricare l'archivio delle versioni dalla pagina Download ufficiale e posizionalo in /tmp
:
curl -o /tmp/debezium.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.1.Final/debezium-connector-mysql-2.6.1.Final-plugin.tar.gz
Al momento della stesura di questo articolo, l'ultima versione disponibile era 2.6.1
. Puoi prendere il collegamento più recente dalla pagina ufficiale sotto il collegamento per il download dell'archivio plugin MySQL Connector.
Quindi, estrailo in ~/kafka/libs
eseguendo:
tar -xzf /tmp/debezium.tar.gz -C ~/kafka/libs/
Ora hai reso disponibile il connettore Debezium MySQL per Kafka Connect.
Successivamente, creerai un'origine Kafka Connect per osservare il database. Lo memorizzerai in un file chiamato mysql-source-connector.json
. Crealo e aprilo per la modifica eseguendo:
nano mysql-source-connector.json
Aggiungi le seguenti righe:
{
"name": "employees-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "password",
"database.server.id": "1",
"topic.prefix": "dbhistory",
"database.include.list": "employees",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.employees"
}
}
Qui devi prima specificare la classe Debezium MySqlConnector
come classe connettore e fornire i dati di connessione per il database. Quindi, specifichi l'indirizzo del server Kafka (database.history.kafka.bootstrap.servers
) e imposti il prefisso dell'argomento su dbhistory
. Debezium creerà un argomento separato con questo prefisso per ogni tabella elencata in database.include.list
.
Salva e chiudi il file quando hai finito. Ora eseguirai Kafka Connect ed eseguirai il connettore.
Esecuzione di Kafka Connect in modalità distribuita
Contrariamente alla modalità autonoma, in modalità distribuita Kafka Connect accetta carichi di lavoro tramite richieste HTTP. Ciò gli consente di funzionare in background come servizio di sistema e di essere configurabile da remoto.
Kafka viene fornito in bundle con un file di configurazione per questa modalità, denominato connect-distributed.properties
, che è archiviato in config/.
Lo utilizzerai per avviare Kafka Connect, ma dovrai prima aggiornarlo per utilizzare Debezium Connector. Aprilo per la modifica eseguendo:
nano config/connect-distributed.properties
Alla fine del file, trova l'impostazione plugin.path
:
...
#plugin.path=
Modificalo in questo modo:
...
plugin.path=libs/
Quando hai finito, salva e chiudi il file.
Ora puoi eseguire Kafka Connect in modalità distribuita:
bin/connect-distributed.sh config/connect-distributed.properties
Ora è possibile accedervi all'indirizzo http://localhost:8083
.
In un terminale secondario, esegui il comando seguente per inviare il employees-connector
che hai definito:
curl -d @"mysql-source-connector.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Con questo comando invii il contenuto di mysql-source-connector.json
a Kafka Connect. L'output sarà:
{"name":"employees-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"localhost","database.port":"3306","database.user":"kafkaconnect","database.password":"password","database.server.id":"1","topic.prefix":"dbhistory","database.include.list":"employees","schema.history.internal.kafka.bootstrap.servers":"localhost:9092","schema.history.internal.kafka.topic":"schema-changes.employees","name":"employees-connector"},"tasks":[],"type":"source"}
Kafka Connect eseguirà immediatamente il connettore e inizierà a importare i dati. Per ciascuna tabella in tutti i database elencati verrà creato un argomento separato. Puoi eseguire lo streaming dell'avanzamento del database dipendenti
in tempo reale utilizzando kafka-console-consumer.sh
:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbhistory.employees.employee --from-beginning
Il nome dell'argomento evidenziato è costituito dal prefisso dell'argomento specificato (dbhistory
), dal nome del database (employees
) e dal nome della tabella (employee
). L'output dettaglierà le due righe simili a queste:
{
"schema": {
...
"payload": {
"before": null,
"after": {
"Id": 3,
"Name": "John",
"Surname": "Smith"
},
"source": {
"version": "2.6.1.Final",
"connector": "mysql",
"name": "dbhistory",
"ts_ms": 1713183316000,
"snapshot": "last",
"db": "employees",
"sequence": null,
"ts_us": 1713183316000000,
"ts_ns": 1713183316000000000,
"table": "employee",
"server_id": 0,
"gtid": null,
"file": "binlog.000004",
"pos": 3344,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1713183316537,
"ts_us": 1713183316537164,
"ts_ns": 1713183316537164000,
"transaction": null
}
}
},
{
"schema": {
...
"payload": {
"before": null,
"after": {
"Id": 4,
"Name": "Mark",
"Surname": "Twain"
},
"source": {
"version": "2.6.1.Final",
"connector": "mysql",
"name": "dbhistory",
"ts_ms": 1713183316000,
"snapshot": "last",
"db": "employees",
"sequence": null,
"ts_us": 1713183316000000,
"ts_ns": 1713183316000000000,
"table": "employee",
"server_id": 0,
"gtid": null,
"file": "binlog.000004",
"pos": 3344,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1713183316537,
"ts_us": 1713183316537164,
"ts_ns": 1713183316537164000,
"transaction": null
}
}
}
Ora inserirai una riga aggiuntiva nella tabella employee
. In una terza sessione terminale, accedi alla console MySQL eseguendo:
sudo mysql
Passa al database dipendenti
:
USE employees;
Quindi, inserisci una nuova riga eseguendo:
INSERT INTO `employees`.`employees` (`Name`, `Surname`) VALUES ('George', 'Martin');
Verrà trasmesso in streaming fino alla fine dell'output:
{
"schema": {
...
"payload": {
"before": null,
"after": {
"Id": 5,
"Name": "George",
"Surname": "Martin"
},
"source": {
"version": "2.6.1.Final",
"connector": "mysql",
"name": "dbhistory2",
"ts_ms": 1713183573000,
"snapshot": "false",
"db": "employees",
"sequence": null,
"ts_us": 1713183573000000,
"ts_ns": 1713183573000000000,
"table": "employee",
"server_id": 1,
"gtid": null,
"file": "binlog.000004",
"pos": 3573,
"row": 0,
"thread": 64,
"query": null
},
"op": "c",
"ts_ms": 1713183573029,
"ts_us": 1713183573029781,
"ts_ns": 1713183573029781000,
"transaction": null
}
}
}
Quando hai finito, premi CTRL + C
sui rispettivi terminali per chiudere sia Kafka Connect che il consumer della console.
In questo passaggio hai configurato il connettore Debezium MySQL per Kafka Connect. Hai anche configurato ed eseguito Kafka Connect in modalità distribuita e aggiunto un connettore di origine MySQL per sincronizzare il database e Kafka. Ora imparerai come esportare i dati da Kafka in Elasticsearch.
Passaggio 3: esportazione dei dati in Elasticsearch
In questo passaggio scaricherai e compilerai il connettore Confluent Elasticsearch per Kafka Connect. Quindi, creerai un sink per esportare i dati da Kafka a Elasticsearch, che li utilizza.
Confluent fornisce un connettore per Elasticsearch nel repository GitHub ufficiale. Innanzitutto, clonalo eseguendo:
git clone https://github.com/confluentinc/kafka-connect-elasticsearch.git
Naviga verso di esso:
cd kafka-connect-elasticsearch
Quindi, chiedi a Maven di comprimerlo per la distribuzione eseguendo il comando seguente, facendo attenzione a saltare l'esecuzione di test non necessari:
mvn package -Dmaven.test.skip=true
Una volta terminato, la fine dell'output sarà simile a questa:
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10:35 min (Wall Clock)
[INFO] Finished at: 2024-04-14T14:41:12Z
[INFO] ------------------------------------------------------------------------
La libreria compilata e altre dipendenze necessarie sono ora disponibili in target/components/packages
. Come nel passaggio precedente, per rendere disponibile il plugin, dovrai aggiungere questo percorso alla configurazione di Kafka Connect. Uscire dalla directory:
cd ..
Quindi, apri connect-distributed.properties
per modificare:
nano config/connect-distributed.properties
Vai alla fine del file e trova la riga plugin.path
:
...
plugin.path=libs/
Aggiungi il nuovo percorso alla riga:
...
plugin.path=libs,kafka-connect-elasticsearch/target/components/packages
Salva e chiudi il file.
Ora definirai il sink Elasticsearch e lo memorizzerai in un file denominato elasticsearch-sink-connector.json
. Crealo e aprilo per la modifica:
nano elasticsearch-sink-connector.json
Aggiungi le seguenti righe:
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbhistory.employees.employee",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink"
}
}
Qui definisci un connettore che utilizzerà la classe ElasticsearchSinkConnector
con un'attività. Quindi, imposti gli argomenti
che dovrebbero essere monitorati, ignori le loro chiavi poiché sei interessato solo ai valori e imposti l'URL di connessione per Elasticsearch.
Salva e chiudi il file quando hai finito.
Quindi, avvia Kafka Connect in modalità distribuita:
bin/connect-distributed.sh config/connect-distributed.properties
Nel secondo terminale, aggiungilo a Kafka Connect eseguendo:
curl -d @"elasticsearch-sink-connector.json" -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Nota che Kafka Connect sta iniziando ad importare i dati:
...
[2024-04-15 10:43:24,518] INFO [elasticsearch-sink|task-0] Creating index dbhistory.employees.employee. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:227)
...
Quindi, interroga Elasticsearch per vederlo:
curl -X GET 'http://localhost:9200/dbhistory.employees.employee/_search?pretty'
Vedrai che le tre righe del database sono presenti nell'indice come eventi:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+1",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 4,
"Name": "Mark",
"Surname": "Twain"
},
...
}
},
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+0",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 3,
"Name": "John",
"Surname": "Smith"
},
...
}
},
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+2",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 5,
"Name": "George",
"Surname": "Martin"
},
...
}
}
]
}
}
Per verificare che il flusso tra origini e sink funzioni correttamente, aggiungerai un'altra riga alla tabella employee
. Nella terza sessione del terminale, accedi alla console MySQL eseguendo:
sudo mysql
Passa al database dipendenti
:
USE employees;
Quindi, inserisci la quarta riga eseguendo:
INSERT INTO `employees`.`employees` (`Name`, `Surname`) VALUES ('Robert', 'Jordan');
Interroga nuovamente Elasticsearch e vedrai che è stato raccolto:
...
{
"_index": "dbhistory.employees.employee",
"_id": "dbhistory.employees.employee+0+3",
"_score": 1,
"_source": {
"before": null,
"after": {
"Id": 6,
"Name": "Robert",
"Surname": "Jordan"
},
...
}
}
...
In questo passaggio, hai scaricato e compilato il connettore Confluent Elasticsearch. Lo hai reso disponibile a Kafka Connect e hai creato un sink per esportare i dati da Kafka agli indici Elasticsearch. Quindi, hai verificato che il flusso tra il database, Kafka ed Elasticsearch funzioni correttamente con una latenza minima.
Conclusione
In questo articolo, hai utilizzato Kafka Connect per importare dati dal file system e dal tuo database MySQL in Kafka. Hai anche imparato come compilare e importare plugin personalizzati ed esportare dati da Kafka a Elasticsearch per una successiva indicizzazione.
L'autore ha selezionato Apache Software Foundation per ricevere una donazione come parte del programma Write for DOnations.