我要评分
获取效率
正确性
完整性
易理解

Architecture

In big data analytics scenarios, the data volume increases exponentially, and the analysis cost for multi-source and heterogeneous data increases. In most data analytics scenarios, a large number of disk drive shuffles exist in the data iteration process. As a result, data analytics takes a long time.

As a performance acceleration component of the big data engine Spark, OmniShuffle runs in big data clusters of the customer's data center. It employs effective features such as unified addressing of the memory pool, data exchange in memory semantics, and converged shuffle to reduce the drive I/O overhead, quicken the data analytics process, and improve cluster resource utilization.

OmniShuffle supports the RSS and ESS modes. The two modes have little difference except in the deployment and configuration methods, and you can switch between the two modes when needed.

As a performance acceleration component of Spark, OmniShuffle uses the plugin mechanism provided by Spark to implement the Shuffle Manager and Broadcast Manager plugin interfaces and replace the Shuffle and Broadcast of open source Spark in a non-intrusive manner.

ESS Mode

For shuffle-intensive jobs in Spark, a large amount of data needs to be exchanged across nodes after the map process is complete. Statistics show that the Spark shuffle process spends the most time and resources in many scenarios, and even accounts for 50% to 80% of the end-to-end time overhead of Spark services. Spark provides the Shuffle Manager plugin to optimize the shuffle process. You can use the interfaces defined by Shuffle Manager to customize Shuffle Manager and gear shuffle policies, methods, and processes.

In ESS mode, OmniShuffle enables in-memory shuffle by implementing the Shuffle Manager plugin interface. That is, the shuffle process is completed in the memory pool based on memory semantics, reducing shuffle data flushing to drives. The time overhead and computing power overhead caused by data flushing and reading, serialization and deserialization, compression and decompression can be lessened.

The Broadcast Manager interface is implemented to enable variable broadcast based on memory pool sharing, improving the transmission efficiency of broadcast variables among executors.

In addition, OmniShuffle supports two network modes: Remote Direct Memory Access (RDMA) and TCP. Compared with TCP, RDMA improves transmission efficiency, requires less computing power, and implements efficient data exchange between nodes.

The ESS mode is more suitable for small clusters that require high performance.

RSS Mode

Data analytics costs have been increasing due to soaring data volumes and heterogeneous data types. Traditional shuffle architectures strongly depend on local storage and do not separate storage from compute. As a result, Reduce nodes have a lot of fragmented drive reads and writes, which deteriorate the overall read and write efficiency. What's more, traditional shuffle processes have poor reliability. In large-scale service scenarios, shuffle data may be lost when a drive is faulty, causing stage recalculation. If the number of links is too large, network data may fail to be obtained.

The RSS works on decoupled storage and compute architecture. By optimizing the Spark shuffle write process, BoostRSS saves the data generated in the Map stage to RSS nodes. In this way, the original small files and small I/O operations are aggregated into efficient large files and continuous large I/O operations. The RSS significantly improves the overall drive read and write efficiency and alleviates the I/O burden on compute nodes. As a result, MapReduce task execution performance increases.

The RSS mode is more suitable for medium- and large-sized clusters that require high reliability.

Overall Solution

OmniShuffle enables in-memory shuffle by implementing the Spark Shuffle Manager plugin interface. Figure 1 shows the overall solution architecture of OmniShuffle. Table 1 describes the subsystems.

Figure 1 Logical architecture of OmniShuffle
Table 1 Subsystems

Subsystem

Description

Memory pool kit

Provides the distributed shared memory infrastructure and basic memory semantics.

Metadata

Stores basic information about shuffle data and nodes.

Shuffle semantics

Provides APIs for implementing Spark shuffle semantics.

Broadcast semantics

Provides APIs for implementing Spark broadcast semantics.

Service Process

OmniShuffle accelerates Spark shuffle based on the ShuffleManager APIs provided by Spark and the memory pool kit. See Figure 2 and Figure 3 to understand the OmniShuffle service process.

Figure 2 Service flowchart in ESS mode
Figure 3 Service flowchart in RSS mode

After connecting OmniShuffle to Spark, you can access the Spark CLI to view the cluster status. Administrators and O&M personnel can view the cluster status on the OmniShuffle CLI.

The metadata service is started by the driver and is provisioned only for the current application, which is separated from other applications.

Combining with OmniOperator

Most big data engines use Java or Scala operators, which cannot fully utilize the CPU capability. Besides, these operators do not apply to heterogeneous computing or cannot give full play to hardware computing performance. OmniOperator uses native code to make full use of hardware, especially in heterogeneous computing.

OmniOperator improves the operator execution efficiency and OmniShuffle optimizes the data interaction process. They combine to improve the end-to-end engine query performance.