Get Our e-AlertsSubmit Manuscript
Intelligent Computing / 2022 / Article

Research Article | Open Access

Volume 2022 |Article ID 9820424 | https://doi.org/10.34133/2022/9820424

Shixin Huang, Chao Chen, Gangya Zhu, Jinhan Xin, Zheng Wang, Kai Hwang, Zhibin Yu, "Resource Configuration Tuning for Stream Data Processing Systems via Bayesian Optimization", Intelligent Computing, vol. 2022, Article ID 9820424, 16 pages, 2022. https://doi.org/10.34133/2022/9820424

Resource Configuration Tuning for Stream Data Processing Systems via Bayesian Optimization

Received04 Jul 2022
Accepted09 Aug 2022
Published06 Oct 2022

Abstract

Stream data processing systems are becoming increasingly popular in the big data era. Systems such as Apache Flink typically provide a number (e.g., 30) of configuration parameters to flexibly specify the amount of resources (e.g., CPU cores and memory) allocated for tasks. These parameters significantly affect task performance. However, it is hard to manually tune them for optimal performance for an unknown program running on a given cluster. An automatic as well as fast resource configuration tuning approach is therefore desired. To this end, we propose to leverage Bayesian optimization to automatically tune the resource configurations for stream data processing systems. We first select a machine learning model—Random Forest—to construct accurate performance models for a stream data processing program. We subsequently take the Bayesian optimization (BO) algorithm, along with the performance models, to iteratively search the optimal configurations for a stream data processing program. Experimental results show that our approach improves the 99th-percentile tail latency by a factor of 2.62× on average and up to 5.26× overall. Furthermore, our approach improves throughput by a factor of 1.05× on average and up to 1.21× overall.

1. Introduction

Stream data processing systems, such as Apache Flink [1], Apache Storm [2], Spark Streaming [3], and Apache Heron [4], are being widely deployed in an increasing number of companies (for example, Alibaba and Twitter) as these companies have to deal with continuous stream data and perform real-time data analysis. Flink, one of the latest stream data processing systems, is being widely used for real-time monitoring, real-time warehouse data analysis, and stream-data analysis.

Typically, stream data processing applications repeatedly run on a specific hardware environment for long periods of time (e.g., months or years), with each application being run using different data. Considering these applications’ large-scale deployment and long, even a small performance improvement can bring huge economic benefits (e.g., millions of dollars) for companies. Unlike other big data applications such as batch processing, the higher performance of stream data processing applications implies shorter 99th-percentile latency and higher throughput [5] simultaneously.

Stream data processing systems typically provide a large number (e.g., 30) of resource configuration parameters to specify the amount of resources such as CPU cores and memory used in tasks. These configuration parameters, including those related to I/O behavior and load balancing, significantly affect the performance of a stream data processing application; we call these key-configuration parameters. Tuning these key-configuration parameters can achieve surprisingly high performance.

However, picking key-configuration parameters, as well as finding their optimal values for a stream data processing application, is very challenging. This is because the optimal configuration parameter values depend on various aspects including the physical cluster resources, the characteristics of the application, and its input data. Furthermore, these aspects are usually intertwined in a complex way, making the configuration tuning extremely challenging.

As such, manually tuning these parameters is extremely time-consuming. For a single unknown application, a performance engineer, who has a deep understanding on the stream data processing system, may take several days or even weeks to find its optimal resource configuration. What is worse, when the number of key-configuration parameters becomes high-dimensional (e.g., greater than 30), using traditional methods to seek the optimal optional parameter values to achieve high performance is NP-hard. Consequently, autotuning the resource configuration parameters for optimal performance is desired.

In the present study, we leverage a machine learning-based methodology [6] to adjust the resource allocation parameters automatically and efficiently for stream data processing applications. It takes the speed of input data and key-configuration parameters as input to employ a highly accurate performance model for a stream data processing workload, which outputs the tail latency or throughput of the application. Along with the performance model, we utilize a Bayesian optimization algorithm (BOA) to search the optimal resource configuration parameter values to achieve high performance.

Without loss of generality, we use Apache Flink programs as stream data processing applications for our experiments. Our methodology is not bound to specific parameters or systems. Instead, it can be applied to other stream processing systems such as Apache Storm or Spark Streaming. To the best of our knowledge, this is the first study to tune the resource configuration parameters for stream data processing frameworks by using Bayesian optimization.

Following are the contributions of this study: (i)We propose a methodology to autotune resource configuration parameters of stream data processing frameworks(ii)We propose a Random Forest algorithm to build a highly accurate performance model for a stream data processing program(iii)We leverage the Bayesian optimization algorithm (BOA) to iteratively search the high-dimensional resource configuration space to achieve optimal performance

We demonstrate that our approach significantly improves the 99th-percentile latency of Flink applications compared with default configurations by a factor of 2.62× on average and up to 5.26× overall and improves the throughput by a factor of 1.05× on average and up to 1.21× overall.

The remainder of this paper is organized as follows. In Section 2, we describe materials and methods. Section 3 introduces experimental results and discussion. In Section 4, we present our conclusions.

2. Materials and Methods

2.1. Background and Motivation
2.1.1. Flink Framework

Apache Flink is an open-source real-time data processing engine that can process both batch and stream data [7]. It is based on event-driven, efficient checkpointing mechanism, and exactly once semantics. Flink allows users to write their own applications that can be deployed on multiple distributed compute nodes, which makes the speed of processing massive amounts of data much faster than a single computer node. Unlike Apache Spark, Storm, and other frameworks, Flink provides a unified processing architecture for both batch and stream data processing. As a matter of fact, while processing, it treats batch data as a type of special stream data.

Figure 1 shows the architecture of Flink. The Flink runtime consists of two types of daemons: JobManager and TaskManager. JobManager acts as the master node running on a Flink cluster and is responsible for scheduling tasks, coordinating checkpoints, and executing failover strategies, among other tasks. TaskManager is a worker process on each slave node of the cluster and executes tasks while managing the status of each slave node. In addition to the runtime, Flink uses Client to prepare and send a dataflow to the JobManager.

The entire job submission process is as follows: a user submits a job, which is then transformed into a directed acyclic graph (DAG) [8] of a task for execution. The nodes of a DAG can be classified into three categories: source nodes, sink nodes, and operators. The source nodes read data from the message queue. The operators define how the intermediate data are processed. And the sink nodes output the final processing results. The interaction between the JobManager and the TaskManager and the JobManager and the Client is based on the Akka toolkit.

The submission of a Flink job also involves the creation of the Actor System, the startup of JobManager, and the startup and registration of TaskManager. JobManager and TaskManager communicate with each other via the Actor System to obtain the status of executed tasks and then send it to a Client via the Actor System. During job execution, the JobManager may trigger checkpoint operations. After each TaskManager receives trigger-checkpoint instructions, they finish checkpoint operations. All coordination processes about checkpoint are completed by the JobManager. When a job is completed, Flink feeds the information of the job’s execution back to the Client and releases the resources occupied by the TaskManager for the next submission.

Flink applications can be deployed on mainstream resource managers (e.g., Mesos [9], YARN [10], and Kubernetes [11]). Flink provides hundreds of configuration parameters (e.g., >300) that specify different aspects of one Flink job, including JobManager, TaskManager, network communication, and shuffle behavior. Some of these configuration parameters do not affect the performance of Flink applications. For instance, jobmanager.archive.fs.dir defines the file directory where the JobManager stores the archives of jobs that have been completed, and jobmanager.rpc.port specifies the network port for communication with the JobManager.

However, other configuration parameters, especially the resource-related configuration parameters such as jobmanager.memory.flink.size and akka.framesize, significantly impact a job’s performance. In this example, the parameter jobmanager.memory.flink.size specifies the memory size that the JobManager can consume. It consists of two parts: Java on-heap memory and off-heap memory. If it is too small, the memory buffer cannot cache all the intermediate results, which degrades the performance. The parameter akka.framesize limits the size of the message sent between the JobManager and the TaskManager, which has a significant impact on the network performance. We call these resource-related parameters key-configuration parameters. We will first determine these key-configuration parameters and then focus on tuning them afterward.

2.1.2. Bayesian Optimization

Bayesian optimization [12] is an algorithm for black-box function optimizations, and it has been extensively used for hyperparameter tuning in machine learning. Mathematically, we formulate a global maximization problem of a black-box function as follows: where is our resource configuration space. Bayesian optimization is extremely useful in cases where function is unknown, the cost of evaluating is quite high (such as the learning rate in deep neural networks), or one does not have access to derivatives with respect to .

Bayesian optimization consists of two key parts. The first part is a prior model, also called the probabilistic surrogate model; it consists of a distribution that captures our beliefs about the behavior of a black-box function. The other part is an acquisition function; examples include Thompson sampling (TS), probability of improvement (POI), expected improvement (EI), and upper-confidence-bound (UCB). The advantage of acquisition functions is that they are easy to evaluate compared with the unknown objective function . In most cases, the acquisition function is much easier to optimize compared with the original black-box function. Algorithm 1 is the pseudocode of a Bayesian optimization framework, and Figure 2 illustrates the iterative process of Bayesian optimization.

1 for do
2   select new by optimizing utility function
3   query target function to obtain
4   augment data
5   update statistical model
6 end

The main idea of Bayesian optimization is to build a model that can be easily updated and queried to find a global maximizer in a short time. To explain Bayesian optimization, we take a model parameterized by as an example and let represent the available data. We simply consider the nonparametric case in this example. Because is unobserved, we treat it as a random variable with a prior distribution , which captures our beliefs about the unknown objective function. When we are given data and a likelihood, inferring a posterior distribution is easy by using Bayes’ rule: where represents marginal likelihood. After observing data , we update our beliefs about . The Bayesian optimization algorithm uses Bayes’ rule, which makes the posterior computing analytically and quickly.

2.1.3. Motivation

Flink has more than 300 configuration parameters, and the key-configuration parameters need to be tuned. In this section, we attempt to answer two questions: (1) whether the performance of Flink (such as throughput and latency) is affected by input data speed and (2) whether some approaches on automatic parameter tuning for big data processing systems [1316] (such as Hadoop and Spark) are also applicable to Flink.

2.1.4. Input Data Speed Sensitivity

In theory, when the input data speed increases, the streaming processing system has to adapt itself to handle the increased input data speed and process tuples. When the input data speed is beyond the ability of the system to handle, backpressure arises in the system. To find out whether the input data speed influences throughput and latency of Flink applications, we first select three programs in HiBench [17] as follows: Flink WordCount (Flink-WC), Flink Repartition (Flink-RP), and Flink Identity (Flink-ID). Subsequently, we run these three programs with three different input data speeds—8 K records/s, 80 K records/s, and 160 K records/s—with the default resource configuration on the same cluster. Our goal is to observe the fluctuations of throughput and latency.

Figure 3 shows the fluctuations of throughput and latency. We can clearly find that the input data speed significantly influences both throughput and latency of Flink applications. In addition, throughput and latency are not closely correlated. However, we note that their values increase with the speed of the input data. Here is what explains this phenomenon.

On the one hand, when the input data speed increases, the streaming system has to scale out to process more data, which causes the system to receive data at a much higher rate than it can process it. As a result, backpressure builds in the system. From the moment when the backpressure mechanism starts, the latency of all queue messages increases.

On the other hand, fluctuations of throughput and latency exist because of the nature of In-Memory Computing (IMC) frameworks. As an IMC framework, Flink leverages memory to keep its state. As Figure 4 shows, stateful Flink applications are optimized for local state accesses. Task state is always maintained in memory unless there is not enough memory. In that case, the task state is maintained in access-efficient on-disk data structures. Consequently, computations are performed using local states, which results in low processing latency. Besides, since it periodically checkpoints the local state to the storage, the Flink guarantees exactly once state consistency in case of failures.

2.1.5. Limitations of Prior Works

Popular big data computing frameworks such as Spark and Flink also contain numerous configuration parameters controlling memory, I/O, computing cores, and so on. Herodotou et al. [18] classify common big data parameter autotuning methods into six categories. Rule-based methods assist developers tune some parameters quickly. It relies on expert experience and manual tuning instructions, but a deep understanding of the internal mechanism of the system is required. White-box approaches are broadly used in constructing cost model and analytical model. Large amounts of history information and system logs are needed to be analyzed to build an accurate model. This type of complex analytical approach is not suitable for Flink applications. Simulation-based approaches, meanwhile, cannot tune the resource configuration parameters automatically, and thus, they are not very efficient for searching the optimal configuration for a specific streaming workload.

Though experiment-driven approaches can find optimal settings on real systems, they are time-consuming because repeated experiments on physical machines are necessary. Also, the high-dimensional configuration space makes it impossible to find an optimal configuration to achieve our desired performance. Adaptive approaches can tune resource configurations when an workload is running, but improper settings of some configuration parameters may cause certain system issues. Machine learning-based methods typically abstract a complex system into an invisible black box to tune system parameters. One advantage is they do not require an in-depth understanding of the system mechanism and hardware. The other advantage is that they have the ability to capture complex system dynamics. However, machine learning-based approaches have yet not been applied in stream data processing systems. We therefore choose machine learning approaches to build performance models for our Flink workloads.

2.2. Related Work

For batch processing systems, Herodotou et al. [1921] first proposed using analytical models to predict the performance of the MapReduce framework. Gencer et al. [22] developed the response surface (RS) methodology to build a performance model, after which the model was used in a simulator for MapReduce jobs. Bei et al. [23] suggested using Random Forest to autotune Hadoop’s configurations. Their results showed that RFHOC’s performance increases when the size of the input data set increases.

In-memory cluster (IMC) computing frameworks such as Spark can leverage memory resource to store temporary data and thus perform better than traditional on-disk cluster computing [24, 25]. Yu et al. [26] proposed a DAC, a datasize-aware autotuning methodology to efficiently find the optimal configuration in high-dimensional configuration space for a given IMC program, and compared with default configurations; the authors achieved improved performance of Spark programs by a factor of 30.4× on average and up to 89× overall.

For the stream processing system Apache Storm, Li et al. [27] aimed not only to leverage support vector regression (SVR) to construct the workload performance model but also provided an efficient scheduling solution for Storm. Trotter et al. [28] suggest accelerating the exploration of the configuration space in Storm by employing searching algorithms such as genetic algorithm (GA) or Bayesian optimization algorithm (BOA).

For Spark Streaming, Lin et al. [29] leveraged an executable and reliable model named SSP (stands for Spark Streaming Processing) to simulate Spark Streaming in different scenarios. Vaquro et al. [30] employed a reinforcement learning method to autotune configurations for stream processing engines. Das et al. [31] applied a robust and effective algorithm that adaptively tunes the batch size for promoting the performance of Spark Streaming. Petrov et al. [32] proposed a robust and adaptive performance model for Spark Streaming to achieve the goal of allocating resources dynamically and reducing the total cost.

As for the stream processing system Apache Flink, although there are few studies on scheduling optimization such as operator chain, slot sharing, and checkpoint and some people even propose to change hardware resources to optimize the performance, these methods are extremely time-consuming and not necessarily very effective for dynamic tuning of Flink program performance. In other words, these approaches may be feasible in theory, but they cannot efficiently optimize the performance of Flink applications in reality. Furthermore, they require expert knowledge on the extremely complex Flink system and are not adaptive to system changes.

We propose a machine learning approach based on Bayesian optimization to optimize the performance of Flink. We focus on tuning key-configuration parameters, which significantly improves the performance of Flink programs. Moreover, our approach treats the Flink system as a black box, which means a user is not required to have a deep understanding of Flink. In addition, we take into account the influence of input data speed, and our approach is adaptive to system changes.

2.3. Machine Learning Approach Based on Bayesian Optimization

In this section, we propose a machine learning approach based on Bayesian optimization to optimize the performance for a given Flink application on a given hard cluster by autotuning its key-configuration parameters. Figure 5 depicts an workflow of our methodology. It is composed of five modules: determining, generating, collecting, modeling, and searching.

In the determining module, we pick key-configuration parameters out of more than 300 configuration parameters in Flink using theories and experiments in the ① step.

In the ② step, the generating component automatically generates random configuration parameter values that are employed to learn the potential relationship between configuration parameters and the performance (e.g., tail latency or throughput) of a given Flink application.

Note that in the ③ step, we classify these configurations into two categories: FailConf and PassConf. FailConf represents the configuration parameters with which fail to start a Flink Cluster or submit a job. For example, on the Flink official website, it is not recommended to explicitly configure both total process memory and total Flink memory. It may lead to deployment failures due to potential memory configuration conflicts. Unfortunately, we do not know whether the configuration parameters are proper unless we use them to submit a Flink job.

On the contrary, PassConf represents the configuration parameters with which we can successfully submit a Flink job. By performing experiments with such parameters, we can easily find the PassConf. Later, in the ④ and ⑤ steps, the collecting component collects the performance (such as latency and throughput) of a Flink program with a number of different parameter values in PassConf.

In the ⑥step, the modeling module employs an appropriate machine learning algorithm to construct a performance model that inputs the key-configuration parameters and the speed of input data as inputs and outputs the performance for a given Flink program. Last but not the least, the searching module is employed to automatically search the optimal configurations in the ⑦ step.

In summary, the searching component depends on the accuracy of the performance models built by the modeling component; in turn, the modeling component depends on the training data of the collecting component, and the generating component needs the key-configuration parameters from the determining component to generate proper configuration parameter values. The five components are closely connected for optimal configuration parameter tuning.

2.3.1. Determining Key Configurations

Flink provides more than 300 configuration parameters to tune, and they are stored in flink-conf.yaml, which is a collection of key-value pairs with format key: value. The configuration and its corresponding value is parsed from this file when an Flink process is started. Therefore, changing a configuration file requires restarting Flink processes. These configurations are mainly classified as follows: basic setup, common setup options, security, resource orchestration frameworks, state backends, metrics, history server, and so on. The basic setup includes the ones needed for a basic distributed Flink setup, which includes hostnames/ports, memory sizes, parallelism, checkpointing, and web UI, among others. The basic setup is necessary and significant for starting a Flink cluster.

First of all, we can exclude some configuration parameters that do not affect performance, such as hostnames/ports, YARN, SSL, and web UI. After this, we are left with approximately 60 configuration parameters. Then, we use the single variable principle to perform experiments: when we change a parameter’s value, we set the rest of the parameters as default values. At the same time, we observe the variation of performance and the corresponding resource configurations such as computing resources (the number of cores required for each task), memory resources (memory size and buffer size), network bandwidth utilization, and disk resource utilization. If the performance varies significantly when the parameter’s value changes, we call these parameters key-configuration parameters. Otherwise, we exclude this configuration parameter. Through the experiments, we end up picking 30 key-configuration parameters, as shown in Table 1. Most of these configuration parameters are related to resources such as memory, and thus, we also call them resource configuration parameters.


Configuration parameter-descriptionRangeDefault

parallelism.default: the default parallelism used when no parallelism is specified anywhere1-2001
taskmanager.numberOfTaskSlots: the number of slots that a TaskManager offers. It is denoted as “#slot” by some parameters as the default value, such as taskmanager.network.netty.num-arenas.1-401
execution.buffer-timeout: the maximum time frequency (milliseconds) for the flushing of the output buffers, in ms. A positive value means triggering flushing periodically by that interval, 0 ms means triggering flushing after every record, and -1 ms triggers flushing only when the output buffer is full.-1, 0, 1-500100
execution.checkpointing.interval: the interval in which checkpoints are periodically scheduled, in ms.10-600000(none)
taskmanager.network.numberOfBuffers: the number of buffers available to the network stack28-448002048
jobmanager.memory.flink.size: total Flink Memory size for the JobManager, in MB1024-40960(none)
jobmanager.memory.jvm-metaspace.size: JVM Metaspace size for the JobManager, in MB64-4096256
jobmanager.memory.jvm-overhead.max: Max JVM Overhead size for the JobManager, in MB512-40961024
jobmanager.memory.jvm-overhead.min: Min JVM Overhead size for the JobManager, in MB64-512192
taskmanager.memory.flink.size: total Flink Memory size for the TaskExecutors, in MB1024-40960(none)
taskmanager.memory.jvm-metaspace.size: JVM Metaspace size for the TaskExecutors, in MB64-4096256
taskmanager.memory.jvm-overhead.max: Max JVM Overhead size for the TaskExecutors, in MB512-40961024
taskmanager.memory.jvm-overhead.min: Min JVM Overhead size for the TaskExecutors, in MB64-512192
blob.fetch.backlog: the config parameter defining the backlog of BLOB fetches on the JobManager.200-50001000
blob.fetch.num-concurrent: the config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves10-20050
taskmanager.memory.segment-size: size of memory buffers used by the network stack and the memory manager, in KB4-102432
taskmanager.network.blocking-shuffle.compression.enabled: Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle modeTrue, falseFalse
taskmanager.network.blocking-shuffle.type: the blocking shuffle type, either “mmap” or “file”File, mmapFile
taskmanager.network.netty.client.numThreads: the number of Netty client threads1-40#slot
taskmanager.network.netty.num-arenas: the number of Netty arenas1-40#slot
taskmanager.network.netty.sendReceiveBufferSize: the Netty send and receive buffer size, in MB4-409604
taskmanager.network.netty.server.numThreads: the number of Netty server threads1-40#slot
taskmanager.network.netty.transport: the Netty transport type, either “nio” or “epoll.” The “auto” means selecting the property mode automatically based on the platform.nio, epollAuto
taskmanager.network.request-backoff.initial: minimum backoff in milliseconds for partition requests of input channels10-500100
taskmanager.network.request-backoff.max: maximum backoff in milliseconds for partition requests of input channels1000-2000010000
akka.framesize: maximum size of messages which are sent between the JobManager and the TaskManagers, in MB1-204810
akka.throughput: number of messages that are processed in a batch before returning the thread to the pool1-50015
akka.transport.threshold: threshold for the transport failure detector100-1000300
fs.output.always-create-directory: file writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory.False, trueFalse
fs.overwrite-files: specifies whether file output writers should overwrite existing files by defaultFalse, trueFalse

2.3.2. Generating Resource Configurations

The aim of the generating component is to randomly generate values for resource configuration parameters within their value ranges, and these configurations are written into the configuration file flink-conf.yaml. Here we use the following vector to represent a configuration set: with the configuration set and the value of the configuration parameter in the configuration set. is 30 which corresponds to the 30 key-configuration parameters of Flink, and is the number of configurations. It is worth mentioning that as long as the two values of at least one parameter of two configuration sets are different, we regard them as two different configuration sets.

After generating configurations, we classify these configurations into two categories: FailConf and PassConf. Note that we only use configurations in PassConf to collect performance data, because the FailConf cannot successfully submit a Flink job or start a Flink cluster. Unfortunately, the emergence of FailConf is inevitable because there may be conflicts between different configuration parameters. By submitting a job using generated configurations, we can easily find out whether they belong to PassConf.

2.3.3. Collecting Performance Data

For each set of configurations in PassConf, we run a Flink application in a given cluster and then use HiBench to collect its throughput and 99th-percentile tail latency. Finally, we define the performance of a Flink application as follows: where is the performance of the trial of the application, the throughput (msg/s) of the execution, and the 99th-percentile tail latency (ms) of the execution. denotes the number of executions. Higher is better, and this definition can ensure high throughput and low latency at the same time, which is good for both service providers and end users.

Subsequently, we use these data as training data to build proper performance models. must be large enough to ensure the accuracy of a performance model (e.g., %), and different Flink programs may require different values of for obtaining good accuracy of the performance model. needs a proper value because a larger causes a longer runtime of programs and collect correspond performance, and a smaller may decrease the accuracy of a performance model. Consequently, we leverage experiments to maintain a favourable balance between the cost of time and the accuracy of the performance model in our environment.

2.3.4. Building Performance Models

We consider establishing the following simplified performance model for a given Flink program on a given cluster: with perf the performance, the value of the configuration parameter, and speed the input data speed (byte/s). In order to effectively tune configuration parameters for a given Flink application, we use machine learning (ML) approaches to construct an accurate performance model with high-dimensional parameter space. ML models provide two benefits: (1) they regard the intricate system as a black box; in other words, there is no need to comprehend the internals of the system; (2) they can improve accuracy with more training data. However, we face one challenge using ML: the ML model selection issue.

There exists many ML algorithms for us to choose from. Some well-known ones are (1) Random Forest (RF), which uses the average of the multiple decision trees’ prediction results as the final prediction result; (2) Gradient Boosted Regression Tree (GBRT), which is an iterative decision tree algorithm composed of multiple decision trees; (3) Artificial Neutral Network (ANN), which forms a collection of connected nodes that can transmit signals to each other, leading to some prediction; and (4) Support Vector Machine (SVM), which is aimed at finding the separated hyperplane that can correctly partition the training data set and has the maximum geometric spacing.

Which one is the most suitable for our performance models? In theory, it is particularly challenging to obtain a correct answer to this question. Our goal is to build a sufficiently accurate model that uses as little training data as possible. We implement four different ML algorithms—SVM, ANN, RL, and GBRT—on top of the sklearn Python library [33] and adopt default hyperparameter values. We employ the four machine learning models on the same training data set and evaluate their fitting effect using the mean absolute percentage error (MAPE). MAPE is a mathematical method that widely used to evaluate the fitting quality of regression models. It is defined as follows: where is the performance predicted by the models for a given Flink program, is the actual performance of that program, and is the total number of the testing data. In our experiments, one-fifth of the dataset was used as a testing data and the other four-fifths as training data. Note that there exists no overlap between the testing data and the training data.

Figure 6 shows the MAPE of the models for the program WordCount built by the four algorithms. The horizontal axis is the total number of dataset (where the training data accounts for 80% and the testing data accounts for 20%). As can be seen, the MAPE decreases when the amount of training data increases for all four algorithms. However, the MAPE of RF is the lowest among the experimented algorithms for all the data sets, as shown in Figure 6. We thus choose RF as the performance model.

2.3.5. Searching Optimal Configurations

To search the optimal configuration parameters for a given Flink program, a well-performing search algorithm is urgently needed. We require a robust and reliable searching algorithm. Many searching algorithms exists that can efficiently search a particular parameter space, such as grid search [34], random search [35], tabu search [36], genetic algorithm [37], and Bayesian optimization. Grid search is highly time-consuming, especially when we search in a high-dimensional space. Random search tends to easily get stuck in local optimality. Tabu search is typically affected by local convergence rate and its initial solution space.

The genetic algorithm is an evolutionary and heuristic algorithm that seeks the optimal solution by simulating the selection and heredity mechanism in nature. However, there are some parameters in a genetic algorithm that dramatically affect the effectiveness of solutions, such as crossover rate and mutation rate. At present, it is very challenging to pick appropriate values for these parameters that heavily rely on parameter tuning experience. Moreover, from results in Section 3.2.2, we find that the genetic algorithm needs more training time to obtain more accurate solutions, and thus, its iteration speed is relatively slow. The iteration efficiency of Bayesian optimization, however, is higher.

As a result, we choose Bayesian optimization for our experiment and compare it against the genetic algorithm on the time used to find the optimal configuration and the final speedups. Note that we utilize Matern 5/2 kernel for Bayesian optimization because it is favored on practical functions [12]. Detailed results are presented in Section 3.2.2.

3. Results and Discussion

3.1. Experimental Setup
3.1.1. Cluster Platform

Our experimental hardware environment is made up of 8 Linux compute nodes, and one of them acts as the master node and the others as the slave nodes. Each compute node is mounted with an Intel(R) Xeon(R) CPU Silver-4114 2.20 GHz 10-core processor and 64 GB DRAM. There are in total 80 cores and 512 GB of memory in our hardware cluster. The operating system used in each node is Ubuntu Server 18.04. We choose Flink 1.11 for this experiment. We use a Kafka [38] cluster to generate data streams.

Figure 7 shows the topology of our Kafka and Flink cluster. The cluster consists of 8 servers, where one is used as the Flink master and the others as Flink slaves. Each server runs Kafka and Flink daemons for data transmission and processing. Once we start Kafka, one server will be selected as the leader while the others as followers. The Kafka daemon reads data from the data generator continuously. The Flink daemon processes the stream from Kafka and then outputs the results to Kafka. The latency of each record is computed as the difference between out timestamp and in timestamp.

3.1.2. Representative Programs

We choose all Flink programs from HiBench, as listed in Table 2. Streaming benchmarks are widely used to evaluate streaming processing systems, such as Spark Streaming, Flink, Storm, and Gearpump. The four programs are as follows: WordCount, FixWindow, Repartition, and Identity. WordCount mainly evaluates the performance of the stateful operator and the cost of Checkpoint/Acker during stream processing. FixWindow involves a window operation and thus evaluates the efficiency of window operation. Repartition tests the shuffle performance by changing the parallelism of tasks. Identity evaluates read/write efficiency from Kafka. The input data speeds of the benchmarks are listed in Table 3 and the detailed processes are shown in Figure 8.


Benchmark suiteProgramAbbr.

HiBenchFixWindowFW
WordCountWC
RepartitionRP
IdentityID


ApplicationAbbr.Input data speed

WordCountWC16 K, 80 K, 160 K, 240 K (events/s)
IdentityID16 K, 80 K, 160 K, 200 K (events/s)
RepartitionRP16 K, 80 K, 160 K, 200 K(events/s)
FixWindowFW16 K, 40 K, 80 K, 240 K (events/s)

3.1.3. Configuration Parameters

Based on expert experience and additional experiments, we select 30 configuration parameters that affect the performance of Flink system critically, including network stack, I/O, parallelism, and execution behavior. Table 1 lists 30 key-configuration parameters in detail. The third column of Table 1 demonstrates default configurations that Apache Flink provides for developers [39]. The second column depicts the value range of each configuration.

3.2. Results and Analysis

In this section, we evaluate the impact of execution number and iteration number for Bayesian optimization. Then we present the performance comparison results and analysis among various methods.

3.2.1. Impact of Execution Number

As described in Section 2.3.3, it is extremely important to determine a suitable execution number that affects both model accuracy and time cost. A larger increases the model accuracy at the cost of increasing time for collecting training data and vice versa. To minimize the time cost and maximize model accuracy simultaneously, we carefully determine the trade-off using experiments.

We first build a performance model for a Flink program using 100 different configurations and increase the training dataset by 100 configurations each time. Figure 9 illustrates the relationship between model accuracy and the number of resource configurations in the dataset for the experimented Flink programs. As mentioned earlier, we use mean absolute percentage error (MAPE) to represent the model accuracy, which is a common metric to evaluate the quality of regression models. A MAPE value of 0% indicates a perfect model (i.e., no mean errors), and as MAPE value increases, the modeling quality deteriorates. Note that when the actual value is 0, there is a problem of dividing the denominator by 0, and thus, this formula is no longer available.

For convenience, we only show the maximum (Max), mean (Mean), and minimum (Min) MAPE of the performance models for all the Flink programs in Figure 9. The overall trend is that the MAPE decreases when the number of resource configurations in the training dataset increases. When reaches 1000, the whole curve becomes flat and shows signs of convergence, and thus, we set to 1000 to build the model of each Flink program.

3.2.2. Comparison between Bayesian Optimization and Genetic Algorithm

To choose a suitable search algorithm, we compare the Bayesian optimization algorithm (BOA) with the genetic algorithm (GA). Figure 10 shows that Bayesian optimization has a faster convergence speed, which means it requires lesser iteration time. Besides, we observe that the average iteration time of GA is 1.4× that of BOA. Nonetheless, the final optimal search solution of Bayesian optimization is almost the same as that from GA.

Consequently, we choose Bayesian optimization as our search algorithm by iteratively searching the extremely complex resource configuration space to achieve the optimal performance for given Flink programs.

3.2.3. The Impact of Different Acquisition Functions on Bayesian Optimization

During Bayesian optimization, we use acquisition functions to select a sequence of query points. Besides, acquisition functions are helpful for satisfying our requirement to trade off between exploration and exploitation. There are three widely used acquisition functions: probability of improvement (POI), expected improvement (EI), and upper confidence bound (UCB). We divide these three acquisition functions into two categories [12]: (1)Improvement-Based Policies. POI favors points that are most likely to improve the current target, while EI prefers to choose these points that can maximize the expected improvement which includes the amount of improvement over the current best(2)Optimistic Policies. When we aim to find a minimum over a black-box function, UCB chooses these points whose certainty region has the smallest lower bound. The key idea of this strategy is to always stay optimistic when facing uncertainty. And UCB has been shown to be an efficient approach to negotiate exploration and exploitation

In our experiment, we first consider these three widely used acquisition functions: POI, EI, and UCB. We also perform an experiment to choose an acquisition function that can achieve better iteration results in Bayesian optimization. Figure 11 shows how different acquisition functions impact the iteration of Bayesian optimization. The iteration result of EI is better compared with POI and UCB. Therefore, we choose EI as the acquisition function for our experiment.

3.2.4. Time Overhead

The total time overhead of our method mainly consists three parts: (1) data collection, (2) model building, and (3) searching the best configuration. It should be noted that the time used for data collection dominates the total time overhead. Another problem is that there always exists noise in the real hardware cluster. To reduce the influence of measurement noise, we repeat the experiment for three times to obtain the average value.

For the stream application, we tested in HiBench, we set a run time of 5 min for one trial. And for one given workload, the time cost for data collection is approximately between 3 and 4 days. In contrast, it usually takes less than 3 s to build an accurate model and approximately a few minutes (e.g., 1 or 2 min) to finish searching the best configuration.

3.2.5. Speedup

To clearly show the optimization results, we use the speedup metric that can be defined as follows: where perf is the performance of an Flink application with configurations produced using our proposed approach Bayesian optimization algorithm (BOA), and is the performance obtained using the default configuration. Besides, we also implement modified Hill Climbing (mHC) [5], the state-of-the-art approach to tuning configuration of streaming processing systems, for performance comparison.

For stream processing systems, both throughput and tail latency are crucial performance metrics. However, it is difficult to obtain a clear relationship between them. In many cases, high throughput does not necessarily produce low latency and vice versa. As a consequence, we consider 99th-percentile tail latency, throughput, and the ratio of the throughput to 99th-percentile tail latency defined in Equation (4) (Section 2.3.3) as the optimization metrics in our study.

Figure 12(a) depicts the speedups of 16 program-input pairs with BOA and mHC compared with the default configurations. Regarding the ratio of throughput and latency (the higher the better) as the optimization metric, compared to the default configurations, BOA improves the 16 programs-input pairs by a factor of 2.73× on average and up to 5.46× overall, whereas mHC improves by a factor of 1.95× on average and up to 3.58× overall. In Figure 12(b), we consider 99th-percentile tail latency as the optimization metric (the lower the better). As can be seen, compared with the default configurations, BOA improves the 99th-percentile tail latency by a factor of 2.62× on average and up to 5.26× overall, while mHC improves by a factor of 1.93× on average and up to 3.51× overall.

In Figure 12(c), we take throughput as the optimization metric (the higher the better). We observe that when it comes to throughput, all methods exhibit similar performance: BOA improves the throughput by a factor of 1.05× on average and up to 1.21× overall, whereas mHC improves by a factor of 1.03× on average and up to 1.09× overall.

3.2.6. Future Work

The proposed methodology can adopt to different workloads (for example, time-varying workload). Furthermore, as it is based on a black-box model, specific characteristics of the application do not affect the universality of the method.

In fact, Flink is just one example of our methodology optimized for a stream processing engine. Flink essentially performs stateful stream processing, storing state (intermediate results) either in the memory or in a stable storage file system (e.g., HDFS). It also supports three different time semantics and provides an incremental checkpoint mechanism. Due to the above internal mechanism, Flink can simultaneously ensure high throughput and low tail latency when processing data streams.

However, the proposed method is a parameter-tuning tool independent of the Flink system. It interacts with the Flink system through several APIs to start Flink, run a given workload, and collect performance data. And later, it constructs an accurate machine learning model and leverages Bayesian optimization to achieve automatic parameter tuning. Because of this, it can also be integrated into other stream processing systems (e.g., Spark Streaming and Storm). In future studies, we plan to add more evaluations using various applications and benchmark suites (e.g., TPC-DS and TPC-H).

4. Conclusion

We proposed a machine learning approach based on Bayesian optimization to autotune the key-configuration parameters of stream data processing systems. The speed of input data and 30 key-configuration parameters were needed to build the performance model. We evaluated the proposed method on a cluster with eight servers using four benchmarks, with each benchmark having four different input data speeds. Experiment results revealed that our approach significantly improved the 99th-percentile tail latency of the four Flink programs by a factor of 2.62× on average and up to 5.26× overall and improved throughput by a factor of 1.05× on average and up to 1.21× overall.

Data Availability

Data are available from the corresponding author on reasonable request.

Conflicts of Interest

The authors declare that there is no conflict of interest regarding the publication of this paper.

Authors’ Contributions

S. Huang and C. Chen designed the experiments and wrote the original draft. G. Zhu, J. Xin, and Z. Wang conducted the experiments. K. Hwang reviewed the manuscript. Z. Yu conceived the idea and edited the manuscript.

Acknowledgments

This work was supported by the Key-Area Research and Development Program of Guangdong Province (grant number 2019B010155003), the Guangdong Basic and Applied Basic Research Foundation (grant number 2020B1515120044), and the National Natural Science Foundation of China (grant number NSFC 61902355).

References

  1. A. Flink, 2020, https://flink.apache.org/.
  2. A. Storm, 2020, https://storm.apache.org/.
  3. A. Spark, 2020, https://spark.apache.org/.
  4. A. Heron, 2020, https://incubator.apache.org/projects/heron.html/.
  5. M. Bilal and M. Canini, “Towards automatic parameter tuning of stream processing systems,” in Proceedings of the 2017 Symposium on Cloud Computing, pp. 189–200, New York, NY, USA, 2017. View at: Publisher Site | Google Scholar
  6. M. Abadi, P. Barham, J. Chen et al., “Tensorflow: a system for large-scale machine learning,” in 12th USENIX Symposium on Operating Systems Design and Implementation (OSDI 16), pp. 265–283, Savannah, GA: USENIX Association, 2016, https://www.usenix.org/conference/osdi16/technical-sessions/presentation/abadi/. View at: Google Scholar
  7. P. Carbone, A. Katsifodimos, S. Ewen, V. Markl, S. Haridi, and K. Tzoumas, “Apache flink™: stream and batch processing in a single engine,” Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, vol. 38, pp. 28–38, 2015. View at: Google Scholar
  8. D. Warneke and O. Kao, “Nephele: efficient parallel data processing in the cloud,” in Proceedings of the 2nd Workshop on Many-Task Computing on Grids and Supercomputers, Portland, Oregon, Novembver 2009. View at: Publisher Site | Google Scholar
  9. A. Mesos, 2020, https://mesos.apache.org/.
  10. A. Yarn, 2020, https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarnsite/YARN.html/.
  11. G. Kubernetes, 2020, https://kubernetes.io/.
  12. B. Shahriari, K. Swersky, Z. Wang, R. P. Adams, and N. de Freitas, “Taking the human out of the loop: a review of Bayesian optimization,” Proceedings of the IEEE, vol. 104, no. 1, pp. 148–175, 2016. View at: Publisher Site | Google Scholar
  13. O. Alipourfard, H. H. Liu, J. Chen, S. Venkataraman, M. Yu, and M. Zhang, “Cherrypick: adaptively unearthing the best cloud configurations for big data analytics,” in 14th USENIX Symposium on Networked Systems Design and Implementation (NSDI 17), pp. 469–482, Boston, MA: USENIX Association, March 2017, https://www.usenix.org/conference/nsdi17/technical-sessions/presentation/alipourfard/. View at: Google Scholar
  14. X. Hua, M. C. Huang, and P. Liu, “Hadoop configuration tuning with ensemble modeling and metaheuristic optimization,,” IEEE Access, vol. 6, pp. 44161–44174, 2018. View at: Publisher Site | Google Scholar
  15. S. M. Nabavinejad and M. Goudarzi, “Faster mapreduce computation on clouds through better performance estimation,” IEEE Transactions on Cloud Computing, vol. 7, no. 3, pp. 770–783, 2019. View at: Publisher Site | Google Scholar
  16. A. Gounaris, G. Kougka, R. Tous, C. T. Montes, and J. Torres, “Dynamic configuration of partitioning in spark applications,” IEEE Transactions on Parallel and Distributed Systems, vol. 28, no. 7, pp. 1891–1904, 2017. View at: Publisher Site | Google Scholar
  17. HiBench Suite, 2020, https://github.com/Intel-bigdata/HiBench/.
  18. H. Herodotou, Y. Chen, and J. Lu, “A survey on automatic parameter tuning for big data processing systems,” ACM Computing Surveys (CSUR), vol. 53, no. 2, 2020. View at: Publisher Site | Google Scholar
  19. H. Herodotou and S. Babu, “Profiling, what-if analysis, and cost-based optimization of mapreduce programs,” Proceedings of the VLDB Endowment, vol. 4, no. 11, pp. 1111–1122, 2011. View at: Publisher Site | Google Scholar
  20. H. Herodotou, “Hadoop performance models,” 2011, http://arxiv.org/abs/1106.0940. View at: Google Scholar
  21. H. Herodotou, H. Lim, G. Luo et al., “Starfish: a self-tuning system for big data analytics,” Classless Inter-Domain Routing, vol. 11, pp. 261–272, 2011. View at: Google Scholar
  22. A. E. Gencer, D. Bindel, E. G. Sirer, and R. van Renesse, “Configuring distributed computations using response surfaces,” in Proceedings of the 16th Annual Middleware Conference, pp. 235–246, Vancouver, BC, Canada: Association for Computing Machinery, 2015. View at: Publisher Site | Google Scholar
  23. Z. Bei, Z. Yu, H. Zhang et al., “Rfhoc: a random-forest approach to auto-tuning Hadoop’s configuration,” IEEE Transactions on Parallel and Distributed Systems, vol. 27, no. 5, pp. 1470–1483, 2016. View at: Publisher Site | Google Scholar
  24. Z. Bei, Z. Yu, Q. Liu, C. Xu, S. Feng, and S. Song, “Mest: a model-driven efficient searching approach for mapreduce self-tuning,” IEEE Access, vol. 5, pp. 3580–3593, 2017. View at: Publisher Site | Google Scholar
  25. Z. Bei, Z. Yu, N. Luo, C. Jiang, C. Xu, and S. Feng, “Configuring in-memory cluster computing using random forest,” Future Generation Computer Systems, vol. 79, pp. 1–15, 2018. View at: Publisher Site | Google Scholar
  26. Z. Yu, Z. Bei, and X. Qian, “Datasize-aware high dimensional configurations auto-tuning of in-memory cluster computing,” in Proceedings of the Twenty-Third International Conference on Architectural Support for Programming Languages and Operating Systems, pp. 564–577, Williamsburg, VA, USA, March 2018. View at: Publisher Site | Google Scholar
  27. T. Li, J. Tang, and J. Xu, “Performance modeling and predictive scheduling for distributed stream data processing,” IEEE Transactions on Big Data, vol. 2, no. 4, pp. 353–364, 2016. View at: Publisher Site | Google Scholar
  28. M. Trotter, G. Liu, and T. Wood, “Into the storm: descrying optimal configurations using genetic algorithms and Bayesian optimization,” in 2017 IEEE 2nd International Workshops on Foundations and Applications of Self Systems (FASW), pp. 175–180, Tucson, AZ, USA, September 2017. View at: Publisher Site | Google Scholar
  29. J.-C. Lin, M.-C. Lee, I. C. Yu, and E. B. Johnsen, “Modeling and simulation of spark streaming,” in 2018 IEEE 32nd International Conference on Advanced Information Networking and Applications (AINA), pp. 407–413, Krakow, Poland, May 2018. View at: Publisher Site | Google Scholar
  30. L. M. Vaquero and F. Cuadrado, “Auto-tuning distributed stream processing systems using reinforcement learning,” 2018, http://arxiv.org/abs/1809.05495. View at: Google Scholar
  31. T. Das, Y. Zhong, I. Stoica, and S. Shenker, “Adaptive stream processing using dynamic batch sizing,” in Proceedings of the ACM Symposium on Cloud Computing, pp. 1–13, Seattle, WA, USA, 2014. View at: Publisher Site | Google Scholar
  32. M. Petrov, N. Butakov, D. Nasonov, and M. Melnik, “Adaptive performance model for dynamic scaling apache spark streaming,” Procedia Computer Science, vol. 136, pp. 109–117, 2018. View at: Publisher Site | Google Scholar
  33. “Scikit-learn,” 2020, https://scikit-learn.org/stable/. View at: Google Scholar
  34. J. Y. Hesterman, L. Caucci, M. A. Kupinski, H. H. Barrett, and L. R. Furenlid, “Maximum likelihood estimation with a contracting-grid search algorithm,” IEEE Transactions on Nuclear Science, vol. 57, no. 3, pp. 1077–1084, 2010. View at: Publisher Site | Google Scholar
  35. J. Bergstra and Y. Bengio, “Random search for hyper-parameter optimization,” Journal of Machine Learning Research, vol. 13, no. l, pp. 281–305, 2012. View at: Google Scholar
  36. J. Escobar, R. Linfati, P. Toth, and M. Baldoquin, “A hybrid granular tabu search algorithm for the multi-depot vehicle routing problem,” Journal of Heuristics, vol. 20, no. 5, pp. 483–509, 2014. View at: Publisher Site | Google Scholar
  37. M. Albayrak and N. Allahverdi, “Development a new mutation operator to solve the traveling salesman problem by aid of genetic algorithms,” Expert Systems with Applications, vol. 38, no. 3, pp. 1313–1320, 2011. View at: Publisher Site | Google Scholar
  38. “Apache Flink configuration,” 2020, https://ci.apache.org/projects/flink/flink–docs-release-1.11/ops/config.html/. View at: Google Scholar
  39. A. Kafka, 2020, https://kafka.apache.org/.

Copyright © 2022 Shixin Huang et al. Exclusive Licensee Zhejiang Lab, China. Distributed under a Creative Commons Attribution License (CC BY 4.0).

 PDF Download Citation Citation
Views396
Downloads114
Altmetric Score
Citations