Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updating proxy 6.1 for 2 bugfixs #135

Merged
merged 3 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ pub struct Config {
pub reactive_memory_lock_tick_interval: ReadableDuration,
/// Max tick count before reactivating in-memory pessimistic lock.
pub reactive_memory_lock_timeout_tick: usize,

pub unreachable_backoff: ReadableDuration,
}

impl Default for Config {
Expand Down Expand Up @@ -378,6 +380,7 @@ impl Default for Config {
report_min_resolved_ts_interval: ReadableDuration::millis(0),
check_leader_lease_interval: ReadableDuration::secs(0),
renew_leader_lease_advance_duration: ReadableDuration::secs(0),
unreachable_backoff: ReadableDuration::secs(10),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ where
let is_synced = self.write_to_db();

if !self.apply_res.is_empty() {
fail_point!("before_nofity_apply_res");
let apply_res = mem::take(&mut self.apply_res);
self.notifier.notify(apply_res);
}
Expand Down
22 changes: 14 additions & 8 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ use tikv_util::future::poll_future_notify;
type Key = Vec<u8>;

pub const PENDING_MSG_CAP: usize = 100;
const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10);
const ENTRY_CACHE_EVICT_TICK_DURATION: Duration = Duration::from_secs(1);

pub struct StoreInfo<EK, ER> {
Expand Down Expand Up @@ -204,16 +203,21 @@ where
{
fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) {
for r in apply_res {
self.router.try_send(
r.region_id,
let region_id = r.region_id;
if let Err(e) = self.router.force_send(
region_id,
PeerMsg::ApplyRes {
res: ApplyTaskRes::Apply(r),
},
);
) {
error!("failed to send apply result"; "region_id" => region_id, "err" => ?e);
}
}
}
fn notify_one(&self, region_id: u64, msg: PeerMsg<EK>) {
self.router.try_send(region_id, msg);
if let Err(e) = self.router.force_send(region_id, msg) {
error!("failed to notify apply msg"; "region_id" => region_id, "err" => ?e);
}
}

fn clone_box(&self) -> Box<dyn ApplyNotifier<EK>> {
Expand Down Expand Up @@ -701,6 +705,7 @@ impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, St
where
for<'a> F: FnOnce(&'a BatchSystemConfig),
{
fail_point!("begin_raft_poller");
self.previous_metrics = self.poll_ctx.raft_metrics.ready.clone();
self.poll_ctx.pending_count = 0;
self.poll_ctx.ready_count = 0;
Expand Down Expand Up @@ -2595,13 +2600,14 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER

fn on_store_unreachable(&mut self, store_id: u64) {
let now = Instant::now();
let unreachable_backoff = self.ctx.cfg.unreachable_backoff.0;
if self
.fsm
.store
.last_unreachable_report
.get(&store_id)
.map_or(UNREACHABLE_BACKOFF, |t| now.saturating_duration_since(*t))
< UNREACHABLE_BACKOFF
.map_or(unreachable_backoff, |t| now.saturating_duration_since(*t))
< unreachable_backoff
{
return;
}
Expand All @@ -2610,11 +2616,11 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
"store_id" => self.fsm.store.id,
"unreachable_store_id" => store_id,
);
self.fsm.store.last_unreachable_report.insert(store_id, now);
// It's possible to acquire the lock and only send notification to
// involved regions. However loop over all the regions can take a
// lot of time, which may block other operations.
self.ctx.router.report_unreachable(store_id);
self.fsm.store.last_unreachable_report.insert(store_id, now);
}

fn on_update_replication_mode(&mut self, status: ReplicationStatus) {
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/worker/check_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Runner {
meta.region_ranges
// get overlapped regions
.range((Excluded(start_key), Unbounded))
.take_while(|(_, id)| end_key > enc_start_key(&meta.regions[id]))
.take_while(|(_, id)| end_key > enc_start_key(&meta.regions[*id]))
// get the min `safe_ts`
.map(|(_, id)| {
registry.get(id).unwrap().safe_ts()
Expand Down
8 changes: 7 additions & 1 deletion components/tikv_util/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub mod batch;
use crossbeam::channel::{
self, RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError,
};
use fail::fail_point;

use std::cell::Cell;
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -234,7 +236,11 @@ impl<T> LooseBoundedSender<T> {
#[inline]
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
let cnt = self.tried_cnt.get();
if cnt < CHECK_INTERVAL {
let check_interval = || {
fail_point!("loose_bounded_sender_check_interval", |_| 0);
CHECK_INTERVAL
};
if cnt < check_interval() {
self.tried_cnt.set(cnt + 1);
} else if self.len() < self.limit {
self.tried_cnt.set(1);
Expand Down
38 changes: 37 additions & 1 deletion tests/failpoints/cases/test_split_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use pd_client::PdClient;
use raft::eraftpb::MessageType;
use raftstore::store::config::Config as RaftstoreConfig;
use raftstore::store::util::is_vote_msg;
use raftstore::store::Callback;
use raftstore::store::{Callback, PeerMsg};
use raftstore::Result;
use tikv_util::HandyRwLock;

Expand Down Expand Up @@ -890,3 +890,39 @@ fn test_split_pessimistic_locks_with_concurrent_prewrite() {
let resp = resp.join().unwrap();
assert!(resp.get_region_error().has_epoch_not_match(), "{:?}", resp);
}

fn test_split_store_channel_full() {
let mut cluster = new_node_cluster(0, 1);
cluster.cfg.raft_store.notify_capacity = 10;
cluster.cfg.raft_store.store_batch_system.max_batch_size = Some(1);
cluster.cfg.raft_store.messages_per_tick = 1;
let pd_client = cluster.pd_client.clone();
pd_client.disable_default_operator();
cluster.run();
cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k2", b"v2");
let region = pd_client.get_region(b"k2").unwrap();
let apply_fp = "before_nofity_apply_res";
fail::cfg(apply_fp, "pause").unwrap();
let (tx, rx) = mpsc::channel();
cluster.split_region(
&region,
b"k2",
Callback::write(Box::new(move |_| tx.send(()).unwrap())),
);
rx.recv().unwrap();
let sender_fp = "loose_bounded_sender_check_interval";
fail::cfg(sender_fp, "return").unwrap();
let store_fp = "begin_raft_poller";
fail::cfg(store_fp, "pause").unwrap();
let raft_router = cluster.sim.read().unwrap().get_router(1).unwrap();
for _ in 0..50 {
raft_router.force_send(1, PeerMsg::Noop).unwrap();
}
fail::remove(apply_fp);
fail::remove(store_fp);
sleep_ms(300);
let region = pd_client.get_region(b"k1").unwrap();
assert_ne!(region.id, 1);
fail::remove(sender_fp);
}
Loading