Elasticsearch

2 minute read

Published:

This is my personal notes for elasticsearch.

Introduction

Elasticsearch是一个分布式的 RESTful 风格的搜索和数据分析引擎, Kibana是一个方便的ES插件
python操作ES也十分方便, 导入es包,实例化es之后便可以进行各种操作

Elasticsearch建立在Lucene基础之上,底层采用Lucene来实现文件的读写操作, Elasticsearch基于lucene,又不简单地只是lucene,它完美地将lucene与分布式系统结合,既利用了lucene的检索能力,又具有了分布式系统的众多优点。

Elasticsearch通过引入分片概念,成功地将lucene部署到分布式系统中,增强了系统的可靠性和扩展性。

Elasticsearch通过定期refresh lucene in-momory-buffer中的数据,使得ES具有了近实时的写入和查询能力。

Elasticsearch通过引入translog,多副本,以及定期执行flush,merge等操作保证了数据可靠性和较高的存储性能。

Elasticsearch通过存储_source字段结合verison字段实现了文档的局部更新,使得ES的使用方式更加灵活多样。


ES存储方式

avatar

不同于MySQL和HBase, ES的存储方式是索引存储

关系数据库 ⇒ 数据库 ⇒ 表 ⇒ 行 ⇒ 列(Columns)
Elasticsearch ⇒ 索引 ⇒ 类型 ⇒ 文档 ⇒ 字段(Fields)
一个 Elasticsearch 集群可以包含多个索引(数据库),其中包含了很多类型(表)。这些类型中包含了很多的文档(行),然后每个文档中又包含了很多的字段(列)。每条记录都是一个文档, 用JSON序列化。


ES索引

Elasticsearch最关键的就是提供强大的索引能力了, Elasticsearch索引的精髓:一切设计都是为了提高搜索的性能, 另一层意思:为了提高搜索的性能,难免会牺牲某些其他方面,比如插入/更新。

往Elasticsearch里插入一条记录,其实就是直接PUT一个json的对象,这个对象有多个fields,比如name, sex, age, interests,那么在插入这些数据到Elasticsearch的同时,Elasticsearch还自动为这些字段建立索引–倒排索引,因为Elasticsearch最核心功能是搜索。

  • 倒排索引:

    传统的我们的检索是通过文章,逐个遍历找到对应关键词的位置。
    而倒排索引,是通过分词策略,形成了词和文章的映射关系表,这种词典+映射表, 即为倒排索引。
    有了倒排索引,就能实现 o(1)时间复杂度的效率检索文章了,极大的提高了检索效率。

    avatar


ES写入方案

  • 分布式设计

    为了支持对海量数据的存储和查询,Elasticsearch引入分片的概念,一个索引被分成多个分片,每个分片可以有一个主分片和多个副本分片,每个分片副本都是一个具有完整功能的lucene实例。分片可以分配在不同的服务器上,同一个分片的不同副本不能分配在相同的服务器上。

    在进行写操作时,ES会根据传入的_routing参数(或mapping中设置的_routing, 如果参数和设置中都没有则默认使用_id), 按照公式 shard_num=hash(\routing)%num_primary_shards,计算出文档要分配到的分片,在从集群元数据中找出对应主分片的位置,将请求路由到该分片进行文档写操作。

  • 近实时性

    avatar

    如果每次一条数据写入内存后立即写到硬盘文件上,会严重降低es的性能。因此es在设计时在memory buffer和硬盘间加入了Linux的页面高速缓存(File system cache)来提高es的写效率。

    当写请求发送到es后,es将数据暂时写入memory buffer中,此时写入的数据还不能被查询到。默认设置下,es每1秒钟将memory buffer中的数据refresh到Linux的File system cache,并清空memory buffer,此时写入的数据就可以被查询到了。

    对于需要写入后实时查询的数据,可以通过手动refresh操作将memory buffer的数据立即写入到File system cache。

  • 数据存储可靠性

    • 1 translog

      File system cache依然是内存数据,一旦断电,则数据全部丢失。因此需要通过translog来保证即使因为断电File system cache数据丢失,es重启后也能通过日志回放找回丢失的数据。

      translog默认设置下,每一个index、delete、update或bulk请求都会直接fsync写入硬盘。为了保证translog不丢失数据,在每一次请求之后执行fsync确实会带来一些性能问题。对于一些允许丢失几秒钟数据的场景下,可以通过设置index.translog.durability和index.translog.sync_interval参数让translog每隔一段时间才调用fsync将事务日志数据写入硬盘。

    • 2 flush

      每30分钟ES会触发一次flush操作,File system cache中的数据会持久化到disk中 translog中的数据会被清空。

  • 写入流程

    ES的任意节点都可以作为协调节点(coordinating node)接受请求,当协调节点接受到请求后进行一系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发给primary shard, primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。大致流程如下图:

    avatar


ES索引数据过多如何调优,部署

  • (1) 动态索引层面

    基于模板+时间+rollover api 滚动创建索引,举例:设计阶段定义:blog 索引的模板格式为:blog_index_时间戳的形式,每天递增数据。

    这样做的好处:不至于数据量激增导致单个索引数据量非常大,接近于上线 2 的32 次幂-1,索引存储达到了 TB+甚至更大。一旦单个索引很大,存储等各种风险也随之而来,所以要提前考虑+及早避免。

  • (2) 存储层面

    冷热数据分离存储,热数据(比如最近 3 天或者一周的数据),其余为冷数据。

    对于冷数据不会再写入新数据,可以考虑定期 force_merge 加 shrink 压缩操作,节省存储空间和检索效率。

  • (3) 部署层面

    一旦之前没有规划,这里就属于应急策略。

    结合 ES 自身的支持动态扩展的特点,动态新增机器的方式可以缓解集群压力

    注意:如果之前主节点等规划合理,不需要重启集群也能完成动态新增的。


在并发情况下,Elasticsearch如果保证读写一致

  • 可以通过版本号使用乐观并发控制(OCC),以确保新版本不会被旧版本覆盖,由应用层来处理具体的冲突;

  • 对于写操作,一致性级别支持quorum/one/all,默认为quorum,即只有当大多数分片可用时才允许写操作。但即使大多数可用,也可能存在因为网络等原因导致写入副本失败,这样该副本被认为故障,分片将会在一个不同的节点上重建。

  • 对于读操作,可以设置replication为sync(默认),这使得操作在主分片和副本分片都完成后才会返回;如果设置replication为async时,也可以通过设置搜索请求参数_preference为primary来查询主分片,确保文档是最新版本。

  • Note:

    • OCC是用来解决写-写冲突的无锁并发控制,认为事务间争用没有那么多,所以先进行修改,在提交事务前,检查一下事务开始后,有没有新提交改变,如果没有就提交,如果有就放弃并重试。乐观并发控制类似自选锁。乐观并发控制适用于低数据争用,写冲突比较少的环境。

    • MVCC是用来解决读-写冲突的无锁并发控制,也就是为事务分配单向增长的时间戳,为每个修改保存一个版本,版本与事务时间戳关联,读操作只读该事务开始前的数据库的快照。 这样在读操作不用阻塞写操作,写操作不用阻塞读操作的同时,避免了脏读和不可重复读。


搜索数据的底层原理

查询过程大体上分为查询和取回这两个阶段,广播查询请求到所有相关分片,并将它们的响应整合成全局排序后的结果集合,这个结果集合会返回给客户端。

  • 1 查询阶段

    • a 当一个节点接收到一个搜索请求,这这个节点就会变成协调节点,第一步就是将广播请求到搜索的每一个节点的分片拷贝,查询请求可以被某一个主分片或某一个副分片处理,协调节点将在之后的请求中轮训所有的分片拷贝来分摊负载。
    • b 每一个分片将会在本地构建一个优先级队列,如果客户端要求返回结果排序中从from 名开始的数量为size的结果集,每一个节点都会产生一个from+size大小的结果集,因此优先级队列的大小也就是from+size,分片仅仅是返回一个轻量级的结果给协调节点,包括结果级中的每一个文档的ID和进行排序所需要的信息。
    • c 协调节点将会将所有的结果进行汇总,并进行全局排序,最总得到排序结果。
  • 2 取值阶段

    • a 查询过程得到的排序结果,标记处哪些文档是符合要求的,此时仍然需要获取这些文档返回给客户端
    • b 协调节点会确定实际需要的返回的文档,并向含有该文档的分片发送get请求,分片获取的文档返回给协调节点,协调节点将结果返回给客户端。

实例展示

from elasticsearch import Elasticsearch
es = Elasticsearch(['mbd-rec-elasticsearch-online006-bdwg.xxx.xxx:xxxx'], port=9200) #集群的一个节点

  • ES查询

    example 1

    “must”==and, ‘should’==or, ‘range’==between, ‘exists’==非空, ‘match_phrase’==’==’

          body2={
          "query": {
              "bool": {
              "must": [
                          {"range": { "tag_cnt": {"gte": "1","lt": None}}},
                          {"range": {"duration": {"gte": 61, "lt": 900}}},
    
                          {"exists": {"field": "father_id"}},
    
                          {"match_phrase": {
                              "access": {
                                  "query": True
                                  }
                              }
                          },
    
    
                          {"match_phrase": {
                              "soft_porn": {
                                  "query": False
                              }
                          }
                          },
    
                          {
                              "bool": {
                                  "should": [
    
                                      {"match_phrase": {
                                          "lchannel": {
                                              "query": 1
                                          }
                                      }
                                      },
    
                                      {"match_phrase": {
                                          "lchannel": {
                                              "query": 2
                                          }
                                      }
                                      },
    
                                      {"match_phrase": {
                                          "lchannel": {
                                              "query": 3
                                          }
                                      }
                                      },
    
                                      {"match_phrase": {
                                          "lchannel": {
                                              "query": 4
                                          }
                                      }
                                      },
    
                                      {"match_phrase": {
                                          "lchannel": {
                                              "query": 6
                                          }
                                      }
                                      },
    
                                      {"match_phrase": {
                                          "lchannel": {
                                              "query": 15
                                          }
                                      }
                                      },
    
                                      {"match_phrase": {
                                          "lchannel": {
                                              "query": 31
                                          }
                                      }
                                      }
    
                                  ],
                                  "minimum_should_match": 1
    
                              }
                          }
    
              ]
              }
          }
          }
    
          page2 = es.search(index='stv', doc_type='stv', body=body2)
          DS_short=page2['hits']['total']
            
    

    example 2

    这里增加了聚合操作’aggs’,相当于distinct count(‘father_id’), 统计符合条件的 father_id 数量

    
          body4={
              "aggs": {
                  "by_fatherId": {
                      "cardinality": {
                          "field": "father_id"
                      }
                  }
              },
    
              "query": {
              "bool": {
              "must": [
                          {"range": { "home_cps_time": {"gte": "now-7d/d","lt": "now"}}},
                          {"range": {"album": {"gte": 0, "lt": 1}}},
                          {"exists": {"field": "father_id"}},
              ]
              }
          }
          }
    
          page4 = es.search(index='stv', doc_type='stv', body=body4)
          corpus_long2=page4['aggregations']['by_fatherId']['value']
    
    
  • ES写入

    1 es更新(update)操作, 这里需要id已经在es中存在,会覆盖原来的数据, body可传入各种形式的json数据

    
          body = {"doc": {
              'lidCount_online': lidCount_online,
              'sidCount_online': sidCount_online,
              'lidCount_rate': lidCount_rate,
              'sidCount_rate': sidCount_rate
          }
      }
    
          es.update(index='corpus_daily_statistics', doc_type='st', id=date, body=body)
    
    

    2 es插入(index)操作, 这里新建一个id为Today的es记录,body可传入各种形式的json数据
    如 {‘A’:{‘B’:[{‘C’:’D’},{‘E’:’F’}]}}

    
          es.index(index='core_indicator_daily_analysis', doc_type='st', id=Today, body=data)