Skip to content

Commit

Permalink
Improve pulsar functions logging (apache#165)
Browse files Browse the repository at this point in the history
- inject instance id in thread context. so we can have instance id information in function log
- rename dlog logging to bk logging, so all bk/dlog logging can be routed there
- add more variables to allow overrides by sys variables
- fixes some scripts
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 4396db6 commit 989f59d
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 40 deletions.
4 changes: 4 additions & 0 deletions pulsar-functions/bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,14 @@ OPTS="$OPTS $PULSAR_EXTRA_OPTS"
# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}

#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"

#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
Expand Down
4 changes: 4 additions & 0 deletions pulsar-functions/bin/pulsar-functions
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ OPTS="$OPTS $PULSAR_EXTRA_OPTS"
# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}

#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"

#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
Expand Down
116 changes: 96 additions & 20 deletions pulsar-functions/conf/log4j2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,23 @@
#

Configuration:
name: test
name: pulsar-functions
monitorInterval: 30

Properties:
Property:
- name: "pulsar.log.dir"
value: "logs"
- name: "pulsar.log.file"
value: "pulsar-functions.log"
- name: "pulsar.log.appender"
value: "RoutingAppender"
- name: "pulsar.log.level"
value: "info"
- name: "pulsar.routing.appender.default"
value: "Console"
- name: "bk.log.level"
value: "info"

Appenders:

Expand All @@ -32,9 +48,9 @@ Configuration:
# Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.dlog"
filePattern: "/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: false
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: true
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
Expand All @@ -57,35 +73,95 @@ Configuration:
IfLastModified:
age: 30d

# Rolling file appender configuration for bk
RollingRandomAccessFile:
name: BkRollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: true
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}.bk*log.gz"
IfLastModified:
age: 30d

# Routing
Routing:
name: RoutingAppender
Routes:
pattern: "$${ctx:function}"
Route:
-
RollingFile:
name: "Rolling-${ctx:function}"
fileName : "logs/functions/${ctx:function}/function.log"
filePattern : "logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz"
PatternLayout:
Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] %logger{1} - %msg%n"
SizeBasedTriggeringPolicy:
size: "20MB"

- ref: Console
-
Routing:
name: InstanceRoutingAppender
Routes:
pattern: "$${ctx:instance}"
Route:
-
RollingFile:
name: "Rolling-${ctx:function}"
fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/function.log"
filePattern : "${sys:pulsar.log.dir}/functions/${ctx:function}-%d{MM-dd-yyyy}-%i.log.gz"
PatternLayout:
Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: "20MB"
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"

Loggers:

Root:
level: info
Logger:
name: org.apache.bookkeeper
level: "${sys:bk.log.level}"
additivity: false
AppenderRef:
- ref: "${sys:pulsar.log.appender}"
- ref: BkRollingFile

Logger:
name: org.apache.distributedlog
level: info
level: "${sys:bk.log.level}"
additivity: false
AppenderRef:
- ref: RollingFile
- ref: BkRollingFile

Root:
level: info
AppenderRef:
- ref: "${sys:pulsar.log.appender}"
level: "${sys:pulsar.log.level}"
2 changes: 1 addition & 1 deletion pulsar-functions/run-counter-examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

bin/pulsar-functions functions localrun \
bin/pulsar-functions --admin-url http://localhost:8080 functions localrun \
--function-config conf/example.yml \
--source-topics persistent://sample/standalone/ns1/test_src \
--sink-topic persistent://sample/standalone/ns1/test_result \
Expand Down
37 changes: 37 additions & 0 deletions pulsar-functions/run-instance.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

java -cp runtime/target/java-instance.jar -Dlog4j.configurationFile=java_instance_log4j2.yml org.apache.pulsar.functions.runtime.instance.JavaInstanceMain \
--name example \
--tenant "test" \
--namespace test-namespace \
--source_topics persistent://sample/standalone/ns1/test_src \
--sink_topic persistent://sample/standalone/ns1/test_result \
--input_serde_classnames org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--output_serde_classname org.apache.pulsar.functions.api.utils.Utf8StringSerDe \
--processing_guarantees ATMOST_ONCE \
--instance_id test-instance-id \
--function_id test-function-id \
--function_version "1.0" \
--pulsar_serviceurl "pulsar://localhost:6650/" \
--port 0 \
--max_buffered_tuples 1000 \
--function_classname org.apache.pulsar.functions.api.examples.CounterFunction \
--state_storage_serviceurl localhost:4182 \
--jar `pwd`/java-examples/target/pulsar-functions-api-examples.jar
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class JavaInstanceMain {
protected String userConfig;

@Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
protected String autoAck;
protected String autoAck = "true";

private Thread fnThread;
private JavaInstanceRunnable javaInstanceRunnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
@Override
public void run() {
try {
// initialize the thread context
ThreadContext.put("function", FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));
ThreadContext.put("instance", instanceConfig.getInstanceId());

log.info("Starting Java Instance {}", instanceConfig.getFunctionConfig().getName());

// start the function thread
Expand All @@ -136,9 +140,6 @@ public void run() {
// start the source consumer
startSourceConsumers();

// initialize the thread context
ThreadContext.put("function", FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));

this.outputSerDe = initializeSerDe(instanceConfig.getFunctionConfig().getOutputSerdeClassName(), clsLoader);

javaInstance = new JavaInstance(instanceConfig, clsLoader, client,
Expand Down
Loading

0 comments on commit 989f59d

Please sign in to comment.