ElasticSearch 数据同步在项目中的实践经验 作者: admin 时间: 2020-06-17 分类: 默认分类,elasticsearch #ElasticSearch 数据同步在项目中的实践经验 接触ES也有两年的时间了,在两家公司、不同的项目上接触到了不同的ES使用方法,走了不少岔路,也积累了不少的经验,接下来分析一下我接触过的项目中使用ES的方法。 ## 离线数据同步--Logstash 使用 Logstash 可以 通过简单的配置文件实现秒级的数据增量同步,但是缺点也很明显,在 MySQL->ES 千万数据量级同步时,Logstash 宿主机很容易因内存不足而杀死 Logstash 进程。 配置文件例子: ```shell input { jdbc { jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.11.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://xxxxxx:3306/datebase?useUnicode=true&characterEncoding=utf-8&useSSL=false" jdbc_user => "username" jdbc_password => "password" schedule => "0 * * * *" # 时区设置为上海 jdbc_default_timezone => "Asia/Shanghai" jdbc_page_size => "50000" jdbc_paging_enabled => true lowercase_column_names => "false" # 为true表示重启logstash重新读取数据库所有内容,false会从上次读取的内容开始往后读取 clean_run => false use_column_value => true tracking_column => "id" record_last_run => true tracking_column_type => "numeric" last_run_metadata_path => "./last_id" # 数据库文档的查询sql statement => "select * from table where id >= :sql_last_value;" tags => ["xxxxx"] } } filter { if "xxxx" in [tags] { ruby { code => "ruby代码" } } } output { if "xxxx" in [tags] { elasticsearch { hosts => ["ip:9200"] user => "user" password => "paassword" index => "index" document_id => "%{id}" } } } ``` 启动命令: ```sh nohup logstash -f xxxx.conf --config.reload.automatic >> log.out & ``` ## 通过队列异步实时同步数据 在 PHP 中可以通过 Trait 方便的为 Model 赋予可以操作 ES 的特性。 通过进一步封装`elasticsearch/elasticsearch`包,ElasticSearchTrait 可以拥有如下能力: - 别名管理 - 对数据的管理 - 对索引的管理 通过队列可以高效的将数据写入到 ES 中,并且可自定义数据组装的方法,对数据进行改写。 标签: elasticsearch, logstash, 队列, 数据同步