-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Policy Recommendation Spark job and image #16
Conversation
ad8e9e9
to
9b7034b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
@@ -0,0 +1,19 @@ | |||
FROM gcr.io/spark-operator/spark-py:v3.1.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo for a future PR: Consider whether this image can be moved in the antrea dockerhub as we did for the other 3rd party images used by Theia
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, yes we could tag this image and push to our docker hub too.
"""Returns the model properties as a dict""" | ||
result = {} | ||
|
||
for attr, _ in six.iteritems(self.attribute_types): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance here you can just use .items because you don't have to worry about python2 compatibility.
In case the image defaults to python2, you can change the /usr/bin/python symlink or explicitly trigger the job with python3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, changed.
print(help_message) | ||
sys.exit(2) | ||
for opt, arg in opts: | ||
if opt in ("-h", "--help"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd check for -h before entering the loop, a user my specify also other options, we don't want to parse them if -h is specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, thanks!
d5b1586
to
5670828
Compare
# Select user trusted denied flows when unprotected equals False | ||
sql_query += " WHERE trusted == 1" | ||
if start_time: | ||
sql_query += " AND flowEndSeconds >= '{}'".format(start_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UX question: the condition above captures flows that were completed after the requested start time.
In the case of start_time, would it make sense to instead capture the flows that were already started at start_time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, changed it to check flowStartSeconds
instead.
) | ||
else: | ||
print("Warning: egress tuple {} has wrong format".format(egress)) | ||
return "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be considered an error? If so, should we fail the job instead of returning an empty string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me, marked this as a fatal error, and stopped the spark job immediately.
def recommend_antrea_policies(flows_df, option=1, deny_rules=True, to_services=True): | ||
ingress_rdd = flows_df.filter(flows_df.flowType != "pod_to_external")\ | ||
.rdd.map(map_flow_to_ingress)\ | ||
.reduceByKey(lambda a, b: (a[0]+PEER_DELIMITER+b[0], "")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(to no address in this PR) you can consider using a NamedTuple for src and dest so instead of referring by item 0 and item 1 you can refer to them as "src" and "dest"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In PySpark, I think to achieve a 'namedTuple' like data structure I need to change the current RDD to the Dataframe type. This will involve lots of changes in the computation code. Could we mark this as a TODO for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surely it can be TODO, Ignore it for this PR.
The namedtuple would actually just be some syntactic sugar, where you access item in a tuple as if you were accessing an object.
I would not think this requires using a DataFrame, but if that's the case, it's surely not worth the effort.
.option("password", os.getenv("CH_PASSWORD")) \ | ||
.option("dbtable", table_name) \ | ||
.save() | ||
return recommendation_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yanjunz97 do you think we need to update the clickhouse monitor to periodically clean up recommendation results as well? Do you think perhaps we might need to define an expiration time for results, as perhaps it's not ok to bluntly delete reco results when memory exceeds threshold.
In any case, I am ok not supporting periodic collelction of old reco results in Theia's first release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we do not expect the recommendation results may occupy too many spaces, I think expiration time might be more reasonable comparing to cleaning up by the monitor.
But I'm not sure what expiration time should be chosen. I think a recommendation policy might be useful for a long time. Maybe it is more reasonable to delete them only when users trigger a deletion task from the UI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yanjunz97, that's valuable feedback. Nothing we need to address here, but we surely need a mechanism to handle lifecycle of policy recommendation results.
It might also make sense of using the python logging library instead of printing to stdout. |
Sure, I added the code to use spark logger to replace the print statement. |
Signed-off-by: Yongming Ding <dyongming@vmware.com>
ff8ef3e
to
98ef479
Compare
] | ||
|
||
spark = SparkSession.builder.getOrCreate() | ||
spark.sparkContext.setLogLevel("info") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the log level configrable? It may help for live debug purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's what @dreamtalen reckons. From what I gather this log will only emit what we are logging in this job, and - obviously - we are not logging anything at debug level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank Salvatore help me answer this question.
Yes, I hard-coded log level to info because I only added logs at info, warning, and error level. Also tried changing to "debug" mode and I could see lots of debug logs automatically generated by Spark.
-s, --start_time=None: The start time of the flow records considered for the policy recommendation. | ||
Format is YYYY-MM-DD hh:mm:ss in UTC timezone. Default value is None, which means no limit of the start time of flow records. | ||
-e, --end_time=None: The end time of the flow records considered for the policy recommendation. | ||
Format is YYYY-MM-DD hh:mm:ss in UTC timezone. Default value is None, which means no limit of the end time of flow records. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we may do not have proper flow records in DB, maybe we need a warn message to indicate this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks pretty much good to me.
There are a few pending questions from Ziyou, waiting on approval.
] | ||
|
||
spark = SparkSession.builder.getOrCreate() | ||
spark.sparkContext.setLogLevel("info") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's what @dreamtalen reckons. From what I gather this log will only emit what we are logging in this job, and - obviously - we are not logging anything at debug level.
def recommend_antrea_policies(flows_df, option=1, deny_rules=True, to_services=True): | ||
ingress_rdd = flows_df.filter(flows_df.flowType != "pod_to_external")\ | ||
.rdd.map(map_flow_to_ingress)\ | ||
.reduceByKey(lambda a, b: (a[0]+PEER_DELIMITER+b[0], "")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surely it can be TODO, Ignore it for this PR.
The namedtuple would actually just be some syntactic sugar, where you access item in a tuple as if you were accessing an object.
I would not think this requires using a DataFrame, but if that's the case, it's surely not worth the effort.
.option("password", os.getenv("CH_PASSWORD")) \ | ||
.option("dbtable", table_name) \ | ||
.save() | ||
return recommendation_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yanjunz97, that's valuable feedback. Nothing we need to address here, but we surely need a mechanism to handle lifecycle of policy recommendation results.
Update regarding logs: I found the spark logger only works on the driver and mapped functions running on the executors will meet an error: |
63fbf9e
to
4b55b90
Compare
svc_acnp_list = svc_acnp_rdd.collect() | ||
if deny_rules: | ||
if option == 1: | ||
# Recommend deny ANPs for the applied to groups of allow policies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: appliedTo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank Jianjun, addressed.
logger.error("Error: option {} is not valid".format(option)) | ||
return [] | ||
if option == 3: | ||
# Recommend k8s native network policies for unprotected flows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: NetworkPolicies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We capitalize the first K8s resource or CRD kinds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think code LGTM. Hope issues with logging have been sorted out.
return flow_df | ||
|
||
def write_recommendation_result(spark, result, recommendation_type, db_jdbc_address, table_name, id): | ||
if not id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (perhaps to be addressed in a future PR): id is a reserved keyword in python. In this case it will be correctly interpreted, but using it is a risk for a maintainability perspective (e.g.: we change the param name to something else, no error is thrown when running the code, but "if not id" will always be false!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank Salvatore, that's a fair concern. I rename this parameter to recommendation_id_input
instead.
Signed-off-by: Yongming Ding <dyongming@vmware.com>
In this PR, we add the policy recommendation spark job and image into the Theia repo.
Previously policy recommendation spark job has got several reviews on the Antrea repo at antrea-io/antrea#3064. I'm highlighting these changes compared with that closed PR:
Also, this is only the first PR for the Policy Recommendation feature. I will create subsequent PRs including Documentation, Antctl CLI, unit tests, and e2e tests, to complete this new feature.