女人自慰AV免费观看内涵网,日韩国产剧情在线观看网址,神马电影网特片网,最新一级电影欧美,在线观看亚洲欧美日韩,黄色视频在线播放免费观看,ABO涨奶期羡澄,第一导航fulione,美女主播操b

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

跨機房ES同步實戰(zhàn)

OSC開源社區(qū) ? 來源:OSCHINA 社區(qū) ? 作者:京東云開發(fā)者-謝澤 ? 2022-12-13 15:10 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

背景眾所周知單個機房在出現(xiàn)不可抗拒的問題(如斷電、斷網(wǎng)等因素)時,會導致無法正常提供服務,會對業(yè)務造成潛在的損失。所以在協(xié)同辦公領(lǐng)域,一種可以基于同城或異地多活機制的高可用設(shè)計,在保障數(shù)據(jù)一致性的同時,能夠最大程度降低由于機房的僅單點可用所導致的潛在高可用問題,最大程度上保障業(yè)務的用戶體驗,降低單點問題對業(yè)務造成的潛在損失顯得尤為重要。同城雙活,對于生產(chǎn)的高可用保障,重大的意義和價值是不可言喻的。表面上同城雙活只是簡單的部署了一套生產(chǎn)環(huán)境而已,但是在架構(gòu)上,這個改變的影響是巨大的,無狀態(tài)應用的高可用管理、請求流量的管理、版本發(fā)布的管理、網(wǎng)絡(luò)架構(gòu)的管理等,其提升的架構(gòu)復雜度巨大。結(jié)合真實的協(xié)同辦公產(chǎn)品:京辦(為北京市政府提供協(xié)同辦公服務的綜合性平臺)生產(chǎn)環(huán)境面對的復雜的政務網(wǎng)絡(luò)以及京辦同城雙活架構(gòu)演進的案例,給大家介紹下京辦持續(xù)改進、分階段演進過程中的一些思考和實踐經(jīng)驗的總結(jié)。本文僅針對 ES 集群在跨機房同步過程中的方案和經(jīng)驗進行介紹和總結(jié)。

架構(gòu)

1.部署 Logstash 在金山云機房上,Logstash 啟動多個實例(按不同的類型分類,提高同步效率),并且和金山云機房的 ES 集群在相同的 VPC2.Logstash 需要配置大網(wǎng)訪問權(quán)限,保證 Logstash 和 ES 原集群和目標集群互通。3.數(shù)據(jù)遷移可以全量遷移和增量遷移,首次遷移都是全量遷移后續(xù)的增加數(shù)據(jù)選擇增量遷移。4.增量遷移需要改造增加識別的增量數(shù)據(jù)的標識,具體方法后續(xù)進行介紹。ab134d5c-7a83-11ed-8abf-dac502259ad0.png?原理

Logstash 工作原理

ab315e3c-7a83-11ed-8abf-dac502259ad0.png??Logstash 分為三個部分 input 、filter、ouput:1.input 處理接收數(shù)據(jù),數(shù)據(jù)可以來源 ES,日志文件,kafka 等通道.2.filter 對數(shù)據(jù)進行過濾,清洗。3.ouput 輸出數(shù)據(jù)到目標設(shè)備,可以輸出到 ES,kafka,文件等。

增量同步原理

1. 對于 T 時刻的數(shù)據(jù),先使用 Logstash 將 T 以前的所有數(shù)據(jù)遷移到有孚機房京東云 ES,假設(shè)用時?T2. 對于 T 到 T+?T 的增量數(shù)據(jù),再次使用 logstash 將數(shù)據(jù)導入到有孚機房京東云的 ES 集群3. 重復上述步驟 2,直到?T 足夠小,此時將業(yè)務切換到華為云,最后完成新增數(shù)據(jù)的遷移適用范圍:ES 的數(shù)據(jù)中帶有時間戳或者其他能夠區(qū)分新舊數(shù)據(jù)的標簽

流程

?ab61eb2e-7a83-11ed-8abf-dac502259ad0.png?

準備工作

1.創(chuàng)建 ECS 和安裝 JDK 忽略,自行安裝即可2.下載對應版本的 Logstash,盡量選擇與 Elasticsearch 版本一致,或接近的版本安裝即可https://www.elastic.co/cn/downloads/logstash

1)源碼下載直接解壓安裝包,開箱即用

2)修改對內(nèi)存使用,logstash 默認的堆內(nèi)存是 1G,根據(jù) ECS 集群選擇合適的內(nèi)存,可以加快集群數(shù)據(jù)的遷移效率。

ab81d11e-7a83-11ed-8abf-dac502259ad0.png

?3. 遷移索引

Logstash 會幫助用戶自動創(chuàng)建索引,但是自動創(chuàng)建的索引和用戶本身的索引會有些許差異,導致最終數(shù)據(jù)的搜索格式不一致,一般索引需要手動創(chuàng)建,保證索引的數(shù)據(jù)完全一致。

以下提供創(chuàng)建索引的 python 腳本,用戶可以使用該腳本創(chuàng)建需要的索引。

create_mapping.py 文件是同步索引的 python 腳本,config.yaml 是集群地址配置文件。

注:使用該腳本需要安裝相關(guān)依賴

yum install -y PyYAML
yum install -y python-requests

拷貝以下代碼保存為 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys

defhelp():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    
    example:  
    python create_mapping.py -c config.yaml 
    """
defprocess_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias inlist(aliases.keys()):
        if alias == dest_index:
            print(
                "source index "+ dest_index +" alias "+ alias +" is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro ={"opendistro":{"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
defput_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers ={'Content-Type':'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code !=200:
        print(
            "create index "+ url +" failed with response: "+str(create_resp)+", source index is "+ source_index)
        print(create_resp.text)
        withopen(source_index +".json","w")as f:
            json.dump(mapping, f)
defmain():
    config_yaml ="config.yaml"
    opts, args = getopt.getopt(sys.argv[1:],'-h-c:',['help','config='])
    for opt_name, opt_value in opts:
        if opt_name in('-h','--help'):
            help()
            exit()
        if opt_name in('-c','--config'):
            config_yaml = opt_value

    config_file =open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth =None
    if source_user !="":
        source_auth =(source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth =None
    if dest_user !="":
        dest_auth =(dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)

    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index"+ source_index +", target index: "+ dest_index)
            source_url = source +"/"+ source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    response.status_code)+" response is "+ response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index "+ source_index +" to target index "+ dest_index +" successed.")
    else:
        # get all indices
        response = requests.get(source +"/_alias", auth=source_auth)
        if response.status_code !=200:
            print("*** get all index failed. resp statusCode:"+str(
                response.status_code)+" response is "+ response.text)
            exit()
        all_index = response.json()
        for index inlist(all_index.keys()):
            if"."in index:
                continue
            print("start to process source index"+ index)
            source_url = source +"/"+ index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    index_response.status_code)+" response is "+ index_response.text)
                continue
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index "+ index +" to target index "+ dest_index +" successed.")

if __name__ =='__main__':
    main()

配置文件保存為 config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只處理這個文件中mapping地址的索引
# 如果設(shè)置成true,則只會將下面的mapping中的索引獲取到并在目的端創(chuàng)建
# 如果設(shè)置成false,則會取源端集群的所有索引,除去(.kibana)
# 并且將索引名稱與下面的mapping匹配,如果匹配到使用mapping的value作為目的端的索引名稱
# 如果匹配不到,則使用源端原始的索引名稱
only_mapping: true

# 要遷移的索引,key為源端的索引名字,value為目的端的索引名字
mapping:
    source_index: dest_index

以上代碼和配置文件準備完成,直接執(zhí)行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目標集群的 kibana 上查看或者執(zhí)行 curl 查看索引遷移情況:

GET _cat/indices?v

?ab95c520-7a83-11ed-8abf-dac502259ad0.png

??全量遷移Logstash 配置位于 config 目錄下。用戶可以參考配置修改 Logstash 配置文件,為了保證遷移數(shù)據(jù)的準確性,一般建議建立多組 Logstash,分批次遷移數(shù)據(jù),每個 Logstash 遷移部分數(shù)據(jù)。配置集群間遷移配置參考:abc4a7aa-7a83-11ed-8abf-dac502259ad0.png

?

input{
    elasticsearch{
        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
        # 需要遷移的索引列表,以逗號分隔,支持通配符
        index => "a_*,b_*"
        # 以下三項保持默認即可,包含線程數(shù)和遷移數(shù)據(jù)大小和logstash jvm配置相關(guān)
        docinfo=>true
        slices => 10
        size => 2000
        scroll => "60m"
    }
}

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
 # 目的端索引名稱,以下配置為和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置為和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目標端數(shù)據(jù)的_id,如果不需要保留原_id,可以刪除以下這行,刪除后性能會更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

    # 調(diào)試信息,正式遷移去掉
    stdout { codec => rubydebug { metadata => true }}
}

增量遷移

預處理:

1.@timestamp在 elasticsearch2.0.0beta 版本后棄用

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html

2. 本次對于京辦從金山云機房遷移到京東有孚機房,所涉及到的業(yè)務領(lǐng)域多,各個業(yè)務線中所代表新增記錄的時間戳字段不統(tǒng)一,所涉及到的兼容工作量大,于是考慮通過 elasticsearch 中預處理功能 pipeline 進行預處理添加統(tǒng)一增量標記字段:gmt_created_at,以減少遷移工作的復雜度(各自業(yè)務線可自行評估是否需要此步驟)。

PUT _ingest/pipeline/gmt_created_at
{
  "description":"Adds gmt_created_at timestamp to documents",
  "processors":[
    {
      "set":{
        "field":"_source.gmt_created_at",
        "value":"{{_ingest.timestamp}}"
      }
    }
  ]
}

3. 檢查 pipeline 是否生效

GET _ingest/pipeline/*

4. 各個 index 設(shè)置對應 settings 增加 pipeline 為默認預處理

PUT index_xxxx/_settings
{
  "settings": {
    "index.default_pipeline": "gmt_created_at"
  }
}

5. 檢查新增 settings 是否生效

GET index_xxxx/_settings

?ac0e3f46-7a83-11ed-8abf-dac502259ad0.png

??增量遷移腳本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的 DSL,統(tǒng)一 gmt_create_at 為增量同步的特殊標記

schedule: 每分鐘同步一把,"* * * * *"

input {
elasticsearch {
        hosts =>["ip:port"]
        # 安全集群配置登錄用戶名密碼
        user =>"username"
        password =>"password"
        index =>"index_*"
        query =>'{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size =>5000
        scroll =>"5m"
        docinfo =>true
        schedule =>"* * * * *"
      }
}
filter {
     mutate {
      remove_field =>["source", "@version"]
   }
}
output {
    elasticsearch {
        # 目的端es地址
        hosts =>["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user =>"username"
        password =>"password"
        index =>"%{[@metadata][_index]}"
        document_type =>"%{[@metadata][_type]}"
        document_id =>"%{[@metadata][_id]}"
        ilm_enabled =>false
        manage_template =>false
    }

# 調(diào)試信息,正式遷移去掉
stdout { codec => rubydebug { metadata =>true}}
}

問題:

mapping 中存在 join 父子類型的字段,直接遷移報 400 異常?ac4a1f70-7a83-11ed-8abf-dac502259ad0.png ?
[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解決方法:

https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140https://github.com/elastic/elasticsearch/issues/26183

結(jié)合業(yè)務特征,通過在 filter 中加入小量的 ruby 代碼,將_routing 的值取出來,放回 logstah event 中,由此問題得以解決。

示例:

ac725e7c-7a83-11ed-8abf-dac502259ad0.png


審核編輯 :李倩


聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 數(shù)據(jù)
    +關(guān)注

    關(guān)注

    8

    文章

    7256

    瀏覽量

    91826
  • 數(shù)據(jù)遷移
    +關(guān)注

    關(guān)注

    0

    文章

    84

    瀏覽量

    7113

原文標題:跨機房 ES 同步實戰(zhàn)

文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關(guān)推薦
    熱點推薦

    機房托管費詳細分析

    機房托管費是一個復雜而多變的話題,它受到多種因素的影響,以下是對機房托管費用的詳細分析,主機推薦小編為您整理發(fā)布機房托管費詳細分析。
    的頭像 發(fā)表于 02-28 09:48 ?457次閱讀

    機房精密空調(diào)故障?排查步驟看這!

    機房精密空調(diào)作為維持機房環(huán)境穩(wěn)定的關(guān)鍵設(shè)備,其故障排查工作至關(guān)重要。下面聊一下排查機房精密空調(diào)故障的詳細步驟。
    的頭像 發(fā)表于 02-17 15:48 ?551次閱讀
    <b class='flag-5'>機房</b>精密空調(diào)故障?排查步驟看這!

    機房施工—機房吊頂與靜電地板怎樣安裝?

    為了滿足機房的高效、穩(wěn)定運行,吊頂和靜電地板的安裝成為機房建設(shè)的關(guān)鍵環(huán)節(jié)。下面聊一下機房吊頂與靜電地板的安裝施工方案。 吊頂安裝: 1、材料選擇:選用輕鋼龍骨、石膏板等符合國家標準的材料,確保
    的頭像 發(fā)表于 02-07 19:00 ?328次閱讀
    <b class='flag-5'>機房</b>施工—<b class='flag-5'>機房</b>吊頂與靜電地板怎樣安裝?

    機房空調(diào)—機房送風與回風設(shè)計常見問題和解決方法

    機房送風與回風設(shè)計是確保機房穩(wěn)定運行的重要環(huán)節(jié)。然而,在實際設(shè)計和應用中,常常會遇到一些問題。下面聊一下機房送風與回風設(shè)計常見問題。 一、送風系統(tǒng)設(shè)計常見問題 1、送風口布局不合理
    的頭像 發(fā)表于 02-07 10:37 ?605次閱讀
    <b class='flag-5'>機房</b>空調(diào)—<b class='flag-5'>機房</b>送風與回風設(shè)計常見問題和解決方法

    ES-DEV-ES7W8020DB用戶指南

    電子發(fā)燒友網(wǎng)站提供《ES-DEV-ES7W8020DB用戶指南.pdf》資料免費下載
    發(fā)表于 01-16 15:31 ?0次下載
    <b class='flag-5'>ES-DEV-ES</b>7W8020DB用戶指南

    ES-DEV-ES32W0030DB用戶指南

    電子發(fā)燒友網(wǎng)站提供《ES-DEV-ES32W0030DB用戶指南.pdf》資料免費下載
    發(fā)表于 01-16 15:30 ?0次下載
    <b class='flag-5'>ES-DEV-ES</b>32W0030DB用戶指南

    ES-PDS-ES32F3696LX-V1.1原理圖

    電子發(fā)燒友網(wǎng)站提供《ES-PDS-ES32F3696LX-V1.1原理圖.pdf》資料免費下載
    發(fā)表于 01-16 15:20 ?0次下載
    <b class='flag-5'>ES-PDS-ES</b>32F3696LX-V1.1原理圖

    ES-PDS-ES32F0100DB1原理圖

    電子發(fā)燒友網(wǎng)站提供《ES-PDS-ES32F0100DB1原理圖.pdf》資料免費下載
    發(fā)表于 01-16 15:15 ?0次下載
    <b class='flag-5'>ES-PDS-ES</b>32F0100DB1原理圖

    機房精密空調(diào)安裝指南

    1、確認精密空調(diào)型號和規(guī)格是否符合機房需求,確保精密空調(diào)能夠滿足機房的制冷、除濕、加濕等需求。
    的頭像 發(fā)表于 10-25 17:44 ?796次閱讀
    <b class='flag-5'>機房</b>精密空調(diào)安裝指南

    動環(huán)監(jiān)控主機應用機房

    信息化高速發(fā)展的時代,數(shù)據(jù)中心作為信息社會的“心臟”,其穩(wěn)定運行與高效管理直接關(guān)系到各行各業(yè)的發(fā)展。隨著云計算、大數(shù)據(jù)、物聯(lián)網(wǎng)等技術(shù)的不斷融合與創(chuàng)新,智慧機房的概念應運而生,動環(huán)監(jiān)控主機的引入,正是
    的頭像 發(fā)表于 09-14 15:59 ?631次閱讀

    機房監(jiān)控,機房監(jiān)控系統(tǒng)百科

    一、引言 隨著信息技術(shù)的飛速發(fā)展,數(shù)據(jù)中心和機房作為支撐企業(yè)運營和存儲關(guān)鍵數(shù)據(jù)的基礎(chǔ)設(shè)施,其重要性日益凸顯。機房環(huán)境的穩(wěn)定性、安全性及設(shè)備的運行狀態(tài)直接影響到企業(yè)的業(yè)務連續(xù)性和數(shù)據(jù)安全性。因此,建立
    的頭像 發(fā)表于 09-02 14:32 ?821次閱讀

    機房監(jiān)控,機房監(jiān)控百科

    機房監(jiān)控是現(xiàn)代數(shù)據(jù)中心管理不可或缺的一部分,它直接關(guān)系到系統(tǒng)的穩(wěn)定運行、數(shù)據(jù)的安全保護以及故障的快速響應。一個完善的機房監(jiān)控系統(tǒng)能夠?qū)崟r監(jiān)測機房內(nèi)的環(huán)境參數(shù)、設(shè)備狀態(tài)及安全情況,確保數(shù)據(jù)中心高效、可靠地運行。以下是一篇關(guān)于
    的頭像 發(fā)表于 08-22 17:34 ?646次閱讀

    機房托管費用貴嗎?機房托管要考慮哪些因素?

     機房托管費用受多種因素影響,包括地理位置、設(shè)備規(guī)模、服務水平、安全性要求等。不同配置和服務質(zhì)量的托管價格差異較大,一般1U服務器托管費用一年在2000到5000元之間。Rak部落為您整理發(fā)布機房托管費用的差異,希望對您選擇機房
    的頭像 發(fā)表于 08-16 11:34 ?976次閱讀

    機房托管的好處

    機房托管提供的專業(yè)級服務和便利性,是其受到企業(yè)青睞的重要原因。下面將詳細探討機房托管的多重優(yōu)勢,并了解這種服務模式如何幫助企業(yè)提升運營效率和安全保障。Rak部落為您整理發(fā)布機房托管的好處。
    的頭像 發(fā)表于 08-08 10:08 ?540次閱讀

    機房配線架是干嘛的

    機房配線架是機房內(nèi)用于管理和連接網(wǎng)絡(luò)線路的重要設(shè)備。它的主要作用和功能包括以下幾個方面: 一、定義與作用 定義:配線架是用于終端用戶線或中繼線,并能對它們進行調(diào)配連接的設(shè)備。它是管理子系統(tǒng)中最
    的頭像 發(fā)表于 07-17 10:06 ?817次閱讀