当前位置: 代码网 > it编程>编程语言>Java > flink本地模式使用rocksdb进行checkpoint报错:RocksDBException: Failed to create a NewWriteableFile

flink本地模式使用rocksdb进行checkpoint报错:RocksDBException: Failed to create a NewWriteableFile

2024年08月02日 Java 我要评论
开发机器:win10java:8。

环境

开发机器:win10

java:8

flink:1.16.2

hudi:0.14.1

现象

在进行flink+hudi升级的时候本地测试出现了异常,报错如下

2024-06-27 17:13:16.659 [bucket_assigner (1/2)#0] warn  org.apache.flink.runtime.taskmanager.task  - bucket_assigner (1/2)#0 (be7ca740282c17e0d01630e88b5f5905_eae0775fbc9b695162a640f9f20e7bb9_0_0) switched from running to failed with failure cause: java.io.ioexception: could not perform checkpoint 1 for operator bucket_assigner (1/2)#0.
	at org.apache.flink.streaming.runtime.tasks.streamtask.triggercheckpointonbarrier(streamtask.java:1243)
	at org.apache.flink.streaming.runtime.io.checkpointing.checkpointbarrierhandler.notifycheckpoint(checkpointbarrierhandler.java:147)
	at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.triggercheckpoint(singlecheckpointbarrierhandler.java:287)
	at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.access$100(singlecheckpointbarrierhandler.java:64)
	at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler$controllerimpl.triggerglobalcheckpoint(singlecheckpointbarrierhandler.java:488)
	at org.apache.flink.streaming.runtime.io.checkpointing.abstractalignedbarrierhandlerstate.triggerglobalcheckpoint(abstractalignedbarrierhandlerstate.java:74)
	at org.apache.flink.streaming.runtime.io.checkpointing.abstractalignedbarrierhandlerstate.barrierreceived(abstractalignedbarrierhandlerstate.java:66)
	at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.lambda$processbarrier$2(singlecheckpointbarrierhandler.java:234)
	at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.markcheckpointalignedandtransformstate(singlecheckpointbarrierhandler.java:262)
	at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.processbarrier(singlecheckpointbarrierhandler.java:231)
	at org.apache.flink.streaming.runtime.io.checkpointing.checkpointedinputgate.handleevent(checkpointedinputgate.java:181)
	at org.apache.flink.streaming.runtime.io.checkpointing.checkpointedinputgate.pollnext(checkpointedinputgate.java:159)
	at org.apache.flink.streaming.runtime.io.abstractstreamtasknetworkinput.emitnext(abstractstreamtasknetworkinput.java:110)
	at org.apache.flink.streaming.runtime.io.streamoneinputprocessor.processinput(streamoneinputprocessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.streamtask.processinput(streamtask.java:545)
	at org.apache.flink.streaming.runtime.tasks.mailbox.mailboxprocessor.runmailboxloop(mailboxprocessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.streamtask.runmailboxloop(streamtask.java:836)
	at org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:785)
	at org.apache.flink.runtime.taskmanager.task.runwithsystemexitmonitoring(task.java:935)
	at org.apache.flink.runtime.taskmanager.task.restoreandinvoke(task.java:914)
	at org.apache.flink.runtime.taskmanager.task.dorun(task.java:728)
	at org.apache.flink.runtime.taskmanager.task.run(task.java:550)
	at java.lang.thread.run(thread.java:748)
caused by: org.apache.flink.runtime.checkpoint.checkpointexception: could not complete snapshot 1 for operator bucket_assigner (1/2)#0. failure reason: checkpoint was declined.
	at org.apache.flink.streaming.api.operators.streamoperatorstatehandler.snapshotstate(streamoperatorstatehandler.java:269)
	at org.apache.flink.streaming.api.operators.streamoperatorstatehandler.snapshotstate(streamoperatorstatehandler.java:173)
	at org.apache.flink.streaming.api.operators.abstractstreamoperator.snapshotstate(abstractstreamoperator.java:345)
	at org.apache.flink.streaming.runtime.tasks.regularoperatorchain.checkpointstreamoperator(regularoperatorchain.java:228)
	at org.apache.flink.streaming.runtime.tasks.regularoperatorchain.buildoperatorsnapshotfutures(regularoperatorchain.java:213)
	at org.apache.flink.streaming.runtime.tasks.regularoperatorchain.snapshotstate(regularoperatorchain.java:192)
	at org.apache.flink.streaming.runtime.tasks.subtaskcheckpointcoordinatorimpl.takesnapshotsync(subtaskcheckpointcoordinatorimpl.java:726)
	at org.apache.flink.streaming.runtime.tasks.subtaskcheckpointcoordinatorimpl.checkpointstate(subtaskcheckpointcoordinatorimpl.java:363)
	at org.apache.flink.streaming.runtime.tasks.streamtask.lambda$performcheckpoint$13(streamtask.java:1286)
	at org.apache.flink.streaming.runtime.tasks.streamtaskactionexecutor$1.runthrowing(streamtaskactionexecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.streamtask.performcheckpoint(streamtask.java:1274)
	at org.apache.flink.streaming.runtime.tasks.streamtask.triggercheckpointonbarrier(streamtask.java:1231)
	... 22 more
caused by: org.rocksdb.rocksdbexception: failed to create a newwriteablefile: c:\users\administrator\appdata\local\temp\minicluster_76d0223f7cd18f01de3a67a01eee7fa2\tm_0\tmp\job_e2784be18f17fbba96d46813105438ef_op_keyedprocessoperator_eae0775fbc9b695162a640f9f20e7bb9__1_2__uuid_10af1fed-db05-4b12-bd81-457078114132\chk-1.tmp/manifest-000004: ϵͳõҳ»µ½ָ¶
	at org.rocksdb.checkpoint.createcheckpoint(native method)
	at org.rocksdb.checkpoint.createcheckpoint(checkpoint.java:51)
	at org.apache.flink.contrib.streaming.state.snapshot.rocksdbsnapshotstrategybase.takedbnativecheckpoint(rocksdbsnapshotstrategybase.java:172)
	at org.apache.flink.contrib.streaming.state.snapshot.rocksdbsnapshotstrategybase.syncprepareresources(rocksdbsnapshotstrategybase.java:158)
	at org.apache.flink.contrib.streaming.state.snapshot.rocksdbsnapshotstrategybase.syncprepareresources(rocksdbsnapshotstrategybase.java:78)
	at org.apache.flink.runtime.state.snapshotstrategyrunner.snapshot(snapshotstrategyrunner.java:77)
	at org.apache.flink.contrib.streaming.state.rocksdbkeyedstatebackend.snapshot(rocksdbkeyedstatebackend.java:593)
	at org.apache.flink.streaming.api.operators.streamoperatorstatehandler.snapshotstate(streamoperatorstatehandler.java:246)
	... 33 more

 

代码刚启动的时候,看日志没有什么问题。

过了一会就报错了,很明显是checkpoint的时候失败了......详细看是rocksdb的异常,主要是下面这一句。

在临时目录创建文件失败,报错message是乱码ϵͳõҳ»µ½ָ¶(看不懂。。有没有win大神解答一下),刚猜想是权限问题,重启idea使用管理员身份运行,再次本地运行flink,然并卵还是报同样的错误。

 

解决

对于c盘这路径还是感觉权限问题,所以在想应该使用其他路径去替换。

首先从报错堆栈里跟踪一下代码,进入org.apache.flink.contrib.streaming.state.rocksdbkeyedstatebackend#snapshot

//keyedstate进行checkpoint快照
    public runnablefuture<snapshotresult<keyedstatehandle>> snapshot(
            final long checkpointid,
            final long timestamp,
            @nonnull final checkpointstreamfactory streamfactory,
            @nonnull checkpointoptions checkpointoptions)
            throws exception {

        // flush everything into db before taking a snapshot
        writebatchwrapper.flush();
/*
checkpointsnapshotstrategy就是checkpoint的策略,里面有个属性instancebasepath就是c:\users\administrator\appdata\local\temp\minicluster....这个值
这样只要能够修改到instancebasepath为其他路径就行了
*/
        return new snapshotstrategyrunner<>(
                        checkpointsnapshotstrategy.getdescription(),
                        checkpointsnapshotstrategy,
                        cancelstreamregistry,
                        asynchronous)
                .snapshot(checkpointid, timestamp, streamfactory, checkpointoptions);
    }

往snapshot方法上层跟踪到embeddedrocksdbstatebackend的createkeyedstatebackend方法

public <k> abstractkeyedstatebackend<k> createkeyedstatebackend(
            environment env,
            jobid jobid,
            string operatoridentifier,
            typeserializer<k> keyserializer,
            int numberofkeygroups,
            keygrouprange keygrouprange,
            taskkvstateregistry kvstateregistry,
            ttltimeprovider ttltimeprovider,
            metricgroup metricgroup,
            @nonnull collection<keyedstatehandle> statehandles,
            closeableregistry cancelstreamregistry,
            double managedmemoryfraction)
            throws ioexception {
        // first, make sure that the rocksdb jni library is loaded
        // we do this explicitly here to have better error handling
        string tempdir = env.gettaskmanagerinfo().gettmpworkingdirectory().getabsolutepath();
        ensurerocksdbisloaded(tempdir);

        // replace all characters that are not legal for filenames with underscore
        string filecompatibleidentifier = operatoridentifier.replaceall("[^a-za-z0-9\\-]", "_");
//初始化
        lazyinitializeforjob(env, filecompatibleidentifier);
//这里生成了rocksdb实例的路径
        file instancebasepath =
                new file(
                        getnextstoragepath(),
                        "job_"
                                + jobid
                                + "_op_"
                                + filecompatibleidentifier
                                + "_uuid_"
                                + uuid.randomuuid());
.......
    }

    private void lazyinitializeforjob(
            environment env, @suppresswarnings("unused") string operatoridentifier)
            throws ioexception {
.......
        //这里能够看出来就是从flink的env获取出来的,那就找一下有没有配置项吧
        // initialize the paths where the local rocksdb files should be stored
        if (localrocksdbdirectories == null) {
            initializeddbbasepaths = new file[] {env.gettaskmanagerinfo().gettmpworkingdirectory()};
        } else {
        ......
        }
.......
    }

现在就是找到这个workingdirectory的设置方法就行了

根据tmpworkingdirectory属性继续往上跟到org.apache.flink.runtime.taskexecutor.taskmanagerrunner#starttaskmanager,因为是本地模式使用的是minicluster,在start方法中找到workingdirectory的设置。

明显是从配置项process.working-dir里面读取的,如果没有设置process.working-dir的值,默认会获取系统变量java.io.tmpdir的值

解决方法:在我们的flink程序里面configuration.set(process.working-dir, "d://workspace/temp") 或者 命令启动在上-d process.working-dir=xxxxx

public void start() throws exception {
        synchronized (lock) {
.......
            final configuration configuration = miniclusterconfiguration.getconfiguration();
            final boolean usesinglerpcservice =
                    miniclusterconfiguration.getrpcservicesharing() == rpcservicesharing.shared;

            try {
//这里对workingdirectory 进行生成,generateworkingdirectoryfile就是根据process_working_dir_base生成的
                workingdirectory =
                        workingdirectory.create(
                                clusterentrypointutils.generateworkingdirectoryfile(
                                        configuration,
                                        optional.of(process_working_dir_base),
                                        "minicluster_" + resourceid.generate()));
......
}

 

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com