pip3 install --trusted-host pypi.org --trusted-host files.pythonhosted.org kafka-python
참고: https://curryyou.tistory.com/179
pip.conf 파일을 만들면 매번 옵션을 추가할 필요가 없다.
pip3 install --trusted-host pypi.org --trusted-host files.pythonhosted.org kafka-python
참고: https://curryyou.tistory.com/179
pip.conf 파일을 만들면 매번 옵션을 추가할 필요가 없다.
hue_safety_valve.ini 에 download_row_limit 을 변경한다.
Hive table partition의 조각 모음 (concatenate) (0) | 2021.07.31 |
---|---|
쉡스크립트에서 문자열을 배열로 바꾸고 n번째 항목 가져오기 (0) | 2021.07.21 |
Hive 테이블에서 각 partition의 파일 위치(location) 뽑아보기 (0) | 2021.07.21 |
sbt로 spark job 빌드 (0) | 2019.08.12 |
zeppelin 에서 spark으로 데이터 조회할 때 테이블 뷰 (0) | 2019.08.01 |
# 현재 사용 중인 버전 확인
$ /usr/libexec/java_home -V
Matching Java Virtual Machines (2):
14.0.1, x86_64: "OpenJDK 14.0.1" /Users/cjos/Library/Java/JavaVirtualMachines/openjdk-14.0.1/Contents/Home
1.8.0_172, x86_64: "Java SE 8" /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home
# 변경
$ export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
$ source ~/.bash_profile
$ java -version
java version "1.8.0_172" Java(TM) SE Runtime Environment (build 1.8.0_172-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)
# 안쓰는 버전 삭제
$ cd /Users/cjos/Library/Java/JavaVirtualMachines/
$ sudo rm -rf openjdk-14.0.1
이벤트를 Spark Stream 등으로 받아 Hive table로 저장하면 많은 조각 파일들이 생성된다. 작은 파일이 너무 많은면 이후 처리에 문제가 되므로 큰 파일로 합쳐주는 것이 좋다.
아래는 table이 2단계 partition을 가질 때 예이다.
아래 예를 파일로 저장하고, oozie workflow에서 table 이름을 parameter로 넘겨 실행한다.
#!/bin/sh
partitions=`hive -e "show partitions $1 "`
echo "partitions: " $partitions
for part in $partitions; do
IFS="/" read part1 part2 <<< $part
IFS="=" read k1 v1 <<< $part1
IFS="=" read k2 v2 <<< $part2
if [[ $v1 == "$2" ]]
then
hive -e "alter table $1 partition($k1='$v1', $k2='$v2') concatenate;"
fi
done
HUE에서 download 가능한 결과 수 제한 (0) | 2021.08.11 |
---|---|
쉡스크립트에서 문자열을 배열로 바꾸고 n번째 항목 가져오기 (0) | 2021.07.21 |
Hive 테이블에서 각 partition의 파일 위치(location) 뽑아보기 (0) | 2021.07.21 |
sbt로 spark job 빌드 (0) | 2019.08.12 |
zeppelin 에서 spark으로 데이터 조회할 때 테이블 뷰 (0) | 2019.08.01 |
string="Paris, France, Europe"
IFS=', ' read -r -a array <<< "$string"
# 0번째
echo "${array[0]}"
# iteration
for element in "${array[@]}"
do
echo "$element"
done
# loop
for index in "${!array[@]}"
do
echo "$index ${array[index]}"
done
https://stackoverflow.com/questions/10586153/how-to-split-a-string-into-an-array-in-bash
HUE에서 download 가능한 결과 수 제한 (0) | 2021.08.11 |
---|---|
Hive table partition의 조각 모음 (concatenate) (0) | 2021.07.31 |
Hive 테이블에서 각 partition의 파일 위치(location) 뽑아보기 (0) | 2021.07.21 |
sbt로 spark job 빌드 (0) | 2019.08.12 |
zeppelin 에서 spark으로 데이터 조회할 때 테이블 뷰 (0) | 2019.08.01 |
$ hive -e "explain extended select * from {DB}.{TABLE}" | grep location
Hive table partition의 조각 모음 (concatenate) (0) | 2021.07.31 |
---|---|
쉡스크립트에서 문자열을 배열로 바꾸고 n번째 항목 가져오기 (0) | 2021.07.21 |
sbt로 spark job 빌드 (0) | 2019.08.12 |
zeppelin 에서 spark으로 데이터 조회할 때 테이블 뷰 (0) | 2019.08.01 |
Spark dataframe 여러 값으로 구성된 컬럼에서 짝을 맞춰 뽑아오기 (0) | 2019.06.21 |
deduplicate: different file contents found in the following:
build.sbt
assemblyMergeStrategy in assembly := {
//case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.last
}
reference: http://queirozf.com/entries/creating-scala-fat-jars-for-spark-on-sbt-with-sbt-assembly-plugin
Hive table partition의 조각 모음 (concatenate) (0) | 2021.07.31 |
---|---|
쉡스크립트에서 문자열을 배열로 바꾸고 n번째 항목 가져오기 (0) | 2021.07.21 |
Hive 테이블에서 각 partition의 파일 위치(location) 뽑아보기 (0) | 2021.07.21 |
zeppelin 에서 spark으로 데이터 조회할 때 테이블 뷰 (0) | 2019.08.01 |
Spark dataframe 여러 값으로 구성된 컬럼에서 짝을 맞춰 뽑아오기 (0) | 2019.06.21 |
val df = spark.table("search.impression_log").
filter(s"dt='20190730' and hour='20'").
df.createTempView("abc")
%spark.sql
select * from abc
limit 100
Hive table partition의 조각 모음 (concatenate) (0) | 2021.07.31 |
---|---|
쉡스크립트에서 문자열을 배열로 바꾸고 n번째 항목 가져오기 (0) | 2021.07.21 |
Hive 테이블에서 각 partition의 파일 위치(location) 뽑아보기 (0) | 2021.07.21 |
sbt로 spark job 빌드 (0) | 2019.08.12 |
Spark dataframe 여러 값으로 구성된 컬럼에서 짝을 맞춰 뽑아오기 (0) | 2019.06.21 |
다음과 같은 테이블에서
log_id|itemid | is_valid | price
100|1,2,3,...| Y,Y,N,... | 10,20,10,...
다음과 같이 짝을 맞춰 처리하려면 변환이 필요하다.
(1,Y,10)
(2,Y,20)
(3,N,10)
...
val zip = udf((id: Seq[String], is_valid: Seq[String], price: Seq[String]) => {
id.indices.map(i=> (id(i), is_valid(i), price(i)))
}
)
var selected_df = source_df.withColumn("vars", explode(zip(split($"id", ","), split($"is_valid", ","), split($"price", ",") ))).select(
$"log_id", $"timestamp", // column from source_df
$"vars._1".alias("id"),
$"vars._2".alias("is_valid"),
$"vars._3".alias("price")
)
Hive table partition의 조각 모음 (concatenate) (0) | 2021.07.31 |
---|---|
쉡스크립트에서 문자열을 배열로 바꾸고 n번째 항목 가져오기 (0) | 2021.07.21 |
Hive 테이블에서 각 partition의 파일 위치(location) 뽑아보기 (0) | 2021.07.21 |
sbt로 spark job 빌드 (0) | 2019.08.12 |
zeppelin 에서 spark으로 데이터 조회할 때 테이블 뷰 (0) | 2019.08.01 |
데이터가 크지 않다면, pandas + jupyter의 조합으로 데이터를 분석하는 것이 편할 때가 있다.
단, 데이터 pipeline이 서버 상에서 이루어지는 것이 보통이고, AWS를 사용한다면 S3에 저장될 가능성이 높은데,
작업을 할때 마다 일일이 다운 받기는 귀찮은 작업이다.
준비
jupyter를 실행하고 있는 instance에 S3의 target bucket을 읽을 수 있도록 role을 부여한다(IAM)
import boto3
import pyarrow
import pyarrow.parquet as parquet
import pyarrow.orc as orc
import pandas as pd
def read_surprise_table(obj):
buf = obj.get()['Body'].read()
reader = pyarrow.BufferReader(buf)
data = orc.ORCFile(reader)
df = data.read().to_pandas()
print("df.columns: ", df.columns)
print("df.shape: ", df.shape)
return df
s3 = boto3.resource('s3')
bucket = 'bucket_name' # bucket name
s3bucket = s3.Bucket(bucket)
merged_df = pd.DataFrame()
path = 'folder1/folder2/dt=' + today_str + '/' # bucket name을 제외한 object path
# 데이터가 여러 파일로 나누어진 경우 loop돌면서 읽고 하나의 dataframe으로 concat한다.
for obj in s3bucket.objects.filter(Delimiter='/', Prefix=path):
print("bucket:{}, key:{}".format(obj.bucket_name, obj.key))
if obj.key.endswith("_SUCCESS") :
continue
tmp = read_surprise_table(obj)
if merged_df.shape[0] == 0 :
merged_df = tmp
else :
merged_df = pd.concat([merged_df, tmp], axis=0)
print("merged_df.columns: ", merged_df.columes)
print("merged_df.shape: ", merged_df.shape)