Hadoop
MR
reducer writes to HDFS, whereas mapper writes to local (unless the MR job has no reducers)
reducer writes are costly due to HDFS redundant writes
combiner function runs before reducer
Hadoop may call combiner function
Hadoop streaming uses Unix pipes to allow non-java programs to be used in MR
JobControl class allows ordering multiple jobs ordered as a DAG but evaluate dependencies on the client machine
Oozie is a more robust solution that runs as a service
HDFS
consists of 1 namenode and multiple datanode
namenode knows which blocks are on each datanode, but does not store their exact locations persistently
this is reconstructed from datanodes when the system starts
All data would be lost if namenode were lost, so for HA have,
namenode write metadata to two places, usually NFS mounted storage
a secondary namenode running on separate machine which periodically merges edit log with namespace image
data loss is still possible since a secondary namenode lags the primary namenode
a hot standby namenode (instead of a secondary) must use HA shared storage for storing metadata
namenode stores blocks metadata in namespace image and edit log files
Files cannot have multiple concurrent writers and are written in append-only mode.
Not good for low-latency, large number of small files
Applications can direct namenode which files to cache for better performance
namenode doesn't itself do any caching but delegates caching to datanodes
Hadoop 2.x introduced HDFS federation, which let's a volume to be maintained by separate namenode
keeps namenode from getting too big as was the case in 1.x
failure of one namenode doesn't affect others
since blockpool storage is not partitioned, datanodes register with each namenode
Various URI schemes besides hdfs to allow different filesystem with uniform interface. Eg
local, s3a: AWS S3, wasb: Azure, ftp, viewfs: client-side mount table for federated HDFS
Also available via HTTP REST API for non-java clients either directly or through proxy servers
Clients connect to namenode first to get a list of blocks with their locations ondatanodes and then connect directly to data nodes
HA
Hadoop 1.x had long recovery time due to having to replay edit log and getting metadata from all datanodes
Hadoop 2.x introduced standby namenode to reduce startup time upon failure
datanodes must send block reports to both namenodes since they are in namenodes' memory and not persistent storage
standby namenode also acts as secondary namenode
metadata shared storage can be NFS or QJM (Quoram Journal Manager)
QJM uses multiple nodes for writes, and writes must be successful on majority of nodes
Uses zookeeper for electing active namenode
Fencing is process for preventing both namenodes to think they are the primary.
QJM enforces this by letting only one namenode to perform writes. NFS requires stricter fencing
V/S Cloud
When data is small and localized on HDFS (no shuffle), HDFS may be better than Cloud object store
ROT: Keep small data in cloud but copy it to HDFS before processing
MR jobs running on Hadoop 2.x but < Hadoop 2.7, run faster on HDFS
S3 v/s HDFS
S3 is 10x cheaper when including human cost, or 5x cheaper for just h/w
HDFS can provide up to 6x performance
S3 is ~2x for the same performace/price compared to HDFS
S3 provide more reliable storage than HDFS
HDFS provides transactional writes (through atomic renames). Since S3 does not, data integrity can be complex
YARN
Consists of resource manager (/cluster) and node managers (/node)
node managers run applications in a container
timeline server stores application history
Scheduling policies: FIFO, Capacity and Fair Scheduling
Fair scheduling optionally permits preemption
Application life-cycle
Application request resource-manager to launch which is usually an Application Master process
resource-manager finds a node-manager to run the application
node-manager creates a container and runs application-master
application-master can either perform processing itself and return the results back, or
application-master can request request-manger to allocate more containers to run processes in parallel
Applications can specify locality constraints when requesting containers
constraint can be specifi node, rack or anywhere in the cluster
constraints can be loose or strict, which decides if application falls back or fails if locality cannot be fulfilled
Application can run using different models:
One per job, eg map-reduce
One per user session, eg spark
Shared by multiple users, eg Impala, Slider etc
Other
Avro can use utf8 or Java String. utf8 is more space efficient because it's lazily evaluated
Parquet can efficiently store/retrieve deeply nested fields in columnar format
Flume
high-volume ingestion of event-based data
consists of channels which connecto source and sink
eg log files as source and HDFS/HBase as sink
Fan Out refers to having more than 1 sink to deliver same data
can also filter events by using a multiplexing selector
channel maintains separate transactions for source and sink
channel can be file (durable) or memory (fast)
every event will reach sink at least once
can use batches for efficiency
Sqoop
Uses database specific Connector for bulk data load
Some vendors pre-configure it to use TDCH
Hive
Provides an SQL abstraction over MR
Managed and external tables
LOADing an external HDFS file into a managed table actually moves the data, so dropping it later will lose the data
Creating partitions creates nested folders witch each level named with column-name=value
CLUSTERED BY (<col>) INTO 4 BUCKETS uses module on hash value of col
Zookeeper
highly available, high performance coordination service
HA works by replication within a cluster of servers call ensemble
uses zab protocol which is similar to RAFT
data is stored in znode as hiearchical path -- like Unix filesystem
data is very small (<1MB) and accessed atomocally (always all of the data read/written)
znode are either ephemeral (deleted when client session ends) or persistent
sequential znode has a sequence# automatically appended to its name
useful for imposing global ordering, eg handing out a lock on a resource
operations: create, delete, exists, {get,set}ACL, getChildren, {get,set}Data, sync
Can set a watch (triggered only once) on data operations allowing to get notified for changes
Update operations require a version (obtained from a prior exist) that must match current version
clients usually establish a long lived session with any server from the ensemble
must keep the session alive with heartbeats
ephemeral znodes are lost when session terminates
znode can be protected by ACLs
authentication is performed by ip, sasl (Kerberos), digest (user/password)
Misc
HDFS block size are max value, a smaller block size will not use the entire 128MB of space
HDFS blocks are checksumed and verified on each read.
Datanode block scanner is a background process that periodically verifies all blocks in a datanode
bzip2 compressed files are splittable, but files compressed most others codec are not
hadoop distcp command provides an efficient way (without flowing data to the client) to copy data from one HDFS to another
ROT: use 20 mappers per node for fast and balanced data copy
YARN doesn't provide ways for application running in containers to communicate, application specific methods (such as RPC) are used
NSM (N-ary Storage Model) layout is row-oriented storage
DSM Columnar storage
PAX (Partition Attribute Across) layout:
A batch is group of rows; a batch is columnar.
Protocols like ODBC and JDBC are row oriented, which means transferring data to/from column storage systems involve a lot of computations to assemble data from columns to rows and back to columns
ADBC solves the problem by transferring data in columnar layout, which uses Arrow