deduplicate: different file contents found in the following:

build.sbt

assemblyMergeStrategy in assembly := {
  //case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.last
}

reference: http://queirozf.com/entries/creating-scala-fat-jars-for-spark-on-sbt-with-sbt-assembly-plugin

 

Creating Scala Fat Jars for Spark on SBT with sbt-assembly Plugin

Technology reference and information archive.

queirozf.com

 

Posted by poterius
,

val df = spark.table("search.impression_log").
    filter(s"dt='20190730' and hour='20'").

df.createTempView("abc")

%spark.sql
select * from abc
limit 100

Posted by poterius
,

다음과 같은 테이블에서

log_id|itemid | is_valid | price

100|1,2,3,...| Y,Y,N,... | 10,20,10,...

 

다음과 같이 짝을 맞춰 처리하려면 변환이 필요하다. 

(1,Y,10)

(2,Y,20)

(3,N,10)

...

val zip = udf((id: Seq[String], is_valid: Seq[String], price: Seq[String])  => {
        id.indices.map(i=> (id(i), is_valid(i), price(i)))
    }
)

var selected_df = source_df.withColumn("vars", explode(zip(split($"id", ","), split($"is_valid", ","), split($"price", ",")  ))).select(
        $"log_id", $"timestamp", // column from source_df
        $"vars._1".alias("id"), 
        $"vars._2".alias("is_valid"), 
        $"vars._3".alias("price")       
        )

 

 

 

Posted by poterius
,