国产精品高清一区二区三区不卡-国产精品一区二区三区免费视频-日韩免费高清一级毛片-亚洲欧美一区二区三区国产精品-日韩欧美一区二区三区不卡视频-亚欧免费视频一区二区三区-亚洲欧美日韩一区成人-欧美日韩视频综合一区无弹窗-精品日韩在线视频一区二区三区-国内精品视频一区二区三区

你好,歡迎進(jìn)入江蘇優(yōu)軟數(shù)字科技有限公司官網(wǎng)!

誠(chéng)信、勤奮、創(chuàng)新、卓越

友好定價(jià)、專業(yè)客服支持、正版軟件一站式服務(wù)提供

13262879759

工作日:9:00-22:00

數(shù)據(jù)異構(gòu)之 Canal 初探(技巧篇)

發(fā)布時(shí)間:2023-07-25

瀏覽次數(shù):0

源碼解析Canal系列開(kāi)始了,一個(gè)全新的系列,可以講解canal本身的實(shí)現(xiàn)原理,也是作者源碼閱讀方法的展示。

1、應(yīng)用場(chǎng)景

提到Canal,大家應(yīng)該都會(huì)想到它是一個(gè)用于解析MySQL日志、將MySQL數(shù)據(jù)庫(kù)中的數(shù)據(jù)同步到其他存儲(chǔ)介質(zhì)等的工具。

也就是Canal的一個(gè)非常常見(jiàn)的使用場(chǎng)景:數(shù)據(jù)異構(gòu),一種更高層次的數(shù)據(jù)讀寫(xiě)分離架構(gòu)設(shè)計(jì)技術(shù)。

隨著業(yè)務(wù)的不斷發(fā)展,當(dāng)企業(yè)發(fā)展到一定階段,發(fā)現(xiàn)單一的關(guān)系型數(shù)據(jù)庫(kù)已經(jīng)無(wú)法支撐業(yè)務(wù)快速發(fā)展帶來(lái)的數(shù)據(jù)不斷積累的壓力,于是一種設(shè)計(jì)框架就誕生了:分庫(kù)分表。 分庫(kù)分表確實(shí)是一個(gè)非常好的緩解單庫(kù)數(shù)據(jù)庫(kù)壓力的方案,但是卻導(dǎo)致了另一個(gè)困境,就是對(duì)關(guān)聯(lián)查詢不友好,甚至跨庫(kù)JOIN更是如此。

舉例如下:比如在一個(gè)訂單系統(tǒng)中,一般有兩類用戶需要查詢訂單,一類是顧客,一類是店鋪。 當(dāng)數(shù)據(jù)庫(kù)分庫(kù)分庫(kù)時(shí),如果使用()來(lái)分庫(kù),那么同一個(gè)店鋪的訂單數(shù)據(jù)就會(huì)分布在不同的數(shù)據(jù)庫(kù)中。 而里面的場(chǎng)景只能滿足一方的需求,那怎么辦呢?

Canal此時(shí)已首次亮相。 在電子商務(wù)設(shè)計(jì)中,雖然商店和顧客會(huì)被拆分成兩個(gè)不同的服務(wù),但我們可以為這兩個(gè)不同的服務(wù)構(gòu)建不同的數(shù)據(jù)庫(kù)集群。 我們可以分為用戶訂單數(shù)據(jù)庫(kù)和商戶訂單數(shù)據(jù)庫(kù),其中用戶訂單數(shù)據(jù)庫(kù)為主數(shù)據(jù)庫(kù)。 使用店鋪ID作為分庫(kù),完美解決問(wèn)題。

2. 架構(gòu)設(shè)計(jì)原則

了解了Canal的基本使用場(chǎng)景后,我們通過(guò)Canal官方文檔探索其核心架構(gòu)設(shè)計(jì)理念,從而打開(kāi)并步入Canal的神秘世界。

首先我們簡(jiǎn)單了解一下MySQL的主從同步原理:

關(guān)系庫(kù)的標(biāo)準(zhǔn)語(yǔ)言_intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖_關(guān)系庫(kù)中的數(shù)據(jù)表的關(guān)系

從上圖可以看出,主從復(fù)制主要分為三步:

基于MySQL的這些數(shù)據(jù)同步機(jī)制intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖,Canal的設(shè)計(jì)目標(biāo)主要是實(shí)現(xiàn)數(shù)據(jù)同步,即數(shù)據(jù)復(fù)制。 從里面的圖來(lái)看,很自然的想到了下面的設(shè)計(jì):

關(guān)系庫(kù)中的數(shù)據(jù)表的關(guān)系_intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖_關(guān)系庫(kù)的標(biāo)準(zhǔn)語(yǔ)言

原理比較簡(jiǎn)單:

我們先看一下整體的組成部分:

intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖_關(guān)系庫(kù)中的數(shù)據(jù)表的關(guān)系_關(guān)系庫(kù)的標(biāo)準(zhǔn)語(yǔ)言

闡明:

模塊:

我暫時(shí)不打算深入研究這種組件,因?yàn)楝F(xiàn)階段我自己也不了解,但這是我后續(xù)學(xué)習(xí)和研究的重點(diǎn)。

3.在IDEA中運(yùn)行

Linux環(huán)境下安裝canal比較簡(jiǎn)單。 可以按照官方指導(dǎo)一步步安裝。 這里我就不再重復(fù)介紹了。 本節(jié)的主要目的是在開(kāi)發(fā)工具中運(yùn)行Canal的demo,以便大家在研究源碼的過(guò)程中遇到困難時(shí)可以進(jìn)行調(diào)試。

溫馨提示:在學(xué)習(xí)過(guò)程中,您可以先按照官方文檔安裝Canalintellij idea 數(shù)據(jù)庫(kù)關(guān)系圖,這對(duì)于了解Canal的核心組件非常有幫助。

首先從canal源碼中尋找官方的Demo。 示例代碼在包中,如右圖所示:

關(guān)系庫(kù)的標(biāo)準(zhǔn)語(yǔ)言_關(guān)系庫(kù)中的數(shù)據(jù)表的關(guān)系_intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖

不過(guò)有點(diǎn)遺憾的是canal提供的示例代碼只包含端相關(guān)的代碼,并沒(méi)有包含端(),所以我們重點(diǎn)關(guān)注它的單元測(cè)試,如右圖所示:

關(guān)系庫(kù)中的數(shù)據(jù)表的關(guān)系_關(guān)系庫(kù)的標(biāo)準(zhǔn)語(yǔ)言_intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖

然后根據(jù)官方的一些提示和我自己的理解,編寫(xiě)了如下測(cè)試代碼,在IDEA開(kāi)發(fā)工具中運(yùn)行Canal相關(guān)的Demo。 下面的代碼已經(jīng)測(cè)試過(guò),可以直接使用。

1. 運(yùn)河演示

package com.alibaba.otter.canal.server;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class CanalServerTestMain {
    protected static final String ZK_CLUSTER_ADDRESS      = "127.0.0.1:2181";
    protected static final String DESTINATION   = "example";
    protected static final String DETECTING_SQL = "select 1";
    protected static final String MYSQL_ADDRESS = "127.0.0.1";
    protected static final String USERNAME      = "canal";
    protected static final String PASSWORD      = "canal";
    protected static final String FILTER        = ".\\\\*\\\\\\\\\\\\\\\\..\\\\*";
    /** 默認(rèn) 500s 后關(guān)閉 */
    protected static final long RUN_TIME = 120 * 1000;
    private final ByteBuffer header        = ByteBuffer.allocate(4);
    private CanalServerWithNetty nettyServer;
    public static void main(String[] args) {
        CanalServerTestMain test = new CanalServerTestMain();
        try {
            test.setUp();
            System.out.println("start");
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            System.out.println("sleep");
            try {
                Thread.sleep(RUN_TIME);
            } catch (Throwable ee) {
            }
            test.tearDown();
            System.out.println("end");
        }
    }
    public void setUp() {
        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
        embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
            public CanalInstance generate(String destination) {
                Canal canal = buildCanal();
                return new CanalInstanceWithManager(canal, FILTER);
            }
        });
        nettyServer = CanalServerWithNetty.instance();
        nettyServer.setEmbeddedServer(embeddedServer);
        nettyServer.setPort(11111);
        nettyServer.start();
        // 啟動(dòng) instance
        embeddedServer.start("example");
    }
    public void tearDown() {
        nettyServer.stop();
    }
    private Canal buildCanal() {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(DESTINATION);
        canal.setDesc("test");
        CanalParameter parameter = new CanalParameter();
        //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));
        parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);
        parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);
        parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
        parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);
        parameter.setMemoryStorageBufferSize(32 * 1024);
        parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
                new InetSocketAddress(MYSQL_ADDRESS, 3306)));
        parameter.setDbUsername(USERNAME);
        parameter.setDbPassword(PASSWORD);
        parameter.setSlaveId(1234L);
        parameter.setDefaultConnectionTimeoutInSeconds(30);
        parameter.setConnectionCharset("UTF-8");
        parameter.setConnectionCharsetNumber((byte) 33);
        parameter.setReceiveBufferSize(8 * 1024);
        parameter.setSendBufferSize(8 * 1024);
        parameter.setDetectingEnable(false);
        parameter.setDetectingIntervalInSeconds(10);
        parameter.setDetectingRetryTimes(3);
        parameter.setDetectingSQL(DETECTING_SQL);
        canal.setCanalParameter(parameter);
        return canal;
    }
}

2. 運(yùn)河演示

package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
public class SimpleCanalClientExample {
    public static void main(String[] args) {
        // 創(chuàng)建鏈接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\\\..*");
            connector.rollback();
            int totalEmptyCount = 3000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \\n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交確認(rèn)
                // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

手術(shù)療效如右圖所示:

關(guān)系庫(kù)中的數(shù)據(jù)表的關(guān)系_關(guān)系庫(kù)的標(biāo)準(zhǔn)語(yǔ)言_intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖

改變數(shù)據(jù)庫(kù)中的一條數(shù)據(jù),方便形成新的日志,輸出結(jié)果如下:

intellij idea 數(shù)據(jù)庫(kù)關(guān)系圖_關(guān)系庫(kù)的標(biāo)準(zhǔn)語(yǔ)言_關(guān)系庫(kù)中的數(shù)據(jù)表的關(guān)系

能夠在IDEA中構(gòu)建并運(yùn)行demo是我們進(jìn)入canal的第一步。 未來(lái)我們將按照官方文檔中的內(nèi)容為綱領(lǐng),嘗試逐步解鎖canal的實(shí)現(xiàn)原理,從而更好地指導(dǎo)實(shí)踐。

本文到此結(jié)束。 歡迎您留言與作者分享。 三聯(lián)是對(duì)我最大的鼓勵(lì)和支持(點(diǎn)贊、評(píng)論、轉(zhuǎn)發(fā))。

如有侵權(quán)請(qǐng)聯(lián)系刪除!

13262879759

微信二維碼