In this article, we’ll see how to download the input text file for our WordCount job, and put the file into HDFS.

Create a repository on the VM to download the data

Using the following command lines, create a repository in the VM,

[raj_ops@sandbox-hdp ~]$ mkdir TP

[raj_ops@sandbox-hdp ~]$ cd TP

Download the data and the JAR file

Download the .txt file we’ll be using for our WordCount from here.

In the TP repository, you can use the command line directly :

wget https://norvig.com/big.txt

You should have something like this :

[raj_ops@sandbox-hdp TP]$ wget https://norvig.com/big.txt

If everything worked well, by typing ls, you should see the file big.txt.

You will now also add the Jar file, which contains the Java code to execute a MapReduce :

wget https://github.com/maelfabien/maelfabien.github.io/blob/master/assets/files/wc.jar

This code is a pre-compiled version of the code available here.

If we try to detail just a little bit the Java code :

import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
    public static class TokenizerMapper
    extends Mapper<Object , Text , Text , IntWritable>{

    private final static IntWritable one = new IntWritable (1) ;
    private Text word = new Text();

    public void map( Object key , Text value , Context context ) throws IOException , InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString()); 
        while ( i t r . hasMoreTokens () ) {
            word.set(itr.nextToken());
            context.write(word, one);
        } 
    }
}

public static class IntSumReducer
    extends Reducer<Text , IntWritable , Text , IntWritable> {
    private IntWritable result = new IntWritable () ;

    public void reduce(Text key , Iterable<IntWritable> values , Context context) 
    throws IOException , InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum) ;
        context.write(key, result);
    } 
}

And in the Main :

public static void main(String [] args) throws Exception {

    /* Provide a configuration of the cluster */
    Configuration conf = new Configuration () ;

    /* Call the constructor with the configuration object and a name for the job */
    Job job = Job.getInstance(conf, word count);

    /* Provide an implementation for the Map Class */
    job.setMapperClass(TokenizerMapper.class);

    /* Provide an implementation for the Combiner Class */
    job.setCombinerClass(IntSumReducer.class);

    /* Provide an implementation for the Reduce Class */
    job.setReducerClass(IntSumReducer.class);

    /* Specify the type of the output key/value */
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    /* Give the location of the input/output of the application */
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    /* Specify how the input/output will be formatted */
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    /* Start the job and wait for its completion! */
    job.waitForCompletion(true);
}

Move file to HDFS

Hadoop commands launch by default on a working repository based on the name of the user: /user/<user_name>

  • We need to create the repository from our SSH connexion: /user/raj_ops/TP/input, and upload our file to HDFS.

hadoop fs -mkdir -p TP/input

  • We have downloaded the data under the big.txt file. We will upload the file on the folder :

hadoop fs -put big.txt TP/input

  • In this command, big.txt is in the local repository on the Linux VM whereas the TP/input refers to a file in HDFS. We can display the last 5 lines of the file big.txt located in HDFS :

hadoop fs -cat TP/input/big.txt | tail -n 5

The book ends on a function written in Python 2, so you should see something like this :

if ord(c) > 127 and c not in s:

print i, c, ord(c), big[max(0, i-10):min(N, i+10)]

s.add(c)

print s

print [ord(c) for c in s]

Additional commands

To add files, instead of using hadoop fs -put filename, we can simply drop them and create folders through the File System offered by Sandbox.

image

To delete a file, move to Trash or use hadoop fs -rm filename. However, it does not properly speaking delete the file but moves it to the trash. You need to purge the trash frequently :

hadoop fs –expunge

Note that raj_ops does not have the rights to purge the trash.

Conclusion: I hope this tutorial was clear and helpful. I’d be happy to answer any question you might have in the comments section.