CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3)

news2025/1/15 12:56:29

        上回说到在我们搭好的YAF3环境上使用yaf处理pcap文件得到silk flow,再使用super mediator工具转为ipfix,继而在spark中导入mothra,就可以开始数据分析了。然而在我们粗粗一用之下,却发现DPI信息在ipfix文件中找不到,到底是提取的时候就没提取出来(ipfix不支持dpi信息存储)?还是说我们的分析方式不对?——换句话说yaf、super-mediator、methora这三个环节,问题到底出在哪里了?我们卖了个关子,为本篇开启留了个伏笔。实则是我也不知道……。

        所以,本篇从记录排查过程的角度,更深入地探索以下这3个组件——主要是methora的使用。

        一、YAF肯定没有问题

        因为我们在执行 super_mediator -o result.txt -m text test.yaf的时候已经从yaf的结果文件中导出过txt版本的dpi记录,其中是存在dns和tls的,可以证明相关的信息在test.yaf中确实存在。

        使用json格式导出一次,也可以证明结果还比较“完备”:

[root@12c4bd60ff5f testdata]# rm tmp.ipfix -f
[root@12c4bd60ff5f testdata]# super_mediator -o test2.json -m json test2.yaf
Initialization Successful, starting...
[2023-06-30 05:59:34] Running as root in --live mode, but not dropping privilege

         cat输出的json文件,可以看到tls和dns的相关记录也都在里面:

         二、super-mediator也没问题

        既然super-mediator的txt模式和json模式都没有问题,那是不是ipfix模式不支持dpi信息呢(之前我们是这么猜的)。这个猜测,我们通过观察super-mediator的执行log记录否定了:   

[root@12c4bd60ff5f testdata]# super_mediator -o tmp.ipfix --verbose test2.yaf
[2023-06-30 06:16:23] super_mediator starting
[2023-06-30 06:16:23] E1: Opening File tmp.ipfix
[2023-06-30 06:16:23] E1: Exporter Active.
Initialization Successful, starting...
[2023-06-30 06:16:23] Running as root in --live mode, but not dropping privilege
[2023-06-30 06:16:23] C1: Opening file: test2.yaf
[2023-06-30 06:16:23] COL C1(1) TID 0xd006: received. Labeled as IE_SPEC - TMD OR IE - EXACT_DEF
[2023-06-30 06:16:23] COL C1(1) TID 0xd007: received. Labeled as TMD - TMD OR IE - EXACT_DEF
[2023-06-30 06:16:23] COL C1(1) TID 0xd008: received. Labeled as TMD - TMD OR IE - EXACT_DEF
[2023-06-30 06:16:23] COL C1(1) TID yaf_dns_a (0xce01): received. Labeled as NESTED DATA - DPI - DPI UNKNOWN - EXACT_DEF
[2023-06-30 06:16:23] EXP E1(1) TID yaf_dns_a (0xce01): Adding new template
[2023-06-30 06:16:23] COL C1(1) TID yaf_dns_aaaa (0xce02): received. Labeled as NESTED DATA - DPI - DPI UNKNOWN - EXACT_DEF
…… …… ……
[2023-06-30 06:16:23] EXP E1(1) TID yaf_ssl_cert (0xca0b): Adding new template
[2023-06-30 06:16:23] COL C1(1) TID yaf_ssh (0xcc00): received. Labeled as NESTED DATA - DPI - DPI UNKNOWN - EXACT_DEF
[2023-06-30 06:16:23] EXP E1(1) TID yaf_ssh (0xcc00): Adding new template
…… …… ……
[2023-06-30 06:16:23] COL C1(1) TID yaf_flow_rle_tcp_ip4_total_biflow (0xb022): Added observationDomainId
[2023-06-30 06:16:23] COL C1(1) TID yaf_flow_rle_tcp_ip4_total_biflow (0xb022): Added yafFlowKeyHash
…… …… ……
[2023-06-30 06:16:23] Collector Stats: C1-INACTIVE-SINGLE FILE: Total Records Read: 3418, Files Read: 1, Restarts: 0, UNKNOWN Records: 0, FLOW Records: 3416, YAF STATS Records: 1, TOMBSTONE Records: 1, DNS DEDUP Records: 0, SSL DEDUP Records: 0, GENERAL DEDUP Records: 0, DNS RR Records: 0, TMD OR IE Records: 0, DPI Records: 0, UNKNOWN DATA Records: 0, UNKNOWN OPTIONS Records: 0, No filters used
[2023-06-30 06:16:23] Core Stats: Records Processed: 3418, UNKNOWN Records: 0, FLOW Records: 3416, YAF STATS Records: 1, TOMBSTONE Records: 1, DNS DEDUP Records: 0, SSL DEDUP Records: 0, GENERAL DEDUP Records: 0, DNS RR Records: 0, TMD OR IE Records: 0, DPI Records: 0, UNKNOWN DATA Records: 0, UNKNOWN OPTIONS Records: 0, AppLabel 0 Records: 1401, AppLabel 53 Records: 601, AppLabel 80 Records: 790, AppLabel 123 Records: 1, AppLabel 137 Records: 42, AppLabel 138 Records: 28, AppLabel 161 Records: 4, AppLabel 443 Records: 537, AppLabel 51443 Records: 12, Tombstone Records Generated: 0
[2023-06-30 06:16:23] Exporter Stats: E1-ACTIVE-SINGLE FILE: Total Records Written: 3418, Files Written: 1, Bytes Written: 614700, Restarts: 0, No filters used, UNKNOWN Ignored: 0, FLOW Ignored: 0, YAF STATS Ignored: 0, TOMBSTONE Ignored: 0, DNS DEDUP Ignored: 0, SSL DEDUP Ignored: 0, GENERAL DEDUP Ignored: 0, DNS RR Ignored: 0, TMD OR IE Ignored: 0, DPI Ignored: 0, UNKNOWN DATA Ignored: 0, UNKNOWN OPTIONS Ignored: 0, UNKNOWN Generated: 0, FLOW Generated: 0, YAF STATS Generated: 0, TOMBSTONE Generated: 0, DNS DEDUP Generated: 0, SSL DEDUP Generated: 0, GENERAL DEDUP Generated: 0, DNS RR Generated: 0, TMD OR IE Generated: 0, DPI Generated: 0, UNKNOWN DATA Generated: 0, UNKNOWN OPTIONS Generated: 0, UNKNOWN Forwarded: 0, FLOW Forwarded: 3416, YAF STATS Forwarded: 1, TOMBSTONE Forwarded: 1, DNS DEDUP Forwarded: 0, SSL DEDUP Forwarded: 0, GENERAL DEDUP Forwarded: 0, DNS RR Forwarded: 0, TMD OR IE Forwarded: 0, DPI Forwarded: 0, UNKNOWN DATA Forwarded: 0, UNKNOWN OPTIONS Forwarded: 0, AppLabel 0 Records: 1401, AppLabel 53 Records: 601, AppLabel 80 Records: 790, AppLabel 123 Records: 1, AppLabel 137 Records: 42, AppLabel 138 Records: 28, AppLabel 161 Records: 4, AppLabel 443 Records: 537, AppLabel 51443 Records: 12, 
[2023-06-30 06:16:23] Exporter Stats: E1-ACTIVE-SINGLE FILE: Total Records Written: 3418, Files Written: 1, Bytes Written: 614700, Restarts: 0, No filters used, UNKNOWN Ignored: 0, FLOW Ignored: 0, YAF STATS Ignored: 0, TOMBSTONE Ignored: 0, DNS DEDUP Ignored: 0, SSL DEDUP Ignored: 0, GENERAL DEDUP Ignored: 0, DNS RR Ignored: 0, TMD OR IE Ignored: 0, DPI Ignored: 0, UNKNOWN DATA Ignored: 0, UNKNOWN OPTIONS Ignored: 0, UNKNOWN Generated: 0, FLOW Generated: 0, YAF STATS Generated: 0, TOMBSTONE Generated: 0, DNS DEDUP Generated: 0, SSL DEDUP Generated: 0, GENERAL DEDUP Generated: 0, DNS RR Generated: 0, TMD OR IE Generated: 0, DPI Generated: 0, UNKNOWN DATA Generated: 0, UNKNOWN OPTIONS Generated: 0, UNKNOWN Forwarded: 0, FLOW Forwarded: 3416, YAF STATS Forwarded: 1, TOMBSTONE Forwarded: 1, DNS DEDUP Forwarded: 0, SSL DEDUP Forwarded: 0, GENERAL DEDUP Forwarded: 0, DNS RR Forwarded: 0, TMD OR IE Forwarded: 0, DPI Forwarded: 0, UNKNOWN DATA Forwarded: 0, UNKNOWN OPTIONS Forwarded: 0, AppLabel 0 Records: 1401, AppLabel 53 Records: 601, AppLabel 80 Records: 790, AppLabel 123 Records: 1, AppLabel 137 Records: 42, AppLabel 138 Records: 28, AppLabel 161 Records: 4, AppLabel 443 Records: 537, AppLabel 51443 Records: 12, 
[2023-06-30 06:16:23] E1: Closing File tmp.ipfix
[2023-06-30 06:16:23] super_mediator Terminating
[root@12c4bd60ff5f testdata]# 

        通过--verbose参数打印处理过程,可以看到即使是在ipfix模式下,super-mediator也处理了 dns、ssl之类的dpi信息,并且在最后统计信息的处理中,也看到了多种类型的AppLabel。所以,super-mediator大概率应该是没有问题的——也是就说数据是没有问题的。那么,问题只能是出在我们对mothra的使用方式上。

        三、Mothra的使用

        1. 载入字段选择

        通过学习mothra的API文档Mothra 1.6.0 - org.cert.netsa.mothra.datasources.ipfix,发现丢失DPI数据的原因可能是由于我们使用了默认的数据载入方式:        

         也就是说,采用spark.read.ipfix(your_file_path)载入的数据,由于没有使用fields方法指定载入数据的范围,可能只是Default Fields。实际上,spark.read是sparksession类的read方法,其调用了mothra提供的数据源CERTDataFrameReader提供的ipfix方法。该数据源还提供了fields方法用于指定读取数据的范围。

         fields方法使用FieldsSpec类型来指定读取数据范围,而IPFIXFields.default正是这样一个类型:

         同理,IPFIXFields也定义了诸如IPFIXFields:everything这样的fieldsSpec:

         所以,只要以IPFIXFields.everything指定fields,应该可以将所有信息导入到spark中:

scala> import org.cert.netsa.mothra.datasources._
import org.cert.netsa.mothra.datasources._

scala> import org.cert.netsa.mothra.datasources.ipfix.IPFIXFields
import org.cert.netsa.mothra.datasources.ipfix.IPFIXFields

scala> val all = spark.read.fields(IPFIXFields.everything).ipfix("f:/tmp/test2.ipfix")
all: org.apache.spark.sql.DataFrame = [startTime: timestamp, endTime: timestamp ... 184 more fields]

scala> all.count
res45: Long = 3418

scala> all.show(1,0,true)
-RECORD 0-----------------------------------------------------------
 startTime                                | 2022-11-15 10:34:36.433
 endTime                                  | 2022-11-15 10:34:36.485
 sourceIPAddress                          | 47.110.20.149
 sourcePort                               | 443
 destinationIPAddress                     | 192.168.182.76
 destinationPort                          | 62116
 protocolIdentifier                       | 6
 observationDomainId                      | 0
 vlanId                                   | 0
 reverseVlanId                            | 0
 silkAppLabel                             | 0
 packetCount                              | 3
 reversePacketCount                       | 4
 octetCount                               | 674
 reverseOctetCount                        | 253
 initialTCPFlags                          | 24
 reverseInitialTCPFlags                   | 24
 unionTCPFlags                            | 25
 reverseUnionTCPFlags                     | 21
 dnp3RecordList                           | []
 dnsRecordList                            | []
………………
 reverseFlowAttributes                    | 0
 flowEndReason                            | 3
 reverseFlowDeltaMilliseconds             | 1
 ipClassOfService                         | 20
………………
 reverseMaxPacketSize                     | null
 reverseStandardDeviationPayloadLength    | null
 tcpSequenceNumber                        | 3558855353
 reverseTcpSequenceNumber                 | 341543645
 ndpiL7Protocol                           | null
 ndpiL7SubProtocol                        | null
 mplsTopLabelStackSection                 | null
 mplsLabelStackSection2                   | null
 mplsLabelStackSection3                   | null
 yafFlowKeyHash                           | 4001181757
only showing top 1 row

        可以看到,明显比使用default时的数据是要多了好多的。

        2. 提取特定的DPI数据

        下一步的问题,是证明mothra是否真的将数据读入了进来,还是说虽然字段多了但其实是张空表。

        (1)统计AppLabel

scala> all.groupBy('silkAppLabel).count.show
+------------+-----+
|silkAppLabel|count|
+------------+-----+
|         137|   42|
|          53|  601|
|        null|    2|
|       51443|   12|
|         161|    4|
|         443|  537|
|          80|  790|
|         123|    1|
|           0| 1401|
|         138|   28|
+------------+-----+

        通过统计AppLabel,我们能够直到,至少在ipfix中存在601条dns信息、537条tls信息和790条http信息,这个是我们在执行yaf时设置过需要提取DPI信息的,接下来需要搞清楚的是这些信息有没有存下来。

        (2)提取DNS记录

        根据everthing的schema,我们知道存储dns信息的列名是dnsRecordList,因此可以使用filter appLabel的方式进行过滤后提取。

scala> all.printSchema
root
 |-- startTime: timestamp (nullable = true)
 |-- endTime: timestamp (nullable = true)
 |-- sourceIPAddress: string (nullable = true)
 |-- sourcePort: integer (nullable = true)
…………………………
 |-- dnsRecordList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dnsName: string (nullable = true)
 |    |    |-- dnsTTL: long (nullable = true)
 |    |    |-- dnsRRType: integer (nullable = true)
 |    |    |-- dnsQueryResponse: integer (nullable = true)
 |    |    |-- dnsAuthoritative: integer (nullable = true)
 |    |    |-- dnsResponseCode: integer (nullable = true)
 |    |    |-- dnsSection: integer (nullable = true)
 |    |    |-- dnsId: integer (nullable = true)
 |    |    |-- dnsA: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsAAAA: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsCNAME: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsMX: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- exchange: string (nullable = true)
 |    |    |    |    |-- preference: integer (nullable = true)
 |    |    |-- dnsNS: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsPTR: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsTXT: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsSOA: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- mname: string (nullable = true)
 |    |    |    |    |-- rname: string (nullable = true)
 |    |    |    |    |-- serial: long (nullable = true)
 |    |    |    |    |-- refresh: long (nullable = true)
 |    |    |    |    |-- retry: long (nullable = true)
 |    |    |    |    |-- expire: long (nullable = true)
 |    |    |    |    |-- minimum: long (nullable = true)
 |    |    |-- dnsSRV: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- target: string (nullable = true)
 |    |    |    |    |-- priority: integer (nullable = true)
 |    |    |    |    |-- weight: integer (nullable = true)
 |    |    |    |    |-- port: integer (nullable = true)
 |-- enipDataList: array (nullable = true)
…………………………


scala> val dns = all.filter("silkAppLabel='53'").select('dnsRecordList)
dns: org.apache.spark.sql.DataFrame = [dnsRecordList: array<struct<dnsName:string,dnsTTL:bigint,dnsRRType:int,dnsQueryResponse:int,dnsAuthoritative:int,dnsResponseCode:int,dnsSection:int,dnsId:int,dnsA:array<string>,dnsAAAA:array<string>,dnsCNAME:array<string>,dnsMX:array<struct<exchange:string,preference:int>>,dnsNS:array<string>,dnsPTR:array<string>,dnsTXT:array<string>,dnsSOA:array<struct<mname:string,rname:string,serial:bigint,refresh:bigint,retry:bigint,expire:bigint,minimum:bigint>>,dnsSRV:array<struct<target:string,priority:int,weight:int,port:int>>>>]

scala> dns.count
res49: Long = 601

        打印一下试试

scala> dns.show
+--------------------+
|       dnsRecordList|
+--------------------+
|[{pagead2.googles...|
|[{SMS_SLP., 0, 1,...|
|[{SMS_SLP., 0, 1,...|
…… …… ……
+--------------------+
only showing top 20 rows


scala> dns.show(1,0,true)
-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 dnsRecordList | [{pagead2.googlesyndication.com., 0, 1, 0, 0, 0, 0, 37404, [], [], [], [], [], [], [], [], []}, {pagead2.googlesyndication.com., 254, 1, 1, 0, 0, 1, 37404, [180.163.150.166], [], [], [], [], [], [], [], []}]
only showing top 1 row

        可见Dns数据确实是读进来了,下一步就是怎么把它展开以方便分析了。

       四、操作复杂的spark类型

        上文中可以看到,经由ipfix读进来的数据其结构是比较复杂的,包括了Array和Struct等scala类型的嵌套,这些类型难以被简单地转成rdd进行处理,需要掌握一些展平数据的方法才能快乐的使用。

        例如,上文我们已经打印了dnsRecordList的schema结构,这里还可以看看http的主要IE的schema结构:

scala> val http = all.filter("silkAppLabel='80'").select('httpHostList,
     | 'httpGetList,
     | 'httpResponseList,
     | 'httpViaList,
     | 'httpContentTypeList,
     | 'httpContentLengthList)
http: org.apache.spark.sql.DataFrame = [httpHostList: array<string>, httpGetList: array<string> ... 4 more fields]

scala> http.count
res2: Long = 790

scala> http.show(1)
+--------------------+-----------+----------------+-----------+--------------------+---------------------+
|        httpHostList|httpGetList|httpResponseList|httpViaList| httpContentTypeList|httpContentLengthList|
+--------------------+-----------+----------------+-----------+--------------------+---------------------+
|[pr.x.hub.sandai....|   [POST /]|        [200 OK]|         []|[application/octe...|            [192, 36]|
+--------------------+-----------+----------------+-----------+--------------------+---------------------+
only showing top 1 row

scala> http.printSchema
root
 |-- httpHostList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpGetList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpResponseList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpViaList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpContentTypeList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpContentLengthList: array (nullable = true)
 |    |-- element: string (containsNull = true)

        1.纵向打开

        纵向打开是比较常见的打开方式,就是当一行数据中存在嵌套时,将数据展开成多行。这在dns记录中比较常见,比如在一条dns应答中得到了多个options结果,为了分析方便起见,需要将options结果展开为多行,一行一个,而且每行都需要保持dns应答中的相关信息。所以纵向打开也会显著增加数据的冗余量。

        (1)explode方法

        explode函数是在Spark 3.4.1 ScalaDoc - org.apache.spark.sql.functions中定义的,官方的说法,是从给定的array/map列创建新的数据行。

       使用explode,如果被炸对象是空值,会导致整行数据被删掉。所以,如果还希望保留被炸对象为空值的数据行,可以使用explode_outer方法。

        (2)单个array打开

        以dnsRecordList为例。 打印出第一行,大概能够猜出来,dnsRecordList是一个包含struct类型对象的array数组,这个array实际就是以这个struct为条目包装的dns查询应答,一条一个struct。

scala> dns.show(1,0,true)
-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 dnsRecordList | [{pagead2.googlesyndication.com., 0, 1, 0, 0, 0, 0, 37404, [], [], [], [], [], [], [], [], []}, {pagead2.googlesyndication.com., 254, 1, 1, 0, 0, 1, 37404, [180.163.150.166], [], [], [], [], [], [], [], []}]
only showing top 1 row

        由上面给出的官方说明,explode输入一个列,输出也是列,所以可以用select来选择explode炸出的列。 炸成多行后是这个样子:

scala> dns.select(explode('dnsRecordList).as("dnsRecord")).show
+--------------------+
|           dnsRecord|
+--------------------+
|{pagead2.googlesy...|
|{pagead2.googlesy...|
|{SMS_SLP., 0, 1, ...|
|{SMS_SLP., 0, 1, ...|
…… …… …… ……
+--------------------+
only showing top 20 rows

        可以看到,那个代表array的中括号没了,中括号中如果有多个struct的话,就会被炸成多行的形式。

        (3)多个array打开

        多个array打开和单个是一样的,但要注意空值的处理,所以比较合适的是使用explode_outer;另外一个麻烦的问题,是在一个select语句中不能使用多次explode,所以需要使用withColumn来配合,代码上感觉冗余,不太舒服。  

scala> http.withColumn("Host",explode_outer('httpHostList)).
     |      | withColumn("Get",explode_outer('httpGetList)).
     |      | withColumn("Response",explode_outer('httpResponseList)).
     |      | withColumn("ContentType",explode_outer('httpContentTypeList)).
     |      | withColumn("ContentLen",explode_outer('httpContentLengthList)).
     |      | select('Host,'Get,'Response,'ContentType,'ContentLen).show
+--------------------+--------------------+----------------+--------------------+----------+
|                Host|                 Get|        Response|         ContentType|ContentLen|
+--------------------+--------------------+----------------+--------------------+----------+
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
…………………………
+--------------------+--------------------+----------------+--------------------+----------+
only showing top 20 rows

        炸开的结果,可以明显感觉到数据的冗余量上来了。对比一下原始的数据: 

scala> http.select('httpHostList,'httpGetList,'httpResponseList,'httpContentTypeList,'httpContentLengthList).show
+--------------------+--------------------+--------------------+--------------------+---------------------+
|        httpHostList|         httpGetList|    httpResponseList| httpContentTypeList|httpContentLengthList|
+--------------------+--------------------+--------------------+--------------------+---------------------+
|[pr.x.hub.sandai....|            [POST /]|            [200 OK]|[application/octe...|            [192, 36]|
|[pr.x.hub.sandai....|            [POST /]|            [200 OK]|[application/octe...|            [192, 36]|

        对比一下原始的数据,可以看到这主要是因为Contentlength被炸开造成的;所以,观察数据的时候,谨慎选择炸开的列还是很有必要的。 

        2. 横向打开

        横向打开,就是将复杂数据类型打开成多列。需要注意到对于表格来说,行数是可以随意增长的,这也是纵向打开相对容易的原因,只要接受数据冗余就好了;但是列数,就不是能够随意打开的了,毕竟表头不能随着数据动态增长——那就太自由了。所以,根据数据类型的不同,横向展开的方式也会不同。

        (1)Struct类型横向打开

        如果数据类型是Struct,也就暗示着实际复杂类型包含的子要素的个数是恒定的,相对来说,展成多列也就容易接受。

        比如dnsRecordList里面的这个struct,通过printSchema我们已经知道其数据结构,包含dnsName、dnsTTL等下级元素。只需要使用select(“structname.*”)就可以解决问题。

scala> dns.select(explode('dnsRecordList).as("RecordList")).select("RecordList.*").show
+--------------------+------+---------+----------------+----------------+---------------+----------+-----+-----------------+--------------------+--------+-----+-----+------+------+--------------------+------+
|             dnsName|dnsTTL|dnsRRType|dnsQueryResponse|dnsAuthoritative|dnsResponseCode|dnsSection|dnsId|             dnsA|             dnsAAAA|dnsCNAME|dnsMX|dnsNS|dnsPTR|dnsTXT|              dnsSOA|dnsSRV|
+--------------------+------+---------+----------------+----------------+---------------+----------+-----+-----------------+--------------------+--------+-----+-----+------+------+--------------------+------+
|pagead2.googlesyn...|     0|        1|               0|               0|              0|         0|37404|               []|                  []|      []|   []|   []|    []|    []|                  []|    []|
|pagead2.googlesyn...|   254|        1|               1|               0|              0|         1|37404|[180.163.150.166]|                  []|      []|   []|   []|    []|    []|                  []|    []|
|            SMS_SLP.|     0|        1|               0|               0|              0|         0|14041|               []|                  []|      []|   []|   []|    []|    []|                  []|    []|
|            SMS_SLP.|     0|        1|               0|               0|              0|         0|14041|               []|                  []|      []|   []|   []|    []|    []|                  []|    []|

        如果只需要查看有限几个下级元素,也可以直接指定元素进行选择:

scala> dns.select(explode('dnsRecordList).as("RecordList")).select("RecordList.dnsName","RecordList.dnsA").show
+--------------------+-----------------+
|             dnsName|             dnsA|
+--------------------+-----------------+
|pagead2.googlesyn...|               []|
|pagead2.googlesyn...|[180.163.150.166]|
|            SMS_SLP.|               []|

        (2)Array类型横向展开

        如果数据类型是Array,就不携带任何下级元素的表头信息了,而且还可能因为数据带来不同array大小,这种展开就比较麻烦了:

scala> import org.apache.spark.sql.functions.size
import org.apache.spark.sql.functions.size

scala> http.select('httpHostList).withColumn("listSize",size('httpHostList)).groupBy('listSize).count.sort('listSize.desc).show
+--------+-----+
|listSize|count|
+--------+-----+
|       3|    2|
|       2|    1|
|       1|  106|
|       0|  681|
+--------+-----+

        过滤一下看看,确实有host为3的情况:

scala> val tmp = http.select('httpHostList).withColumn("size",size('httpHostList)).filter("size='3'")
tmp: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [httpHostList: array<string>, size: int]

scala> tmp.show
+--------------------+----+
|        httpHostList|size|
+--------------------+----+
|[ctldl.windowsupd...|   3|
|[1d.tlu.dl.delive...|   3|
+--------------------+----+

        对于array类型,需要在dataframe中选择的话,直接使用“(索引)”操作符就行。所以,比较笨的方法,就是手动展开:

scala> http.select('httpHostList(0).as("host0"),'httpHostList(1).as("host1"),'httpHostList(2).as("host2")).show
+--------------------+--------------------+--------------------+
|               host0|               host1|               host2|
+--------------------+--------------------+--------------------+
|pr.x.hub.sandai.n...|                null|                null|
|pr.x.hub.sandai.n...|                null|                null|
|extshort.weixin.q...|                null|                null|
|      42.187.182.123|                null|                null|
|ctldl.windowsupda...|ctldl.windowsupda...|ctldl.windowsupda...|
|      x1.c.lencr.org|                null|                null|
……………………

        自动一点,可以考虑采取如下的写法,其中“:_*”表示将对应的参数序列化后使用:

scala> http.select( $"httpGetList" +: (0 until 3).map(i => 'httpHostList(i).alias(s"host$i")):_* ).show
+--------------------+--------------------+--------------------+--------------------+
|         httpGetList|               host0|               host1|               host2|
+--------------------+--------------------+--------------------+--------------------+
|            [POST /]|pr.x.hub.sandai.n...|                null|                null|
|            [POST /]|pr.x.hub.sandai.n...|                null|                null|
|[POST /mmtls/0000...|extshort.weixin.q...|                null|                null|
|[POST /mmtls/0000...|      42.187.182.123|                null|                null|
|[GET /msdownload/...|ctldl.windowsupda...|ctldl.windowsupda...|ctldl.windowsupda...|
………………
+--------------------+--------------------+--------------------+--------------------+
only showing top 20 rows

PS

        最后,推荐几篇不错的文章,帮助我解决了scala小白过程中遭遇的问题:

        1. 数组列转多列:

Spark DataFrame数组列转多列 - 知乎 (zhihu.com)

        2. 操作复杂的spark类型:

 Spark之处理复杂数据类型(Struct、Array、Map、JSON字符串等)_spark string 转成json array_大数据翻身的博客-CSDN博客

        3. explode的本质是什么: 

 Spark Sql中的Map和flatMap_spark flatmap_数仓白菜白的博客-CSDN博客

        4. 带表达式的select: 

 Spark---DataFrame学习(二)——select、selectExpr函数_stan1111的博客-CSDN博客

        5. 奇怪的Scala操作符

scala中:_*的使用和scala中的:: , +:, :+, :::, +++ 等操作_scala中*_爱学习的孙同学的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/728328.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Excel】csv乱码

原因 CSV用UTF-8编码 Excel用ANSI编码 解决 1 创建一个新的Excel 2 数据 > 从文本/CSV 3 选择文件 4 选择 文件原始格式 和 分隔符 &#xff08;根据自己文件进行选择&#xff0c;如果不知道编码&#xff0c;可以一个一个的试&#xff0c;直到不出现乱码&#xff09;

【Go|第5期】Lorca无法正常运行的解决方案

日期&#xff1a;2023年7月5日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xff…

奇怪的SQL问题+1

我的 VIP 用户又抛给我一个 SQL 问题&#xff0c;我很激动&#xff0c;因为素材又来了&#xff1a; 我一看&#xff0c;这个表没什么花头&#xff0c;不就是没设置主键吗&#xff0c;MySQL 会默认生成一个主键&#xff0c;这跟 delete 不掉数据好像也没啥关系。 然后他说&…

事件监听及DOM操作

1.页面内容实现 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>常见事件案例</title> </head> <body><img id"light" src"img/off.gif"> <br><…

红黑树的介绍

红黑树 1.红黑树的概念2. 红黑树的性质3. 红黑树的结点定义4. 红黑树的插入操作情况一: cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u存在且为红情况二: cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u不存在/u存在且为黑情况三: cur为红&#xff0c;p为…

Distributional Graphormer:从分子结构预测到平衡分布预测

编者按&#xff1a;近年来&#xff0c;深度学习技术在分子微观结构预测中取得了巨大的进展。然而&#xff0c;分子的宏观属性和功能往往取决于分子结构在平衡态下的分布&#xff0c;仅了解分子的微观结构还远远不够。在传统的统计力学中&#xff0c;分子动力学模拟或增强采样等…

【计算机视觉 | 目标检测】arxiv 计算机视觉关于目标检测的学术速递(7 月 6 日论文合集)

文章目录 一、检测相关(16篇)1.1 Large-scale Detection of Marine Debris in Coastal Areas with Sentinel-21.2 Unbalanced Optimal Transport: A Unified Framework for Object Detection1.3 Detecting Images Generated by Deep Diffusion Models using their Local Intrin…

Oracle单行函数(字符,数值,日期,转换)

Oracle单行函数&#xff08;字符&#xff0c;数值&#xff0c;日期&#xff0c;转换&#xff09; 前言 1、字符函数 1.1大小写转换函数 1.2连接字符串X和concat(X,Y) 1.3ASCII码与字符转换 1.4返回字符串索引位置&#xff1a;instr(x,str) 1.5返回字符串长度&#xff1a;length…

使用Plotly创建自定义指标图表

大家好&#xff0c;使用Plotly可以创建和自定义指标图表&#xff0c;本文中将介绍如何使用Plotly库创建指标图表的具体操作步骤。 Plotly简介 Plotly是一个强大的数据可视化工具&#xff0c;允许我们使用Python创建各种交互式绘图和图表。在Plotly提供的无数类型的图表中&…

【MySQL】MySQL里程碑

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️MySQL】 文章目录 时间表从产品特性的角度梳理其发展过程中了解MySQL里程碑事件 时间表 从产品特性的角度梳理其发展过程中了解MySQL里程碑事件 1995年&#xff0c;MySQL 1.0发布&#xff0c;仅供内…

【LeetCode周赛】2022上半年题目精选集——贪心

文章目录 2136. 全部开花的最早一天&#xff08;贪心&#xff09;⭐⭐⭐⭐⭐思路代码语法解析&#xff1a;Integer[] id IntStream.range(0, plantTime.length).boxed().toArray(Integer[]::new); 2141. 同时运行 N 台电脑的最长时间&#xff08;贪心&#xff09;⭐⭐⭐⭐⭐解…

一分钟带你创建百万测试数据,玩转软件测试

准备测试数据是软件测试中非常重要的一个环节&#xff0c;无论是手工测试、动化测试还是性能测试&#xff0c;生成大量测试数据以评估性能是一项重要任务。 然而&#xff0c;寻找合适的测试数据并确保其质量常常是一项繁琐且耗时的工作。 先来看一下准备测试数据常见的四类方法…

Vue 实时显示时间

Vue 实时显示时间 getNowTime() {setInterval(() > {const date new Date();var year date.getFullYear();var month (date.getMonth() 1).toString().padStart(2, "0");var day date.getDate().toString().padStart(2, "0");var hours date.ge…

人工智能商业变现途径,并介绍详细公司案列

目录 1. 推荐系统&#xff1a;2. 智能广告和营销&#xff1a;3. 聊天机器人和虚拟助手&#xff1a;4. 自动化和机器人化&#xff1a;5. 数据分析和预测&#xff1a;6. 机器视觉和图像识别&#xff1a;7. 金融科技&#xff08;FinTech&#xff09;&#xff1a;8. 医疗诊断和健康…

【成长之路】nginx配置https遇到的一系列问题

问题一&#xff1a;拿到手的文件并没有网上说的crt和key&#xff0c;而是一个cer、key和csr 按照网上说的&#xff0c;将cer转成pem文件&#xff0c;配置之后确实好使了 server {listen 443 ssl;ssl_certificate /opt/nginx/conf/域名.cer;ssl_certificate_key /opt/nginx/co…

诚迈科技董事长、统信软件董事长王继平出席全球数字经济大会

7月5日&#xff0c;2023全球数字经济大会“数字未来新一代软件产业高质量发展论坛”在北京大兴隆重举行。论坛以“数字新高地&#xff0c;数创兴未来”为主题&#xff0c;共同探讨产业升级新路径&#xff0c;凝聚数字经济合作新共识&#xff0c;构建数字产业集聚发展新高地。诚…

python接口自动化之DDT数据驱动测试

一、简单介绍 DDT&#xff08;Date Driver Test&#xff09;&#xff0c;所谓数据驱动测试&#xff0c;简单来说就是由数据的改变从而驱动自动化测试的执行&#xff0c;最终引起测试结果的改变。通过使用数据驱动测试的方法&#xff0c;可以在需要验证多组数据测试场景中&…

YApi-高效、易用、功能强大的可视化接口管理平台——(二)YApi 分组权限

YApi 分组权限 认识 YApi角色划分项目权限分组权限分组操作创建分组项目列表添加成员分组删除 认识 YApi YApi 是一个开源的接口管理平台&#xff0c;可以方便地管理和测试 API 接口&#xff0c;支持接口文档自动生成、Mock 数据生成、接口测试和接口监控等功能。YApi 支持多人…

TCP 协议报文

TCP 提供面向连接的通信传输&#xff0c;面向连接是指在传送数据之前必须先建立连接&#xff0c;数据传送完成后要释放连接。无论哪一方向另一方发送数据之前&#xff0c;都必须先在双方之间建立一条连接。在TCP/IP协议中&#xff0c;TCP协议提供可靠的连接服务&#xff0c;连接…

ATFX国际:大非农数据来袭,美国劳动力市场需求或空前旺盛

ATFX国际&#xff1a;昨日晚间公布的ADP数据震惊市场&#xff0c;新增就业人口高达49.7万人&#xff0c;而预期值仅为22.8万人&#xff0c;前值也只有26.7万人。公布值约为预期值和前值的总和。 ▲ATFX图 ADP数据是非农就业报告的前瞻指标&#xff0c;前者表现亮眼&#xff0c…