Apache Hive に関するメモ。 Hive の表の内部表現ディレクトリ。辞書順。partition はさらにサブディレクトリ。このへん、どこかにドキュメントはありませんか? EMR で、極力 HDFS を経由しないようにするEMR は、最初の入力と最後の出力にだけは、外部のストレージ(普通は S3)を利用せざるを得ないため、HDFS のローカリティが生きません。なるべく Hive の中間データを使わないようにしたいので、以下のようにすれば良いのだと思われます。 下記で、elastic-mapreduce コマンドまでは「Amazon Elastic MapReduce のセットアップ」を、s3cmd コマンドまでは「Amazon Simple Storage Servece のセットアップ」を参照。 サンプルとして、下記の 3 つの入力ファイルを用意します。ここでは、カラムの区切りにはタブを使っています。
ローカルと EMR との双方で動作するように Hive スクリプトを書きます。${fsbase} の部分が、コマンドを呼び出す際に指定するパラメータによって substutite されます。
この時点で、以下のようなディレクトリ構成になっています。 script/ join.q input/ employees/ foo.tsv bar.tsv divisions/ buzz.tsv まずは、ローカルで実行してみます。 $ hive --define fsbase=file://$PWD -f script/join.q ... $ 結果です。 $ cat output/empdivs/000000_0 GATES William Development Division JOBS Steve Marketing Division KANAME Madoka Development Division LUCAS Geroge Marketing Division NAKA Kiichiro Development Division $ 良いようですので、次に、EMR の Hive + S3 で実行してみます。 S3 にはそもそもディレクトリの観念は無く、”/” がダブるとそのままの名前でファイルを作ろうとして妙なことになるので気をつけます。 $ fsbase=s3://emr.ayutaya.com $ s3cmd sync --delete-removed input/ $fsbase/input/ $ s3cmd del --recursive $fsbase/output/empdivs/ $ s3cmd sync --delete-removed script/ $fsbase/script/ $ elastic-mapreduce --create --hive-script --arg $fsbase/script/join.q --args -d,fsbase=$fsbase Created job flow j-2BSWG8RQ8VRGV $ elastic-mapreduce --list j-2BSWG8RQ8VRGV STARTING ec2-54-248-7-20.ap-northeast-1.compute.amazonaws.comDevelopment Job Flow PENDING Setup Hive PENDING Run Hive Script $ 監視しつつしばらく待ちます。終わったら、結果を取ってきます。 $ mkdir emr-output $ s3cmd sync --delete-removed $fsbase/output/ emr-output/ $ cat emr-output/empdivs/9b7ac2c5-a948-4f67-9787-b0faff1492b9_000000 GATES William Development Division JOBS Steve Marketing Division KANAME Madoka Development Division LUCAS Geroge Marketing Division NAKA Kiichiro Development Division あたりまえですが、ローカルと同じ結果となりました。 Hive で multi-partition をしてみるHive は 0.7.1 で試行。何となく、店舗・日付ごとの商品売上蓄積テーブル的に考えてください。pid (Product), dt (Date), sid (Shop), sm (Summary)。
外部表を定義し、dynamic partition する。マルチカラムを自動振り分けするために、nonstrict 指定。 DROP TABLE stockspin; CREATE EXTERNAL TABLE stockspin (pid INT, dt INT, sid INT, sm INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 'file:///home/knaka/input/stocksp/'; DROP TABLE stockspout; CREATE EXTERNAL TABLE stockspout (pid INT, sm INT) PARTITIONED BY(sid INT, dt INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 'file:///home/knaka/output/stocksp/'; SET hive.exec.dynamic.partition = true; SET hive.exec.dynamic.partition.mode = nonstrict; INSERT OVERWRITE TABLE stockspout PARTITION (sid, dt) SELECT pid, sm, sid, dt FROM stockspin; 出力される外部表の生表現は、以下。読む時は CRC って任意よね? CRC ファイルが存在してハッシュが異なればエラーだが、そもそも存在しなければチェックをしない、と。 $ pwd /Users/knaka/output/stocksp $ find . . ./sid=100 ./sid=100/dt=20120101 ./sid=100/dt=20120101/.000000_0.crc ./sid=100/dt=20120101/000000_0 ./sid=100/dt=20120102 ./sid=100/dt=20120102/.000000_0.crc ./sid=100/dt=20120102/000000_0 ./sid=100/dt=20120103 ./sid=100/dt=20120103/.000001_0.crc ./sid=100/dt=20120103/000001_0 ./sid=101 ./sid=101/dt=20120101 ./sid=101/dt=20120101/.000000_0.crc ./sid=101/dt=20120101/000000_0 ./sid=101/dt=20120102 ./sid=101/dt=20120102/.000001_0.crc ./sid=101/dt=20120102/000001_0 ./sid=101/dt=20120103 ./sid=101/dt=20120103/.000001_0.crc ./sid=101/dt=20120103/000001_0 Hive の partition は低レベルが顔を出すので、かえって感覚的に把握しやすいですわ。 不等号条件での self-join も、WITH 句も使えないHadoop、ましてや EMR では、起動オーバーヘッドがデカいので、少々 MapReduce の効率が悪かろうが、なるべく 1 job flow で処理を終えたいし、S3 へのアクセスは 1 度にして、中間データは HDFS に置きたい。 そんなわけなのだが、不等号条件での self-join も、WITH 句も使えない Hive において、累積的なクエリをどう実現するべか。一度 range(ここでは日付 dt)ごとに集計してから FULL OUTER JOIN か? INSERT OVERWRITE TABLE fixes SELECT diffs.dt, 0 - diffs.df + old_diffs.df FROM diffs JOIN old_diffs ON old_diffs.dt = diffs.dt; INSERT OVERWRITE TABLE fixes_map SELECT stocks.dt, sum(fixes.fx) FROM stocks FULL OUTER JOIN fixes WHERE stocks.dt >= fixes.dt GROUP BY stocks.dt, stocks.sm; これはできるようだ。本番では実行計画を見て効率を確認しておこう。 ちゃんとした CSV パーサ単純なカンマ区切りではなく、マジの CSV を送られたらどうすれば良いか? 以下のようなテキストです。
単純な DELIMITED FIELDS TERMINATED BY ',' は、単純に ”,” で切ってしまうので、結果はボロボロになります。送り元に TSV 等にしてもらう方が楽ではありますが、それができない場合には、Ruby の CSV ライブラリあたりで切ることにします。さすがに、改行が入ることは考慮しません。
そして、transform() でストリームを通すビューを作成します。EMR でも走るように書いてみます。
ローカルでの実行スクリプトです。
ローカルで実行してみます。 $ ./transcsv $ cat output/tsvtest/000000_0 <hoge,fuga,"foo","bar"> <buzz> <hello, world> <good-bye, world> $ EMR での実行スクリプトです。
EMR でも実行してみます。 $ ./transcsv-emr $ cat emr-output/tsvtest/b37f6148-d7cf-4f4e-a858-091b56f78a58_000000 <hoge,fuga,"foo","bar"> <buzz> <hello, world> <good-bye, world> $ INSERT INTO は使えるのか?Hive 0.8 から、INSERT INTO で追記(実質的には、ファイル追加)ができるようになったらしいです。 DROP TABLE IF EXISTS ids; CREATE EXTERNAL TABLE ids (id int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION '${fsbase}/output/ids/'; DROP TABLE IF EXISTS names; CREATE EXTERNAL TABLE names (name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION '${fsbase}/output/names/'; FROM employees INSERT OVERWRITE TABLE ids SELECT id INSERT INTO TABLE names SELECT name; FROM employees INSERT OVERWRITE TABLE ids SELECT id -- こちらは置き換えられる INSERT INTO TABLE names SELECT name; -- こちらは追記されて行く しかし、冪等ではないので気持ちが悪いです。何とかなるまいか? 結局、パーティションして OVERWRITE した方が、繰り返し操作をしても結果が変わらないので安心ですかね。 ∥ DynamicPartitions / LanguageManual DDL 1/1 に生成されたデータは、前年の 12/28~31 に立った売り上げのデータを含み、
1/2 に生成されたデータは、12/28~1/1 に立った売り上げのデータを含みます(各店舗から、遅れて and/or 修正として入ってくるデータがある)。
異なる生成日を持つデータが別の日に到来することはないので、生成日でのパーティションは簡単です。問題は売上日の方なのですが、INSERT INTO するよりも、売上日→生成日でパーティションして、INSERT OVERWRITE で書く方が気楽です。
ええ。 hive> SELECT sum(scount) FROM sales_by_cdate WHERE sdate = 20111231; OK 330 hive> その他サブディレクトリは読めない。 hive> SELECT * FROM employees; OK Failed with exception java.io.IOException:java.io.IOException: Not a file: file:/home/knaka/hstack/input/employees/sub Time taken: 0.06 seconds hive> GROUP BY ~ HAVING も、普通に使える。 SELECT division_id, count(*) FROM employees WHERE name IS NOT NULL GROUP BY division_id HAVING count(*) >= 3; テーブルとしてのサブクエリは使えるが、式としてのサブクエリは不可か? SELECT * -- , (SELECT c.name FROM employees c WHERE a.name = c.name) FROM ( SELECT * FROM employees ) a INNER JOIN ( SELECT * FROM employees ) b INNER JOIN ( SELECT * FROM employees ) c ON a.name = b.name AND b.division_id = c.division_id WHERE a.division_id = 100; 上記のとおり、式としてのサブクエリが使えない中で、「特別な方法を利用しなくてもできる順位付けの方法の考え方*1 - CAMUSのこくばん -クエリの板書所-」をやってみる。 |