You are currently viewing How to read different types of files in Hadoop MapReduce

How to read different types of files in Hadoop MapReduce

In Hadoop, we can read different types of files using map-reduce. As different files have different types of formats. We can’t read all in the same manner. So, we will see here which type of file can be read in which way. Let’s first make a list of file types for which we will see how to read each type of file one by one.

There are various types of File Types given down below:

  • Text Files
  • CSV Files
  • Parquet Files
  • Avro Data Files
  • RC (Row Columnar) data files
  • ORC (Optimized Row Columnar) data files

Let’s understand one by one now

How to read text files

This is a very basic one. How to read a raw text file in Hadoop.

To read a file it should be available on HDFS. so, put the file available on the local path to HDFS. we can put files from the local path to HDFS using Hadoop provided command. There are two options:

  • put
  • copyFromLocal

There are the two commands we can run using the below approach:

bin/hadoop dfs -put ./input.txt /

Over here, ./input.txt is the input file available at the local location and / is the root location available on hdfs where the input file will be copied. put is the command used to copy data from local to hdfs

bin/hadoop dfs -copyFromLocal ./input.txt /

Over here, ./input.txt is the input file available at the local location and / is the root location available on hdfs where the input file will be copied. copyFromLocal is the command used to copy data from local to hdfs

Java code to read a text file from HDFS.

// here we mention path where file is available on HDFS.
Path pt = new Path("hdfs://pathTofile"); 

// create file system object
FileSystem fs = FileSystem.get(context.getConfiguration()); 

// create buffered reader object to read a text file content
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); 

try {
    String line;
    // read file line by line
    line = br.readLine(); 
    while (line != null) {
        System.out.println(line); // be sure to read the next line otherwise you'll get an infinite loop 
        line = br.readLine();
    }
} finally {
    // you should close out the BufferedReader 
    br.close();
}

How to read the CSV files

First, we need to create a configuration object. We need to add core-site.xml and hdfs-site.xml as a resource. In these files, we can mention a format in which the file needs to be read. In our case comma separated format. Now, we will mention the path of a file to read the CSV file content.

To read the CSV file, we need to put the CSV file on the hdfs first. We can put CSV files on hdfs using the commands mentioned above.

public static void main(String[] args) throws IOException {
    Configuration conf = new Configuration();
    conf.addResource(new Path("/hadoop/projects/hadoop-1.0.4/conf/core-site.xml"));
    conf.addResource(new Path("/hadoop/projects/hadoop-1.0.4/conf/hdfs-site.xml"));
    FileSystem fs = FileSystem.get(conf);
    FSDataInputStream inputStream = fs.open(new Path("/path/to/input/file"));
    System.out.println(inputStream.readChar());
}

How to read the Parquet files

An open source, column-oriented data file format called Apache Parquet was created for effective data storage and retrieval. It offers effective data encoding and compression techniques with improved performance to handle complicated data in bulk.

Ther are the following steps you have to follow in order to read the Parquet file

  • The file to be read should be with the .parquet extension.
  • We need to copy this file over HDFS using the put or copyFromLocal command
  • To read a parquet file we have a class called ParquetReader
  • In ParquetReader there will be multiple records of GenericData.Record type
  • Iterate through a reader to read each record one by one and print on the console
  • Close record reader after reading all records.

Importing all the necessary libraries

import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

We will see now, how to read a parquet file using java code. Here, in the main method, we have called a method that is used to read a parquet file. We can read parquet files using a parquet reader. We will read it record by record and write it to the console or any other file.

public class ParquetFileRead {
    public static void main(String[] args) {
        readParquetFile();
    }
    private static void readParquetFile() {
        ParquetReader < GenericData.Record > reader = null;
        Path path = new Path("/test/EmpRecord.parquet");
        try {
            reader = AvroParquetReader
                . < GenericData.Record > builder(path)
                .withConf(new Configuration())
                .build();
            GenericData.Record record;
            while ((record = reader.read()) != null) {
                System.out.println(record);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

How to read the Avro data files

Avro is a data serialization and row-oriented remote procedure call framework created by Apache’s Hadoop project. It serializes data in a small binary format and utilizes JSON to define data types and protocols.

There are the following steps you have to follow in order to read Avro data files

  • The file to be read will be with the .avro extension.
  • We need to copy this file over HDFS using the put or copyFromLocal command
  • Now, we will read the file from HDFS.
  • Next, we will mention configuration detail.
  • Read URL on hdfs using FileSystem object.
  • Use the BufferedInputStream object to convert file content into the buffered input stream.
  • DataFileStream is used to read buffered input stream content using a generic approach with a generic schema.
  • Read schema and write to console.

Here, the Avro file is available over hdfs. We will read files using a buffered input stream. GenericDatumReader is used to set schema common for writer and reader. We can read the schema here.

BufferedInputStream inStream = null;
String inputF = "hdfs://CustomerData-20160128-1501807.avro";
org.apache.hadoop.fs.Path inPath = new org.apache.hadoop.fs.Path(inputF);
try {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://localhost:8030");
    FileSystem fs = FileSystem.get(URI.create(inputF), conf);
    inStream = new BufferedInputStream(fs.open(inPath));
} catch (IOException e) {
    e.printStackTrace();
}
DataFileStream reader = new DataFileStream(inStream, new GenericDatumReader());
Schema schema = reader.getSchema();
System.out.println(schema.toString())

How to read the RC (Row columnar) data files

Relational tables are stored on computer clusters using the data placement structure known as RCFile.

There are the following steps that you have to follow in order to read the RC data files.

  • Provide a path of directory read all files available on HDFS.
  • Iterate through files available in the directory.
  • Use the FileSplit object to split data into records
  • Use RCFileRecordReader to read each record one by one

Here, we are reading files available inside the folder. It lists down files available in the directory and reads records available in each file using RCFileRecordReader. In the end, it will print each record in the file.

try {
    FileSystem fs = FileSystem.get(job);
    FileStatus[] fileStatuses = fs.listStatus(new Path("/path/to/dir/"));
    LongWritable key = new LongWritable();
    BytesRefArrayWritable value = new BytesRefArrayWritable();
    int counter = 1;
    for (int i = 0; i < fileStatuses.length; i++) {
        FileStatus fileStatus = fileStatuses[i];
        if (!fileStatus.isDir()) {
            System.out.println("File: " + fileStatus);
            FileSplit split = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), job);
            RCFileRecordReader reader = new RCFileRecordReader(job, split);
            while (reader.next(key, value)) {
                System.out.println("Getting row " + counter);
                AllCountriesRow acr = AllCountriesRow.valueOf(value);
                System.out.println("ROW: " + acr);
                counter++;
            }
        }
    }
} catch (IOException e) {
    throw new Error(e);
}

How to read the ORC (Optimized Row columnar) data files

Hive data may be stored in an incredibly effective manner using the Optimized Row Columnar (ORC) file format. It was created to get around restrictions placed on the other Hive file formats. Hive performs better while reading, writing, and processing data from ORC files.

There are the following steps to follow in order to read the ORC files

  • The file to be read will be with the .orc extension.
  • Put orc file over hdfs location.
  • We used the read method to read records from the .orc file.
  • Create a list to collect rows
  • Create an ORC reader using the Hadoop fileSystem and path
  • Retrieve the schema.
  • Create a batch of rows
  • Read batch of records to read data in an optimized manner.
  • Write data on the console.

Hare, in the main method we are reading text from orders.orc file. We are reading data in a batch format to read in a more optimized way. We are adding data in a hashmap.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class OrcFileReader {
    private static final int BATCH_SIZE = 2048;

    public static void main(String[] args) throws IOException {
        List < Map < String, Object >> rows = read(new Configuration(), "orders.orc");
        for (Map < String, Object > row: rows) {
            System.out.println(row);
        }
    }

    public static List < Map < String, Object >> read(Configuration configuration, String path)
    throws IOException {
        // Create a list to collect rows
        List < Map < String, Object >> rows = new LinkedList < > ();

        // Create an ORC reader using the Hadoop fileSystem and path
        try (Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(configuration))) {
            // Extract the schema
            TypeDescription schema = reader.getSchema();

            try (RecordReader records = reader.rows(reader.options())) {
                // Read rows in batch for better performance.
                VectorizedRowBatch batch = reader.getSchema().createRowBatch(BATCH_SIZE);
                LongColumnVector orderIdColumnVector = (LongColumnVector) batch.cols[0];
                BytesColumnVector itemNameColumnVector = (BytesColumnVector) batch.cols[1];
                DoubleColumnVector priceColumnVector = (DoubleColumnVector) batch.cols[2];

                while (records.nextBatch(batch)) {
                    for (int rowNum = 0; rowNum < batch.size; rowNum++) {
                        // Read rows from the batch
                        Map < String, Object > map = new HashMap < > ();
                        map.put("order_id", orderIdColumnVector.vector[rowNum]);
                        map.put("item_name", itemNameColumnVector.toString(rowNum));
                        map.put("price", priceColumnVector.vector[rowNum]);
                        rows.add(map);
                    }
                }
            }
        }
        return rows;
    }
}

Conclusion

As we have seen, we are using different approaches for different types of input files. In Hadoop, based on different types provides different techniques to read or write data. We have given the basic idea to start with to work with different types of input formats. Hope it will help you to explore different file formats through the Hadoop file system.

This Post Has One Comment

  1. israel night club

    When I originally commented I appear to have clicked on the -Notify me when new comments are added- checkbox and from now on whenever a comment is added I recieve four emails with the exact same comment. Perhaps there is a way you can remove me from that service? Many thanks!

Comments are closed.