開発‎ > ‎Hadoop‎ > ‎

Apache Hive による開発 (Frozen)

Apache Hive に関するメモ。

Hive の表の内部表現

ディレクトリ。辞書順。partition はさらにサブディレクトリ。このへん、どこかにドキュメントはありませんか?

Hadoop Hive - Hadoop Hive- Data Manipulation Statements

EMR で、極力 HDFS を経由しないようにする

EMR は、最初の入力と最後の出力にだけは、外部のストレージ(普通は S3)を利用せざるを得ないため、HDFS のローカリティが生きません。なるべく Hive の中間データを使わないようにしたいので、以下のようにすれば良いのだと思われます。

下記で、elastic-mapreduce コマンドまでは「Amazon Elastic MapReduce のセットアップ」を、s3cmd コマンドまでは「Amazon Simple Storage Servece のセットアップ」を参照。

サンプルとして、下記の 3 つの入力ファイルを用意します。ここでは、カラムの区切りにはタブを使っています。

input/employees/foo.tsv
10	NAKA Kiichiro	100
20	LUCAS Geroge	200
30	KANAME Madoka	100
input/employees/bar.tsv
40	JOBS Steve	200
50	GATES William	100
input/divisions/buzz.tsv
100	Development Division
200	Marketing Division

ローカルと EMR との双方で動作するように Hive スクリプトを書きます。${fsbase} の部分が、コマンドを呼び出す際に指定するパラメータによって substutite されます。

script/join.q
DROP TABLE IF EXISTS employees;
 
CREATE EXTERNAL TABLE employees (id int, name string, division_id int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/input/employees/';
 
DROP TABLE IF EXISTS divisions;
 
CREATE EXTERNAL TABLE divisions (id int, name string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/input/divisions/';
 
DROP TABLE IF EXISTS empdivs;
 
CREATE EXTERNAL TABLE empdivs (ename string, dname string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/output/empdivs/';
 
INSERT OVERWRITE TABLE empdivs
SELECT e.name, d.name
FROM
  employees e INNER JOIN
  divisions d ON e.division_id = d.id
ORDER BY e.name;

この時点で、以下のようなディレクトリ構成になっています。

script/
  join.q
input/
  employees/
    foo.tsv
    bar.tsv
  divisions/
    buzz.tsv

まずは、ローカルで実行してみます。

$ hive --define fsbase=file://$PWD -f script/join.q
...
$

結果です。

$ cat output/empdivs/000000_0 
GATES William	Development Division
JOBS Steve	Marketing Division
KANAME Madoka	Development Division
LUCAS Geroge	Marketing Division
NAKA Kiichiro	Development Division
$ 

良いようですので、次に、EMR の Hive + S3 で実行してみます。

S3 にはそもそもディレクトリの観念は無く、”/” がダブるとそのままの名前でファイルを作ろうとして妙なことになるので気をつけます。
$ fsbase=s3://emr.ayutaya.com
$ s3cmd sync --delete-removed input/ $fsbase/input/
$ s3cmd del --recursive $fsbase/output/empdivs/
$ s3cmd sync --delete-removed script/ $fsbase/script/
$ elastic-mapreduce --create --hive-script --arg $fsbase/script/join.q --args -d,fsbase=$fsbase
Created job flow j-2BSWG8RQ8VRGV
$ elastic-mapreduce --list
j-2BSWG8RQ8VRGV     STARTING       ec2-54-248-7-20.ap-northeast-1.compute.amazonaws.comDevelopment Job Flow
   PENDING        Setup Hive               
   PENDING        Run Hive Script
$

監視しつつしばらく待ちます。終わったら、結果を取ってきます。

$ mkdir emr-output
$ s3cmd sync --delete-removed $fsbase/output/ emr-output/
$ cat emr-output/empdivs/9b7ac2c5-a948-4f67-9787-b0faff1492b9_000000 
GATES William	Development Division
JOBS Steve	Marketing Division
KANAME Madoka	Development Division
LUCAS Geroge	Marketing Division
NAKA Kiichiro	Development Division

あたりまえですが、ローカルと同じ結果となりました。

Hive で multi-partition をしてみる

Hive は 0.7.1 で試行。何となく、店舗・日付ごとの商品売上蓄積テーブル的に考えてください。pid (Product), dt (Date), sid (Shop), sm (Summary)。

/home/knaka/input/stocksp/foo.txt
10,20120101,100,10
10,20120101,101,20
11,20120102,100,30
11,20120102,101,40
12,20120103,100,50
12,20120103,101,60

外部表を定義し、dynamic partition する。マルチカラムを自動振り分けするために、nonstrict 指定。

DROP TABLE stockspin;
 
CREATE EXTERNAL TABLE stockspin (pid INT, dt INT, sid INT, sm INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'file:///home/knaka/input/stocksp/';
 
DROP TABLE stockspout;
 
CREATE EXTERNAL TABLE stockspout (pid INT, sm INT)
PARTITIONED BY(sid INT, dt INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'file:///home/knaka/output/stocksp/';
 
SET hive.exec.dynamic.partition = true;
 
SET hive.exec.dynamic.partition.mode = nonstrict;
 
INSERT OVERWRITE TABLE stockspout PARTITION (sid, dt)
SELECT pid, sm, sid, dt
FROM stockspin;

出力される外部表の生表現は、以下。読む時は CRC って任意よね? CRC ファイルが存在してハッシュが異なればエラーだが、そもそも存在しなければチェックをしない、と。

$ pwd
/Users/knaka/output/stocksp
$ find .
.
./sid=100
./sid=100/dt=20120101
./sid=100/dt=20120101/.000000_0.crc
./sid=100/dt=20120101/000000_0
./sid=100/dt=20120102
./sid=100/dt=20120102/.000000_0.crc
./sid=100/dt=20120102/000000_0
./sid=100/dt=20120103
./sid=100/dt=20120103/.000001_0.crc
./sid=100/dt=20120103/000001_0
./sid=101
./sid=101/dt=20120101
./sid=101/dt=20120101/.000000_0.crc
./sid=101/dt=20120101/000000_0
./sid=101/dt=20120102
./sid=101/dt=20120102/.000001_0.crc
./sid=101/dt=20120102/000001_0
./sid=101/dt=20120103
./sid=101/dt=20120103/.000001_0.crc
./sid=101/dt=20120103/000001_0

Hive の partition は低レベルが顔を出すので、かえって感覚的に把握しやすいですわ。

不等号条件での self-join も、WITH 句も使えない

Hadoop、ましてや EMR では、起動オーバーヘッドがデカいので、少々 MapReduce の効率が悪かろうが、なるべく 1 job flow で処理を終えたいし、S3 へのアクセスは 1 度にして、中間データは HDFS に置きたい。

そんなわけなのだが、不等号条件での self-join も、WITH 句も使えない Hive において、累積的なクエリをどう実現するべか。一度 range(ここでは日付 dt)ごとに集計してから FULL OUTER JOIN か?

INSERT OVERWRITE TABLE fixes
SELECT diffs.dt, 0 - diffs.df + old_diffs.df
FROM diffs JOIN old_diffs ON old_diffs.dt = diffs.dt;
 
INSERT OVERWRITE TABLE fixes_map
SELECT stocks.dt, sum(fixes.fx)
FROM stocks FULL OUTER JOIN fixes
WHERE stocks.dt >= fixes.dt
GROUP BY stocks.dt, stocks.sm;

これはできるようだ。本番では実行計画を見て効率を確認しておこう。

ちゃんとした CSV パーサ

単純なカンマ区切りではなく、マジの CSV を送られたらどうすれば良いか? 以下のようなテキストです。

input/csvtest/foo.csv
"hoge,fuga,""foo"",""bar""",buzz
"hello, world","good-bye, world"

単純な DELIMITED FIELDS TERMINATED BY ',' は、単純に ”,” で切ってしまうので、結果はボロボロになります。送り元に TSV 等にしてもらう方が楽ではありますが、それができない場合には、Ruby の CSV ライブラリあたりで切ることにします。さすがに、改行が入ることは考慮しません。

script/csv2tsv.rb
require "csv";
STDIN.each_line do |line|
  STDOUT.puts CSV.parse_line(line).join("\t")
end

そして、transform() でストリームを通すビューを作成します。EMR でも走るように書いてみます。

script/transcsv.q
DROP TABLE IF EXISTS csvtest_raw;
 
CREATE EXTERNAL TABLE csvtest_raw (line string)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
LOCATION '${fsbase}/input/csvtest/';
 
ADD FILE ${fsbase}/script/csv2tsv.rb;
 
DROP VIEW IF EXISTS csvtest;
 
CREATE VIEW csvtest AS
SELECT transform(line) USING 'ruby csv2tsv.rb' AS (a string, b string)
FROM csvtest_raw;
 
DROP TABLE IF EXISTS tsvtest;
 
CREATE EXTERNAL TABLE tsvtest (a string, b string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/output/tsvtest/';
 
INSERT OVERWRITE TABLE tsvtest
SELECT concat('<', a, '>'), concat('<', b, '>') FROM csvtest;

ローカルでの実行スクリプトです。

./transcsv
#!/bin/sh
hive --define fsbase=file://$PWD -f script/transcsv.q

ローカルで実行してみます。

$ ./transcsv
$ cat output/tsvtest/000000_0 
<hoge,fuga,"foo","bar">	<buzz>
<hello, world>	<good-bye, world>
$ 

EMR での実行スクリプトです。

./transcsv-emr
#!/bin/bash
fsbase=s3://emr.ayutaya.com
s3cmd sync --delete-removed input/ $fsbase/input/
s3cmd sync --delete-removed script/ $fsbase/script/
elastic-mapreduce --create --wait-for-steps --hive-script \
 --arg $fsbase/script/transcsv.q --args -d,fsbase=$fsbase
s3cmd sync --delete-removed $fsbase/output/ emr-output/

EMR でも実行してみます。

$ ./transcsv-emr
$ cat emr-output/tsvtest/b37f6148-d7cf-4f4e-a858-091b56f78a58_000000 
<hoge,fuga,"foo","bar">	<buzz>
<hello, world>	<good-bye, world>
$ 

INSERT INTO は使えるのか?

Hive 0.8 から、INSERT INTO で追記(実質的には、ファイル追加)ができるようになったらしいです。

DROP TABLE IF EXISTS ids;
 
CREATE EXTERNAL TABLE ids (id int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/output/ids/';
 
DROP TABLE IF EXISTS names;
 
CREATE EXTERNAL TABLE names (name string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/output/names/';
 
FROM employees
INSERT OVERWRITE TABLE ids SELECT id
INSERT INTO TABLE names SELECT name;
 
FROM employees
INSERT OVERWRITE TABLE ids SELECT id -- こちらは置き換えられる
INSERT INTO TABLE names SELECT name; -- こちらは追記されて行く

しかし、冪等ではないので気持ちが悪いです。何とかなるまいか? 結局、パーティションして OVERWRITE した方が、繰り返し操作をしても結果が変わらないので安心ですかね。 ∥ DynamicPartitions / LanguageManual DDL

1/1 に生成されたデータは、前年の 12/28~31 に立った売り上げのデータを含み、

input/sales20120101/foo.tsv
20111228	80	20120101
20111230	100	20120101
20111231	120	20120101

1/2 に生成されたデータは、12/28~1/1 に立った売り上げのデータを含みます(各店舗から、遅れて and/or 修正として入ってくるデータがある)。

input/sales20120102/bar.tsv
20111228	135	20120102
20111231	210	20120102
20120101	130	20120102

異なる生成日を持つデータが別の日に到来することはないので、生成日でのパーティションは簡単です。問題は売上日の方なのですが、INSERT INTO するよりも、売上日→生成日でパーティションして、INSERT OVERWRITE で書く方が気楽です。

script/partsales.q
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
 
-- データ生成日でパーティションした、売り上げ実績表
 
DROP TABLE IF EXISTS sales_by_cdate;
 
CREATE EXTERNAL TABLE sales_by_cdate (sdate int, scount int)
PARTITIONED BY (cdate int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/output/sales_by_cdate/';
 
-- 売り上げ日でパーティションした、売り上げ実績表
 
DROP TABLE IF EXISTS sales_by_sdate;
 
CREATE EXTERNAL TABLE sales_by_sdate (scount int)
PARTITIONED BY (sdate int, cdate int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/output/sales_by_sdate/';
 
-- 2012/01/01 に生成された入力データ
 
DROP TABLE IF EXISTS sales20120101;
 
CREATE EXTERNAL TABLE sales20120101 (sdate int, scount int, cdate int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/input/sales20120101/';
 
FROM sales20120101
INSERT OVERWRITE TABLE sales_by_cdate PARTITION (cdate)
SELECT
  sdate, scount,
  cdate -- Dynamic Partition (DP) column(s)
INSERT OVERWRITE TABLE sales_by_sdate PARTITION (sdate, cdate)
SELECT
  scount,
  sdate, cdate -- Dynamic Partition (DP) column(s)
;
 
-- 2012/01/02 に生成された入力データ
 
DROP TABLE IF EXISTS sales20120102;
 
CREATE EXTERNAL TABLE sales20120102 (sdate int, scount int, cdate int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${fsbase}/input/sales20120102/';
 
FROM sales20120102
INSERT OVERWRITE TABLE sales_by_cdate PARTITION (cdate)
SELECT
  sdate, scount,
  cdate -- Dynamic Partition (DP) column(s)
INSERT OVERWRITE TABLE sales_by_sdate PARTITION (sdate, cdate)
SELECT
  scount,
  sdate, cdate -- Dynamic Partition (DP) column(s)
;

ええ。

hive> SELECT sum(scount) FROM sales_by_cdate WHERE sdate = 20111231;
OK
330
hive> 

その他

サブディレクトリは読めない。

hive> SELECT * FROM employees;
OK
Failed with exception java.io.IOException:java.io.IOException:
 Not a file: file:/home/knaka/hstack/input/employees/sub
Time taken: 0.06 seconds
hive> 

GROUP BY ~ HAVING も、普通に使える。

SELECT
  division_id, count(*)
FROM employees
WHERE name IS NOT NULL
GROUP BY division_id
HAVING count(*) >= 3;

テーブルとしてのサブクエリは使えるが、式としてのサブクエリは不可か?

SELECT * -- , (SELECT c.name FROM employees c WHERE a.name = c.name)
FROM
  (
    SELECT *
    FROM employees
  ) a INNER JOIN
  (
    SELECT *
    FROM employees
  ) b INNER JOIN
  (
    SELECT *
    FROM employees
  ) c ON a.name = b.name AND b.division_id = c.division_id
WHERE a.division_id = 100;

上記のとおり、式としてのサブクエリが使えない中で、「特別な方法を利用しなくてもできる順位付けの方法の考え方*1 - CAMUSのこくばん -クエリの板書所-」をやってみる。

Comments