环境
开发机器:win10
java:8
flink:1.16.2
hudi:0.14.1
现象
在进行flink+hudi升级的时候本地测试出现了异常,报错如下
2024-06-27 17:13:16.659 [bucket_assigner (1/2)#0] warn org.apache.flink.runtime.taskmanager.task - bucket_assigner (1/2)#0 (be7ca740282c17e0d01630e88b5f5905_eae0775fbc9b695162a640f9f20e7bb9_0_0) switched from running to failed with failure cause: java.io.ioexception: could not perform checkpoint 1 for operator bucket_assigner (1/2)#0.
at org.apache.flink.streaming.runtime.tasks.streamtask.triggercheckpointonbarrier(streamtask.java:1243)
at org.apache.flink.streaming.runtime.io.checkpointing.checkpointbarrierhandler.notifycheckpoint(checkpointbarrierhandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.triggercheckpoint(singlecheckpointbarrierhandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.access$100(singlecheckpointbarrierhandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler$controllerimpl.triggerglobalcheckpoint(singlecheckpointbarrierhandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.abstractalignedbarrierhandlerstate.triggerglobalcheckpoint(abstractalignedbarrierhandlerstate.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.abstractalignedbarrierhandlerstate.barrierreceived(abstractalignedbarrierhandlerstate.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.lambda$processbarrier$2(singlecheckpointbarrierhandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.markcheckpointalignedandtransformstate(singlecheckpointbarrierhandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.singlecheckpointbarrierhandler.processbarrier(singlecheckpointbarrierhandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.checkpointedinputgate.handleevent(checkpointedinputgate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.checkpointedinputgate.pollnext(checkpointedinputgate.java:159)
at org.apache.flink.streaming.runtime.io.abstractstreamtasknetworkinput.emitnext(abstractstreamtasknetworkinput.java:110)
at org.apache.flink.streaming.runtime.io.streamoneinputprocessor.processinput(streamoneinputprocessor.java:65)
at org.apache.flink.streaming.runtime.tasks.streamtask.processinput(streamtask.java:545)
at org.apache.flink.streaming.runtime.tasks.mailbox.mailboxprocessor.runmailboxloop(mailboxprocessor.java:231)
at org.apache.flink.streaming.runtime.tasks.streamtask.runmailboxloop(streamtask.java:836)
at org.apache.flink.streaming.runtime.tasks.streamtask.invoke(streamtask.java:785)
at org.apache.flink.runtime.taskmanager.task.runwithsystemexitmonitoring(task.java:935)
at org.apache.flink.runtime.taskmanager.task.restoreandinvoke(task.java:914)
at org.apache.flink.runtime.taskmanager.task.dorun(task.java:728)
at org.apache.flink.runtime.taskmanager.task.run(task.java:550)
at java.lang.thread.run(thread.java:748)
caused by: org.apache.flink.runtime.checkpoint.checkpointexception: could not complete snapshot 1 for operator bucket_assigner (1/2)#0. failure reason: checkpoint was declined.
at org.apache.flink.streaming.api.operators.streamoperatorstatehandler.snapshotstate(streamoperatorstatehandler.java:269)
at org.apache.flink.streaming.api.operators.streamoperatorstatehandler.snapshotstate(streamoperatorstatehandler.java:173)
at org.apache.flink.streaming.api.operators.abstractstreamoperator.snapshotstate(abstractstreamoperator.java:345)
at org.apache.flink.streaming.runtime.tasks.regularoperatorchain.checkpointstreamoperator(regularoperatorchain.java:228)
at org.apache.flink.streaming.runtime.tasks.regularoperatorchain.buildoperatorsnapshotfutures(regularoperatorchain.java:213)
at org.apache.flink.streaming.runtime.tasks.regularoperatorchain.snapshotstate(regularoperatorchain.java:192)
at org.apache.flink.streaming.runtime.tasks.subtaskcheckpointcoordinatorimpl.takesnapshotsync(subtaskcheckpointcoordinatorimpl.java:726)
at org.apache.flink.streaming.runtime.tasks.subtaskcheckpointcoordinatorimpl.checkpointstate(subtaskcheckpointcoordinatorimpl.java:363)
at org.apache.flink.streaming.runtime.tasks.streamtask.lambda$performcheckpoint$13(streamtask.java:1286)
at org.apache.flink.streaming.runtime.tasks.streamtaskactionexecutor$1.runthrowing(streamtaskactionexecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.streamtask.performcheckpoint(streamtask.java:1274)
at org.apache.flink.streaming.runtime.tasks.streamtask.triggercheckpointonbarrier(streamtask.java:1231)
... 22 more
caused by: org.rocksdb.rocksdbexception: failed to create a newwriteablefile: c:\users\administrator\appdata\local\temp\minicluster_76d0223f7cd18f01de3a67a01eee7fa2\tm_0\tmp\job_e2784be18f17fbba96d46813105438ef_op_keyedprocessoperator_eae0775fbc9b695162a640f9f20e7bb9__1_2__uuid_10af1fed-db05-4b12-bd81-457078114132\chk-1.tmp/manifest-000004: ϵͳõҳ»µ½ָ¶
at org.rocksdb.checkpoint.createcheckpoint(native method)
at org.rocksdb.checkpoint.createcheckpoint(checkpoint.java:51)
at org.apache.flink.contrib.streaming.state.snapshot.rocksdbsnapshotstrategybase.takedbnativecheckpoint(rocksdbsnapshotstrategybase.java:172)
at org.apache.flink.contrib.streaming.state.snapshot.rocksdbsnapshotstrategybase.syncprepareresources(rocksdbsnapshotstrategybase.java:158)
at org.apache.flink.contrib.streaming.state.snapshot.rocksdbsnapshotstrategybase.syncprepareresources(rocksdbsnapshotstrategybase.java:78)
at org.apache.flink.runtime.state.snapshotstrategyrunner.snapshot(snapshotstrategyrunner.java:77)
at org.apache.flink.contrib.streaming.state.rocksdbkeyedstatebackend.snapshot(rocksdbkeyedstatebackend.java:593)
at org.apache.flink.streaming.api.operators.streamoperatorstatehandler.snapshotstate(streamoperatorstatehandler.java:246)
... 33 more
代码刚启动的时候,看日志没有什么问题。
过了一会就报错了,很明显是checkpoint的时候失败了......详细看是rocksdb的异常,主要是下面这一句。
在临时目录创建文件失败,报错message是乱码ϵͳõҳ»µ½ָ¶(看不懂。。有没有win大神解答一下),刚猜想是权限问题,重启idea使用管理员身份运行,再次本地运行flink,然并卵还是报同样的错误。
解决
对于c盘这路径还是感觉权限问题,所以在想应该使用其他路径去替换。
首先从报错堆栈里跟踪一下代码,进入org.apache.flink.contrib.streaming.state.rocksdbkeyedstatebackend#snapshot
//keyedstate进行checkpoint快照
public runnablefuture<snapshotresult<keyedstatehandle>> snapshot(
final long checkpointid,
final long timestamp,
@nonnull final checkpointstreamfactory streamfactory,
@nonnull checkpointoptions checkpointoptions)
throws exception {
// flush everything into db before taking a snapshot
writebatchwrapper.flush();
/*
checkpointsnapshotstrategy就是checkpoint的策略,里面有个属性instancebasepath就是c:\users\administrator\appdata\local\temp\minicluster....这个值
这样只要能够修改到instancebasepath为其他路径就行了
*/
return new snapshotstrategyrunner<>(
checkpointsnapshotstrategy.getdescription(),
checkpointsnapshotstrategy,
cancelstreamregistry,
asynchronous)
.snapshot(checkpointid, timestamp, streamfactory, checkpointoptions);
}
往snapshot方法上层跟踪到embeddedrocksdbstatebackend的createkeyedstatebackend方法
public <k> abstractkeyedstatebackend<k> createkeyedstatebackend(
environment env,
jobid jobid,
string operatoridentifier,
typeserializer<k> keyserializer,
int numberofkeygroups,
keygrouprange keygrouprange,
taskkvstateregistry kvstateregistry,
ttltimeprovider ttltimeprovider,
metricgroup metricgroup,
@nonnull collection<keyedstatehandle> statehandles,
closeableregistry cancelstreamregistry,
double managedmemoryfraction)
throws ioexception {
// first, make sure that the rocksdb jni library is loaded
// we do this explicitly here to have better error handling
string tempdir = env.gettaskmanagerinfo().gettmpworkingdirectory().getabsolutepath();
ensurerocksdbisloaded(tempdir);
// replace all characters that are not legal for filenames with underscore
string filecompatibleidentifier = operatoridentifier.replaceall("[^a-za-z0-9\\-]", "_");
//初始化
lazyinitializeforjob(env, filecompatibleidentifier);
//这里生成了rocksdb实例的路径
file instancebasepath =
new file(
getnextstoragepath(),
"job_"
+ jobid
+ "_op_"
+ filecompatibleidentifier
+ "_uuid_"
+ uuid.randomuuid());
.......
}
private void lazyinitializeforjob(
environment env, @suppresswarnings("unused") string operatoridentifier)
throws ioexception {
.......
//这里能够看出来就是从flink的env获取出来的,那就找一下有没有配置项吧
// initialize the paths where the local rocksdb files should be stored
if (localrocksdbdirectories == null) {
initializeddbbasepaths = new file[] {env.gettaskmanagerinfo().gettmpworkingdirectory()};
} else {
......
}
.......
}
现在就是找到这个workingdirectory的设置方法就行了
根据tmpworkingdirectory属性继续往上跟到org.apache.flink.runtime.taskexecutor.taskmanagerrunner#starttaskmanager,因为是本地模式使用的是minicluster,在start方法中找到workingdirectory的设置。
明显是从配置项process.working-dir里面读取的,如果没有设置process.working-dir的值,默认会获取系统变量java.io.tmpdir的值
解决方法:在我们的flink程序里面configuration.set(process.working-dir, "d://workspace/temp") 或者 命令启动在上-d process.working-dir=xxxxx
public void start() throws exception {
synchronized (lock) {
.......
final configuration configuration = miniclusterconfiguration.getconfiguration();
final boolean usesinglerpcservice =
miniclusterconfiguration.getrpcservicesharing() == rpcservicesharing.shared;
try {
//这里对workingdirectory 进行生成,generateworkingdirectoryfile就是根据process_working_dir_base生成的
workingdirectory =
workingdirectory.create(
clusterentrypointutils.generateworkingdirectoryfile(
configuration,
optional.of(process_working_dir_base),
"minicluster_" + resourceid.generate()));
......
}
发表评论