Skip to content

Commit

Permalink
fix: remove delete_function #315
Browse files Browse the repository at this point in the history
  • Loading branch information
gangliao committed Dec 1, 2021
1 parent 7237860 commit 7deb1b9
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 127 deletions.
80 changes: 49 additions & 31 deletions bench/nexmark/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use nexmark::NexMarkSource;
use runtime::prelude::*;
use rusoto_core::Region;
use rusoto_lambda::{
CreateFunctionRequest, DeleteFunctionRequest, GetFunctionRequest, InvocationRequest,
InvocationResponse, Lambda, LambdaClient, PutFunctionConcurrencyRequest,
CreateFunctionRequest, FunctionCode, GetFunctionRequest, InvocationRequest, InvocationResponse,
Lambda, LambdaClient, PutFunctionConcurrencyRequest, UpdateFunctionCodeRequest,
};
use serde_json::Value;
use std::sync::Arc;
Expand Down Expand Up @@ -265,6 +265,9 @@ async fn set_lambda_concurrency(function_name: String, concurrency: i64) -> Resu

/// Creates a single lambda function using bootstrap.zip in Amazon S3.
async fn create_lambda_function(ctx: &ExecutionContext) -> Result<String> {
let s3_bucket = globals["lambda"]["s3_bucket"].to_string();
let s3_key = globals["lambda"]["s3_nexmark_key"].to_string();
let func_name = ctx.name.clone();
if LAMBDA_CLIENT
.get_function(GetFunctionRequest {
function_name: ctx.name.clone(),
Expand All @@ -273,39 +276,54 @@ async fn create_lambda_function(ctx: &ExecutionContext) -> Result<String> {
.await
.is_ok()
{
// To avoid obsolete code on S3, remove the previous lambda function.
LAMBDA_CLIENT
.delete_function(DeleteFunctionRequest {
function_name: ctx.name.clone(),
match LAMBDA_CLIENT
.update_function_code(UpdateFunctionCodeRequest {
function_name: func_name.clone(),
s3_bucket: Some(s3_bucket.clone()),
s3_key: Some(s3_key.clone()),
..Default::default()
})
.await
.map_err(|e| FlockError::Internal(e.to_string()))?;
}

match LAMBDA_CLIENT
.create_function(CreateFunctionRequest {
code: lambda::nexmark_function_code(),
environment: lambda::environment(&ctx),
function_name: ctx.name.clone(),
handler: lambda::handler(),
memory_size: lambda::memory_size(&ctx),
role: lambda::role().await,
runtime: lambda::runtime(),
..Default::default()
})
.await
{
Ok(config) => {
return config.function_arn.ok_or_else(|| {
FlockError::Internal("Unable to find lambda function arn.".to_string())
})
{
Ok(config) => {
return config.function_name.ok_or_else(|| {
FlockError::Internal("Unable to find lambda function arn.".to_string())
})
}
Err(err) => {
return Err(FlockError::Internal(format!(
"Failed to update lambda function: S3 Bucket: {}. S3 Key: {}. {}",
s3_bucket, s3_key, err
)))
}
}
Err(err) => {
return Err(FlockError::Internal(format!(
"Failed to create lambda function: {}",
err
)))
} else {
match LAMBDA_CLIENT
.create_function(CreateFunctionRequest {
code: FunctionCode {
s3_bucket: Some(s3_bucket.clone()),
s3_key: Some(s3_key.clone()),
..Default::default()
},
function_name: func_name.clone(),
handler: lambda::handler(),
role: lambda::role().await,
runtime: lambda::runtime(),
..Default::default()
})
.await
{
Ok(config) => {
return config.function_name.ok_or_else(|| {
FlockError::Internal("Unable to find lambda function arn.".to_string())
})
}
Err(err) => {
return Err(FlockError::Internal(format!(
"Failed to create lambda function: {}",
err
)))
}
}
}
}
Expand Down
28 changes: 14 additions & 14 deletions docs/benchmarks/ec2.scrbl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

@section{Install Docker in EC2 Instance}

In this section, we will describe how to deploy a Flink cluster on Amazon EC2. We use a @bold{r4.8xlarge} instance
to run the Flink cluster. Each instance has 32 threads and 244 GB of memory.
In this section, we will describe how to deploy a Flink cluster on Amazon EC2. We use a @bold{r4.8xlarge} instance
to run the Flink cluster. Each instance has 32 threads and 244 GB of memory.

The first step is to login to the EC2 instance. We use the @bold{ssh} command to login to the instance.

Expand All @@ -45,7 +45,7 @@ Warning: Permanently added 'ec2-18-208-228-89.compute-1.amazonaws.com,18.208.228
https://aws.amazon.com/amazon-linux-2/


[ec2-user ~]$
[ec2-user ~]$
}

Then, we need to install Docker on the instance. We use the @bold{sudo} command to install Docker.
Expand All @@ -57,7 +57,7 @@ sudo service docker start
sudo usermod -a -G docker ec2-user
}

But there is one more subtle detail. Every time your Amazon AMI is rebooted, you want the Docker service to remain
But there is one more subtle detail. Every time your Amazon AMI is rebooted, you want the Docker service to remain
up and running. Therefore, we have one final command to use:

@bash-repl{
Expand All @@ -75,9 +75,9 @@ CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES

@section{Install Docker-Compose in EC2 Instance}

@bold{docker-compose} is a very convenient tool to work with. If you have a few Docker images, you quickly get
tired of entering everything via the command line. This is where docker-compose comes in. It allows you to
configure all the images in one place. An application consisting of multiple containers can easily be started
@bold{docker-compose} is a very convenient tool to work with. If you have a few Docker images, you quickly get
tired of entering everything via the command line. This is where docker-compose comes in. It allows you to
configure all the images in one place. An application consisting of multiple containers can easily be started
with the command @bold{docker-compose up -d} and stopped with @bold{docker-compose down}.

You can install Docker-Compose using the following commands:
Expand All @@ -95,12 +95,12 @@ You can install Docker-Compose using the following commands:
Docker Compose version v2.2.0
}

When you've done all that, you can choose to append those commands to your user data (without sudo) as well.
When you've done all that, you can choose to append those commands to your user data (without sudo) as well.
Now you've got docker-compose as a tool available inside EC2!

@section{How To Pull Docker Images From Amazon ECR}

You'll probably try is to pull a Docker image from Amazon ECR. If you don't configure anything, it will fail.
You'll probably try is to pull a Docker image from Amazon ECR. If you don't configure anything, it will fail.
There are two things you need to fix to make that work.

@itemlist[#:style 'ordered
Expand All @@ -121,8 +121,8 @@ There are two things you need to fix to make that work.

@section{How To Run Flink on EC2}

The next step is to run Flink on EC2. We'll use the @bold{flink-1.13.3-scala_2.12-java11} image. To deploy a Flink Session cluster with Docker,
you need to start a JobManager container. To enable communication between the containers, we first set a required
The next step is to run Flink on EC2. We'll use the @bold{flink-1.13.3-scala_2.12-java11} image. To deploy a Flink Session cluster with Docker,
you need to start a JobManager container. To enable communication between the containers, we first set a required
Flink configuration property and create a network:

@bash-repl{
Expand All @@ -139,7 +139,7 @@ docker run \
--network flink-network \
--publish 8081:8081 \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:latest jobmanager &
flink:latest jobmanager &
}

and one or more TaskManager containers:
Expand All @@ -150,7 +150,7 @@ docker run \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:latest taskmanager &
flink:latest taskmanager &
}

@bash-repl{
Expand All @@ -165,5 +165,5 @@ The web interface is now available at localhost:8081.
@image[#:scale 1/2]{img/benchmarks/flink-ui.png}
The Flink web interface

To shut down the cluster, either terminate (e.g. with CTRL-C) the JobManager and TaskManager processes, or
To shut down the cluster, either terminate (e.g. with CTRL-C) the JobManager and TaskManager processes, or
use @bold{docker ps} to identify and @bold{docker stop} to terminate the containers.
40 changes: 20 additions & 20 deletions docs/benchmarks/eks.scrbl
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,37 @@

@section{Overview}

Flink supports different deployment modes when running on @link["https://kubernetes.io/"]{Kubernetes}. We
will show you how to deploy Flink on Kubernetes using the
Flink supports different deployment modes when running on @link["https://kubernetes.io/"]{Kubernetes}. We
will show you how to deploy Flink on Kubernetes using the
@link["https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/"]{Native Kubernetes Deployment}.

@section{Amazon EKS}

@link["https://aws.amazon.com/eks/"]{Amazon EKS} is a fully managed Kubernetes service. EKS supports creating and managing spot instances using Amazon EKS
@link["https://aws.amazon.com/eks/"]{Amazon EKS} is a fully managed Kubernetes service. EKS supports creating and managing spot instances using Amazon EKS
managed node groups following Spot best practices. This enables you to take advantage of the steep savings and scale
that Spot Instances provide for interruptible workloads. EKS-managed node groups require less operational effort
that Spot Instances provide for interruptible workloads. EKS-managed node groups require less operational effort
compared to using self-managed nodes.

Flink can run jobs on Kubernetes via Application and Session Modes only.

@section{AWS Spot Instances}

Flink is distributed to manage and process high volumes of data. Designed for failure, they can run on machines with
different configurations, inherently resilient and flexible. Spot Instances can optimize runtimes by increasing
Flink is distributed to manage and process high volumes of data. Designed for failure, they can run on machines with
different configurations, inherently resilient and flexible. Spot Instances can optimize runtimes by increasing
throughput, while spending the same (or less). Flink can tolerate interruptions using restart and failover strategies.

Job Manager and Task Manager are key building blocks of Flink. The Task Manager is the compute intensive part and
Job Manager is the orchestrator. We would be running Task Manager on Spot Instances and Job Manager on On Demand
Job Manager and Task Manager are key building blocks of Flink. The Task Manager is the compute intensive part and
Job Manager is the orchestrator. We would be running Task Manager on Spot Instances and Job Manager on On Demand
Instances.

Flink supports elastic scaling via @link["https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/"]{Reactive Mode}.
Flink supports elastic scaling via @link["https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/"]{Reactive Mode}.
This is ideal with Spot Instances as it implements elastic scaling with higher throughput in a cost optimized way.

@section{Flink Deployment}

For production use, we recommend deploying Flink Applications in the @bold{Application Mode}, as these modes provide a
better isolation for the Applications. We will be bundling the user code in the Flink image for that purpose and
upload in @link["https://aws.amazon.com/ecr/"]{Amazon ECR}. Amazon ECR is a fully managed container registry that
For production use, we recommend deploying Flink Applications in the @bold{Application Mode}, as these modes provide a
better isolation for the Applications. We will be bundling the user code in the Flink image for that purpose and
upload in @link["https://aws.amazon.com/ecr/"]{Amazon ECR}. Amazon ECR is a fully managed container registry that
makes it easy to store, manage, share, and deploy your container images and artifacts anywhere.

@itemlist[#:style 'ordered
Expand All @@ -76,15 +76,15 @@ Run the following to create the policy. Note the ARN.}
@item{Cluster and node groups deployment
@itemlist[
@item{Create an EKS cluster. The cluster takes approximately 15 minutes to launch.}
@item{Create the node group using the nodeGroup config file. We are using multiple nodeGroups of different
sizes to adapt Spot best practice of diversification. Replace the <<Policy ARN>> string using the ARN string
@item{Create the node group using the nodeGroup config file. We are using multiple nodeGroups of different
sizes to adapt Spot best practice of diversification. Replace the <<Policy ARN>> string using the ARN string
from the previous step.}
@item{Download the Cluster Autoscaler and edit it to add the cluster-name.}
]}

@item{Install the Cluster AutoScaler using the following command: kubectl apply -f cluster-autoscaler-autodiscover.yaml
@itemlist[
@item{Using EKS Managed node groups requires significantly less operational effort compared to using self-managed
@item{Using EKS Managed node groups requires significantly less operational effort compared to using self-managed
node group and enables: 1) Auto enforcement of Spot best practices. 2) Spot Instance lifecycle management. 3)
Auto labeling of Pods.}
@item{eksctl has integrated @link["https://github.com/aws/amazon-ec2-instance-selector"]{amazon-ec2-instance-selector}
Expand All @@ -101,13 +101,13 @@ $ kubectl create clusterrolebinding flink-role-binding-flink \

@item{Deploy Flink

This install folder here has all the YAML files required to deploy a standalone Flink cluster. Run the install.sh
file. This will deploy the cluster with a JobManager, a pool of TaskManagers and a Service exposing JobManager’s
This install folder here has all the YAML files required to deploy a standalone Flink cluster. Run the install.sh
file. This will deploy the cluster with a JobManager, a pool of TaskManagers and a Service exposing JobManager’s
ports.

@itemlist[
@item{This is a High-Availability(HA) deployment of Flink with the use of Kubernetes high availability service.}
@item{The JobManager runs on OnDemand and TaskManager on Spot. As the cluster is launched in Application Mode,
@item{The JobManager runs on OnDemand and TaskManager on Spot. As the cluster is launched in Application Mode,
if a node is interrupted only one job will be restarted.}
@item{Autoscaling is enabled by the use of Reactive Mode. Horizontal Pod Autoscaler is used to monitor the CPU
load and scale accordingly.}
Expand All @@ -119,8 +119,8 @@ load and scale accordingly.}

@section{Conclusion}

In this post, we demonstrated how you can run Flink workloads on a Kubernetes Cluster using Spot Instances,
achieving scalability, resilience, and cost optimization.
In this post, we demonstrated how you can run Flink workloads on a Kubernetes Cluster using Spot Instances,
achieving scalability, resilience, and cost optimization.


@section[#:style 'unnumbered]{Reference}
Expand Down
26 changes: 13 additions & 13 deletions docs/benchmarks/flink.scrbl
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
#lang scribble/manual
@title[#:tag "flink_intro" #:style 'unnumbered]{Introduction to Apache Flink}

@link["https://flink.apache.org/"]{Apache Flink} is a framework and distributed processing engine for stateful
computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster
@link["https://flink.apache.org/"]{Apache Flink} is a framework and distributed processing engine for stateful
computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster
environments, perform computations at in-memory speed and at any scale.

Flink enables you to perform transformations on many different data sources,
such as Amazon Kinesis Streams or the Apache Cassandra database. Flink has some SQL support for these stream and batch datasets.
such as Amazon Kinesis Streams or the Apache Cassandra database. Flink has some SQL support for these stream and batch datasets.
Flink’s API is categorized into DataSets and DataStreams. DataSets are transformations on sets or collections of
distributed data, while DataStreams are transformations on streaming data like those found in Amazon Kinesis.

The Flink runtime consists of two different types of daemons: @bold{JobManagers}, which are responsible for
coordinating scheduling, checkpoint, and recovery functions, and @bold{TaskManagers}, which are the worker processes
that execute tasks and transfer data between streams in an application. Each application has one JobManager
The Flink runtime consists of two different types of daemons: @bold{JobManagers}, which are responsible for
coordinating scheduling, checkpoint, and recovery functions, and @bold{TaskManagers}, which are the worker processes
that execute tasks and transfer data between streams in an application. Each application has one JobManager
and at least one TaskManager.


You can use the Docker images to deploy a @bold{Session} or @bold{Application} cluster on Docker.

@itemlist[#:style 'unnumbered

@item{@bold{Application Mode}: This is a lightweight and scalable way to submit an application on Flink and is the preferred way
to launch application as it supports better resource isolation. Resource isolation is achieved by running a cluster
@item{@bold{Application Mode}: This is a lightweight and scalable way to submit an application on Flink and is the preferred way
to launch application as it supports better resource isolation. Resource isolation is achieved by running a cluster
per job. Once the application shuts down all the Flink components are cleaned up.}

@item{@bold{Session Mode}: This is a long running Kubernetes deployment of Flink. Multiple applications can be launched on a
cluster and the applications competes for the resources. There may be multiple jobs running on a TaskManager in
parallel. Its main advantage is that it saves time on spinning up a new Flink cluster for new jobs, however if one
@item{@bold{Session Mode}: This is a long running Kubernetes deployment of Flink. Multiple applications can be launched on a
cluster and the applications competes for the resources. There may be multiple jobs running on a TaskManager in
parallel. Its main advantage is that it saves time on spinning up a new Flink cluster for new jobs, however if one
of the Task Managers fails it may impact all the jobs running on that.}
]

Expand All @@ -39,7 +39,7 @@ For Flink, we have the following questions:

@itemlist[#:style 'ordered

@item{What is the performance and price difference between users deploying Flink to
@item{What is the performance and price difference between users deploying Flink to
@link["https://aws.amazon.com/ec2"]{AWS EC2} and directly using serverless services provided by cloud
service providers such as @link["https://aws.amazon.com/kinesis/data-analytics/"]{Amazon Kinesis Data Analytics}?}

Expand All @@ -51,5 +51,5 @@ more convenient, effective and affordable for users?}

]

In order to answer these questions, let's talk about how to deploy Flink on Amazon EC2 nodes and how to configure
In order to answer these questions, let's talk about how to deploy Flink on Amazon EC2 nodes and how to configure
Amazon Kinesis Data Analytics to run Flink jobs.
2 changes: 0 additions & 2 deletions docs/benchmarks/kinesis.scrbl
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
#lang scribble/manual
@title[#:tag "flink_kinesis" #:style 'unnumbered]{Amazon Kinesis Data Analytics for Apache Flink}


Loading

0 comments on commit 7deb1b9

Please sign in to comment.