JSON数据处理新思路:深入SparkSQL复杂类型

在实际的数据处理工作中,处理多层嵌套、结构不规则的JSON数据,是一个常见的挑战。
传统方法通常依赖一系列字符串解析函数(如get_json_object)与行展开操作(explode)。这种方式不仅导致SQL语句冗长复杂,难以维护,而且在处理深层嵌套或复杂数组时,性能开销显著,逻辑也容易出错。
面对这一问题,SparkSQL提供了一套更为优雅和高效的解决方案:复杂数据类型(Struct, Array, Map)。这些类型允许我们将完整的JSON结构直接加载为表中的一列,并在此结构之上,使用扩展的SQL语法进行精确的查询和操作。这意味着,无需预先进行繁琐的扁平化处理,我们就能直接访问嵌套字段、遍历数组元素或映射键值对,从而极大地简化了代码逻辑,并提升了处理效率。
本文不仅教会你如何解析任意复杂结构的 JSON 数据,同时还会告诉你如何封装一个 JSON 数据。

一、复杂数据类型及其配套函数

在传统的关系型数据库与SQL中,数据模型通常被约束在“第一范式”之内,即所有数据都是原子性的,表中的每一列都是不可再分的基本数据类型。然而,现实世界中的数据,尤其是来自现代应用日志、API接口和传感器数据的JSON格式,往往是多层嵌套、结构复杂的。
为了在保持SQL关系模型强大查询能力的同时,原生地处理这种半结构化数据,SparkSQL引入了三种核心的复杂数据类型STRUCTARRAYMAP。这三种类型允许我们将一个具有内部结构的数据块,作为表中的一列来处理,从而在数据建模和查询上提供了极大的灵活性。

1.1 数据类型

Struct

  • 概念STRUCT类型可以理解为一张表中的“微缩表”。它将多个命名字段组合成一个单一的列,每个字段都有自己的名称和数据类型,且数据类型可以不同。
  • 类比与作用:它对应于JSON对象或编程语言中的“对象”。使用STRUCT,你可以将一组逻辑上紧密相关的属性(如用户的地址信息:省、市、街道)封装在一起,作为一个整体单元进行查询和访问,而无需将其拆分成多个离散的列。
  • 在SQL中的表示:在DDL中通常使用STRUCT<field1: type1, field2: type2, ...>语法定义。在查询结果中,常显示为类似{"city": "北京", "street": "长安街"}的结构。

Array

  • 概念ARRAY类型表示一个由相同数据类型的元素构成的有序序列。
  • 类比与作用:它对应于JSON数组。例如,一个用户的多个电话号码、一笔订单下的多个商品SKU、一次会话中的多个页面浏览事件,都适合用ARRAY类型来存储。SparkSQL提供了丰富的内置函数来操作数组,如通过索引访问元素、计算长度、展开数组、过滤元素等。
  • 在SQL中的表示:在DDL中使用ARRAY<element_type>语法定义。在查询结果中,显示为类似["13800138000", "13900139000"]的列表形式。

Map

  • 概念MAP类型存储的是键值对集合。其中,所有键必须是相同的基本数据类型,所有值也必须是相同的数据类型(但键和值的类型可以不同)。
  • 类比与作用:它对应于JSON对象中那些键动态变化的部分。例如,产品在不同渠道的属性标签、事件的动态附加属性等。MAP类型允许你通过键名直接查询对应的值,非常适用于处理稀疏或动态模式的属性集。
  • 在SQL中的表示:在DDL中使用MAP<key_type, value_type>语法定义。在查询结果中,显示为类似{"age": 25, "score": 98}的键值对形式。
  • 特别说明:值得注意的是,在处理JSON数据时,一个JSON对象在概念上同时与STRUCTMAP相似。然而,SparkSQL在将JSON模式推断为数据类型时,会统一将JSON对象解析为STRUCT类型,而非MAP。这是因为典型的JSON对象通常具有稳定、可预期的字段名(键),这更符合STRUCT的「固定模式」语义,能提供更优的查询性能和类型安全。仅在需要明确表示键值对集合且键名不固定时,才应在定义表结构时显式使用MAP类型。

同时这些复杂数据类型还允许嵌套,即一个STRUCT的字段可以是ARRAY类型,一个ARRAY的元素可以是MAP类型,一个MAP的值也可以是STRUCT类型。这种嵌套能力完美匹配了现实世界中JSON数据的树状结构。

1.2 配套函数

常用数组函数

可以点击Built-in Functions搜索array开头的函数,主要是对数组做一些合并、去重、排序、插入以及获取数组的属性

filter

过滤数组元素

> select filter(array(1, 2, 3, 4), x -> x % 2 == 1);
[1, 3]
> select filter(array(0, 2, 3), (x, i) -> x > i);
[2, 3]

transform

将数组中每个元素都应用到指定的函数形成一个新数组

> select transform(array(1, 2, 3), x -> x + 1); 
[2, 3, 4]
> select transform(array(1, 2, 3), (x, i) -> x + i);
[1, 3, 5]

from_json

将JSON格式的字符串转换为STRUCT类型的数据,其内部可以包含基本数据类型也可以包含复杂类型的嵌套
例如:

select from_json('{"name": "Alice", "age": 30, "address": {"city": "NYC"}}', 'struct<name:string, age:int, address: struct<city:string>>')

schema_of_json

配合from_json使用,给定一个JSON格式的字符串返回结构体的schema,其结果作为from_json的入参

> select schema_of_json('{"name": "Alice", "age": 30, "address": {"city": "NYC"}}');
STRUCT<address: STRUCT<city: STRING>, age: BIGINT, name: STRING>

有了schema_of_json再也不用编写复杂的schema信息了

二、实战:解析JSON

下面是 AI mock 的一份某个公司的基本信息及其项目情况

{  
    "enterprise_id": 1001,
    "enterprise_name": "TechFuture Inc.",
    "is_public": true,
    "market_cap": 125.78,
    "ipo_date": "2020-05-15",
    "last_updated": "2024-12-01T14:30:45.123Z",
    "departments": [
        {
            "dept_id": "D001",
            "name": "研发部",
            "head_count": 45,
            "is_core": true,
            "budget": 5000000.5,
            "location": {
                "city": "北京",
                "building": "科技大厦A座",
                "floor": [
                    15,
                    16,
                    17
                ]
            },
            "projects": [
                {
                    "project_id": "P2024-001",
                    "name": "星海计划",
                    "status": "进行中",
                    "priority": 1,
                    "start_date": "2024-01-01",
                    "milestones": [
                        {
                            "phase": "设计",
                            "completion": 100,
                            "approved": true
                        },
                        {
                            "phase": "开发",
                            "completion": 75,
                            "approved": true
                        },
                        {
                            "phase": "测试",
                            "completion": 30,
                            "approved": false
                        }
                    ],
                    "tech_stack": [
                        "Java",
                        "Spark",
                        "Kafka",
                        "Kubernetes"
                    ],
                    "team_lead": {
                        "emp_id": "E00123",
                        "name": "张明",
                        "level": "P8"
                    },
                    "cost_breakdown": {
                        "人力": 1200000,
                        "硬件": 800000,
                        "软件": 450000,
                        "其他": 150000
                    }
                },
                {
                    "project_id": "P2024-002",
                    "name": "破晓行动",
                    "status": "已立项",
                    "priority": 2,
                    "start_date": "2024-03-15",
                    "milestones": [

                    ],
                    "tech_stack": [
                        "Python",
                        "TensorFlow",
                        "Docker"
                    ],
                    "team_lead": null,
                    "cost_breakdown": null
                }
            ],
            "performance_metrics": {
                "2023": {
                    "revenue": 45000000,
                    "profit": 12000000,
                    "kpi_score": 92.5
                },
                "2024": {
                    "revenue": 52000000,
                    "profit": 15000000,
                    "kpi_score": 88.3
                }
            }
        },
        {
            "dept_id": "D002",
            "name": "市场部",
            "head_count": 28,
            "is_core": false,
            "budget": 2000000,
            "location": {
                "city": "上海",
                "building": "中心大厦",
                "floor": [
                    10
                ]
            },
            "projects": [

            ],
            "performance_metrics": {
                "2023": {
                    "revenue": 30000000,
                    "profit": 8000000,
                    "kpi_score": 85
                },
                "2024": {
                    "revenue": 35000000,
                    "profit": 9500000,
                    "kpi_score": 91.2
                }
            }
        }
    ],
    "company_tags": {
        "industry": "科技",
        "scale": "大型",
        "certifications": [
            "ISO9001",
            "CMMI5",
            "国家高新企业"
        ],
        "risk_level": "低"
    },
    "contact_matrix": [
        [
            {
                "type": "紧急",
                "person": "CEO",
                "phone": "13800138001"
            },
            {
                "type": "常规",
                "person": "CTO",
                "phone": "13800138002"
            }
        ],
        [
            {
                "type": "商务",
                "person": "CFO",
                "phone": "13800138003"
            },
            {
                "type": "法务",
                "person": "COO",
                "phone": "13800138004"
            }
        ]
    ],
    "dynamic_attributes": {
        "stock_code": "TFI001",
        "employee_count": 500,
        "avg_age": 32.5,
        "has_remote_policy": true,
        "vacation_days": [
            "2024-01-01",
            "2024-05-01",
            "2024-10-01"
        ]
    }
}

需求:计算每个部门历年的收入情况,即 performance_metrics 的 revenue 值
按照上面的做法,将JSON字符串扔进schema_of_json获取schema,并作为入参传入from_json即可。但是这个需求只需要 revenue 值,所有字段的schema是极其庞大且不利于维护。好在from_json允许传入部分schema信息,因此解决上述需求最小 schema 如下:

struct<
departments:array<
struct<
dept_id:string,
name:string,
performance_metrics:map<string, struct<revenue:bigint>>
>
>
>

完整sql如下

with basic_data as (
select '{"enterprise_id":1001,"enterprise_name":"TechFuture Inc.","is_public":true,"market_cap":125.78,"ipo_date":"2020-05-15","last_updated":"2024-12-01T14:30:45.123Z","departments":[{"dept_id":"D001","name":"研发部","head_count":45,"is_core":true,"budget":5000000.50,"location":{"city":"北京","building":"科技大厦A座","floor":[15,16,17]},"projects":[{"project_id":"P2024-001","name":"星海计划","status":"进行中","priority":1,"start_date":"2024-01-01","milestones":[{"phase":"设计","completion":100,"approved":true},{"phase":"开发","completion":75,"approved":true},{"phase":"测试","completion":30,"approved":false}],"tech_stack":["Java","Spark","Kafka","Kubernetes"],"team_lead":{"emp_id":"E00123","name":"张明","level":"P8"},"cost_breakdown":{"人力":1200000,"硬件":800000,"软件":450000,"其他":150000}},{"project_id":"P2024-002","name":"破晓行动","status":"已立项","priority":2,"start_date":"2024-03-15","milestones":[],"tech_stack":["Python","TensorFlow","Docker"],"team_lead":null,"cost_breakdown":null}],"performance_metrics":{"2023":{"revenue":45000000,"profit":12000000,"kpi_score":92.5},"2024":{"revenue":52000000,"profit":15000000,"kpi_score":88.3}}},{"dept_id":"D002","name":"市场部","head_count":28,"is_core":false,"budget":2000000.00,"location":{"city":"上海","building":"中心大厦","floor":[10]},"projects":[],"performance_metrics":{"2023":{"revenue":30000000,"profit":8000000,"kpi_score":85.0},"2024":{"revenue":35000000,"profit":9500000,"kpi_score":91.2}}}],"company_tags":{"industry":"科技","scale":"大型","certifications":["ISO9001","CMMI5","国家高新企业"],"risk_level":"低"},"contact_matrix":[[{"type":"紧急","person":"CEO","phone":"13800138001"},{"type":"常规","person":"CTO","phone":"13800138002"}],[{"type":"商务","person":"CFO","phone":"13800138003"},{"type":"法务","person":"COO","phone":"13800138004"}]],"dynamic_attributes":{"stock_code":"TFI001","employee_count":500,"avg_age":32.5,"has_remote_policy":true,"vacation_days":["2024-01-01","2024-05-01","2024-10-01"]}}' as json_str
)
select
tbl.department.dept_id,
tbl.department.name,
reduce(map_values(transform_values(tbl.department.performance_metrics, (k, v) -> v.revenue)), cast(0 as bigint), (acc, x) -> acc + x) as revenue
from (
select
from_json(
json_str,
'struct<
departments:array<
struct<
dept_id:string,
name:string,
performance_metrics:map<string, struct<revenue:bigint>>
>
>
>'
) as departments
from basic_data
) t
lateral view explode(departments.departments) tbl as department;

结果如下

D001	研发部	97000000
D002 市场部 65000000

三、实战:生成JSON

需求:将第二章的结果封装成如下的JSON

[
{
"id": "D001",
"name": "研发部",
"revenue": 97000000
},
{
"id": "D002",
"name": "市场部",
"revenue": 65000000
}
]

注意:请不要再使用concat进行手动拼接了,sparksql 提供了更好用的函数

with basic_data as (
select 'D001' as id, '研发部' as name, 97000000 as revenue
union all
select 'D002' as id, '市场部' as name, 65000000 as revenue
)
select
to_json(
collect_list(
named_struct(
'id', id,
'name', name,
'revenue', revenue
)
)
)
from basic_data