問題描述
我有以下數據.
+----------+----+-------+-----------------------+
| date|item|avg_val|conditions |
+----------+----+-------+-----------------------+
|01-10-2020| x| 10| 0|
|02-10-2020| x| 10| 0|
|03-10-2020| x| 15| 1|
|04-10-2020| x| 15| 1|
|05-10-2020| x| 5| 0|
|06-10-2020| x| 13| 1|
|07-10-2020| x| 10| 1|
|08-10-2020| x| 10| 0|
|09-10-2020| x| 15| 1|
|01-10-2020| y| 10| 0|
|02-10-2020| y| 18| 0|
|03-10-2020| y| 6| 1|
|04-10-2020| y| 10| 0|
|05-10-2020| y| 20| 0|
+----------+----+-------+-----------------------+
我正在嘗試基于
- 如果標志值為 0,則新列值將為 0.
- 如果標志為 1,則新列將為 1,接下來的四個 N 行數將為零,即無需檢查下一個 N 值.此過程將應用于每個項目,即按項目分區將起作用.
我在這里使用了 N = 4,
I have used here N = 4,
我已經使用了下面的代碼,但沒有有效的窗口函數是否有任何優化的方法.
I have used the below code but not effienntly windowing function is there any optimized way.
DROP TEMPORARY TABLE t2;
CREATE TEMPORARY TABLE t2
SELECT *,
MAX(conditions) OVER (PARTITION BY item ORDER BY item,`date` ROWS 4 PRECEDING ) AS new_row
FROM record
ORDER BY item,`date`;
DROP TEMPORARY TABLE t3;
CREATE TEMPORARY TABLE t3
SELECT *,ROW_NUMBER() OVER (PARTITION BY item,new_row ORDER BY item,`date`) AS e FROM t2;
SELECT *,CASE WHEN new_row=1 AND e%5>1 THEN 0
WHEN new_row=1 AND e%5=1 THEN 1 ELSE 0 END AS flag FROM t3;
輸出類似于
+----------+----+-------+-----------------------+-----+
| date|item|avg_val|conditions |flag |
+----------+----+-------+-----------------------+-----+
|01-10-2020| x| 10| 0| 0|
|02-10-2020| x| 10| 0| 0|
|03-10-2020| x| 15| 1| 1|
|04-10-2020| x| 15| 1| 0|
|05-10-2020| x| 5| 0| 0|
|06-10-2020| x| 13| 1| 0|
|07-10-2020| x| 10| 1| 0|
|08-10-2020| x| 10| 0| 0|
|09-10-2020| x| 15| 1| 1|
|01-10-2020| y| 10| 0| 0|
|02-10-2020| y| 18| 0| 0|
|03-10-2020| y| 6| 1| 1|
|04-10-2020| y| 10| 0| 0|
|05-10-2020| y| 20| 0| 0|
+----------+----+-------+-----------------------+-----+
但是我無法獲得輸出,我嘗試了更多.
But i am unable to get the ouput , i have tried more.
推薦答案
正如評論中所建議的(@nbk 和 @Akina),您將需要某種迭代器來實現邏輯.對于 SparkSQL 和 Spark 2.4+ 版,我們可以使用內置函數 aggregate 并設置一個結構數組和一個計數器作為累加器.下面是一個名為 record
的示例數據框和表(假設 conditions
列中的值為 0
或 1
):
As suggested in the comments(by @nbk and @Akina), you will need some sort of iterator to implement the logic. With SparkSQL and Spark version 2.4+, we can use the builtin function aggregate and set an array of structs plus a counter as the accumulator. Below is an example dataframe and table named record
(assume values in conditions
column are either 0
or 1
):
val df = Seq(
("01-10-2020", "x", 10, 0), ("02-10-2020", "x", 10, 0), ("03-10-2020", "x", 15, 1),
("04-10-2020", "x", 15, 1), ("05-10-2020", "x", 5, 0), ("06-10-2020", "x", 13, 1),
("07-10-2020", "x", 10, 1), ("08-10-2020", "x", 10, 0), ("09-10-2020", "x", 15, 1),
("01-10-2020", "y", 10, 0), ("02-10-2020", "y", 18, 0), ("03-10-2020", "y", 6, 1),
("04-10-2020", "y", 10, 0), ("05-10-2020", "y", 20, 0)
).toDF("date", "item", "avg_val", "conditions")
df.createOrReplaceTempView("record")
SQL:
spark.sql("""
SELECT t1.item, m.*
FROM (
SELECT item,
sort_array(collect_list(struct(date,avg_val,int(conditions) as conditions,conditions as flag))) as dta
FROM record
GROUP BY item
) as t1 LATERAL VIEW OUTER inline(
aggregate(
/* expr: set up array `dta` from the 2nd element to the last
* notice that indices for slice function is 1-based, dta[i] is 0-based
*/
slice(dta,2,size(dta)),
/* start: set up and initialize `acc` to a struct containing two fields:
* - dta: an array of structs with a single element dta[0]
* - counter: number of rows after flag=1, can be from `0` to `N+1`
*/
(array(dta[0]) as dta, dta[0].conditions as counter),
/* merge: iterate through the `expr` using x and update two fields of `acc`
* - dta: append values from x to acc.dta array using concat + array functions
* update flag using `IF(acc.counter IN (0,5) and x.conditions = 1, 1, 0)`
* - counter: increment by 1 if acc.counter is between 1 and 4
* , otherwise set value to x.conditions
*/
(acc, x) -> named_struct(
'dta', concat(acc.dta, array(named_struct(
'date', x.date,
'avg_val', x.avg_val,
'conditions', x.conditions,
'flag', IF(acc.counter IN (0,5) and x.conditions = 1, 1, 0)
))),
'counter', IF(acc.counter > 0 and acc.counter < 5, acc.counter+1, x.conditions)
),
/* finish: retrieve acc.dta only and discard acc.counter */
acc -> acc.dta
)
) m
""").show(50)
結果:
+----+----------+-------+----------+----+
|item| date|avg_val|conditions|flag|
+----+----------+-------+----------+----+
| x|01-10-2020| 10| 0| 0|
| x|02-10-2020| 10| 0| 0|
| x|03-10-2020| 15| 1| 1|
| x|04-10-2020| 15| 1| 0|
| x|05-10-2020| 5| 0| 0|
| x|06-10-2020| 13| 1| 0|
| x|07-10-2020| 10| 1| 0|
| x|08-10-2020| 10| 0| 0|
| x|09-10-2020| 15| 1| 1|
| y|01-10-2020| 10| 0| 0|
| y|02-10-2020| 18| 0| 0|
| y|03-10-2020| 6| 1| 1|
| y|04-10-2020| 10| 0| 0|
| y|05-10-2020| 20| 0| 0|
+----+----------+-------+----------+----+
地點:
- 使用
groupby
將同一項目的行收集到名為 dta 列的結構數組中,該列具有 4 個字段:date、avg_val、conditions 和 flag 并按 date 排序 - 使用
aggregate
函數遍歷上述結構體數組,根據counter和conditions更新flag字段strong>(詳情見上面SQL代碼注釋) - 使用
Lateral VIEW
和 inline 函數分解來自聚合函數的結果結構數組
- use
groupby
to collect rows for the same item into an array of structs named dta column with 4 fields: date, avg_val, conditions and flag and sorted by date - use
aggregate
function to iterate through the above array of structs, update the flag field based on counter and conditions (details see the above SQL code comments) - use
Lateral VIEW
and inline function to explode the resulting array of structs from the aggregate function
注意事項:
(1) 建議的 SQL 適用于 N=4,其中我們有 acc.counter IN (0,5)
和 acc.counter <;5
在 SQL 中.對于任何 N,將上述調整為:acc.counter IN (0,N+1)
和 acc.counter <;N+1
,下圖為N=2
相同樣本數據的結果:
(1) the proposed SQL is for N=4, where we have acc.counter IN (0,5)
and acc.counter < 5
in the SQL. For any N, adjust the above to: acc.counter IN (0,N+1)
and acc.counter < N+1
, the below shows the result for N=2
with the same sample data:
+----+----------+-------+----------+----+
|item| date|avg_val|conditions|flag|
+----+----------+-------+----------+----+
| x|01-10-2020| 10| 0| 0|
| x|02-10-2020| 10| 0| 0|
| x|03-10-2020| 15| 1| 1|
| x|04-10-2020| 15| 1| 0|
| x|05-10-2020| 5| 0| 0|
| x|06-10-2020| 13| 1| 1|
| x|07-10-2020| 10| 1| 0|
| x|08-10-2020| 10| 0| 0|
| x|09-10-2020| 15| 1| 1|
| y|01-10-2020| 10| 0| 0|
| y|02-10-2020| 18| 0| 0|
| y|03-10-2020| 6| 1| 1|
| y|04-10-2020| 10| 0| 0|
| y|05-10-2020| 20| 0| 0|
+----+----------+-------+----------+----+
(2) 我們使用dta[0]
來初始化acc
,它包括其字段的值和數據類型.理想情況下,我們應該確保這些字段的數據類型正確,以便正確進行所有計算.例如在計算 acc.counter
時,如果 conditions
是 StringType,acc.counter+1
將返回一個帶有 DoubleType 值的 StringType
(2) we use dta[0]
to initialize acc
which includes both the values and datatypes of its fields. Ideally, we should make sure data types of these fields right so that all calculations are correctly conducted. for example when calculating acc.counter
, if conditions
is StringType, acc.counter+1
will return a StringType with a DoubleType value
spark.sql("select '2'+1").show()
+---------------------------------------+
|(CAST(2 AS DOUBLE) + CAST(1 AS DOUBLE))|
+---------------------------------------+
| 3.0|
+---------------------------------------+
當使用 acc.counter IN (0,5)
或 acc.counter
將其值與整數進行比較時,可能會產生浮點錯誤.5
.根據 OP 的反饋,這產生了錯誤的結果,沒有任何警告/錯誤消息.
Which could yield floating-point errors when comparing their value with integers using acc.counter IN (0,5)
or acc.counter < 5
. Based on OP's feedback, this produced incorrect result without any WARNING/ERROR message.
一種解決方法是在設置聚合函數的第二個參數時使用 CAST 指定確切的字段類型,以便在任何類型不匹配時報告錯誤,見下文:
One workaround is to specify exact field types using CAST when setting up the 2nd argument of aggregate function so it reports ERROR when any types mismatch, see below:
CAST((array(dta[0]), dta[0].conditions) as struct<dta:array<struct<date:string,avg_val:string,conditions:int,flag:int>>,counter:int>),
另一種在創建 dta
列時強制類型的解決方案,在此示例中,請參閱以下代碼中的 int(conditions) as conditions
:
Another solution it to force types when creating dta
column, in this example, see int(conditions) as conditions
in below code:
SELECT item,
sort_array(collect_list(struct(date,avg_val,int(conditions) as conditions,conditions as flag))) as dta
FROM record
GROUP BY item
我們也可以在計算中強制使用數據類型,例如,參見下面的int(acc.counter+1)
:
IF(acc.counter > 0 and acc.counter < 5, int(acc.counter+1), x.conditions)
這篇關于如何有效地使用窗口函數根據 N 個先前值來決定接下來的 N 個行的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!