pip3 install --trusted-host pypi.org --trusted-host files.pythonhosted.org kafka-python

 

참고: https://curryyou.tistory.com/179

 

[파이썬] pip 설치 SSLError 오류 해결 방법: SSLCertVerificationError [SSL: CERTIFICATE_VERIFY_FAILED]

회사 컴퓨터나 사내망 등의 환경에서 pip로 파이썬 라이브러리를 설치하면, 아래와 같이 SSL관련 에러가 뜰 때가 있다. (방화벽/프록시 등의 이슈로, 해결 방법은 간단하다) pip install requests <터미

curryyou.tistory.com

 

pip.conf 파일을 만들면 매번 옵션을 추가할 필요가 없다.

https://pip.pypa.io/en/stable/topics/configuration/

Posted by poterius
,

hue_safety_valve.ini 에 download_row_limit 을 변경한다.

 

Posted by poterius
,
# 현재 사용 중인 버전 확인
$ /usr/libexec/java_home -V 
Matching Java Virtual Machines (2): 
14.0.1, x86_64: "OpenJDK 14.0.1" /Users/cjos/Library/Java/JavaVirtualMachines/openjdk-14.0.1/Contents/Home 
1.8.0_172, x86_64: "Java SE 8" /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home

# 변경
$ export JAVA_HOME=$(/usr/libexec/java_home -v 1.8) 
$ source ~/.bash_profile

$ java -version 
java version "1.8.0_172" Java(TM) SE Runtime Environment (build 1.8.0_172-b11) 
Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)

# 안쓰는 버전 삭제
$ cd /Users/cjos/Library/Java/JavaVirtualMachines/ 
$ sudo rm -rf openjdk-14.0.1

 

원본: https://ifuwanna.tistory.com/247

Posted by poterius
,

이벤트를 Spark Stream 등으로 받아 Hive table로 저장하면 많은 조각 파일들이 생성된다. 작은 파일이 너무 많은면 이후 처리에 문제가 되므로 큰 파일로 합쳐주는 것이 좋다. 

아래는 table이 2단계 partition을 가질 때 예이다. 

아래 예를 파일로 저장하고, oozie workflow에서 table 이름을 parameter로 넘겨 실행한다. 

#!/bin/sh

partitions=`hive -e "show partitions $1 "`
echo "partitions: " $partitions
for part in $partitions; do
    IFS="/" read part1 part2 <<< $part
    IFS="=" read k1 v1 <<< $part1

    IFS="=" read k2 v2 <<< $part2

    if [[ $v1 == "$2" ]]
    then
          hive -e "alter table $1  partition($k1='$v1', $k2='$v2') concatenate;"
    fi
done
Posted by poterius
,
string="Paris, France, Europe"
IFS=', ' read -r -a array <<< "$string"

# 0번째
echo "${array[0]}"

# iteration
for element in "${array[@]}"
do
    echo "$element"
done

# loop
for index in "${!array[@]}"
do
    echo "$index ${array[index]}"
done

 

https://stackoverflow.com/questions/10586153/how-to-split-a-string-into-an-array-in-bash

Posted by poterius
,

 

 

$ hive -e "explain extended select * from {DB}.{TABLE}" | grep location
Posted by poterius
,

deduplicate: different file contents found in the following:

build.sbt

assemblyMergeStrategy in assembly := {
  //case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.last
}

reference: http://queirozf.com/entries/creating-scala-fat-jars-for-spark-on-sbt-with-sbt-assembly-plugin

 

Creating Scala Fat Jars for Spark on SBT with sbt-assembly Plugin

Technology reference and information archive.

queirozf.com

 

Posted by poterius
,

val df = spark.table("search.impression_log").
    filter(s"dt='20190730' and hour='20'").

df.createTempView("abc")

%spark.sql
select * from abc
limit 100

Posted by poterius
,

다음과 같은 테이블에서

log_id|itemid | is_valid | price

100|1,2,3,...| Y,Y,N,... | 10,20,10,...

 

다음과 같이 짝을 맞춰 처리하려면 변환이 필요하다. 

(1,Y,10)

(2,Y,20)

(3,N,10)

...

val zip = udf((id: Seq[String], is_valid: Seq[String], price: Seq[String])  => {
        id.indices.map(i=> (id(i), is_valid(i), price(i)))
    }
)

var selected_df = source_df.withColumn("vars", explode(zip(split($"id", ","), split($"is_valid", ","), split($"price", ",")  ))).select(
        $"log_id", $"timestamp", // column from source_df
        $"vars._1".alias("id"), 
        $"vars._2".alias("is_valid"), 
        $"vars._3".alias("price")       
        )

 

 

 

Posted by poterius
,

데이터가 크지 않다면, pandas + jupyter의 조합으로 데이터를 분석하는 것이 편할 때가 있다. 

단, 데이터 pipeline이 서버 상에서 이루어지는 것이 보통이고, AWS를 사용한다면 S3에 저장될 가능성이 높은데, 

작업을 할때 마다 일일이 다운 받기는 귀찮은 작업이다. 

 

준비

jupyter를 실행하고 있는 instance에 S3의 target bucket을 읽을 수 있도록 role을 부여한다(IAM)

 

import boto3
import pyarrow
import pyarrow.parquet as parquet
import pyarrow.orc as orc
import pandas as pd

def read_surprise_table(obj):
    buf = obj.get()['Body'].read()
    reader = pyarrow.BufferReader(buf)
    data = orc.ORCFile(reader)
    df  = data.read().to_pandas()

    print("df.columns: ", df.columns)
    print("df.shape: ", df.shape)

    return df

s3 = boto3.resource('s3')
bucket = 'bucket_name'   # bucket name
s3bucket = s3.Bucket(bucket)
merged_df = pd.DataFrame()
path = 'folder1/folder2/dt=' + today_str + '/'  # bucket name을 제외한 object path

# 데이터가 여러 파일로 나누어진 경우 loop돌면서 읽고 하나의 dataframe으로 concat한다.
for obj in s3bucket.objects.filter(Delimiter='/', Prefix=path):
    print("bucket:{}, key:{}".format(obj.bucket_name, obj.key))
    if obj.key.endswith("_SUCCESS") :
        continue
    tmp = read_surprise_table(obj)
    if merged_df.shape[0] == 0 :
        merged_df = tmp
    else :
        merged_df = pd.concat([merged_df, tmp], axis=0)  

print("merged_df.columns: ", merged_df.columes)
print("merged_df.shape: ", merged_df.shape)
Posted by poterius
,