Persiapan Library dan database

Lakukan installasi Library pendukung seperti java dan semisalnya

sudo apt install -y default-jdk postgresql python3.10-venv

Ubah konfigurasi /etc/postgresql/14/main/postgresql.conf sesuaikan setting nya menjadi seperti berikut

wal_level = logical
max_wal_senders = 10
max_replication_slots = 10

Edit file /etc/postgresql/14/main/pg_hba.conf tambahkan baris ini

host    all             debezium             127.0.0.1/32            md5

Buat user postgreql untuk digunakan oleh debezium

sudo su postgres
createuser -sP debezium

Restart Postgresql

sudo service postgresql restart

Installasi Zookeeper

Download Zookeper dengan perintah ini

wget -c https://dlcdn.apache.org/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1-bin.tar.gz

Extract Package Zookeper dengan perintah ini

tar -zxvf apache-zookeeper*bin.tar.gz

Pindahkan direktori hasil extract tadi ke /opt

sudo mv apache-zookeeper*bin /opt/zookeeper

Buat direktori untuk menyimpan data

sudo mkdir /data

Buat user untuk menjalankan zookeper

sudo useradd -m -d /data/zookeeper -s /bin/bash zookeeper

Buat file configurasi /opt/zookeeper/conf/zoo.cfg kemudian isi dengan konten berikut

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181

Ubah permission /opt/zookeeper

sudo chown -R zookeeper /opt/zookeeper

Buat systemd service file untuk zookeeper di /etc/systemd/system/zookeeper.service kemudian isi dengan baris-baris ini

[Unit]
Description=Zookeeper Daemon
Documentation=http://zookeeper.apache.org
Requires=network.target
After=network.target

[Service]
Type=forking
WorkingDirectory=/opt/zookeeper
User=zookeeper
Group=zookeeper
ExecStart=/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo.cfg
ExecStop=/opt/zookeeper/bin/zkServer.sh stop /opt/zookeeper/conf/zoo.cfg
ExecReload=/opt/zookeeper/bin/zkServer.sh restart /opt/zookeeper/conf/zoo.cfg
TimeoutSec=30
Restart=on-failure

[Install]
WantedBy=default.target

Reload systemd daemon dan juga service zookeeper dengan beberapa baris perintah ini

sudo systemctl daemon-reload
sudo systemctl enable zookeeper
sudo systemctl start zookeeper

Cek status service zookeper dengan perintah

sudo systemctl status zookeeper

Pastikan muncul informasi aktif seperti ini

Installasi Kafka

Komponen berikutnya adalah Kafka. Berikut ini langkah-langkah nya

Downlaod paket installasi Kafka

wget -c https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz

Ekstrak paket installasi tersebut

tar -zxvf kafka*.tgz

Pindahakan hasil extraksi tersebut ke direktori /opt/kafka

sudo mv kafka_*-*/ /opt/kafka

Buat user untuk menjalankan kafka

sudo useradd -d /opt/kafka -s /bin/bash kafka

Buat systemd service file untuk kafka di /etc/systemd/system/kafka.service, isi dengan berikut

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1'
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Edit file /opt/kafka/config/server.properties, pada bagian bawah tambahkan baris berikut ini

delete.topic.enable = true
log.dirs=/opt/kafka/logs

Ubah kepemilikan direktori kafka

sudo chown -R kafka:kafka /opt/kafka

Reload systemd daemon dan juga service kafka dengan beberapa baris perintah ini

sudo systemctl daemon-reload
sudo systemctl enable kafka
sudo systemctl start kafka

Installasi debezium Connect

Selanjutnya kita akan mulai melakukan configurasi Debezium connector untuk postgresql

Buat direktori untuk meletakan debezium connector

sudo install -o kafka -d /opt/kafka/connectors

Download Connector untuk Postgresql

wget -c https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.4.0.Final/debezium-connector-postgres-2.4.0.Final-plugin.tar.gz

Ekstak file hasil download tersebut

tar -zxvf debezium-connector-postgres*.tar.gz

Pindahkan connector ke direktori /opt/kafka/connectors/

sudo mv debezium-connector-postgres /opt/kafka/connectors/

Download Connector untuk jdbc

wget -c https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.4.0.Final/debezium-connector-jdbc-2.4.0.Final-plugin.tar.gz

Ekstak file hasil download tersebut

tar -zxvf debezium-connector-jdbc*.tar.gz

Pindahkan connector ke direktori /opt/kafka/connectors/

sudo mv debezium-connector-jdbc /opt/kafka/connectors/

Downlaod jdbc connector dari confluent

wget -c https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar

Pindahkan library tersebut ke tempat connector

sudo mv kafka-connect-jdbc*.jar /opt/kafka/connectors/debezium-connector-jdbc/

Rubah konfigurasi kafka agar dia tau dimana mencari connector nya, edit file /opt/kafka/config/connect-standalone.properties, tambahkan baris ini

plugin.path=/opt/kafka/connectors/

Buat systemd service file untuk kafka connect di /etc/systemd/system/kafka-connect.service, isi dengan berikut

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties'
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Reload systemd daemon dan juga service kafka-connect dengan beberapa baris perintah ini

sudo systemctl daemon-reload
sudo systemctl enable kafka-connect
sudo systemctl start kafka-connect

Buat file bridge.json sebagai config untuk connector nya

{
        "name": "bridge-connector",
        "config": {
                "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                "plugin.name": "pgoutput",
                "database.hostname": "127.0.0.1",
                "database.port": "5432",
                "database.user": "debezium",
                "database.password": "debezium",
                "database.dbname": "bridge",
                "database.server.name": "kkp",
                "topic.prefix": "common_profile",
                "table.whitelist": "public.common_profile"
        }
}

Daftarkan configurasi connector itu menggunakan curl

 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/  -d @bridge.json

Cek apakah bener connector sudah terdaftar

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/bridge-connector

Cek apakah benar topic yang kita inginkan sudah terbuat

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

Pastikan topic nya muncul

Membuat Kafka Consumer untuk mengupdate Database

Kita akan menggunakan python untuk membangun kafka consumer yang kemudian akan mengupdate database lain.

Buat Virtualenv dengan perintah ini

python3 -m venv tg

Buat Direktori untuk menyimpan kode kita

mkdir tenggiri

Buat file requirements.txt di dalam direktori yang baru kita buat dan isi dengan ini

kafka-python
psycopg2-binary

Install library yang kita butuhkan dengan perintah

~/tg/bin/pip3 install -r requirements.txt

Buat file tg.py dan isi dengan baris-baris ini

import json
import psycopg2
from kafka import KafkaConsumer

TOPIC = "common_profile.public.common_profile"
BOOTSTRAP_SERVER = "127.0.0.1:9092"

pg_conn = psycopg2.connect("dbname='[target db>]' user='[target db user]' host='localhost' password='[target db password]'")

consumer = KafkaConsumer(TOPIC, bootstrap_servers=[BOOTSTRAP_SERVER])

try:
    print("Listening on %s .." % consumer.subscription())
    while True:
        for msg in consumer:
            if msg.value:
                data = json.loads(msg.value)['payload']
                with pg_conn.cursor() as curr:
                    if data['op'] == "u":
                        payload = data['after']
                        curr.execute("update profiles_profile set name = '%s' where nik = '%s'" % (payload['nama'], payload['nip']))
                    elif data['op'] == "d":
                        print("delete record here")
                    elif data['op'] == "c":
                        print("create record here")
                pg_conn.commit()

except KeyboardInterrupt:
    print("Detecting termination, quiting. Bye")

Kita bisa menjalankan nya dengan perintah

~/tg/bin/python tg.py

Sekarang apa bila kita mengubah record di satu database maka record dengan id yang sama di database lain akan ikut terupdate. Selamat mencoba

Categories: Tutorial

Avatar photo

Bramandityo Prabowo

Suka makan dan tentu saja suka masak. Tertarik dengan Functional Programing, Distributed System, Network Security, Operating System Customization, Virtualization dan NoSQL. Language of choices nya adalah Python, Bash, Go, Erlang, Nimlang. Rust dan Ocaml.