14.5.15
13.5.15
Apache YARN Scheduler
- The FIFO Scheduler
- Places applications in a queue and runs them in the order of submission.
- Requests for the first application in the queue are allocated first, then once its requests have been satisfied the next application in the queue is served, and so on.
- The good part:
- simple to understand
- not needing any configuration
- The bad side:
- not suitable for shared clusters.
- Large applications will use all the resources in a cluster
- So each application has to wait its turn.
- On a shared cluster it is better to use the Capacity Scheduler or the Fair Scheduler.
- Both of these allow long-running jobs to complete in a timely manner,
- while still allowing users who are running concurrent smaller ad hoc queries to get results back in a reasonable time.
- The Capacity Scheduler
- A separate dedicated queue allows the small job to start as soon as it is submitted,
- although this is at the cost of
- overall cluster utilization
- since the queue capacity is reserved for jobs in that queue.
- This means that the large job finishes later than when using the FIFO Scheduler.
- The Fair Scheduler
- There is no need to reserve a set amount of capacity since it will dynamically balance resources between all running jobs.
- When the second (small) job starts it is allocated half of the cluster resources so that each job is using its fair share of resources.
- Delay Scheduling
How YARN was designed to address the limitations in MRv1?
All from the Book: Hadoop: The Definitive Guide.
- Scalability
- MRv1:
- because the jobtracker has to manage both jobs and tasks, MRv1 hits scalability bottlenecks in the region of 4,000 nodes, and 40,000 tasks.
- YARN/MRv2 overcomes these limitations by virtue of its split resource manager/application master architecture, which means it is designed to scale up to 10,000 nodes, and 100,000 tasks.
- Availability
- With the jobtracker's responsibilities splits between the resource manager and application master in YARN, making the service highly-available became a divide-and-conquer problem: provide HA for the resource manager, then for YARN applications (on a per-application basis). And indeed Hadoop 2 supports HA for both the resource manager, and for the application master for MapReduce jobs, which is similar to my own product.
- Utilization
- In MRv1:
- each tasktracker is configured with a static allocation of fixed size "slots", which are divided into map slots and reduce slots at configuration time.
- A map slot can only be used to run a map task, and a reduce slot can only be used for a reduce task.
- In YARN/MRv2:
- a node manager manages a pool of resources, rather than a fixed number of designated slots.
- MapReduce running on YARN will not hit the situation where a reduce task has to wait because only map slots are available on the cluster.
- If the resources to run the task are available, then the application will be eligible for them.
- Furthermore, resources in YARN are fine-grained, so an application can make a request for what it needs, rather than for an indivisible slot.
- Multitenancy
Apache YARN
- YARN:
- Yet Another Resource Negotiator
- YARN
- is Hadoop's cluster resource management system.
- was introduced in Hadoop 2
- to improve the MapReduce implementation.
- but is general enough to support other distributed computing paradigms as well.
- YARN
- provides APIs for requesting and working with cluster resources.
- but these APIs are not typically used directly by user code.
- (Instead,) users write to higher-level APIs provided by distributed computing frameworks,
- which themselves are built on YARN and hide the resource management details from the user.
(There is a further layer of applications that build on the frameworks shown in the figure above. Pig, Hive and Crunch for examples)
Figures in Hadoop: The Definitive Guide
Figure 1-1. Structure of the book: there are various pathways through the book
Figure 2-1. MapReduce logical data flow
- MapReduce works by breaking the processing into two phases:
- the map phase and
- the reduce phase
- Each phase has key-value pairs as input and output,
- the types of which may be chosen by the programmer
Figure 2-2. Data-local (a), rack-local (b), and off-rack (c) map tasks
Figure 2-3. MapReduce data flow with a single reduce task
Figure 2-4. MapReduce data flow with multiple reduce tasks
Figure 2-5. MapReduce data flow with no reduce tasks
Figure 3-1. Accessing HDFS over HTTP directly and via a bank of HDFS proxies
Figure 3-2. A client reading data from HDFS
Figure 3-3. Network distance in Hadoop
Figure 3-4. A client writing data to HDFS
Figure 3-5. A typical replica pipeline
Figure 4-1. YARN applications
Figure 4-2. How YARN runs an application
Notice from Figure 4-2 that YARN itself does not provide any way for the parts of the
The difference between schedulers is illustrated in Figure 4-3, which shows that under
With the Capacity Scheduler (ii. in Figure 4-3), a separate dedicated queue allows the
With the Fair Scheduler (iii. in Figure 4-3) there is no need to reserve a set amount of
Figure 4-3. Cluster utilization over time when running a large job and a small job un‐
Figure 4-3 contrasts the basic operation of the Capacity Scheduler and the Fair Sched‐
As we saw in Figure 4-3, a single job does not use more resources than its queue’s
the same share of resources. We saw in Figure 4-3 how fair sharing works for applications
each with their own queue, see Figure 4-4. A starts a job and it is allocated all the re‐
Figure 4-4. Fair sharing between user queues
doop.io package. They form the class hierarchy shown in Figure 5-1.
Figure 5-1. Writable class hierarchy
A sequence file consists of a header followed by one or more records (see Figure 5-2).
Figure 5-2. The internal structure of a sequence file with no compression and with re‐
opportunity to take advantage of similarities between records. (See Figure 5-3.) Records
Figure 5-3. The internal structure of a sequence file with block compression
so on. This is shown diagrammatically in Figure 5-4.
Figure 5-4. Row-oriented versus column-oriented storage
ped. Consider a query of the table in Figure 5-4 that processes only column 2. With
A screenshot of the home page is shown in Figure 6-1. The “Cluster Metrics” section
Figure 6-1. Screenshot of the resource manager page
takes us to the job page, illustrated in Figure 6-2.
Figure 6-2. Screenshot of the job page
for all of the map tasks on one page. The screenshot in Figure 6-3 shows this page for
Figure 6-3. Screenshot of the tasks page
and allowed transitions between them are shown in Figure 6-4.
Figure 6-4. Transition diagram of an Oozie workflow
The whole process is illustrated in Figure 7-1. At the highest level, there are five inde‐
Figure 7-1. How Hadoop runs a MapReduce job
submitJobInternal() on it (step 1 in Figure 7-1). Having submitted the job, waitFor
Streaming runs special map and reduce tasks for the purpose of launching the usersupplied executable and communicating with it (Figure 7-2).
Figure 7-2. The relationship of the Streaming executable to the node manager and the
The process is illustrated in Figure 7-3.
Figure 7-3. How status updates are propagated through the MapReduce system
some presorting for efficiency reasons. Figure 7-4 shows what happens.
Figure 7-4. Shuffle and sort in MapReduce
for the final round. The process is illustrated in Figure 7-5.
Figure 7-5. Efficiently merging 40 file segments with a merge factor of 10
listed in Table 8-3 and shown in a diagram of the data flow path in Figure 8-1.
Figure 8-1. Where separators are used in a Streaming MapReduce job
as their data source (see Figure 8-2). It provides two things: a place to define which files
Figure 8-2. InputFormat class hierarchy
Figure 8-3 shows an example. A single file is broken into lines, and the line boundaries
Figure 8-3. Logical records and HDFS blocks for TextInputFormat
previous section. The OutputFormat class hierarchy appears in Figure 8-4.
Figure 8-4. OutputFormat class hierarchy
Figure 2-2.
of temperature buckets. For example, Figure 9-1 shows the distribution for buckets of
Figure 9-1. Temperature distribution for the weather dataset
inlined in each output row. This is illustrated in Figure 9-2.
Figure 9-2. Inner join of two datasets
illustrated in Figure 10-1. Typically there are 30 to 40 servers per rack (only three are
Figure 10-1. Typical two-level network architecture for a Hadoop cluster
For the network in Figure 10-1, the rack topology is described by two network locations,
tribution Center (KDC). The process is shown graphically in Figure 10-2.
Figure 10-2. The three-step Kerberos ticket exchange protocol
proceeds as follows (and is shown schematically in Figure 11-1 for the edit log and image
Figure 11-1. The checkpointing process
is illustrated in Figure 5-4.) This chapter looks at Parquet in more depth, but there are
chunk is written in pages; this is illustrated in Figure 13-1.
Figure 13-1. The internal structure of a Parquet file
system is illustrated in Figure 14-1.
Figure 14-1. Flume agent with a spooling directory source and a logger sink connected
just like in the previous examples. The flow is illustrated in Figure 14-2.
Figure 14-2. Flume agent with a spooling directory source and fanning out to an HDFS
writing them to HDFS, see Figure 14-3. Further tiers may be warranted for very large
Figure 14-3. Using a second agent tier to aggregate Flume events from the first tier
The system is illustrated in Figure 14-4.
Figure 14-4. Two Flume agents connected by an Avro sink-source pair
purposes, see Figure 14-5. If a second tier agent is unavailable, then events will be de‐
Figure 14-5. Using multiple sinks for load balancing or failover
A diagram of the whole system is show in Figure 14-6.
Figure 14-6. Load balancing between two agents
At a high level, Figure 15-1 demonstrates how Sqoop interacts with both the database
Figure 15-1. Sqoop’s import process
Figure 15-2. When scanning through rows to determine which rows match the criteria
rows, as in Figure 15-3. Accessing a large object often requires “opening” it through the
Figure 15-2. Database tables are typically physically represented as an array of rows,
Figure 15-3. Large objects are usually held in a separate area of storage; the main row
performs imports. (See Figure 15-4.) Before performing the export, Sqoop picks a strat‐
Figure 15-4. Exports are performed in parallel using MapReduce
Hive clients and Hive services is illustrated in Figure 17-1.
Figure 17-1. Hive architecture
(see Figure 17-2).
Figure 17-2. Metastore configurations
trated in Figure 17-3):
Figure 17-3. Data flow with partial results for a UDAF
The plan diagram generated from this pipeline is show in Figure 18-1.
Figure 18-1. Plan diagram for a Crunch pipeline for calculating a histogram of word
in Figure 19-1), which passes the call onto the scheduler that runs as a part of the driver
Figure 19-1. How Spark runs a job
forces a shuffle stage.5 The resulting DAG is illustrated in Figure 19-2.
Figure 19-2. The stages and RDDs in a Spark job for calculating a histogram of word
Going back to Figure 19-1, once the DAG scheduler has constructed the complete DAG
Assigned tasks are launched through a scheduler backend (step 4 in Figure 19-1), which
instance is constructed by the driver program (step 1 in Figure 19-3). The context sub‐
Figure 19-3. How Spark executors are started in YARN client mode
The spark-submit client will launch the YARN application (step 1 in Figure 19-4), but
Figure 19-4. How Spark executors are started in YARN cluster mode
is shown in Figure 20-1.
Figure 20-1. The HBase data model, illustrated for a table storing photos
regionserver workers (see Figure 20-2). The HBase master is responsible for bootstrap‐
Figure 20-2. HBase cluster members
with the name of the group members (servers). This is shown in Figure 21-1.
Figure 21-1. ZooKeeper znodes
but it has no control over this and cannot even know if this is the case.7 See Figure 21-2.
Figure 21-2. Reads are satisfied by followers, whereas writes are committed by the lead‐
Figure 21-3). You can query its state at any time by using the getState() method:
Figure 21-3. ZooKeeper state transitions
never reconnect (refer to Figure 21-3). We simply rethrow the exception10 and let the
Figure 22-1.
Figure 22-1. Operational data flow
Figure 22-2 shows how components can be wired into one another in novel ways, with
Figure 22-2. Composable datasets and functions
eration”) sequencing technology; see Figure 22-3. Sequencing machines are scientific
Figure 22-3. Timeline of Big Data Technology and Cost of Sequencing a Genome
see Figure 22-4. This deterministic pairing enables a “copying mechanism”: the DNA
Figure 22-4. DNA Double Helix Structure
the reference genome; see Figure 22-512. A complete human genome is about 3 billion
Figure 22-5. Aligning Reads to a Reference Genome14
map, and then to the next reduce, and so on (Figure 22-6). That is, key-value pairs are
Figure 22-6. Counting and sorting in MapReduce
pipes (Figure 22-7).
Figure 22-7. Pipes linked by fields and tuples
(Figure 22-8):
Figure 22-8. Pipe types
streams (sinks). See Figure 22-9.
Figure 22-9. A simple PipeAssembly
operations that are applied either to individual tuples or groups of tuples (Figure 22-10):
Figure 22-10. Operation types
Operations are bound to pipes when the pipe assembly is created (Figure 22-11).
Figure 22-11. An assembly of operations
(Figure 22-12). That is, the same pipe assembly can be “instantiated” many times into
Figure 22-12. A Flow
between them (Figure 22-13).
Figure 22-13. How a Flow translates to chained MapReduce jobs
In this overview, we will focus on the “log processing pipeline” (Figure 22-14). This
Figure 22-14. The ShareThis log processing pipeline
just source and sink Taps, trap Taps were planned in (Figure 22-15). Normally, when
Figure 22-15. The ShareThis log processing Flow
Interfaces
FSDataInputStream, Seekable, PositionedReadable
- The open() method of FileSystem returns a FSDataInputStream
- rather than a standard java.io class
- FSDataInputStream is a specialization of java.io.DataInputStream
- with support of random access
- so you can read from any part of the stream
- Those two interface help for this purpose.
FSDataOutputStream, Progressable
Glob patterns and PathFilter
- Hadoop supports the same set of glob characters as Unix bash
- When glob patterns are not powerful enough to describe a set of files you want to access, you can use PathFilter.
public FileStatus[] globStatus(Path pathPattern) throws IOException public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
package org.apache.hadoop.fs;
public interface PathFilter {
boolean accept(Path path);
}
Writable
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
The Writable interface defines two methods:
- One for writing its state to a DataOutput binary stream and
- One for reading its state from a DataInput binary stream
public static void main(String[] args) throws Exception {
IntWritable iw = new IntWritable(1024);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
iw.write(dos);
dos.flush();
byte[] data = baos.toByteArray();
System.out.println(StringUtils.byteToHexString(data));
ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dis = new DataInputStream(bais);
IntWritable iw2 = new IntWritable();
iw2.readFields(dis);
System.out.println(iw2.get());
}
WritableComparable and comparators
- IntWritable implements the WritableComparable interface, which is just a subinterface of the Writable and java.lang.Comparable interface:
package org.apache.hadoop.io; public interface WritableComparableextends Writable, Comparable { }
- Comparison of types is crucial for MapReduce
- where there is a sorting phase during which keys are compared with one another.
- RawComparator is an optimization that Hadoop provides
- extension of Java's Comparator
- allows implementors to compare records read from a stream without deserializing them into objects
- Using big endian may help this also.
- WritableComparator is a general-purpose implementation of RawComparator for WritableComparable classes.
- It provides two main functions.
- A default implementation of the raw compare() method that deserializes the objects to be compared from the stream and invokes the object compare() method.
- acts as a factory for RawComparator instances
RawComparatorcomparator = WritableComparator.get(IntWritable.class); IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));
GenericOptionsParser, Tool interface and ToolRunner
- GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your applicatio to use as desired.
- You don't usually use GenericOptionsParser directly, as it's more convenient to implement the Tool interface and run your application with the ToolRunner
- which uses GenericOptionsParser internally
- You can have your App class derives from Configured, which is an implementation of the Configurable interface.
- All implementation of Tool need to implement Configurable
- and subclassing Configured is often the easiest way to achieve this.
- ToolRunner.run() method takes care of creating a Configuration object for the Tool before calling its run() method.
- ToolRunner also uses a GenericOptionsParser to pick up any standard options specified on the command line and to set them on the Configuration instance.
- -conf <conf file>
- GenericOptionsParser also allows you to set individual properties.
- hadoop ConfigurationPrinter -D color=yellow | grep color
- The -D option is used to set the configuration property with key color to the value yellow.
- Options specified with -D take priority over properties from the configuration files.
InputSampler, Sampler
- The InputSampler class defines a nested Sampler interface whose implementations return a sample of keys given an InputFormat and Job
- This interface usually is not called directly by clients. Instead, the writePartitionFile() static method on InputSampler is used, which creates a sequence file to store the keys that define the partitions.
- The sequence file is used by TotalOrderPartitioner to create partitions for the sort job.
Subscribe to:
Posts (Atom)








