13.5.15

Apache YARN Scheduler


  • The FIFO Scheduler
    • Places applications in a queue and runs them in the order of submission.
    • Requests for the first application in the queue are allocated first, then once its requests have been satisfied the next application in the queue is served, and so on.
    • The good part:
      • simple to understand
      • not needing any configuration
    • The bad side:
      • not suitable for shared clusters.
        • Large applications will use all the resources in a cluster
          • So each application has to wait its turn.
      • On a shared cluster it is better to use the Capacity Scheduler or the Fair Scheduler.
        • Both of these allow long-running jobs to complete in a timely manner,
          • while still allowing users who are running concurrent smaller ad hoc queries to get results back in a reasonable time.
  • The Capacity Scheduler
    • A separate dedicated queue allows the small job to start as soon as it is submitted,
      • although this is at the cost of
        • overall cluster utilization
          • since the queue capacity is reserved for jobs in that queue.
            • This means that the large job finishes later than when using the FIFO Scheduler.
  • The Fair Scheduler
    • There is no need to reserve a set amount of capacity since it will dynamically balance resources between all running jobs.
    • When the second (small) job starts it is allocated half of the cluster resources so that each job is using its fair share of resources.
  • Delay Scheduling

How YARN was designed to address the limitations in MRv1?

All from the Book: Hadoop: The Definitive Guide.
  • Scalability
    • MRv1:
      • because the jobtracker has to manage both jobs and tasks, MRv1 hits scalability bottlenecks in the region of 4,000 nodes, and 40,000 tasks.
    • YARN/MRv2 overcomes these limitations by virtue of its split resource manager/application master architecture, which means it is designed to scale up to 10,000 nodes, and 100,000 tasks.
  • Availability
    • With the jobtracker's responsibilities splits between the resource manager and application master in YARN, making the service highly-available became a divide-and-conquer problem: provide HA for the resource manager, then for YARN applications (on a per-application basis). And indeed Hadoop 2 supports HA for both the resource manager, and for the application master for MapReduce jobs, which is similar to my own product.
  • Utilization
    • In MRv1:
      • each tasktracker is configured with a static allocation of fixed size "slots", which are divided into map slots and reduce slots at configuration time.
      • A map slot can only be used to run a map task, and a reduce slot can only be used for a reduce task.
    • In YARN/MRv2:
      • a node manager manages a pool of resources, rather than a fixed number of designated slots. 
      • MapReduce running on YARN will not hit the situation where a reduce task has to wait because only map slots are available on the cluster.
      • If the resources to run the task are available, then the application will be eligible for them.
      • Furthermore, resources in YARN are fine-grained, so an application can make a request for what it needs, rather than for an indivisible slot.
  • Multitenancy

Apache YARN


  • YARN:
    • Yet Another Resource Negotiator
  • YARN 
    • is Hadoop's cluster resource management system.
    • was introduced in Hadoop 2
    • to improve the MapReduce implementation.
    • but is general enough to support other distributed computing paradigms as well.
  • YARN
    • provides APIs for requesting and working with cluster resources.
      • but these APIs are not typically used directly by user code.
      • (Instead,) users write to higher-level APIs provided by distributed computing frameworks,
        • which themselves are built on YARN and hide the resource management details from the user.

(There is a further layer of applications that build on the frameworks shown in the figure above. Pig, Hive and Crunch for examples)



Figures in Hadoop: The Definitive Guide

Figure 1-1. Structure of the book: there are various pathways through the book





Figure 2-1. MapReduce logical data flow


  • MapReduce works by breaking the processing into two phases:
    • the map phase and
    • the reduce phase
  • Each phase has key-value pairs as input and output,
    • the types of which may be chosen by the programmer

Figure 2-2. Data-local (a), rack-local (b), and off-rack (c) map tasks


Figure 2-3. MapReduce data flow with a single reduce task



Figure 2-4. MapReduce data flow with multiple reduce tasks


Figure 2-5. MapReduce data flow with no reduce tasks


Figure 3-1. Accessing HDFS over HTTP directly and via a bank of HDFS proxies



Figure 3-2. A client reading data from HDFS


Figure 3-3. Network distance in Hadoop

Figure 3-4. A client writing data to HDFS

Figure 3-5. A typical replica pipeline


Figure 4-1. YARN applications



Figure 4-2. How YARN runs an application



Notice from Figure 4-2 that YARN itself does not provide any way for the parts of the
The difference between schedulers is illustrated in Figure 4-3, which shows that under
With the Capacity Scheduler (ii. in Figure 4-3), a separate dedicated queue allows the
With the Fair Scheduler (iii. in Figure 4-3) there is no need to reserve a set amount of

Figure 4-3. Cluster utilization over time when running a large job and a small job un‐
Figure 4-3 contrasts the basic operation of the Capacity Scheduler and the Fair Sched‐
As we saw in Figure 4-3, a single job does not use more resources than its queue’s
the same share of resources. We saw in Figure 4-3 how fair sharing works for applications
each with their own queue, see Figure 4-4. A starts a job and it is allocated all the re‐

Figure 4-4. Fair sharing between user queues
doop.io package. They form the class hierarchy shown in Figure 5-1.

Figure 5-1. Writable class hierarchy
A sequence file consists of a header followed by one or more records (see Figure 5-2).

Figure 5-2. The internal structure of a sequence file with no compression and with re‐
opportunity to take advantage of similarities between records. (See Figure 5-3.) Records

Figure 5-3. The internal structure of a sequence file with block compression
so on. This is shown diagrammatically in Figure 5-4.

Figure 5-4. Row-oriented versus column-oriented storage
ped. Consider a query of the table in Figure 5-4 that processes only column 2. With
A screenshot of the home page is shown in Figure 6-1. The “Cluster Metrics” section
Figure 6-1. Screenshot of the resource manager page
takes us to the job page, illustrated in Figure 6-2.

Figure 6-2. Screenshot of the job page
for all of the map tasks on one page. The screenshot in Figure 6-3 shows this page for

Figure 6-3. Screenshot of the tasks page
and allowed transitions between them are shown in Figure 6-4.
Figure 6-4. Transition diagram of an Oozie workflow
The whole process is illustrated in Figure 7-1. At the highest level, there are five inde‐
Figure 7-1. How Hadoop runs a MapReduce job
submitJobInternal() on it (step 1 in Figure 7-1). Having submitted the job, waitFor
Streaming runs special map and reduce tasks for the purpose of launching the usersupplied executable and communicating with it (Figure 7-2).

Figure 7-2. The relationship of the Streaming executable to the node manager and the
The process is illustrated in Figure 7-3.

Figure 7-3. How status updates are propagated through the MapReduce system
some presorting for efficiency reasons. Figure 7-4 shows what happens.
Figure 7-4. Shuffle and sort in MapReduce
for the final round. The process is illustrated in Figure 7-5.
Figure 7-5. Efficiently merging 40 file segments with a merge factor of 10
listed in Table 8-3 and shown in a diagram of the data flow path in Figure 8-1.

Figure 8-1. Where separators are used in a Streaming MapReduce job
as their data source (see Figure 8-2). It provides two things: a place to define which files

Figure 8-2. InputFormat class hierarchy
Figure 8-3 shows an example. A single file is broken into lines, and the line boundaries
Figure 8-3. Logical records and HDFS blocks for TextInputFormat
previous section. The OutputFormat class hierarchy appears in Figure 8-4.

Figure 8-4. OutputFormat class hierarchy
Figure 2-2.
of temperature buckets. For example, Figure 9-1 shows the distribution for buckets of

Figure 9-1. Temperature distribution for the weather dataset
inlined in each output row. This is illustrated in Figure 9-2.

Figure 9-2. Inner join of two datasets
illustrated in Figure 10-1. Typically there are 30 to 40 servers per rack (only three are

Figure 10-1. Typical two-level network architecture for a Hadoop cluster
For the network in Figure 10-1, the rack topology is described by two network locations,
tribution Center (KDC). The process is shown graphically in Figure 10-2.

Figure 10-2. The three-step Kerberos ticket exchange protocol
proceeds as follows (and is shown schematically in Figure 11-1 for the edit log and image

Figure 11-1. The checkpointing process
is illustrated in Figure 5-4.) This chapter looks at Parquet in more depth, but there are
chunk is written in pages; this is illustrated in Figure 13-1.
Figure 13-1. The internal structure of a Parquet file
system is illustrated in Figure 14-1.
Figure 14-1. Flume agent with a spooling directory source and a logger sink connected
just like in the previous examples. The flow is illustrated in Figure 14-2.
Figure 14-2. Flume agent with a spooling directory source and fanning out to an HDFS
writing them to HDFS, see Figure 14-3. Further tiers may be warranted for very large
Figure 14-3. Using a second agent tier to aggregate Flume events from the first tier
The system is illustrated in Figure 14-4.
Figure 14-4. Two Flume agents connected by an Avro sink-source pair
purposes, see Figure 14-5. If a second tier agent is unavailable, then events will be de‐

Figure 14-5. Using multiple sinks for load balancing or failover
A diagram of the whole system is show in Figure 14-6.
Figure 14-6. Load balancing between two agents
At a high level, Figure 15-1 demonstrates how Sqoop interacts with both the database
Figure 15-1. Sqoop’s import process
Figure 15-2. When scanning through rows to determine which rows match the criteria
rows, as in Figure 15-3. Accessing a large object often requires “opening” it through the
Figure 15-2. Database tables are typically physically represented as an array of rows,

Figure 15-3. Large objects are usually held in a separate area of storage; the main row
performs imports. (See Figure 15-4.) Before performing the export, Sqoop picks a strat‐
Figure 15-4. Exports are performed in parallel using MapReduce
Hive clients and Hive services is illustrated in Figure 17-1.

Figure 17-1. Hive architecture
(see Figure 17-2).
Figure 17-2. Metastore configurations
trated in Figure 17-3):

Figure 17-3. Data flow with partial results for a UDAF
The plan diagram generated from this pipeline is show in Figure 18-1.
Figure 18-1. Plan diagram for a Crunch pipeline for calculating a histogram of word
in Figure 19-1), which passes the call onto the scheduler that runs as a part of the driver

Figure 19-1. How Spark runs a job
forces a shuffle stage.5 The resulting DAG is illustrated in Figure 19-2.

Figure 19-2. The stages and RDDs in a Spark job for calculating a histogram of word
Going back to Figure 19-1, once the DAG scheduler has constructed the complete DAG
Assigned tasks are launched through a scheduler backend (step 4 in Figure 19-1), which
instance is constructed by the driver program (step 1 in Figure 19-3). The context sub‐
Figure 19-3. How Spark executors are started in YARN client mode
The spark-submit client will launch the YARN application (step 1 in Figure 19-4), but

Figure 19-4. How Spark executors are started in YARN cluster mode
is shown in Figure 20-1.

Figure 20-1. The HBase data model, illustrated for a table storing photos
regionserver workers (see Figure 20-2). The HBase master is responsible for bootstrap‐

Figure 20-2. HBase cluster members
with the name of the group members (servers). This is shown in Figure 21-1.

Figure 21-1. ZooKeeper znodes
but it has no control over this and cannot even know if this is the case.7 See Figure 21-2.
Figure 21-2. Reads are satisfied by followers, whereas writes are committed by the lead‐
Figure 21-3). You can query its state at any time by using the getState() method:

Figure 21-3. ZooKeeper state transitions
never reconnect (refer to Figure 21-3). We simply rethrow the exception10 and let the
Figure 22-1.
Figure 22-1. Operational data flow
Figure 22-2 shows how components can be wired into one another in novel ways, with
Figure 22-2. Composable datasets and functions
eration”) sequencing technology; see Figure 22-3. Sequencing machines are scientific
Figure 22-3. Timeline of Big Data Technology and Cost of Sequencing a Genome
see Figure 22-4. This deterministic pairing enables a “copying mechanism”: the DNA
Figure 22-4. DNA Double Helix Structure
the reference genome; see Figure 22-512. A complete human genome is about 3 billion

Figure 22-5. Aligning Reads to a Reference Genome14
map, and then to the next reduce, and so on (Figure 22-6). That is, key-value pairs are
Figure 22-6. Counting and sorting in MapReduce
pipes (Figure 22-7).

Figure 22-7. Pipes linked by fields and tuples
(Figure 22-8):

Figure 22-8. Pipe types
streams (sinks). See Figure 22-9.
Figure 22-9. A simple PipeAssembly
operations that are applied either to individual tuples or groups of tuples (Figure 22-10):

Figure 22-10. Operation types
Operations are bound to pipes when the pipe assembly is created (Figure 22-11).
Figure 22-11. An assembly of operations
(Figure 22-12). That is, the same pipe assembly can be “instantiated” many times into

Figure 22-12. A Flow
between them (Figure 22-13).
Figure 22-13. How a Flow translates to chained MapReduce jobs
In this overview, we will focus on the “log processing pipeline” (Figure 22-14). This

Figure 22-14. The ShareThis log processing pipeline
just source and sink Taps, trap Taps were planned in (Figure 22-15). Normally, when

Figure 22-15. The ShareThis log processing Flow

Interfaces

FSDataInputStream, Seekable, PositionedReadable


  • The open() method of FileSystem returns a FSDataInputStream
    • rather than a standard java.io class
  • FSDataInputStream is a specialization of java.io.DataInputStream
    • with support of random access
    • so you can read from any part of the stream
      • Those two interface help for this purpose.

FSDataOutputStream, Progressable

Glob patterns and PathFilter

  • Hadoop supports the same set of glob characters as Unix bash
  • When glob patterns are not powerful enough to describe a set of files you want to access, you can use PathFilter.
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws
  IOException
Here is the interface of PathFilter
  package org.apache.hadoop.fs;

  public interface PathFilter {
    boolean accept(Path path);
  }


Writable


package org.apache.hadoop.io;

import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;

public interface Writable {
  void write(DataOutput out) throws IOException;
  void readFields(DataInput in) throws IOException;
}

The Writable interface defines two methods:

  • One for writing its state to a DataOutput binary stream and
  • One for reading its state from a DataInput binary stream

    public static void main(String[] args) throws Exception {
        IntWritable iw = new IntWritable(1024);
        
        ByteArrayOutputStream baos = new  ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        iw.write(dos);
        dos.flush();
        byte[] data = baos.toByteArray();
        
        System.out.println(StringUtils.byteToHexString(data));
        
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        DataInputStream dis = new DataInputStream(bais);
        IntWritable iw2 = new IntWritable();
        iw2.readFields(dis);
        System.out.println(iw2.get());
    }

WritableComparable and comparators


  • IntWritable implements the WritableComparable interface, which is just a subinterface of the Writable and java.lang.Comparable interface:
package org.apache.hadoop.io;

public interface WritableComparable extends Writable, Comparable {
}


  • Comparison of types is crucial for MapReduce
    • where there is a sorting phase during which keys are compared with one another.
  • RawComparator is an optimization that Hadoop provides
    • extension of Java's Comparator
    • allows implementors to compare records read from a stream without deserializing them into objects
      • Using big endian may help this also.
  • WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes.
    • It provides two main functions.
      • A default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method.
      • acts as a factory for RawComparator instances
RawComparator comparator = WritableComparator.get(IntWritable.class);

IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));

GenericOptionsParser, Tool interface and ToolRunner

  • GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your applicatio to use as desired.
  • You don't usually use GenericOptionsParser directly, as it's more convenient to implement the Tool interface and run your application with the ToolRunner
    • which uses GenericOptionsParser internally
  • You can have your App class derives from Configured, which is an implementation of the Configurable interface.
    • All implementation of Tool need to implement Configurable
    • and subclassing Configured is often the easiest way to achieve this.
  • ToolRunner.run() method takes care of creating a Configuration object for the Tool before calling its run() method.
  • ToolRunner also uses a GenericOptionsParser to pick up any standard options specified on the command line and to set them on the Configuration instance. 
    • -conf <conf file>
  • GenericOptionsParser also allows you to set individual properties.
    • hadoop ConfigurationPrinter -D color=yellow | grep color
    • The -D option is used to set the configuration property with key color to the value yellow.
    • Options specified with -D take priority over properties from the configuration files.

InputSampler, Sampler



  • The InputSampler class defines a nested Sampler interface whose implementations return a sample of keys given an InputFormat and Job
  • This interface usually is not called directly by clients. Instead, the writePartitionFile()  static method on InputSampler is used, which creates a sequence file to store the keys that define the partitions.
  • The sequence file is used by TotalOrderPartitioner to create partitions for the sort job.