Spark学习(7)-SparkSQL函数定义
创始人
2024-02-29 21:23:35
0

1 SparkSQL 定义UDF函数

在这里插入图片描述

目前在SparkSQL中,仅仅支持UDF和UDAF函数,python仅支持UDF

1.1 定义方式

定义方式有两种:

  1. sparksession.udf.register()
    注册的UDF可以用于DSL和SQL,返回值用于DSL风格,传参内的名字用于SQL风格。

    udf对象 = sparksession.udf.register(参数1,参数2,参数3)
    

    参数1:UDF名称,可用于SQL风格
    参数2:被注册成UDF的方法名
    参数3:声明UDF的返回值类型
    udf对象: 返回值对象,是一个UDF对象,可用于DSL风格

  2. pyspark.sql.functions.udf
    仅能用于DSL风格

    udf对象 = F.udf(参数1, 参数2)
    

    参数1:被注册成UDF的方法名
    参数2:声明UDF的返回值类型
    udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
    其中F是:

    from pyspark.sql import functions as F
    

    其中,被注册成UDF的方法名是指具体的计算方法,如:

    #add就是将要被注册成UDF的方法名
    def add(x, y): x + y
    

1.2 构建一个Interger返回值类型的UDF

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :01_create_integer_udf.py
@Date      :2022/11/29 16:51
@Author    :wuk
@Description  : 构建一个Integer返回值类型的的UDF
"""
from pyspark.sql import SparkSession, functions
from pyspark.sql.types import IntegerTypeif __name__ == '__main__':spark = SparkSession.builder.master("local[*]")\.appName("test")\.config("spark.sql.shuffle.partitions", 2)\.getOrCreate()sc = spark.sparkContext# 构建一个rddrdd_map = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x: [x])df = rdd_map.toDF(["num"])# TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用# UDF的处理函数def num_ride_10(num):return num * 10# 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格# 参数2: UDF的处理逻辑, 是一个单独的方法# 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致# 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法# 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格udf1 = spark.udf.register("udf1",num_ride_10,IntegerType())# SQL风格中使用# selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)# select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算df.selectExpr("udf1(num)").show()# DSL 风格中使用# 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象df.select(udf1(df['num'])).show()# TODO 2: 方式2注册, 仅能用于DSL风格udf = functions.udf(num_ride_10, IntegerType())df.select(udf(df['num'])).show()

1.3 注册一个ArrayType(数字\list)类型的返回值UDF

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :02_create_array_udf.py
@Date      :2022/11/29 17:21
@Author    :wuk
@Description  : 注册一个ArrayType(数字\list)类型的返回值UDF
"""from pyspark.sql import SparkSession, functions
from pyspark.sql.types import StringType, ArrayTypeif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])df = rdd.toDF(["line"])# 注册UDF, UDF的执行函数定义def split_line(data):return data.split(" ")  # 返回值是一个Array对象# TODO1 方式1 构建UDFudf2 = spark.udf.register("udf1",split_line,ArrayType(StringType()))# DLS风格df.select(udf2(df['line'])).show(truncate=False)# SQL风格df.createTempView("lines")spark.sql("select udf1(line) from lines").show(truncate=False)# TODO 2 方式2的形式构建UDFudf3 = functions.udf(split_line, ArrayType(StringType()))df.select(udf3(df['line'])).show(truncate=False)

注意:
数组或者list类型,可以使用ArrayType来描述,同时需要传入数组内类型。

1.4 注册一个字典类型的返回值UDF

# _*_ coding:utf-8 _*_
"""
@Software  :pyspark
@FileName  :03_create_dict_udf.py
@Date      :2022/11/29 18:15
@Author    :wuk
@Description  : 注册一个字典类型的返回值UDF
"""
import stringfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringTypeif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder. \appName("test"). \master("local[*]"). \config("spark.sql.shuffle.partitions", 2). \getOrCreate()sc = spark.sparkContext# 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回# 比如传入1 我们返回 {"num":1, "letters": "a"}rdd = sc.parallelize([[1], [2], [3]])df = rdd.toDF(["num"])# 注册UDFdef process(data):return {"num": data, "letters": string.ascii_letters[data]}"""UDF的返回值是字典的话, 需要用StructType来接收"""udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True). \add("letters", StringType(), nullable=True))df.selectExpr("udf1(num)").show(truncate=False)df.select(udf1(df['num'])).show(truncate=False)

注意: 字典类型返回值, 可以用StructType来进行描述,StructType是一个普通的Spark支持的结构化类型
只是可以用在:

  • DF中用于描述Schema
  • UDF中用于描述返回值是字典的数据。

2 SparkSQL 使用窗口函数

2.1 介绍

2.2 语法

在这里插入图片描述

2.3 开窗函数的使用

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder. \appName("create df"). \master("local[*]"). \config("spark.sql.shuffle.partitions", "2"). \getOrCreate()sc = spark.sparkContextrdd = sc.parallelize([('张三', 'class_1', 99),('王五', 'class_2', 35),('王三', 'class_3', 57),('王久', 'class_4', 12),('王丽', 'class_5', 99),('王娟', 'class_1', 90),('王军', 'class_2', 91),('王俊', 'class_3', 33),('王君', 'class_4', 55),('王珺', 'class_5', 66),('郑颖', 'class_1', 11),('郑辉', 'class_2', 33),('张丽', 'class_3', 36),('张张', 'class_4', 79),('黄凯', 'class_5', 90),('黄开', 'class_1', 90),('黄恺', 'class_2', 90),('王凯', 'class_3', 11),('王凯杰', 'class_1', 11),('王开杰', 'class_2', 3),('王景亮', 'class_3', 99)])schema = StructType().add("name", StringType()). \add("class", StringType()). \add("score", IntegerType())df = rdd.toDF(schema)# 窗口函数只用于SQL风格, 所以注册表先df.createTempView("stu")# TODO 聚合窗口spark.sql("""SELECT *, AVG(score) OVER() AS avg_score FROM stu""").show()# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于# SELECT * FROM stu# SELECT AVG(score) FROM stu# 两个SQL的结果集进行整合而来spark.sql("""SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu""").show()# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于# SELECT * FROM stu# SELECT AVG(score) FROM stu GROUP BY class# 两个SQL的结果集进行整合而来# TODO 排序窗口spark.sql("""SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, RANK() OVER(ORDER BY score) AS rankFROM stu""").show()# TODO NTILEspark.sql("""SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu""").show()

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...