Skip to content

Commit

Permalink
Merge pull request #12 from freeznet/fix_forward_request_to_leader
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie authored Feb 4, 2021
2 parents b3a2d35 + de0060f commit cb74f28
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1388,11 +1388,6 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
brokerConfig.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(
"c-" + brokerConfig.getClusterName()
+ "-fw-" + hostname
+ "-" + (workerConfig.getTlsEnabled()
? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort()));
// inherit broker authorization setting
workerConfig.setAuthenticationEnabled(brokerConfig.isAuthenticationEnabled());
workerConfig.setAuthenticationProviders(brokerConfig.getAuthenticationProviders());
Expand All @@ -1403,6 +1398,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());

workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
workerConfig.setTlsEnableHostnameVerification(false);
workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());

Expand All @@ -1418,6 +1414,12 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
workerConfig.setFunctionsWorkerServiceNarPackage(
brokerConfig.getFunctionsWorkerServiceNarPackage());
}

workerConfig.setWorkerId(
"c-" + brokerConfig.getClusterName()
+ "-fw-" + hostname
+ "-" + (workerConfig.getTlsEnabled()
? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort()));
return workerConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ void setup() throws Exception {
config.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
config.setFunctionsWorkerEnabled(true);
config.setTlsEnabled(true);

WorkerConfig workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(config, null);
workerConfig.setPulsarFunctionsNamespace("public/functions");
Expand Down Expand Up @@ -190,6 +191,8 @@ public void testFunctionsCreation() throws Exception {
FunctionConfig functionConfig = createFunctionConfig(jarFilePathUrl, testTenant, "my-ns",
functionName, "my.*", "sink-topic-" + i, "sub-" + i);

log.info(" -------- Start test function : {}", functionName);

pulsarAdmins[i].functions().createFunctionWithUrl(
functionConfig, jarFilePathUrl
);
Expand All @@ -198,6 +201,8 @@ public void testFunctionsCreation() throws Exception {
assertEquals(config.getTenant(), testTenant);
assertEquals(config.getNamespace(), "my-ns");
assertEquals(config.getName(), functionName);

pulsarAdmins[i].functions().deleteFunction(config.getTenant(), config.getNamespace(), config.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public boolean isBrokerClientAuthenticationEnabled() {
private Properties properties = new Properties();

public boolean getTlsEnabled() {
return tlsEnabled || workerPortTls != null;
return tlsEnabled && workerPortTls != null;
}

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void init() {
handlers.add(stats);
server.setHandler(stats);

if (this.workerConfig.getWorkerPortTls() != null) {
if (this.workerConfig.getTlsEnabled()) {
try {
SslContextFactory sslCtxFactory = SecurityUtility.createSslContextFactory(
this.workerConfig.isTlsAllowInsecureConnection(), this.workerConfig.getTlsTrustCertsFilePath(),
Expand Down

0 comments on commit cb74f28

Please sign in to comment.