Mac OS X 上に、テスト用に Hadoop のスタック (Hadoop MapReduce + Pig + Hive) を構築します。Hadoop の関連プロジェクトの基本的なプライマリのターゲット OS は Linux ですが、そこは UNIX たる OS X ですので、Windows 上に Cygwin で無理やり構築したモガキ環境などよりも、圧倒的に普通です。 はじめにテスト用ですので、複数台のホストを要する分散モードには触れません。また、HDFS が無いとテストになりませんので、ローカルでもありません。実運用のためではないので、ユーザは、デフォルトの作業用ユーザのまま実行します(専用ユーザを宛てがいません)。 バージョンとしては、Mac OS X 10.7 (Lion) 上に構築します。Hadoop プロジェクト成果物のバージョン構成としては、現行の EMR に合わせて、hadoop-0.20.205.0, pig-0.9.2, hive-0.7.1 とします (バージョンの構成については、右記 FAQ を参照 ∥ Amazon Elastic MapReduce FAQs)。 Java の準備Java (コンパイラも) を使いますので、(たしか ver. 1.6 以上が)入っていなかったら入れておいてください。私のところではいつの間にか入っていましたが、無いようならば右記を参照して JDK を入れておいてください ∥ Oracle Technology Network for Java Developers $ java -version java version "1.6.0_31" Java(TM) SE Runtime Environment (build 1.6.0_31-b04-415-11M3646) Java HotSpot(TM) 64-Bit Server VM (build 20.6-b01-415, mixed mode) ~/.bashrc に JAVA_HOME 環境変数が未設定ならば、下記を追加します。 export JAVA_HOME=$(dirname $(readlink $(which java)))/../../CurrentJDK/Home/ Hadoop DFS (HDFS) + Hadoop MapReduce の設定右記サイトの「Download Hadoop」より、今回は 0.20.205.0 の tar ball「hadoop-0.20.205.0.tar.gz」を取得し、展開します ∥ Welcome to Apache™ Hadoop™! $ cd $HOME/hstack $ hadoop_ver=0.20.205.0 $ tar zxvf hadoop-$hadoop_ver.tar.gz 下記の環境変数を ~/.bashrc に追加します。 export HSTACK_HOME=$HOME/hstack export HADOOP_VER=0.20.205.0 export HADOOP_INSTALL=$HSTACK_HOME/hadoop-$HADOOP_VER/ export PATH=$PATH:$HADOOP_INSTALL/bin/ 現環境へも反映させておきます。 $ source ~/.bashrc 設定ファイルを、疑似分散モード用に書き換えます。共通設定: Hadoop のデフォルトファイルシステムを、ローカルの HDFS にします。 $ diff -uNr \ $HADOOP_INSTALL/conf/core-site.xml.orig \ $HADOOP_INSTALL/conf/core-site.xml --- /Users/knaka/Desktop/hadoop-0.20.205.0//conf/core-site.xml.orig +++ /Users/knaka/Desktop/hadoop-0.20.205.0//conf/core-site.xml @@ -5,4 +5,9 @@ <configuration> + <property> + <name>fs.default.name</name> + <value>hdfs://localhost/</value> + </property> + </configuration> HDFS の設定: 疑似分散ですので、ストレージの冗長度を 1(冗長性なし)に設定します。 $ diff -uNr $HADOOP_INSTALL/conf/hdfs-site.xml.orig \ $HADOOP_INSTALL/conf/hdfs-site.xml --- /Users/knaka/Desktop/hadoop-0.20.205.0//conf/hdfs-site.xml.orig +++ /Users/knaka/Desktop/hadoop-0.20.205.0//conf/hdfs-site.xml @@ -5,4 +5,9 @@ <configuration> +<property> + <name>dfs.replication</name> + <value>1</value> +</property> + </configuration> Hadoop MapReduce の設定: Jobtracker へのアクセスの口を、ローカルに開きます。 $ diff -uNr $HADOOP_INSTALL/conf/mapred-site.xml.orig \ $HADOOP_INSTALL/conf/mapred-site.xml --- /Users/knaka/Desktop/hadoop-0.20.205.0//conf/mapred-site.xml.orig +++ /Users/knaka/Desktop/hadoop-0.20.205.0//conf/mapred-site.xml @@ -5,4 +5,9 @@ <configuration> +<property> + <name>mapred.job.tracker</name> + <value>localhost:8021</value> +</property> + </configuration> jobtracker は tasktracker へ SSH 経由でアクセスしてジョブを振り分けます。「システム環境設定」→「共有」(当該ホストを、他のホストから共有)で、「リモートログイン」にチェックを入れることで、SSH サーバが起動します。 疑似分散モードでは自ホストに対して接続しますので、パスワードなしで SSH アクセスできるよう、鍵の設定をしておきます。 $ ssh-keygen -t rsa -P "" -f ~/.ssh/id_rsa (中略) $ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys $ ssh localhost # 接続テスト $ exit # 抜けておく 以上で設定は終わりです。続いて、HDFS を初期化します。 $ hadoop namenode -format HDFS と MapReduce のサービスを起動します。 $ start-dfs.sh $ start-mapred.sh デフォルトで、データが /tmp/hadoop-$USER/ に格納されます。http://localhost:50070/ で DFS の状態は確認できます。調子が悪ければ消してみるのも良いかと思います。 適当なテストコードを書いて実行してみます(参考 ∥ Map/Reduce Tutorial)。50030 番ポートに HTTP で状態が出ていますので、ブラウザで見ていてください。 $ mkdir -p ~/test/ $ cd ~/test/ $ mkdir -p classes/ $ hadoop dfs -mkdir input/ # HDFS 上の入力ディレクトリ $ vi test.txt $ cat test.txt # 対象テキスト hello world hello hello world foo bar foo hoge fuga foo bar ... $ hadoop dfs -copyFromLocal test.txt input/ # ローカル FS から HDFS へコピー $ vi WordCount.java $ cat WordCount.java package org.myorg; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter ) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount2"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } } $ javac -classpath $HADOOP_INSTALL/hadoop-core-$HADOOP_VER.jar \ -d classes/ WordCount.java $ jar -cvf test.jar -C classes/ . $ hadoop dfs -rmr output # 出力先ディレクトリ削除。念のため $ hadoop jar test.jar org.myorg.WordCount input/ output/ # 実行! ... $ hadoop dfs -cat output/part-00000 # 結果 bar 2 foo 3 fuga 1 hello 3 hoge 1 world 2 なお、サービスを止めるには、以下です。今回はまだ続くので、止めません。 $ stop-dfs.sh $ stop-mapred.sh Pig の設定右記サイトの「Getting Started」の先から、今回は 0.9.2 の tar ball「pig-0.9.2.tar.gz」を取得します ∥Welcome to Apache Pig! $ cd $HSTACK_HOME $ pig_ver=0.9.2 $ tar zxvf pig-$pig_ver.tar.gz 下記の環境変数を ~/.bashrc に追加します。 export PIG_VER=0.9.2 export PIG_INSTALL=$HSTACK_HOME/pig-$PIG_VER export PATH=$PATH:$PIG_INSTALL/bin 現環境へも反映させておきます。 $ source ~/.bashrc 以上で、”pig” (MapReduce モード) 、もしくは “pig -x local” (VM 直実行モード) でコンソールが動きます。 なお Pig は一連のコマンドを総合した最適化と、DUMP や STORE 等の実出力が指示されるまでの評価遅延をしますので、下記の例でも、それらコマンドの後では、長く MapReduce の待ちが入ります。 以下の例では、サンプルデータの要素間はタブで区切られています。コピー・ペーストの際にスペースに置き換わらないように注意してください。 $ cat <<EOF > employees.txt 100 Kiichiro NAKA 10 200 Geroge LUCAS 20 300 Madoka KANAME 10 EOF $ cat <<EOF > divisions.txt 10 Development Division 20 Marketing Division EOF $ hadoop dfs -copyFromLocal employees.txt divisions.txt input/ $ hadoop dfs -rmr output/ $ pig grunt> e = LOAD 'input/employees.txt'; grunt> d = LOAD 'input/divisions.txt'; grunt> x = JOIN e BY $2, d BY $0; grunt> DUMP x; (100,Kiichiro NAKA,10,10,Development Division) (300,Madoka KANAME,10,10,Development Division) (200,Geroge LUCAS,20,20,Marketing Division) grunt> y = ORDER x BY $1; grunt> STORE y INTO 'output/'; grunt> QUIT; $ hadoop dfs -cat output/part-r-00000 200 Geroge LUCAS 20 20 Marketing Division 100 Kiichiro NAKA 10 10 Development Division 300 Madoka KANAME 10 10 Development Division Hive の設定右記サイトの「Project」→「Releases」の先から、今回は 0.8.1 の tar ball「hive-0.8.1.tar.gz」を取得します ∥Welcome to Hive! $ cd $HSTACK_HOME $ hive_ver=0.8.1 $ tar zxvf hive-$hive_ver.tar.gz 下記の環境変数を ~/.bashrc に追加します。 export HIVE_VER=0.8.1 export HIVE_INSTALL=$HSTACK_HOME/hive-$HIVE_VER export PATH=$PATH:$HIVE_INSTALL/bin 現環境へも反映させておきます。 $ source ~/.bashrc Hive DB のメタ情報(RDB 的な、スキーマや統計情報等)を保持するデータベースと、今回使用する derby DB のログの出力先を指定します。デフォルトのままだと、いずれもカレントディレクトリに都度都度作られてしまいます(実行ディレクトリが違うと、「あれ? さっき作ったテーブルが無いよ?」といったことになる)。hive-default.xml.template, hive-env.sh.template を参考に、hive-site.xml, hive-env.sh を作成します。 $ vi $HIVE_INSTALL/conf/hive-site.xml $ cat $HIVE_INSTALL/conf/hive-site.xml <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:derby:;databaseName=/Users/knaka/hstack/hive-0.8.1/metastore_db;create=true</value> <description>JDBC connect string for a JDBC metastore</description> </property> </configuration> $ vi $HIVE_INSTALL/conf/hive-env.sh $ cat $HIVE_INSTALL/conf/hive-env.sh export HADOOP_OPTS="$HADOOP_OPTS -Dderby.stream.error.file=/Users/knaka/hstack/hive-0.8.1/derby.log" $ 以上で、"hive" コマンドでプロンプトが現れます。先ほどのサンプルデータで、クエリを発行してみます。 metastore_db/ について。 ちなみに以下で、結合で 1 ステージ、ソートで 1 ステージの計 2 ステージの MapReduce が実行されます。なまじ SQL ぽく書けるので危険ですので、油断しないようにします。 $ hive hive> CREATE TABLE employees (id INT, name STRING, division_id INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; hive> LOAD DATA LOCAL INPATH 'employees.txt' OVERWRITE INTO TABLE employees; hive> CREATE TABLE divisions (id INT, name STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; hive> LOAD DATA LOCAL INPATH 'divisions.txt' OVERWRITE INTO TABLE divisions; hive> SELECT e.name, d.name FROM employees e JOIN divisions d ON (e.division_id = d.id) ORDER by e.name; Geroge LUCAS Marketing Division Kiichiro NAKA Development Division Madoka KANAME Development Division hive> QUIT; $ |