当前位置: 代码网 > it编程>游戏开发>ar > 批量写入数据到Elasticsearch

批量写入数据到Elasticsearch

2024年08月02日 ar 我要评论
批量写入到Elasticsearch会提高写入性能,减少Elasticsearch io压力。执行文档批量请求时,首先需要初始化 Elasticsearch Client,其次创建 BulkProcessor ,设置 BulkProcessor 参数,最后关闭processor。


前言

批量写入到elasticsearch会提高写入性能,减少elasticsearch io压力。


一、elasticsearch是什么?

elasticsearch是一个实时的分布式开放源代码全文本搜索和分析引擎。可从restful web服务界面访问它,并使用无模式的json(javascript对象表示法)文档存储数据。它基于java编程语言构建,因此elasticsearch可以在不同平台上运行。它使用户能够以很高的速度浏览大量的数据。

二、实现步骤

1.创建bulkprocessor

bulkprocessor是一个线程安全的批量处理类,允许方便地设置每次写入es的最大数量,以及超时时间。所谓超时时间,就是在规定的时间内,如果没有请求进来,就把之前累积的请求直接写到es,不必等待请求数量累积到你规定的最大数量。
代码如下(示例):


import org.elasticsearch.action.actionrequest;
import org.elasticsearch.action.docwriterequest;
import org.elasticsearch.action.bulk.bulkitemresponse;
import org.elasticsearch.action.bulk.bulkprocessor;
import org.elasticsearch.action.bulk.bulkrequest;
import org.elasticsearch.action.bulk.bulkresponse;
import org.elasticsearch.rest.reststatus;

public class bulkprolistener implements bulkprocessor.listener{
    @override
    public void beforebulk(long l, bulkrequest bulkrequest) {
        system.out.println("执行前");
    }

    @override
    public void afterbulk(long l, bulkrequest request, bulkresponse response) {
        system.out.println("执行后");
        if (response.hasfailures()) {
            bulkitemresponse itemresponse;
            throwable failure;
            reststatus reststatus;
            docwriterequest actionrequest;

            try {
                for (int i = 0; i < response.getitems().length; i++) {
                    itemresponse = response.getitems()[i];
                    if (itemresponse.isfailed()) {
                        failure = itemresponse.getfailure().getcause();
                        if (failure != null) {
                            reststatus = itemresponse.getfailure().getstatus();
                            actionrequest = request.requests().get(i);
                            if (reststatus == null) {
                                if (actionrequest instanceof actionrequest) {
                                    system.out.println("failed elasticsearch item request:  " + failure.getcause().getmessage());
                                } else {
                                    throw new unsupportedoperationexception(
                                            "the sink currently only supports actionrequests");
                                }
                            }else{
                                if (actionrequest instanceof actionrequest) {
                                    system.out.println("failed sink item request:  " + failure.getcause().getmessage()+" status: "+reststatus.getstatus());
                                    failure.printstacktrace();
                                } else {
                                    throw new unsupportedoperationexception(
                                            "the sink currently only supports actionrequests");
                                }
                            }
                        }
                    }

                }
            }catch (throwable t){
                t.printstacktrace();
            }
        }
    }

    @override
    public void afterbulk(long l, bulkrequest request, throwable failure) {
        system.out.println("有错误");
        try {
            for (docwriterequest writerequest : request.requests()) {
                if (writerequest instanceof actionrequest) {
                    system.out.println("failed elasticsearch item request:  " + failure.getmessage());
                    failure.printstacktrace();
                } else {
                    throw new unsupportedoperationexception(
                            "the sink currently only supports actionrequests");
                }
            }
        } catch (throwable t) {
            // fail the sink and skip the rest of the items
            // if the failure handler decides to throw an exception
            t.printstacktrace();
        }
    }
}

2.批量写入数据

在es中建立索引batch,类型my_type,结构为"user_name",“user_id”,“age”,“user_note”
//设置满5000条提交,时间间隔10秒
bulkprocessor.setbulkactions(5000).setflushinterval(timevalue.timevalueseconds(10)).build();
代码如下(示例):


import com.alibaba.fastjson.jsonobject;
import org.elasticsearch.action.bulk.bulkprocessor;
import org.elasticsearch.action.bulk.bulkrequestbuilder;
import org.elasticsearch.action.index.indexrequestbuilder;
import org.elasticsearch.client.transport.transportclient;
import org.elasticsearch.common.network.networkmodule;
import org.elasticsearch.common.settings.settings;
import org.elasticsearch.common.transport.inetsockettransportaddress;
import org.elasticsearch.common.unit.timevalue;
import org.elasticsearch.transport.netty3plugin;
import org.elasticsearch.transport.client.prebuilttransportclient;

import java.net.inetaddress;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.timeunit;

public class esbatchwritertest {
    public static void main(string[] args) throws  exception {
        settings settings = settings.builder().put(networkmodule.http_type_key, netty3plugin.netty_http_transport_name)
                .put(networkmodule.transport_type_key, netty3plugin.netty_transport_name).build();
//            settings settings = settings.empty;
        //创建client
        transportclient client = new prebuilttransportclient(settings)
                .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("10.68.8.60"), 9300));
        batch2(client);
//        update(client);
        client.close();
    }
    public static void batch2(transportclient client) throws interruptedexception {
        bulkprocessor.builder bulkprocessor = bulkprocessor.builder(
                client,new bulkprolistener());
        bulkprocessor processor = bulkprocessor.setbulkactions(5000).setflushinterval(timevalue.timevalueseconds(10)).build();

        int count = 1;
        list<jsonobject> list = getdata();
        system.out.println(list.size());
        for(jsonobject obj:list) {
            system.out.println(obj.tojsonstring());
            indexrequestbuilder builder = client.prepareindex("batch", "my_type").setid(obj.getstring("user_id")).setsource(obj);
            processor.add(builder.request());
        }
        processor.awaitclose(2, timeunit.minutes);
//        processor.close();
    }
    private static list<jsonobject> getdata(){
        list<jsonobject> list =new arraylist<>();
        jsonobject j=new jsonobject();
        j.put("user_name","name7");
        j.put("user_id","7");
        j.put("age","34");
        j.put("user_note","note");
        list.add(j);

        j=new jsonobject();
        j.put("user_name","name8");
        j.put("user_id","8");
        j.put("age","24");
        j.put("user_note","note");
        list.add(j);
        j=new jsonobject();
        j.put("user_name","name9");
        j.put("user_id","9");
        j.put("age","24");
        j.put("user_note","note");
        list.add(j);
        j=new jsonobject();
        j.put("user_name","name10");
        j.put("user_id","10");
        j.put("age","14");
        j.put("user_note","note");
        list.add(j);
        j=new jsonobject();
        j.put("user_name","name11");
        j.put("user_id","11");
        j.put("age","54b");
        j.put("user_note","note");
        list.add(j);

        j=new jsonobject();
        j.put("user_name","name20");
        j.put("user_id","20");
        j.put("age","34a");
        j.put("user_note","note");
        list.add(j);
        j=new jsonobject();
        j.put("user_name","name30");
        j.put("user_id","30");
        j.put("age","30");
        j.put("user_note","note");
        list.add(j);
        return list;
    }
    public static void batch(transportclient client){
        int count = 1;
        //开启批量插入
        bulkrequestbuilder bulkrequest = client.preparebulk();
        list<jsonobject> list =new arraylist<>();

        jsonobject j=new jsonobject();
        j.put("user_name","name1");
        j.put("user_id","1");
        list.add(j);
        j=new jsonobject();
        j.put("user_name","name3");
        j.put("user_id","3");
        list.add(j);
        j=new jsonobject();
        j.put("user_name","name2");
        j.put("user_id","2");
        list.add(j);
        for(jsonobject obj:list){
            indexrequestbuilder builder = client.prepareindex("batch", "my_type").setid(obj.getstring("user_id")).setsource(obj);
            bulkrequest.add(builder);
            //每一千条提交一次
            if (count% 1000==0) {
                bulkrequest.execute().actionget();
                system.out.println("提交了:" + count);
            }
            count++;
        }
        bulkrequest.execute().actionget();
    }
}


总结

执行文档批量请求时,首先需要初始化 elasticsearch client,其次创建 bulkprocessor ,
设置 bulkprocessor 参数,最后关闭processor。本文示例为es7版本,有关es5相关api调用示例请下载elasticsearch5学习笔记和java对es进行增删改查示例

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com