Pyspark: 여러 배열 열을 행으로 나눕니다.
저는 하나의 행과 여러 개의 열을 가진 데이터 프레임을 가지고 있습니다.일부 열은 단일 값이고 다른 열은 목록입니다.모든 목록 열의 길이가 동일합니다.목록이 아닌 열은 그대로 유지하면서 각 목록 열을 별도의 행으로 분할하고자 합니다.
샘플 DF:
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
sqlc = SQLContext(sc)
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
원하는 것:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
만약 내가 목록열을 하나만 가지고 있다면, 이것은 단지 하기만 하면 쉬울 것입니다.explode
:
df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+
하지만 저도 그러려고 하면.explode
그c
열, 결국 데이터 프레임은 제가 원하는 것의 제곱 길이입니다.
df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+
제가 원하는 것은 각 열에 대해 해당 열에 있는 배열의 n번째 요소를 가져와 새 행에 추가하는 것입니다.데이터 프레임의 모든 열에 폭발물을 매핑해 보았지만 그것도 작동하지 않는 것 같습니다.
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
스파크 >= 2.4
대체가능합니다zip_
udf
와 함께arrays_zip
기능.
from pyspark.sql.functions import arrays_zip, col, explode
(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))
스파크 < 2.4
와 함께DataFrames
UDF:
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
와 함께RDDs
:
(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))
두 솔루션 모두 Python 통신 오버헤드로 인해 비효율적입니다.데이터 크기가 고정되어 있으면 다음과 같은 작업을 수행할 수 있습니다.
from functools import reduce
from pyspark.sql import DataFrame
# Length of array
n = 3
# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")
또는 다음과 같은 경우:
from pyspark.sql.functions import array, struct
# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))
(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
이는 UDF나 RDD에 비해 상당히 빠릅니다. 임의 수의 열을 지원하기 위해 일반화되었습니다.
# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))
df.withColumn("tmp", zip_and_explode("b", "c", n=3))
당신은 사용해야 합니다.flatMap
,것은 아니다.map
각 입력 행에서 출력 행을 여러 개 만들 때 사용합니다.
from pyspark.sql import Row
def dualExplode(r):
rowDict = r.asDict()
bList = rowDict.pop('b')
cList = rowDict.pop('c')
for b,c in zip(bList, cList):
newDict = dict(rowDict)
newDict['b'] = b
newDict['c'] = c
yield Row(**newDict)
df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))
라이너 1개(Spark>=2.4.0용):
df.withColumn("bc", arrays_zip("b","c"))
.select("a", explode("bc").alias("tbc"))
.select("a", col"tbc.b", "tbc.c").show()
가져오기 필수:
from pyspark.sql.functions import arrays_zip
단계 -
- 다음인 열 bc를 만듭니다.
array_zip
기둥의b
그리고.c
- 폭발
bc
구조물을 보다tbc
- 필요한 열을 선택합니다.
a
,b
그리고.c
(필요에 따라 모두 폭발).
출력:
> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 1| 7|
| 1| 2| 8|
| 1| 3| 9|
+---+---+---+
ps.DataFrame(df[['b','c']].pandas_api().iloc[0].to_dict()).to_spark()\
.join(df[['a','d']],how='cross').show()
출력:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
언급URL : https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows
'programing' 카테고리의 다른 글
HANDLES에 fdopen과 동등한 윈도우가 있습니까? (0) | 2023.10.01 |
---|---|
조건부 실행 (&& 및 ||) (0) | 2023.10.01 |
폭 100% 테이블 넘침 디브용기 (0) | 2023.09.26 |
전송된 요청이 Ajax 요청인 경우 Managed Bean에서 리디렉션하는 방법은 무엇입니까? (0) | 2023.09.26 |
기존 MariaDB 타임스탬프 열을 로컬 표준시에서 UTC로 마이그레이션하는 방법 (0) | 2023.09.26 |