你好,歡迎進(jìn)入江蘇優(yōu)軟數(shù)字科技有限公司官網(wǎng)!
發(fā)布時(shí)間:2023-07-03
瀏覽次數(shù):0
點(diǎn)擊上方“中間件興趣圈”,選擇“設(shè)為明星”
做一個(gè)積極向上的人,越努力,越幸運(yùn)!
源碼解析Canal系列開始了,一個(gè)全新的系列,可以講解canal本身的實(shí)現(xiàn)原理,也是作者源碼閱讀方法的展示。
1、應(yīng)用場(chǎng)景
提到Canal,大家應(yīng)該都會(huì)想到它是一個(gè)用于解析MySQL日志、將MySQL數(shù)據(jù)庫中的數(shù)據(jù)同步到其他存儲(chǔ)介質(zhì)等的工具。
也就是Canal的一個(gè)非常常見的使用場(chǎng)景:數(shù)據(jù)異構(gòu),一種更高層次的數(shù)據(jù)讀寫分離架構(gòu)設(shè)計(jì)技術(shù)。
隨著業(yè)務(wù)的不斷發(fā)展,當(dāng)企業(yè)發(fā)展到一定階段,發(fā)現(xiàn)單一的關(guān)系型數(shù)據(jù)庫已經(jīng)無法支撐業(yè)務(wù)快速發(fā)展帶來的數(shù)據(jù)不斷積累的壓力,于是一種設(shè)計(jì)框架就誕生了:分?jǐn)?shù)據(jù)庫和子表。 分庫分表確實(shí)是一個(gè)非常好的緩解單庫數(shù)據(jù)庫壓力的方案,但是卻導(dǎo)致了另一個(gè)困境,就是對(duì)關(guān)聯(lián)查詢不友好,甚至跨庫JOIN更是如此。
舉例如下: 比如在一個(gè)訂單系統(tǒng)中,一般有兩類用戶需要查詢訂單,一類是顧客,一類是店鋪。 一個(gè)店鋪的訂單數(shù)據(jù)會(huì)分布在不同的數(shù)據(jù)庫中。 如果使用()來劃分?jǐn)?shù)據(jù)庫,那么同一個(gè)用戶訂購的所有訂單數(shù)據(jù)將會(huì)分布在不同的數(shù)據(jù)庫中。 這樣關(guān)聯(lián)查詢就必須跨數(shù)據(jù)庫Join,成本會(huì)很低。 而里面的場(chǎng)景只能滿足一方的需求,那怎么辦呢?
Canal此時(shí)首次亮相。 在電子商務(wù)設(shè)計(jì)中,雖然商店和顧客會(huì)被拆分成兩個(gè)不同的服務(wù),但我們可以為這兩個(gè)不同的服務(wù)構(gòu)建不同的數(shù)據(jù)庫集群。 我們可以使用用戶訂單庫,商戶訂單數(shù)據(jù)庫分?jǐn)?shù)據(jù)庫,用戶訂單數(shù)據(jù)庫為主數(shù)據(jù)庫。 當(dāng)用戶在訂單系統(tǒng)下訂單時(shí),數(shù)據(jù)進(jìn)入用戶訂單數(shù)據(jù)庫,然后可以通過canal挖掘數(shù)據(jù)庫的日志,然后將數(shù)據(jù)同步到商店訂單庫,同時(shí)用戶訂單庫根據(jù)用戶ID分庫intellij idea 數(shù)據(jù)庫關(guān)系圖,店鋪訂單庫根據(jù)店鋪ID分庫,完美解決了問題。
2. 架構(gòu)設(shè)計(jì)原則
了解了Canal的基本使用場(chǎng)景后,我們通過Canal官方文檔探索其核心架構(gòu)設(shè)計(jì)理念,從而打開并步入Canal的神秘世界。
首先我們簡(jiǎn)單了解一下MySQL的主從同步原理:
在此插入圖片描述
從上圖可以看出,主從復(fù)制主要分為三步:
基于MySQL的這些數(shù)據(jù)同步機(jī)制,Canal的設(shè)計(jì)目標(biāo)主要是實(shí)現(xiàn)數(shù)據(jù)同步,即數(shù)據(jù)復(fù)制。 從里面的圖來看,很自然的想到了下面的設(shè)計(jì):
原理比較簡(jiǎn)單:
我們先看一下整體的組成部分:
闡明:
模塊:
我暫時(shí)不打算深入研究這種組件,因?yàn)楝F(xiàn)階段我自己也不了解,但這是我后續(xù)學(xué)習(xí)和研究的重點(diǎn)。
3.在IDEA中運(yùn)行Demo
Linux環(huán)境下安裝canal比較簡(jiǎn)單。 您可以按照官方指南一步步進(jìn)行。 這里我就不再重復(fù)介紹了。 本節(jié)的主要目的是在開發(fā)工具中運(yùn)行Canal的demo,方便后續(xù)研究源碼。 過程中遇到困難時(shí)可以進(jìn)行調(diào)試。
溫馨提示:在學(xué)習(xí)過程中,您可以先按照官方文檔安裝Canal,這對(duì)于了解Canal的核心組件非常有幫助。
首先從canal源碼中尋找官方的Demo。 示例代碼在包中,如右圖所示:
不過有點(diǎn)遺憾的是canal提供的示例代碼只包含端相關(guān)的代碼,并沒有包含-side(),所以我們重點(diǎn)關(guān)注它的單元測(cè)試intellij idea 數(shù)據(jù)庫關(guān)系圖,如右圖所示:
然后根據(jù)官方的一些提示和我自己的理解,編寫了如下測(cè)試代碼,在IDEA開發(fā)工具中運(yùn)行Canal相關(guān)的Demo。 下面的代碼已經(jīng)測(cè)試過,可以直接使用。
1. 運(yùn)河演示
package?com.alibaba.otter.canal.server;
import?com.alibaba.otter.canal.instance.core.CanalInstance;
import?com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import?com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import?com.alibaba.otter.canal.instance.manager.model.Canal;
import?com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import?com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import?com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
import?java.net.InetSocketAddress;
import?java.nio.ByteBuffer;
import?java.util.Arrays;
public?class?CanalServerTestMain?{
????protected?static?final?String?ZK_CLUSTER_ADDRESS??????=?"127.0.0.1:2181";
????protected?static?final?String?DESTINATION???=?"example";
????protected?static?final?String?DETECTING_SQL?=?"select?1";
????protected?static?final?String?MYSQL_ADDRESS?=?"127.0.0.1";
????protected?static?final?String?USERNAME??????=?"canal";
????protected?static?final?String?PASSWORD??????=?"canal";
????protected?static?final?String?FILTER????????=?".\\\\*\\\\\\\\\\\\\\\\..\\\\*";
????/**?默認(rèn)?500s?后關(guān)閉?*/
????protected?static?final?long?RUN_TIME?=?120?*?1000;
????private?final?ByteBuffer?header????????=?ByteBuffer.allocate(4);
????private?CanalServerWithNetty?nettyServer;
????public?static?void?main(String[]?args)?{
????????CanalServerTestMain?test?=?new?CanalServerTestMain();
????????try?{
????????????test.setUp();
????????????System.out.println("start");
????????}?catch?(Throwable?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????System.out.println("sleep");
????????????try?{
????????????????Thread.sleep(RUN_TIME);
????????????}?catch?(Throwable?ee)?{
????????????}
????????????test.tearDown();
????????????System.out.println("end");
????????}
????}
????public?void?setUp()?{
????????CanalServerWithEmbedded?embeddedServer?=?new?CanalServerWithEmbedded();
????????embeddedServer.setCanalInstanceGenerator(new?CanalInstanceGenerator()?{
????????????public?CanalInstance?generate(String?destination)?{
????????????????Canal?canal?=?buildCanal();
????????????????return?new?CanalInstanceWithManager(canal,?FILTER);
????????????}
????????});
????????nettyServer?=?CanalServerWithNetty.instance();
????????nettyServer.setEmbeddedServer(embeddedServer);
????????nettyServer.setPort(11111);
????????nettyServer.start();
????????//?啟動(dòng)?instance
????????embeddedServer.start("example");
????}
????public?void?tearDown()?{
????????nettyServer.stop();
????}
????private?Canal?buildCanal()?{
????????Canal?canal?=?new?Canal();
????????canal.setId(1L);
????????canal.setName(DESTINATION);
????????canal.setDesc("test");
????????CanalParameter?parameter?=?new?CanalParameter();
????????//parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));
????????parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);
????????parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);
????????parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
????????parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);
????????parameter.setMemoryStorageBufferSize(32?*?1024);
????????parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
????????parameter.setDbAddresses(Arrays.asList(new?InetSocketAddress(MYSQL_ADDRESS,?3306),
????????????????new?InetSocketAddress(MYSQL_ADDRESS,?3306)));
????????parameter.setDbUsername(USERNAME);
????????parameter.setDbPassword(PASSWORD);
????????parameter.setSlaveId(1234L);
????????parameter.setDefaultConnectionTimeoutInSeconds(30);
????????parameter.setConnectionCharset("UTF-8");
????????parameter.setConnectionCharsetNumber((byte)?33);
????????parameter.setReceiveBufferSize(8?*?1024);
????????parameter.setSendBufferSize(8?*?1024);
????????parameter.setDetectingEnable(false);
????????parameter.setDetectingIntervalInSeconds(10);
????????parameter.setDetectingRetryTimes(3);
????????parameter.setDetectingSQL(DETECTING_SQL);
????????canal.setCanalParameter(parameter);
????????return?canal;
????}
}
2. 運(yùn)河演示
package?com.alibaba.otter.canal.example;
import?java.net.InetSocketAddress;
import?java.util.List;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.common.utils.AddressUtils;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.alibaba.otter.canal.protocol.Message;
import?com.alibaba.otter.canal.protocol.CanalEntry.Column;
import?com.alibaba.otter.canal.protocol.CanalEntry.EventType;
public?class?SimpleCanalClientExample?{
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建鏈接
????????CanalConnector?connector?=?CanalConnectors.newSingleConnector(new?InetSocketAddress(AddressUtils.getHostIp(),
????????????????11111),?"example",?"",?"");
????????int?batchSize?=?1000;
????????int?emptyCount?=?0;
????????try?{
????????????connector.connect();
????????????connector.subscribe(".*\\\\..*");
????????????connector.rollback();
????????????int?totalEmptyCount?=?3000;
????????????while?(emptyCount?????????????????Message?message?=?connector.getWithoutAck(batchSize);?//?獲取指定數(shù)量的數(shù)據(jù)
????????????????long?batchId?=?message.getId();
????????????????int?size?=?message.getEntries().size();
????????????????if?(batchId?==?-1?||?size?==?0)?{
????????????????????emptyCount++;
????????????????????System.out.println("empty?count?:?"?+?emptyCount);
????????????????????try?{
????????????????????????Thread.sleep(1000);
????????????????????}?catch?(InterruptedException?e)?{
????????????????????}
????????????????}?else?{
????????????????????emptyCount?=?0;
????????????????????//?System.out.printf("message[batchId=%s,size=%s]?\\n",?batchId,?size);
????????????????????printEntry(message.getEntries());
????????????????}
????????????????connector.ack(batchId);?//?提交確認(rèn)
????????????????//?connector.rollback(batchId);?//?處理失敗,?回滾數(shù)據(jù)
????????????}
????????????System.out.println("empty?too?many?times,?exit");
????????}?finally?{
????????????connector.disconnect();
????????}
????}
????private?static?void?printEntry(List?entrys) ?{
????????for?(CanalEntry.Entry?entry?:?entrys)?{
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{
????????????????continue;
????????????}
????????????CanalEntry.RowChange?rowChage?=?null;
????????????try?{
????????????????rowChage?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????}?catch?(Exception?e)?{
????????????????throw?new?RuntimeException("ERROR?##?parser?of?eromanga-event?has?an?error?,?data:"?+?entry.toString(),
????????????????????????e);
????????????}
????????????CanalEntry.EventType?eventType?=?rowChage.getEventType();
????????????System.out.println(String.format("================>?binlog[%s:%s]?,?name[%s,%s]?,?eventType?:?%s",
????????????????????entry.getHeader().getLogfileName(),?entry.getHeader().getLogfileOffset(),
????????????????????entry.getHeader().getSchemaName(),?entry.getHeader().getTableName(),
????????????????????eventType));
????????????for?(CanalEntry.RowData?rowData?:?rowChage.getRowDatasList())?{
????????????????if?(eventType?==?CanalEntry.EventType.DELETE)?{
????????????????????printColumn(rowData.getBeforeColumnsList());
????????????????}?else?if?(eventType?==?EventType.INSERT)?{
????????????????????printColumn(rowData.getAfterColumnsList());
????????????????}?else?{
????????????????????System.out.println("------->?before");
????????????????????printColumn(rowData.getBeforeColumnsList());
????????????????????System.out.println("------->?after");
????????????????????printColumn(rowData.getAfterColumnsList());
????????????????}
????????????}
????????}
????}
????private?static?void?printColumn(List?columns) ?{
????????for?(Column?column?:?columns)?{
????????????System.out.println(column.getName()?+?"?:?"?+?column.getValue()?+?"????update="?+?column.getUpdated());
????????}
????}
}
手術(shù)療效如右圖所示:
改變數(shù)據(jù)庫中的一條數(shù)據(jù),方便形成新的日志,輸出結(jié)果如下:
能夠在IDEA中構(gòu)建并運(yùn)行demo是我們進(jìn)入canal的第一步。 未來我們將按照官方文檔中的內(nèi)容為綱領(lǐng),嘗試逐步解鎖canal的實(shí)現(xiàn)原理,從而更好地指導(dǎo)實(shí)踐。
本文就先到此為止,運(yùn)河系列即將開始連載,敬請(qǐng)期待。
原創(chuàng)不易,如果對(duì)您有幫助,請(qǐng)點(diǎn)擊【尋找】這篇文章,這將是我寫出更多優(yōu)質(zhì)文章的最強(qiáng)勁動(dòng)力。
歡迎加入我的知識(shí)星球,一起交流源碼,講解結(jié)構(gòu),解密億級(jí)訂單的結(jié)構(gòu)設(shè)計(jì)和實(shí)踐經(jīng)驗(yàn),構(gòu)建優(yōu)質(zhì)的技術(shù)交流圈,為廣大用戶提供優(yōu)質(zhì)的問答服務(wù)廣大明星朋友。 長(zhǎng)按以下二維碼即可加入。
如有侵權(quán)請(qǐng)聯(lián)系刪除!
Copyright ? 2023 江蘇優(yōu)軟數(shù)字科技有限公司 All Rights Reserved.正版sublime text、Codejock、IntelliJ IDEA、sketch、Mestrenova、DNAstar服務(wù)提供商
13262879759
微信二維碼