Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Log based Pulsar #1

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8448198
Allow to configure the default number of bundles for new namespaces (…
merlimat Oct 26, 2017
eb9e5fc
Introduce auto bundle split and unloading of split bundle in ModularL…
rdhabalia Oct 26, 2017
6bd024d
Fix: compilation error for NamespaceServiceTest (#863)
rdhabalia Oct 26, 2017
3e965b2
Make spark consumer-listener lambda Serializable (#866)
rdhabalia Oct 27, 2017
8ba2a7c
Fix: profobuf interface generation on doc (#864)
rdhabalia Oct 27, 2017
e12d0fd
Pulsar-testclient should not fetch the shaded pulsar-client dependenc…
merlimat Oct 28, 2017
5c58367
Add proxy configuration for kubernetes (#856)
afalko Oct 28, 2017
3428cc6
add dlog impl
ArvinDevel Aug 24, 2017
3c2e17a
Initialize DlogBasedPulsar
ArvinDevel Aug 30, 2017
ab32767
sub update
ArvinDevel Aug 30, 2017
07ef7f2
update impl
ArvinDevel Sep 6, 2017
8f5019f
9-6
ArvinDevel Sep 6, 2017
c3daa0a
update before rebase
ArvinDevel Sep 7, 2017
bb27f58
clear before commit
ArvinDevel Sep 8, 2017
8e1f778
internal update
ArvinDevel Sep 14, 2017
9836883
update dlog impl
ArvinDevel Sep 15, 2017
8c58d8d
fix test error
ArvinDevel Sep 15, 2017
af78365
"enable unit test for DlogBasedManagedLedger"
ArvinDevel Sep 21, 2017
1a7263a
"promote the impl, fix null pointer error"
ArvinDevel Sep 23, 2017
c200831
"change mangedCursor's bk to dlog's bk(cut managedLedgerConfig's comp…
ArvinDevel Sep 26, 2017
6d89b57
"update managedLedger test "
ArvinDevel Sep 27, 2017
0f2d8c0
"fix some error, such as change ManagedLedgerConfig to DlogBased..., …
ArvinDevel Oct 3, 2017
1721a45
"fix some error, enable cache"
ArvinDevel Oct 4, 2017
2e19339
"bump netty to 4.1, move log reader to cache"
ArvinDevel Oct 6, 2017
e1d43cd
"refactor initialize log writer"
ArvinDevel Oct 8, 2017
8883157
"unit test use bkshade-jar"
ArvinDevel Oct 10, 2017
2979c13
"disable numEntriesPerLedger test, because dlog doesn't has correspon…
ArvinDevel Oct 10, 2017
de42a7c
"set read timeout, disable some tests relative to MaxEntriesPerLedger"
ArvinDevel Oct 11, 2017
cd41bc3
"enable write flush, add dlogbased ml standalone setup"
ArvinDevel Oct 12, 2017
cd9df2e
add standalone conf
ArvinDevel Oct 16, 2017
a1f887d
handle exceptions in main, and take care of exceptions of localDLMEmu…
ArvinDevel Oct 18, 2017
32422f9
"fix standalone mode error, enable dlog ByteBuf interface"
ArvinDevel Oct 20, 2017
8adf427
"refactor Position representative, deprecate DlogBasedPosition"
ArvinDevel Oct 20, 2017
6889f51
"reduce dlogBased yahoo-bk dependency, bump AsycHttpClient version to…
ArvinDevel Oct 25, 2017
eeb3604
"remove DlogBasedManagedLedgerConfig"
ArvinDevel Oct 28, 2017
ea8693c
remove deprecated using method
ArvinDevel Nov 14, 2017
b5bd481
normalize the entry and cache impl
ArvinDevel Nov 29, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ OPTS="$OPTS -Dlog4j.configuration=`basename $PULSAR_LOG_CONF`"
OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"

OPTS="-cp $PULSAR_CLASSPATH $OPTS"
#OPTS="-verbose:class $OPTS"
OPTS=" -XX:+PrintCommandLineFlags $OPTS"

OPTS="$OPTS $PULSAR_EXTRA_OPTS"

Expand Down
183 changes: 183 additions & 0 deletions conf/bookie.conf.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#/**
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */

## Bookie settings

# Port that bookie server listen on
bookiePort=3181

# TODO: change the journal directory
# Directory Bookkeeper outputs its write ahead log
journalDirectory=/tmp/data/bk/journal

# TODO: change the ledgers directory
# Directory Bookkeeper outputs ledger snapshots
ledgerDirectories=/tmp/data/bk/ledgers

# TODO: change the index directory
# Directory in which index files will be stored.
indexDirectories=/tmp/data/bk/ledgers

# Ledger Manager Class
# What kind of ledger manager is used to manage how ledgers are stored, managed
# and garbage collected. Try to read 'BookKeeper Internals' for detail info.
ledgerManagerType=hierarchical

# Root zookeeper path to store ledger metadata
# This parameter is used by zookeeper-based ledger manager as a root znode to
# store all ledgers.
zkLedgersRootPath=/messaging/bookkeeper/ledgers

# Max file size of entry logger, in bytes
# A new entry log file will be created when the old one reaches the file size limitation
logSizeLimit=1073741823

# Max file size of journal file, in mega bytes
# A new journal file will be created when the old one reaches the file size limitation
#
journalMaxSizeMB=2048

# Max number of old journal file to kept
# Keep a number of old journal files would help data recovery in specia case
#
journalMaxBackups=5

# How long the interval to trigger next garbage collection, in milliseconds
# Since garbage collection is running in background, too frequent gc
# will heart performance. It is better to give a higher number of gc
# interval if there is enough disk capacity.
# gc per 1 hour (aligning with most DL rolling interval)
gcInitialWaitTime=600000
gcWaitTime=3600000
# do minor compaction per 2 hours
minorCompactionInterval=7200
minorCompactionThreshold=0.2
# disable major compaction
majorCompactionInterval=0
# reduce major compaction threshold to a low value to prevent bad force compaction behavior
majorCompactionThreshold=0.3
# Compaction Rate & Max Outstanding
compactionRate=10737418
compactionMaxOutstandingRequests=10737418

# How long the interval to flush ledger index pages to disk, in milliseconds
# Flushing index files will introduce much random disk I/O.
# If separating journal dir and ledger dirs each on different devices,
# flushing would not affect performance. But if putting journal dir
# and ledger dirs on same device, performance degrade significantly
# on too frequent flushing. You can consider increment flush interval
# to get better performance, but you need to pay more time on bookie
# server restart after failure.
#
flushInterval=1000

# Interval to watch whether bookie is dead or not, in milliseconds
#
# bookieDeathWatchInterval=1000

## zookeeper client settings

# A list of one of more servers on which zookeeper is running.
# The server list can be comma separated values, for example:
# zkServers=zk1:2181,zk2:2181,zk3:2181
zkServers=localhost:2181

# ZooKeeper client session timeout in milliseconds
# Bookie server will exit if it received SESSION_EXPIRED because it
# was partitioned off from ZooKeeper for more than the session timeout
# JVM garbage collection, disk I/O will cause SESSION_EXPIRED.
# Increment this value could help avoiding this issue
zkTimeout=30000

## NIO Server settings

# This settings is used to enabled/disabled Nagle's algorithm, which is a means of
# improving the efficiency of TCP/IP networks by reducing the number of packets
# that need to be sent over the network.
# If you are sending many small messages, such that more than one can fit in
# a single IP packet, setting server.tcpnodelay to false to enable Nagle algorithm
# can provide better performance.
# Default value is true.
#
serverTcpNoDelay=true

## ledger cache settings

# Max number of ledger index files could be opened in bookie server
# If number of ledger index files reaches this limitation, bookie
# server started to swap some ledgers from memory to disk.
# Too frequent swap will affect performance. You can tune this number
# to gain performance according your requirements.
openFileLimit=20000

# Size of a index page in ledger cache, in bytes
# A larger index page can improve performance writing page to disk,
# which is efficent when you have small number of ledgers and these
# ledgers have similar number of entries.
# If you have large number of ledgers and each ledger has fewer entries,
# smaller index page would improve memory usage.
pageSize=8192

# How many index pages provided in ledger cache
# If number of index pages reaches this limitation, bookie server
# starts to swap some ledgers from memory to disk. You can increment
# this value when you found swap became more frequent. But make sure
# pageLimit*pageSize should not more than JVM max memory limitation,
# otherwise you would got OutOfMemoryException.
# In general, incrementing pageLimit, using smaller index page would
# gain bettern performance in lager number of ledgers with fewer entries case
# If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute
# the limitation of number of index pages.
pageLimit=131072

#If all ledger directories configured are full, then support only read requests for clients.
#If "readOnlyModeEnabled=true" then on all ledger disks full, bookie will be converted
#to read-only mode and serve only read requests. Otherwise the bookie will be shutdown.
readOnlyModeEnabled=true

# Bookie Journal Settings
writeBufferSizeBytes=262144
journalFlushWhenQueueEmpty=false
journalRemoveFromPageCache=true
journalAdaptiveGroupWrites=true
journalMaxGroupWaitMSec=4
journalBufferedEntriesThreshold=180
journalBufferedWritesThreshold=131072
journalMaxGroupedEntriesToCommit=200
journalPreAllocSizeMB=4

# Sorted Ledger Storage Settings
sortedLedgerStorageEnabled=true
skipListSizeLimit=67108864
skipListArenaChunkSize=2097152
skipListArenaMaxAllocSize=131072
fileInfoCacheInitialCapacity=10000
fileInfoMaxIdleTime=3600

# Bookie Threads Settings (NOTE: change this to align the cpu cores)
numAddWorkerThreads=4
numJournalCallbackThreads=4
numReadWorkerThreads=4
numLongPollWorkerThreads=4

# stats
statsProviderClass=org.apache.bookkeeper.stats.CodahaleMetricsServletProvider
# Exporting codahale stats
codahaleStatsHttpPort=9001
useHostNameAsBookieID=true
allowLoopback=true
21 changes: 17 additions & 4 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
### --- General broker settings --- ###

# Zookeeper quorum connection string
zookeeperServers=
zookeeperServers=hw160:2181

# Global Zookeeper quorum connection string
globalZookeeperServers=
globalZookeeperServers=hw160:2181

# Broker data port
brokerServicePort=6650
Expand All @@ -44,7 +44,7 @@ bindAddress=0.0.0.0
advertisedAddress=

# Name of the cluster to which this broker belongs to
clusterName=
clusterName=ict

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
Expand Down Expand Up @@ -92,6 +92,10 @@ brokerDeduplicationEntriesInterval=1000
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# When a namespace is created without specifying the number of bundle, this
# value will be used as the default
defaultNumberOfNamespaceBundles=4

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down Expand Up @@ -237,6 +241,12 @@ bookkeeperClientIsolationGroups=

### --- Managed Ledger --- ###

# Impl type of ML, 0 indicate bk, 1 for dlog.
managedLedgerDefaultImplType=1

#Dlog's default namespace, when using dlog
dlogDefaultNamespaceURI=default_namespace

# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize=2

Expand Down Expand Up @@ -287,7 +297,7 @@ managedLedgerMaxUnackedRangesToPersist=10000

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000


Expand Down Expand Up @@ -330,6 +340,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65
# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=false

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

Expand Down
2 changes: 1 addition & 1 deletion conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

# DEFAULT: console appender only
# Define some default values that can be overridden by system properties
pulsar.root.logger=WARN,CONSOLE
pulsar.root.logger=DEBUG,CONSOLE
pulsar.log.dir=logs
pulsar.log.file=pulsar.log

Expand Down
11 changes: 10 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ brokerDeduplicationEntriesInterval=1000
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# When a namespace is created without specifying the number of bundle, this
# value will be used as the default
defaultNumberOfNamespaceBundles=4

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
Expand Down Expand Up @@ -210,6 +213,9 @@ bookkeeperClientIsolationGroups=

### --- Managed Ledger --- ###

# Impl type of ML, 0 indicate bk, 1 for dlog.
managedLedgerDefaultImplType=1

# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize=1

Expand Down Expand Up @@ -260,7 +266,7 @@ managedLedgerMaxUnackedRangesToPersist=10000

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000


Expand Down Expand Up @@ -303,6 +309,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65
# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=false

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

Expand Down
89 changes: 89 additions & 0 deletions kubernetes/generic/proxy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#


apiVersion: v1
kind: ConfigMap
metadata:
name: proxy-config
data:
PULSAR_MEM: "\" -Xms4g -Xmx4g -XX:MaxDirectMemorySize=4g\""
zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
clusterName: us-central
---
##
## Proxy deployment definition
##
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: proxy
spec:
replicas: 5
template:
metadata:
labels:
app: pulsar
component: proxy
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
spec:
containers:
- name: proxy
image: apachepulsar/pulsar:latest
command: ["sh", "-c"]
args:
- >
bin/apply-config-from-env.py conf/proxy.conf &&
bin/apply-config-from-env.py conf/pulsar_env.sh &&
bin/pulsar proxy
ports:
- containerPort: 8080
envFrom:
- configMapRef:
name: proxy-config
---

##
## Expose all nodes on port so that you can reach cluster from outside k8
##
apiVersion: v1
kind: Service
metadata:
name: proxy
labels:
app: pulsar
component: proxy
spec:
type: NodePort
ports:
- name: http
nodePort: 30001
port: 8080
protocol: TCP
- name: tcp
nodePort: 30002
port: 6650
protocol: TCP
selector:
app: pulsar
component: proxy
---
Loading