前言
批量写入到elasticsearch会提高写入性能,减少elasticsearch io压力。
一、elasticsearch是什么?
elasticsearch是一个实时的分布式开放源代码全文本搜索和分析引擎。可从restful web服务界面访问它,并使用无模式的json(javascript对象表示法)文档存储数据。它基于java编程语言构建,因此elasticsearch可以在不同平台上运行。它使用户能够以很高的速度浏览大量的数据。
二、实现步骤
1.创建bulkprocessor
bulkprocessor是一个线程安全的批量处理类,允许方便地设置每次写入es的最大数量,以及超时时间。所谓超时时间,就是在规定的时间内,如果没有请求进来,就把之前累积的请求直接写到es,不必等待请求数量累积到你规定的最大数量。
代码如下(示例):
import org.elasticsearch.action.actionrequest;
import org.elasticsearch.action.docwriterequest;
import org.elasticsearch.action.bulk.bulkitemresponse;
import org.elasticsearch.action.bulk.bulkprocessor;
import org.elasticsearch.action.bulk.bulkrequest;
import org.elasticsearch.action.bulk.bulkresponse;
import org.elasticsearch.rest.reststatus;
public class bulkprolistener implements bulkprocessor.listener{
@override
public void beforebulk(long l, bulkrequest bulkrequest) {
system.out.println("执行前");
}
@override
public void afterbulk(long l, bulkrequest request, bulkresponse response) {
system.out.println("执行后");
if (response.hasfailures()) {
bulkitemresponse itemresponse;
throwable failure;
reststatus reststatus;
docwriterequest actionrequest;
try {
for (int i = 0; i < response.getitems().length; i++) {
itemresponse = response.getitems()[i];
if (itemresponse.isfailed()) {
failure = itemresponse.getfailure().getcause();
if (failure != null) {
reststatus = itemresponse.getfailure().getstatus();
actionrequest = request.requests().get(i);
if (reststatus == null) {
if (actionrequest instanceof actionrequest) {
system.out.println("failed elasticsearch item request: " + failure.getcause().getmessage());
} else {
throw new unsupportedoperationexception(
"the sink currently only supports actionrequests");
}
}else{
if (actionrequest instanceof actionrequest) {
system.out.println("failed sink item request: " + failure.getcause().getmessage()+" status: "+reststatus.getstatus());
failure.printstacktrace();
} else {
throw new unsupportedoperationexception(
"the sink currently only supports actionrequests");
}
}
}
}
}
}catch (throwable t){
t.printstacktrace();
}
}
}
@override
public void afterbulk(long l, bulkrequest request, throwable failure) {
system.out.println("有错误");
try {
for (docwriterequest writerequest : request.requests()) {
if (writerequest instanceof actionrequest) {
system.out.println("failed elasticsearch item request: " + failure.getmessage());
failure.printstacktrace();
} else {
throw new unsupportedoperationexception(
"the sink currently only supports actionrequests");
}
}
} catch (throwable t) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
t.printstacktrace();
}
}
}
2.批量写入数据
在es中建立索引batch,类型my_type,结构为"user_name",“user_id”,“age”,“user_note”
//设置满5000条提交,时间间隔10秒
bulkprocessor.setbulkactions(5000).setflushinterval(timevalue.timevalueseconds(10)).build();
代码如下(示例):
import com.alibaba.fastjson.jsonobject;
import org.elasticsearch.action.bulk.bulkprocessor;
import org.elasticsearch.action.bulk.bulkrequestbuilder;
import org.elasticsearch.action.index.indexrequestbuilder;
import org.elasticsearch.client.transport.transportclient;
import org.elasticsearch.common.network.networkmodule;
import org.elasticsearch.common.settings.settings;
import org.elasticsearch.common.transport.inetsockettransportaddress;
import org.elasticsearch.common.unit.timevalue;
import org.elasticsearch.transport.netty3plugin;
import org.elasticsearch.transport.client.prebuilttransportclient;
import java.net.inetaddress;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.timeunit;
public class esbatchwritertest {
public static void main(string[] args) throws exception {
settings settings = settings.builder().put(networkmodule.http_type_key, netty3plugin.netty_http_transport_name)
.put(networkmodule.transport_type_key, netty3plugin.netty_transport_name).build();
// settings settings = settings.empty;
//创建client
transportclient client = new prebuilttransportclient(settings)
.addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("10.68.8.60"), 9300));
batch2(client);
// update(client);
client.close();
}
public static void batch2(transportclient client) throws interruptedexception {
bulkprocessor.builder bulkprocessor = bulkprocessor.builder(
client,new bulkprolistener());
bulkprocessor processor = bulkprocessor.setbulkactions(5000).setflushinterval(timevalue.timevalueseconds(10)).build();
int count = 1;
list<jsonobject> list = getdata();
system.out.println(list.size());
for(jsonobject obj:list) {
system.out.println(obj.tojsonstring());
indexrequestbuilder builder = client.prepareindex("batch", "my_type").setid(obj.getstring("user_id")).setsource(obj);
processor.add(builder.request());
}
processor.awaitclose(2, timeunit.minutes);
// processor.close();
}
private static list<jsonobject> getdata(){
list<jsonobject> list =new arraylist<>();
jsonobject j=new jsonobject();
j.put("user_name","name7");
j.put("user_id","7");
j.put("age","34");
j.put("user_note","note");
list.add(j);
j=new jsonobject();
j.put("user_name","name8");
j.put("user_id","8");
j.put("age","24");
j.put("user_note","note");
list.add(j);
j=new jsonobject();
j.put("user_name","name9");
j.put("user_id","9");
j.put("age","24");
j.put("user_note","note");
list.add(j);
j=new jsonobject();
j.put("user_name","name10");
j.put("user_id","10");
j.put("age","14");
j.put("user_note","note");
list.add(j);
j=new jsonobject();
j.put("user_name","name11");
j.put("user_id","11");
j.put("age","54b");
j.put("user_note","note");
list.add(j);
j=new jsonobject();
j.put("user_name","name20");
j.put("user_id","20");
j.put("age","34a");
j.put("user_note","note");
list.add(j);
j=new jsonobject();
j.put("user_name","name30");
j.put("user_id","30");
j.put("age","30");
j.put("user_note","note");
list.add(j);
return list;
}
public static void batch(transportclient client){
int count = 1;
//开启批量插入
bulkrequestbuilder bulkrequest = client.preparebulk();
list<jsonobject> list =new arraylist<>();
jsonobject j=new jsonobject();
j.put("user_name","name1");
j.put("user_id","1");
list.add(j);
j=new jsonobject();
j.put("user_name","name3");
j.put("user_id","3");
list.add(j);
j=new jsonobject();
j.put("user_name","name2");
j.put("user_id","2");
list.add(j);
for(jsonobject obj:list){
indexrequestbuilder builder = client.prepareindex("batch", "my_type").setid(obj.getstring("user_id")).setsource(obj);
bulkrequest.add(builder);
//每一千条提交一次
if (count% 1000==0) {
bulkrequest.execute().actionget();
system.out.println("提交了:" + count);
}
count++;
}
bulkrequest.execute().actionget();
}
}
总结
执行文档批量请求时,首先需要初始化 elasticsearch client,其次创建 bulkprocessor ,
设置 bulkprocessor 参数,最后关闭processor。本文示例为es7版本,有关es5相关api调用示例请下载elasticsearch5学习笔记和java对es进行增删改查示例
发表评论