【ElasticSearch】全量或增量构建索引
来源:SegmentFault
时间:2023-01-25 20:31:17 168浏览 收藏
本篇文章给大家分享《【ElasticSearch】全量或增量构建索引》,覆盖了数据库的常见基础知识,其实一个语言的全部知识点一篇文章是不可能说完的,但希望通过这些问题,让读者对自己的掌握程度有一定的认识(B 数),从而弥补自己的不足,更好的掌握它。
一、全量构建索引
ElasticSearch集群环境搭建好以后,首次需要全量地从关系型数据库中将目标待索引数据写入到ElasticSearch搜索引擎中,以下我们将用到logstash的插件logstash-input-jdbc来全量同步数据。
1、Logstash管道构建全量索引
Logstash好比一个数据管道,通常一端连接着关系型数据库来往管道中输入数据,一端连接着ElasticSearch将管道中的数据输出。
下载Logstash
- 前往官网下载Logstash,需要与ElasticSearch版本一致,兼容性更好。
- 将下载的Logstash压缩包上传到服务器上,示例将Logstash安装在了与ElasticSearch同一台服务器上。
安装Logstash
#进入logstash压缩包所在目录 cd /usr/local #加压logstash压缩包 unzip logstash-7.8.0.zip
- 安装logstash的插件logstash-input-jdbc
#进入logstash安装目录的bin目录下 cd /usr/local/logstash-7.8.0/bin #安装插件,注意logstash依赖jdk,安装插件前请确保安装了jdk #当前logstash-7.8.0版本已经默认安装了logstash-integration-jdbc,而此插件已经包含logstash-input-jdbc ./logstash-plugin install logstash-input-jdbc #查看logstash所有已安装的插件 ./logstash-plugin list
配置logstash
logstash需要与mysql通信并获取数据,于是需要准备与mysql通信的驱动程序,需要准备拉取数据所需要执行的sql语句,还需要logstash的输入、输出端点。
- 创建相关目录及文件
cd /usr/local/logstash-7.8.0 mkdir mysql cd mysql touch jdbc.conf touch jdbc.sql
- mysql驱动程序,前往maven中央仓库获取
- 获取数据的sql脚本(jdbc.sql)
SELECT a.id, a.NAME, a.tags, concat( a.latitude, ',', a.longitude ) AS location, a.remark_score, a.price_per_man, a.category_id, b.NAME AS category_name, a.seller_id, c.remark_score AS seller_remark_score, c.disabled_flag AS seller_disabled_flag FROM shop a INNER JOIN category b ON a.category_id = b.id INNER JOIN seller c ON c.id = a.seller_id
- logstash输入、输出配置
input { jdbc { # mysql 数据库链接,dianping为数据库名 jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping" # 用户名和密码 jdbc_user => "root" jdbc_password => "123456" # 驱动 jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 执行的sql 文件路径+名称 statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["192.168.15.151:9200"] # 索引名称 index => "shop" document_type => "_doc" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格式输出 codec => json_lines } }
启动logstash
#进入logstash安装目录 cd /usr/local/logstash-7.8.0 #后台进程方式启动logstash nohup ./bin/logstash -f mysql/jdbc.conf & #查看logstash日志 tail -f -n 100 ./nohup.out
说明:从以上日志中看出logstash正常启动,并且每分钟会进行全量同步数据,日志中也打印出了执行的sql,首先会查个总数,为了后续的分页获取数据。
二、增量构建索引
1、Canal管道准实时构建增量索引
Canal简介
canal将自己伪装成mysql主备的slaver,通过模拟mysql主备之间的通信协议,利用mysql主备同步原理,获取binary log并解析,准实时的获取数据的变更。canal管道中数据可以流出到关系型数据库、消息队列、es等等的其它中间件或数据库存储。
- mysql主备复制原理
Canal下载
说明:当前最新的稳定版是1.1.4,我们需要下载如上图中标红的三个压缩包,之所以需要下载源码包是因为这个版本的canal.adapter仅支持6.x的es,而我用的是7.8.0的es,需要稍微修改下源码,重新编译打包canal.adapter才能使用。
Canal.deployer部署
Canal.deployer的作用是伪装成mysql主备的一个slaver,发送dump指令获取binary log。依赖于jdk,需要先安装配置jdk。
开启mysql的二进制日志
默认mysql没有开启binary log,我们需要确认mysql是否开启了binary log,如果没有开启,那么Canal的能力将无法发挥。
说明:以上标红的三行是需要配置的,配置后重启mysql服务,使配置生效。
- 验证mysql开启了binary log
mysql> show variables like 'log_bin';
为canal创建mysql用户
canal作为一个伪slaver应该单独创建一个专用用户,用root用户不合适。权限太大,过于危险。
- 创建canal用户并授权
#仅仅授予查询权限及其复制所需权限,注意密码过短会违反mysql默认密码策略 #%表示远端所有主机都能访问 grant select,replication slave,replication client on *.* to 'canal'@'%' identified by '123456'; #localhost表示仅本地当前主机能访问 grant select,replication slave,replication client on *.* to 'canal'@'localhost' identified by '123456'; #刷新权限,持久化 flush privileges;
Canal.deployer配置启动
- 修改instance.properties文件
#进入canal.deployer解压目录 cd /usr/local/canal.deployer-1.1.4 #修改配置文件 vim ./conf/example/instance.properties ########修改如下四个配置项即可,其它默认######## #设置伪slaver id,与mysql master 设置的要不同 canal.instance.mysql.slaveId=3 #mysql master地址及端口 canal.instance.master.address=192.168.15.150:3306 #mysql连接用户及密码 canal.instance.dbUsername=canal canal.instance.dbPassword=910625 #############################################
- 修改后的instance.properties文件
################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=3 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.15.150:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=910625 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
- Canal.deployer启动
#进入canal.deployer解压目录 cd /usr/local/canal.deployer-1.1.4 #启动canal.deployer ./bin/startup.sh #canal.deployer默认会暴露11111端口,与其消费者通信 netstat -an | grep 11111
Canal.adapter部署
Canal.adapter扮演着Canal.deployer的消费端的角色,作为一个适配者的身份存在,它从Canal.deployer管道中获取到数据后,通过自身的适配转存到其它数据库或者中间件做进一步处理。
canal.adapter源码小调整
由于canal.adapter在目前的1.1.4版本中不支持es7,所以需要对其源码做个小调整,让其支持es7。
下载canal源码并解压到本地
用idea打开client-adapter子模块
修改elasticsearch子模块下pom.xml
org.elasticsearch elasticsearch 7.8.0 org.elasticsearch.client transport 7.8.0 org.elasticsearch.client elasticsearch-rest-client 7.8.0 org.elasticsearch.client elasticsearch-rest-high-level-client 7.8.0
修改elasticsearch子模块下几个编译失败的点
es新版客户端api做了细微调整,基本仅需要修改如下几个文件,ESConnection.java、ESAdapter.java、ESTemplate.java就能被正确编译了。
- MappingMetaData修改成了MappingMetadata
- com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESBulkRequest#bulk修改
- com.alibaba.otter.canal.client.adapter.es.ESAdapter#count修改
说明:修改如下几个文件里的几个编译通不过的点后,可以使用maven重新编译打包。
maven重新编译打包
#回到源码根目录下用maven编译打包,跳过测试 D:\DeveloperPackage\canal\canal-canal-1.1.4>mvn clean package -Dmaven.test.skip=true
- 找到修改后重新被编译打包好的canal-adapter
#此目录即是我们可以正常使用的canal-adapter #我们仅需要将canal-adapter此目录文件打包上传到服务器后,修改些许配置文件后就能正常运行了 D:\DeveloperPackage\canal\canal-canal-1.1.4\client-adapter\launcher\target\canal-adapter
canal.adapter配置启动
将我们上一步重新编译打包好的canal-adapter上传到服务器后,解压目录如下图所示。
修改application.yml
进入canal-adapter解压目录 cd /usr/local/canal-adapter vim conf/application.yml
- 修改application.yml中数据源配置
说明:告诉canal-deployer需要关注的具体的数据库。
- 修改application.yml中适配器配置
说明:配置数据输出目的地,此处配置为es,采用transport模式与es服务通信。
- 修改后的application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://192.168.15.150:3306/dianping?useUnicode=true username: canal password: 123456 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es hosts: 192.168.15.151:9300 properties: #mode: rest # security.auth: test:123456 # only used for rest mode cluster.name: test-app
创建与es映射的配置文件
cd /usr/local/canal-adapter/conf/es touch shop.yml
- shop.yml内容如下
#配置在client-adapter的application.yml配置文件中,表示数据源 dataSourceKey: defaultDS #配置在client-adapter的application.yml配置文件中,表示管道中数据的目的地,如es destination: example groupId: esMapping: #es索引名称 _index: shop _type: _doc _id: id #表示文档不存在就插入,存在则更新 upsert: true sql: "select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c.id = a.seller_id" commitBatch: 3000
启动canal.adapter
cd /usr/local/canal-adapter #执行启动脚本 ./bin/startup.sh #关闭脚本 ./bin/stop.sh
- 查看canal.adapter启动日志
tail -f -n 100 logs/adapter/adapter.log
说明:此时canal.adapter成功启动,可以尝试修改mysql库中的记录,验证是否准实时地同步到了es中。
Canal自定义客户端
canal.adapter不够灵活,处理多表连接还可能会有问题,如字段冲突等。我们可以自定义canal客户端接入canal服务端,从canal服务端批量获取变更记录,然后利用es客户端更新索引。
相关依赖
org.elasticsearch.client elasticsearch-rest-client 7.8.0 org.elasticsearch elasticsearch 7.8.0 org.elasticsearch.client elasticsearch-rest-high-level-client 7.8.0 com.alibaba.otter canal.client 1.1.4 com.alibaba.otter canal.common 1.1.4 com.alibaba.otter canal.protocol 1.1.4
canal客户端连接器
package com.imooc.dianping.canal; import com.alibaba.google.common.collect.Lists; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.springframework.beans.factory.DisposableBean; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; @Component public class CanalClient implements DisposableBean{ private CanalConnector canalConnector; @Bean public CanalConnector getCanalConnector(){ canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList( new InetSocketAddress("192.168.15.157", 11111)), "example","canal","910625" ); canalConnector.connect(); //订阅,可指定filter,格式{database}.{table} canalConnector.subscribe(); //回滚寻找上次中断的为止 canalConnector.rollback(); return canalConnector; } @Override public void destroy() throws Exception { if(canalConnector != null){ canalConnector.disconnect(); } } }
es客户端
package com.imooc.dianping.config; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ElasticsearchRestClient { @Value("${elasticsearch.ip}") String ipAddress; /** * 高级别的ES客户端,与es server通信 * @return */ @Bean(name="highLevelClient") public RestHighLevelClient highLevelClient(){ String[] address = ipAddress.split(":"); String ip = address[0]; int port = Integer.valueOf(address[1]); HttpHost httpHost = new HttpHost(ip,port,"http"); return new RestHighLevelClient(RestClient.builder(new HttpHost[]{httpHost})); } }
通过定时任务定时从canal服务拉取变更记录
package com.imooc.dianping.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.imooc.dianping.dal.ShopModelMapper; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Component public class CanalScheduling implements Runnable,ApplicationContextAware { private ApplicationContext applicationContext; @Autowired private ShopModelMapper shopModelMapper; @Resource private CanalConnector canalConnector; @Autowired private RestHighLevelClient restHighLevelClient; @Override @Scheduled(fixedDelay = 100) public void run() { long batchId = -1; try{ int batchSize = 1000; //从canal服务端批量拉取数据(未确认) Message message = canalConnector.getWithoutAck(batchSize); batchId = message.getId(); Listentries = message.getEntries(); //判断正确返回了数据,并处理 if(batchId != -1 && entries.size() > 0){ entries.forEach(entry -> { if(entry.getEntryType() == CanalEntry.EntryType.ROWDATA){ //解析处理 publishCanalEvent(entry); } }); } //向canal服务端确认,客户端已成功消费 canalConnector.ack(batchId); }catch(Exception e){ e.printStackTrace(); //出现异常,根据消息的编号进行回滚,下次继续消费 canalConnector.rollback(batchId); } } /** * 处理canal服务端返回的数据 * @param entry */ private void publishCanalEvent(CanalEntry.Entry entry){ CanalEntry.EventType eventType = entry.getHeader().getEventType(); //当前数据库名称 String database = entry.getHeader().getSchemaName(); //当前表的名称 String table = entry.getHeader().getTableName(); //改变的行数据 CanalEntry.RowChange change = null; try { change = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); return; } change.getRowDatasList().forEach(rowData -> { List columns = rowData.getAfterColumnsList(); String primaryKey = "id"; CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null); Map dataMap = parseColumnsToMap(columns); try{ indexES(dataMap,database,table); } catch (IOException e) { e.printStackTrace(); } }); } /** * 解析行数据 * @param columns * @return */ Map parseColumnsToMap(List columns){ Map jsonMap = new HashMap(); columns.forEach(column -> { if(column == null){ return; } jsonMap.put(column.getName(),column.getValue()); }); return jsonMap; } /** * 更新索引 * @param dataMap 行数数据 * @param database 数据库索引 * @param table 表 * @throws IOException */ private void indexES(Map dataMap,String database,String table) throws IOException { if(!StringUtils.equals("dianping",database)){ return; } List
说明:通过以上的定时任务定时从canal服务端拉取数据,然后通过es客户端将变更更新到对应的索引中。
2、Logstash管道构建增量索引
其实在以上Logstash管道构建全量索引的基础上,略微修改即可实现基于时间轴的增量构建索引。仅仅需要修改jdbc.conf、新增一个记录时间的文件,修改jdbc.sql文件加入时间条件即可。
获取数据的sql脚本(jdbc.sql)
SELECT a.id, a.NAME, a.tags, concat( a.latitude, ',', a.longitude ) AS location, a.remark_score, a.price_per_man, a.category_id, b.NAME AS category_name, a.seller_id, c.remark_score AS seller_remark_score, c.disabled_flag AS seller_disabled_flag FROM shop a INNER JOIN category b ON a.category_id = b.id INNER JOIN seller c ON c.id = a.seller_id WHERE a.update_at > : sql_last_value OR b.update_at > : sql_last_value OR c.update_at > : sql_last_value
说明:加入了时间条件,控制抓取时间窗口内的数据,增量更新。
logstash输入、输出配置
input { jdbc { #设置时区timezone jdbc_default_timezone => "Asia/Shanghai" # mysql 数据库链接,dianping为数据库名 jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping" # 用户名和密码 jdbc_user => "root" jdbc_password => "910625" # 驱动 jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #记录上次运行元数据文件路径,如记录增量同步的时间窗口 last_run_metadata_path => "/usr/local/logstash-7.8.0/mysql/last_value_meta" # 执行的sql 文件路径+名称 statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["192.168.15.151:9200"] # 索引名称 index => "shop" document_type => "_doc" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格式输出 codec => json_lines } }
说明:增加了时区的配置,以及指定了记录上次运行元数据文件路径,如记录增量同步的时间窗口。
新增last_value_meta文件,并写入一个初始时间
2020-07-12 00:00:00
说明:记录上次执行同步任务时的时间,下次任务会继续更新这个时间。
Logstash管道构建增量索引的缺点
这种以时间窗口的方式进行数据同步,始终避免不了同步延迟的问题,但是有时候业务上也是能够允许的,增量同步的时间窗口需要更具业务来定。时间窗口太短,数据更新太多,会出现一次增量同步任务还没完成,下一次增量任务又开始了。
好了,本文到此结束,带大家了解了《【ElasticSearch】全量或增量构建索引》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多数据库知识!
-
499 收藏
-
244 收藏
-
235 收藏
-
157 收藏
-
101 收藏
-
176 收藏
-
368 收藏
-
475 收藏
-
266 收藏
-
273 收藏
-
283 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习