Persiapan Library dan database
Lakukan installasi Library pendukung seperti java dan semisalnya
sudo apt install -y default-jdk postgresql python3.10-venv
Ubah konfigurasi /etc/postgresql/14/main/postgresql.conf
sesuaikan setting nya menjadi seperti berikut
wal_level = logical max_wal_senders = 10 max_replication_slots = 10
Edit file /etc/postgresql/14/main/pg_hba.conf
tambahkan baris ini
host all debezium 127.0.0.1/32 md5
Buat user postgreql untuk digunakan oleh debezium
sudo su postgres createuser -sP debezium
Restart Postgresql
sudo service postgresql restart
Installasi Zookeeper
Download Zookeper dengan perintah ini
wget -c https://dlcdn.apache.org/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1-bin.tar.gz
Extract Package Zookeper dengan perintah ini
tar -zxvf apache-zookeeper*bin.tar.gz
Pindahkan direktori hasil extract tadi ke /opt
sudo mv apache-zookeeper*bin /opt/zookeeper
Buat direktori untuk menyimpan data
sudo mkdir /data
Buat user untuk menjalankan zookeper
sudo useradd -m -d /data/zookeeper -s /bin/bash zookeeper
Buat file configurasi /opt/zookeeper/conf/zoo.cfg
kemudian isi dengan konten berikut
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper clientPort=2181
Ubah permission /opt/zookeeper
sudo chown -R zookeeper /opt/zookeeper
Buat systemd service file untuk zookeeper di /etc/systemd/system/zookeeper.service
kemudian isi dengan baris-baris ini
[Unit] Description=Zookeeper Daemon Documentation=http://zookeeper.apache.org Requires=network.target After=network.target [Service] Type=forking WorkingDirectory=/opt/zookeeper User=zookeeper Group=zookeeper ExecStart=/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo.cfg ExecStop=/opt/zookeeper/bin/zkServer.sh stop /opt/zookeeper/conf/zoo.cfg ExecReload=/opt/zookeeper/bin/zkServer.sh restart /opt/zookeeper/conf/zoo.cfg TimeoutSec=30 Restart=on-failure [Install] WantedBy=default.target
Reload systemd daemon dan juga service zookeeper dengan beberapa baris perintah ini
sudo systemctl daemon-reload sudo systemctl enable zookeeper sudo systemctl start zookeeper
Cek status service zookeper dengan perintah
sudo systemctl status zookeeper
Pastikan muncul informasi aktif seperti ini
Installasi Kafka
Komponen berikutnya adalah Kafka. Berikut ini langkah-langkah nya
Downlaod paket installasi Kafka
wget -c https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
Ekstrak paket installasi tersebut
tar -zxvf kafka*.tgz
Pindahakan hasil extraksi tersebut ke direktori /opt/kafka
sudo mv kafka_*-*/ /opt/kafka
Buat user untuk menjalankan kafka
sudo useradd -d /opt/kafka -s /bin/bash kafka
Buat systemd service file untuk kafka di /etc/systemd/system/kafka.service
, isi dengan berikut
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1' ExecStop=/opt/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
Edit file /opt/kafka/config/server.properties
, pada bagian bawah tambahkan baris berikut ini
delete.topic.enable = true log.dirs=/opt/kafka/logs
Ubah kepemilikan direktori kafka
sudo chown -R kafka:kafka /opt/kafka
Reload systemd daemon dan juga service kafka dengan beberapa baris perintah ini
sudo systemctl daemon-reload sudo systemctl enable kafka sudo systemctl start kafka
Installasi debezium Connect
Selanjutnya kita akan mulai melakukan configurasi Debezium connector untuk postgresql
Buat direktori untuk meletakan debezium connector
sudo install -o kafka -d /opt/kafka/connectors
Download Connector untuk Postgresql
wget -c https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.4.0.Final/debezium-connector-postgres-2.4.0.Final-plugin.tar.gz
Ekstak file hasil download tersebut
tar -zxvf debezium-connector-postgres*.tar.gz
Pindahkan connector ke direktori /opt/kafka/connectors/
sudo mv debezium-connector-postgres /opt/kafka/connectors/
Download Connector untuk jdbc
wget -c https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.4.0.Final/debezium-connector-jdbc-2.4.0.Final-plugin.tar.gz
Ekstak file hasil download tersebut
tar -zxvf debezium-connector-jdbc*.tar.gz
Pindahkan connector ke direktori /opt/kafka/connectors/
sudo mv debezium-connector-jdbc /opt/kafka/connectors/
Downlaod jdbc connector dari confluent
wget -c https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar
Pindahkan library tersebut ke tempat connector
sudo mv kafka-connect-jdbc*.jar /opt/kafka/connectors/debezium-connector-jdbc/
Rubah konfigurasi kafka agar dia tau dimana mencari connector nya, edit file /opt/kafka/config/connect-standalone.properties
, tambahkan baris ini
plugin.path=/opt/kafka/connectors/
Buat systemd service file untuk kafka connect di /etc/systemd/system/kafka-connect.service
, isi dengan berikut
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties' ExecStop=/opt/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
Reload systemd daemon dan juga service kafka-connect dengan beberapa baris perintah ini
sudo systemctl daemon-reload sudo systemctl enable kafka-connect sudo systemctl start kafka-connect
Buat file bridge.json sebagai config untuk connector nya
{ "name": "bridge-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "127.0.0.1", "database.port": "5432", "database.user": "debezium", "database.password": "debezium", "database.dbname": "bridge", "database.server.name": "kkp", "topic.prefix": "common_profile", "table.whitelist": "public.common_profile" } }
Daftarkan configurasi connector itu menggunakan curl
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @bridge.json
Cek apakah bener connector sudah terdaftar
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/bridge-connector
Cek apakah benar topic yang kita inginkan sudah terbuat
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
Pastikan topic nya muncul
Membuat Kafka Consumer untuk mengupdate Database
Kita akan menggunakan python untuk membangun kafka consumer yang kemudian akan mengupdate database lain.
Buat Virtualenv dengan perintah ini
python3 -m venv tg
Buat Direktori untuk menyimpan kode kita
mkdir tenggiri
Buat file requirements.txt di dalam direktori yang baru kita buat dan isi dengan ini
kafka-python psycopg2-binary
Install library yang kita butuhkan dengan perintah
~/tg/bin/pip3 install -r requirements.txt
Buat file tg.py dan isi dengan baris-baris ini
import json import psycopg2 from kafka import KafkaConsumer TOPIC = "common_profile.public.common_profile" BOOTSTRAP_SERVER = "127.0.0.1:9092" pg_conn = psycopg2.connect("dbname='[target db>]' user='[target db user]' host='localhost' password='[target db password]'") consumer = KafkaConsumer(TOPIC, bootstrap_servers=[BOOTSTRAP_SERVER]) try: print("Listening on %s .." % consumer.subscription()) while True: for msg in consumer: if msg.value: data = json.loads(msg.value)['payload'] with pg_conn.cursor() as curr: if data['op'] == "u": payload = data['after'] curr.execute("update profiles_profile set name = '%s' where nik = '%s'" % (payload['nama'], payload['nip'])) elif data['op'] == "d": print("delete record here") elif data['op'] == "c": print("create record here") pg_conn.commit() except KeyboardInterrupt: print("Detecting termination, quiting. Bye")
Kita bisa menjalankan nya dengan perintah
~/tg/bin/python tg.py
Sekarang apa bila kita mengubah record di satu database maka record dengan id yang sama di database lain akan ikut terupdate. Selamat mencoba