mysql数据导入es
将mysql数据取出放到elasticsearch中
from datetime import datetime
from elasticsearch import Elasticsearch
import pymysql
import time
import json
from elasticsearch.helpers import bulk, streaming_bulk
import sys
from multiprocessing import Pool
tb_name = "test"
# es的索引
es_index = "index_" + tb_name
es_type = "type_" + tb_name
def db2es(x):
i,j=x
db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}], timeout=60, max_retries=3, retry_on_timeout=True)
cursor = db.cursor()
while True:
print(time.strftime('%Y-%m-%d %H:%M:%S --> ') + str(i))
sql = '''SELECT
id,
name,
age
FROM
''' + tb_name + ''' limit %s, 100000;''' % i
cursor.execute(sql)
rows = cursor.fetchall()
action = []
if rows:
for row in rows:
(id, name, age) = row
action.append({
"_index": es_index,
"_type": es_type,
"_id": id,
"_source": {
"name":name,
"age":age
}
})
# 导入es
bulk(es, action)
del action[0:len(action)]
i = i + 100000
if i >= j:
print(i)
break
else:
break
print(time.strftime('%Y-%m-%d %H:%M:%S --> '), i, j)
print(tb_name + " done")
db.close()
if __name__ == '__main__':
# 多进程运行
pool = Pool(processes=8)
db = pymysql.connect("127.0.0.1", "root", "123456", "User", charset='utf8')
cursor = db.cursor()
sql = "SELECT COUNT(*) FROM test;"
cursor.execute(sql)
rows = cursor.fetchone()[0]
db.close()
args=[(i, i+5000000 if rows>(i+5000000)else rows) for i in range(0,rows,5000000)]
pool.map(db2es,args)
pool.close()
pool.join()