当前位置: 代码网 > it编程>编程语言>Java > 大数据Hive中的UDF:自定义数据处理的利器(上)

大数据Hive中的UDF:自定义数据处理的利器(上)

2024年08月04日 Java 我要评论
在大数据技术栈中,扮演着数据仓库的关键角色,它提供了丰富的数据操作功能,并通过类似于SQL的HiveQL语言简化了对Hadoop数据的处理。然而,内置函数库虽强大,却未必能满足所有特定的业务逻辑需求。此时,用户定义函数(UDF)的重要性便凸显出来。Hive UDF)是Hive中的一种扩展机制,它允许用户通过编写自定义的Java代码来扩展Hive的功能,实现Hive内置函数无法提供的一些特定数据处理逻辑。在深入探讨Hive UDF的实现之前,让我们首先确保开发环境的准备妥当。对于UDF的编写,推荐使用。


1. 前言

在大数据技术栈中,apache hive 扮演着数据仓库的关键角色,它提供了丰富的数据操作功能,并通过类似于 sqlhiveql 语言简化了对 hadoop 数据的处理。然而,内置函数库虽强大,却未必能满足所有特定的业务逻辑需求。此时,用户定义函数(user-defined functionsudf)的重要性便凸显出来。

hive udfuser-defined function)是hive中的一种扩展机制,它允许用户通过编写自定义的java代码来扩展hive的功能,实现hive内置函数无法提供的一些特定数据处理逻辑。

2. udf与宏及静态表的对比

除了udf可以自定义输入和输出还有例如静态表,宏定义的方式也可以实现类似的操作,举个例子:在数据中筛选出已达到退休年龄的员工。

import org.apache.hadoop.hive.ql.exec.udf;
import org.apache.hadoop.io.text;

public class retirementstatusudf extends udf {
    public text evaluate(int age) {
        return new text(age >= 60 ? "已达到退休年龄" : "未达到退休年龄");
    }
}

-- 添加udf的jar包到hive
add jar /path/to/udf.jar;

-- 创建临时函数
create temporary function retirement_status as 'com.example.retirementstatusudf';

-- 使用udf进行查询
select name, age, retirement_status(age) as status
from employee_static;


create table employee_static (
 
  age int,
  flag string -- 'retired' 或 'active'
);



-- 查询已达到退休年龄的员工
select a.*
from employee_data a
join employee_static b
on a.age = b.age and b.flag = 'retired';

drop temporary macro if exists get_retired;

create temporary macro get_retired(age bigint) 
if (
       age is not null,
       case
              when age >= 60 then '退休'
              when age <= 60 then '未退休'
              else null
       end,
       null
);

使用udf(用户定义函数)的原因与宏和静态表的功能有所不同,它们各自适用于不同的场景和需求。以下是使用udf的几个关键原因:

特性/方法udf(用户定义函数)宏(macro)静态表(static table)
定义允许用户通过编写自定义的java代码来扩展hive的功能,实现特定的数据处理逻辑。在hive中,宏是一种用户定义的快捷方式,用于封装一系列hiveql语句,以便在查询中重复使用。预先定义和填充的数据集,其结构和内容在创建后通常保持不变。
使用场景适用于执行hive内置函数不支持的特定数据处理逻辑,如复杂的业务规则或算法。主要用于简化和重用hiveql查询语句,提高代码的可读性和易维护性。适用于存储已知的、不变的数据集,供多次查询使用,无需每次重新计算。
灵活性高,可以根据需求定制数据处理流程。中等,主要用于简化复杂的查询,但不具备动态处理能力。低,结构和内容一旦定义,通常不发生变化。
性能可优化,hive执行udf时会进行优化,性能接近内置函数。取决于宏定义的查询的复杂性,可能提高或降低性能。预先计算,查询时性能较高,适合重复查询相同数据集。
重用性高,一旦创建和注册,可以在不同的hive会话中重复使用。高,宏可以定义一次并在多个查询中重复使用。中等,表结构和数据不变,适用于重复查询相同数据集的场景。
实时性支持实时数据处理,每次调用udf时根据输入动态执行计算。不直接支持实时数据处理,主要用于查询语句的封装。不支持实时数据处理,通常是预先计算和存储的。
适应性强,可以快速适应新的数据处理需求。中等,需要修改宏定义以适应新的需求。弱,结构和数据固定,不适合频繁变化的数据需求。
示例应用用于实现如复杂数学计算、自定义字符串处理、数据清洗等。用于封装复杂的查询模板,如多步骤的数据转换过程。用于存储配置数据、参考数据或不需要频繁更新的数据。

选择使用udf、宏还是静态表应基于具体的业务需求、数据特性和性能考虑。每种方法都有其独特的优势和适用场景。

3. 深入理解udf

hive udf可以分为三种主要类型:udf、udaf和udtf。

  1. udf (user-defined function)
    • 标量函数,用于一对一(one-to-one)的映射,即对单个数据项进行操作并返回单个结果。
    • 例如,字符串处理(upper, substr)、数学计算(sqrt)、日期时间转换等。
  2. udaf (user-defined aggregate function)
    • 聚合函数,用于多对一(many-to-one)的映射,即对多行数据进行聚合操作并返回单个结果。
    • 例如,自定义的求和(sum)、平均值(avg)、最大值(max)、最小值(min)等。
  3. udtf (user-defined table-generating function)
    • 表生成函数,用于一对多(one-to-many)的映射,即对单个数据项进行操作并返回多行结果。
    • 例如,explode函数可以将数组或map类型的列拆分成多行。
类别简称全称描述示例
udfuser-defined function用于实现一对一的映射,即一个输入对应一个输出。将字符串转换为大写。
udafuser-defined aggregate function用于实现一对多的映射,即多个输入对应一个输出。计算某个字段的总和或平均值。
udtfuser-defined table-generating function用于实现一对多的行生成,即一个输入可以产生多行输出。将数组或映射类型的字段展开成多行数据。

这些udf类型允许开发者根据特定的数据处理需求,编写和实现自定义的函数逻辑,从而扩展hive的数据处理能力。通过使用udf、udaf和udtf,用户可以在hive中实现更加复杂和定制化的数据处理任务。

实现一个udf通常涉及以下步骤:

  1. 编写udf类:在java中创建一个类,实现hive udf接口的相应方法。对于标量udf,这通常是evaluate方法。
  2. 编译与打包:将udf类编译成java字节码,并打包成jar文件。
  3. 上传jar包:将jar文件上传到hdfs或其他hive可以访问的文件系统中。
  4. 注册udf:在hive会话中使用add jarcreate temporary function命令注册udf。
  5. 使用udf:在hive查询中调用注册的udf,就像调用内置函数一样。

4. 实现自定义udf

在深入探讨hive udf的实现之前,让我们首先确保开发环境的准备妥当。对于udf的编写,推荐使用maven来配置java项目,这样可以方便地管理依赖和构建过程。以下是配置java开发环境的一个示例,包括使用的版本信息和maven设置:


<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>

    <groupid>org.example</groupid>
    <artifactid>project202401</artifactid>
    <version>1.0-snapshot</version>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceencoding>utf-8</project.build.sourceencoding>
        <hadoop.version>3.1.1.7.1.7.2000-305</hadoop.version>
        <hive.version>3.1.3000.7.1.7.2000-305</hive.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.apache.hive</groupid>
            <artifactid>hive-exec</artifactid>
            <version>${hive.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupid>org.apache.hadoop</groupid>
            <artifactid>hadoop-common</artifactid>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupid>org.junit.jupiter</groupid>
            <artifactid>junit-jupiter</artifactid>
            <version>5.10.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>central</id>
            <name>maven central</name>
            <url>https://repo1.maven.org/maven2/</url>
        </repository>

        <repository>
            <id>cloudera</id>
            <name>cloudera repository</name>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

</project>

下面是一个hive udf(用户定义函数)的示例,该udf的作用是将传入的字符串转换为大写形式。我将对代码进行注释,并解释其工作流程:


import org.apache.hadoop.io.text;  // 引入hadoop的text类,用于处理字符串
import org.apache.hadoop.hive.ql.exec.udf;  // 引入hive的udf类

@suppresswarnings({"deprecation", "unused"})  // 忽略警告,例如未使用的警告或过时api的警告
public class uppercaseudf extends udf {  // 定义一个名为uppercaseudf的类,继承自udf

    /**
     * 该方法重写了udf类中的evaluate方法,是udf的核心。
     * 它接收一个text类型的数据,然后返回转换为大写的text类型数据。
     *
     * @param line text类型的输入数据
     * @return 转换为大写的text类型的数据
     */
    public text evaluate(final text line) {
        // 检查传入的text是否为非空且内容不为空字符串
        if (null != line && !line.tostring().equals("")) {
            // 将text转换为string,并使用string的touppercase方法转换为大写
            string str = line.tostring().touppercase();
            // 将大写字符串重新设置回text对象,并返回
            line.set(str);
            return line;
        } else {
            // 如果传入的text为null或空字符串,则返回一个新的空text对象
            return new text();
        }
    }
}

在hive的较新版本中,推荐使用genericudf而不是直接继承udf。以下是使用genericudf实现的uppercaseudf2的示例代码,以及对代码的详细解释和工作流程分析:


import org.apache.hadoop.hive.ql.exec.udfargumentexception;
import org.apache.hadoop.hive.ql.metadata.hiveexception;
import org.apache.hadoop.hive.ql.udf.generic.genericudf;
import org.apache.hadoop.hive.serde2.objectinspector.objectinspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.primitiveobjectinspectorfactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.stringobjectinspector;
import org.apache.hadoop.io.text;
import org.apache.hadoop.hive.ql.udf.udftype;

@udftype(deterministic = true, stateful = false) // 标注udf的特性,确定性且无状态
public class uppercaseudf2 extends genericudf { // 继承自genericudf

    private stringobjectinspector inputoi; // 输入对象检查员,用于检查输入类型
    private stringobjectinspector outputoi; // 输出对象检查员,用于定义输出类型

    /**
     * initialize方法在udf首次执行时被调用,用于初始化udf。
     * @param arguments 传入的参数对象检查员数组
     * @return 输出对象检查员
     * @throws udfargumentexception 如果输入参数不符合预期,抛出异常
     */
    @override
    public objectinspector initialize(objectinspector[] arguments) throws udfargumentexception {
        // 确保传入的参数是字符串类型
        if (arguments == null || arguments.length == 0) {
            throw new udfargumentexception("arguments array is null or empty.");
        }
        if (!(arguments[0] instanceof stringobjectinspector)) {
            // 如果不是字符串类型,抛出hive异常
            throw new udfargumentexception("the input to uppercaseudf2 must be a string");
        }
        // 将输入参数的对象检查员赋值给局部变量
        inputoi = (stringobjectinspector) arguments[0];
        // 定义输出对象检查员为可写的字符串对象检查员
        outputoi = primitiveobjectinspectorfactory.writablestringobjectinspector;
        // 返回输出对象检查员
        return outputoi;
    }

    /**
     * evaluate方法定义了udf的实际逻辑,即如何将输入转换为输出。
     * @param arguments 包含延迟计算的输入对象的数组
     * @return 转换后的大写文本
     * @throws hiveexception 如果在执行过程中遇到hive异常
     */
    @override
    public object evaluate(deferredobject[] arguments) throws hiveexception {
        // 从延迟对象中获取输入字符串
        text line = (text) arguments[0].get();
        // 如果输入不为空,则转换为大写
        if (line != null && !line.tostring().isempty()) {
            return new text(line.tostring().touppercase());
        }
        // 如果输入为空,返回空字符串
        return new text();
    }

    /**
     * getdisplaystring方法返回udf的可读字符串表示,用于hive日志和解释计划。
     * @param strings 输入参数的字符串表示,通常由hive自动生成
     * @return udf的可读字符串表示
     */
    @override
    public string getdisplaystring(string[] strings) {
        // 返回udf的名称,用于解释计划和日志
        return "uppercaseudf2()";
    }
}
add jar url/project202401-1.0-snapshot.jar;
create temporary function uppercaseudf as 'com.xx.hive.udf.uppercaseudf';
select uppercaseudf('hive is fun') a ;

通过以上步骤,我们能够创建出高效、可靠的hive udf,以满足特定的数据处理需求。udf的开发不仅需要关注功能的实现,还要重视性能优化和代码的可维护性。正确地使用udf可以显著提升数据处理的效率,为用户提供强大的数据操作能力。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com