Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make const decoding thread-safe. #51060

Merged
merged 4 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/librustc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#![feature(trace_macros)]
#![feature(trusted_len)]
#![feature(catch_expr)]
#![feature(integer_atomics)]
#![feature(test)]
#![feature(in_band_lifetimes)]
#![feature(macro_at_most_once_rep)]
Expand Down
206 changes: 167 additions & 39 deletions src/librustc/mir/interpret/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ use std::io;
use std::ops::{Deref, DerefMut};
use std::hash::Hash;
use syntax::ast::Mutability;
use rustc_serialize::{Encoder, Decoder, Decodable, Encodable};
use rustc_serialize::{Encoder, Decodable, Encodable};
use rustc_data_structures::sorted_map::SortedMap;
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::sync::{Lock as Mutex, HashMapExt};
use rustc_data_structures::tiny_list::TinyList;
use byteorder::{WriteBytesExt, ReadBytesExt, LittleEndian, BigEndian};
use ty::codec::TyDecoder;
use std::sync::atomic::{AtomicU32, Ordering};
use std::num::NonZeroU32;

#[derive(Clone, Debug, PartialEq, RustcEncodable, RustcDecodable)]
pub enum Lock {
Expand Down Expand Up @@ -204,44 +209,163 @@ pub fn specialized_encode_alloc_id<
Ok(())
}

pub fn specialized_decode_alloc_id<
'a, 'tcx,
D: Decoder,
CACHE: FnOnce(&mut D, AllocId),
>(
decoder: &mut D,
tcx: TyCtxt<'a, 'tcx, 'tcx>,
cache: CACHE,
) -> Result<AllocId, D::Error> {
match AllocKind::decode(decoder)? {
AllocKind::Alloc => {
let alloc_id = tcx.alloc_map.lock().reserve();
trace!("creating alloc id {:?}", alloc_id);
// insert early to allow recursive allocs
cache(decoder, alloc_id);

let allocation = <&'tcx Allocation as Decodable>::decode(decoder)?;
trace!("decoded alloc {:?} {:#?}", alloc_id, allocation);
tcx.alloc_map.lock().set_id_memory(alloc_id, allocation);

Ok(alloc_id)
},
AllocKind::Fn => {
trace!("creating fn alloc id");
let instance = ty::Instance::decode(decoder)?;
trace!("decoded fn alloc instance: {:?}", instance);
let id = tcx.alloc_map.lock().create_fn_alloc(instance);
trace!("created fn alloc id: {:?}", id);
cache(decoder, id);
Ok(id)
},
AllocKind::Static => {
trace!("creating extern static alloc id at");
let did = DefId::decode(decoder)?;
let alloc_id = tcx.alloc_map.lock().intern_static(did);
cache(decoder, alloc_id);
Ok(alloc_id)
},
// Used to avoid infinite recursion when decoding cyclic allocations.
type DecodingSessionId = NonZeroU32;

#[derive(Clone)]
enum State {
Empty,
InProgressNonAlloc(TinyList<DecodingSessionId>),
InProgress(TinyList<DecodingSessionId>, AllocId),
Done(AllocId),
}

pub struct AllocDecodingState {
// For each AllocId we keep track of which decoding state it's currently in.
decoding_state: Vec<Mutex<State>>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I changed this from Mutex<Vec<_>> to Vec<Mutex<_>>. In theory this should reduce contention but it might be overkill.

// The offsets of each allocation in the data stream.
data_offsets: Vec<u32>,
}

impl AllocDecodingState {

pub fn new_decoding_session(&self) -> AllocDecodingSession {
static DECODER_SESSION_ID: AtomicU32 = AtomicU32::new(0);
let counter = DECODER_SESSION_ID.fetch_add(1, Ordering::SeqCst);

// Make sure this is never zero
let session_id = DecodingSessionId::new((counter & 0x7FFFFFFF) + 1).unwrap();

AllocDecodingSession {
state: self,
session_id,
}
}

pub fn new(data_offsets: Vec<u32>) -> AllocDecodingState {
let decoding_state: Vec<_> = ::std::iter::repeat(Mutex::new(State::Empty))
.take(data_offsets.len())
.collect();

AllocDecodingState {
decoding_state: decoding_state,
data_offsets,
}
}
}

#[derive(Copy, Clone)]
pub struct AllocDecodingSession<'s> {
state: &'s AllocDecodingState,
session_id: DecodingSessionId,
}

impl<'s> AllocDecodingSession<'s> {

// Decodes an AllocId in a thread-safe way.
pub fn decode_alloc_id<'a, 'tcx, D>(&self,
decoder: &mut D)
-> Result<AllocId, D::Error>
where D: TyDecoder<'a, 'tcx>,
'tcx: 'a,
{
// Read the index of the allocation
let idx = decoder.read_u32()? as usize;
let pos = self.state.data_offsets[idx] as usize;

// Decode the AllocKind now so that we know if we have to reserve an
// AllocId.
let (alloc_kind, pos) = decoder.with_position(pos, |decoder| {
let alloc_kind = AllocKind::decode(decoder)?;
Ok((alloc_kind, decoder.position()))
})?;

// Check the decoding state, see if it's already decoded or if we should
// decode it here.
let alloc_id = {
let mut entry = self.state.decoding_state[idx].lock();

match *entry {
State::Done(alloc_id) => {
return Ok(alloc_id);
}
ref mut entry @ State::Empty => {
// We are allowed to decode
match alloc_kind {
AllocKind::Alloc => {
// If this is an allocation, we need to reserve an
// AllocId so we can decode cyclic graphs.
let alloc_id = decoder.tcx().alloc_map.lock().reserve();
*entry = State::InProgress(
TinyList::new_single(self.session_id),
alloc_id);
Some(alloc_id)
},
AllocKind::Fn | AllocKind::Static => {
// Fns and statics cannot be cyclic and their AllocId
// is determined later by interning
*entry = State::InProgressNonAlloc(
TinyList::new_single(self.session_id));
None
}
}
}
State::InProgressNonAlloc(ref mut sessions) => {
if sessions.contains(&self.session_id) {
bug!("This should be unreachable")
} else {
// Start decoding concurrently
sessions.insert(self.session_id);
None
}
}
State::InProgress(ref mut sessions, alloc_id) => {
if sessions.contains(&self.session_id) {
// Don't recurse.
return Ok(alloc_id)
} else {
// Start decoding concurrently
sessions.insert(self.session_id);
Some(alloc_id)
}
}
}
};

// Now decode the actual data
let alloc_id = decoder.with_position(pos, |decoder| {
match alloc_kind {
AllocKind::Alloc => {
let allocation = <&'tcx Allocation as Decodable>::decode(decoder)?;
// We already have a reserved AllocId.
let alloc_id = alloc_id.unwrap();
trace!("decoded alloc {:?} {:#?}", alloc_id, allocation);
decoder.tcx().alloc_map.lock().set_id_same_memory(alloc_id, allocation);
Ok(alloc_id)
},
AllocKind::Fn => {
assert!(alloc_id.is_none());
trace!("creating fn alloc id");
let instance = ty::Instance::decode(decoder)?;
trace!("decoded fn alloc instance: {:?}", instance);
let alloc_id = decoder.tcx().alloc_map.lock().create_fn_alloc(instance);
Ok(alloc_id)
},
AllocKind::Static => {
assert!(alloc_id.is_none());
trace!("creating extern static alloc id at");
let did = DefId::decode(decoder)?;
let alloc_id = decoder.tcx().alloc_map.lock().intern_static(did);
Ok(alloc_id)
}
}
})?;

self.state.decoding_state[idx].with_lock(|entry| {
*entry = State::Done(alloc_id);
});

Ok(alloc_id)
}
}

Expand Down Expand Up @@ -340,6 +464,10 @@ impl<'tcx, M: fmt::Debug + Eq + Hash + Clone> AllocMap<'tcx, M> {
bug!("tried to set allocation id {}, but it was already existing as {:#?}", id, old);
}
}

pub fn set_id_same_memory(&mut self, id: AllocId, mem: M) {
self.id_to_type.insert_same(id, AllocType::Memory(mem));
}
}

#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, RustcEncodable, RustcDecodable)]
Expand Down
51 changes: 10 additions & 41 deletions src/librustc/ty/maps/on_disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use hir::def_id::{CrateNum, DefIndex, DefId, LocalDefId,
use hir::map::definitions::DefPathHash;
use ich::{CachingCodemapView, Fingerprint};
use mir::{self, interpret};
use mir::interpret::{AllocDecodingSession, AllocDecodingState};
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::sync::{Lrc, Lock, HashMapExt, Once};
use rustc_data_structures::indexed_vec::{IndexVec, Idx};
use rustc_serialize::{Decodable, Decoder, Encodable, Encoder, opaque,
SpecializedDecoder, SpecializedEncoder,
UseSpecializedDecodable, UseSpecializedEncodable};
use session::{CrateDisambiguator, Session};
use std::cell::RefCell;
use std::mem;
use syntax::ast::NodeId;
use syntax::codemap::{CodeMap, StableFilemapId};
Expand Down Expand Up @@ -77,11 +77,7 @@ pub struct OnDiskCache<'sess> {
// `serialized_data`.
prev_diagnostics_index: FxHashMap<SerializedDepNodeIndex, AbsoluteBytePos>,

// Alloc indices to memory location map
prev_interpret_alloc_index: Vec<AbsoluteBytePos>,

/// Deserialization: A cache to ensure we don't read allocations twice
interpret_alloc_cache: RefCell<FxHashMap<usize, interpret::AllocId>>,
alloc_decoding_state: AllocDecodingState,
}

// This type is used only for (de-)serialization.
Expand All @@ -92,7 +88,7 @@ struct Footer {
query_result_index: EncodedQueryResultIndex,
diagnostics_index: EncodedQueryResultIndex,
// the location of all allocations
interpret_alloc_index: Vec<AbsoluteBytePos>,
interpret_alloc_index: Vec<u32>,
}

type EncodedQueryResultIndex = Vec<(SerializedDepNodeIndex, AbsoluteBytePos)>;
Expand Down Expand Up @@ -149,8 +145,7 @@ impl<'sess> OnDiskCache<'sess> {
query_result_index: footer.query_result_index.into_iter().collect(),
prev_diagnostics_index: footer.diagnostics_index.into_iter().collect(),
synthetic_expansion_infos: Lock::new(FxHashMap()),
prev_interpret_alloc_index: footer.interpret_alloc_index,
interpret_alloc_cache: RefCell::new(FxHashMap::default()),
alloc_decoding_state: AllocDecodingState::new(footer.interpret_alloc_index),
}
}

Expand All @@ -166,8 +161,7 @@ impl<'sess> OnDiskCache<'sess> {
query_result_index: FxHashMap(),
prev_diagnostics_index: FxHashMap(),
synthetic_expansion_infos: Lock::new(FxHashMap()),
prev_interpret_alloc_index: Vec::new(),
interpret_alloc_cache: RefCell::new(FxHashMap::default()),
alloc_decoding_state: AllocDecodingState::new(Vec::new()),
}
}

Expand Down Expand Up @@ -291,7 +285,7 @@ impl<'sess> OnDiskCache<'sess> {
}
for idx in n..new_n {
let id = encoder.interpret_allocs_inverse[idx];
let pos = AbsoluteBytePos::new(encoder.position());
let pos = encoder.position() as u32;
interpret_alloc_index.push(pos);
interpret::specialized_encode_alloc_id(
&mut encoder,
Expand Down Expand Up @@ -424,8 +418,7 @@ impl<'sess> OnDiskCache<'sess> {
file_index_to_file: &self.file_index_to_file,
file_index_to_stable_id: &self.file_index_to_stable_id,
synthetic_expansion_infos: &self.synthetic_expansion_infos,
prev_interpret_alloc_index: &self.prev_interpret_alloc_index,
interpret_alloc_cache: &self.interpret_alloc_cache,
alloc_decoding_session: self.alloc_decoding_state.new_decoding_session(),
};

match decode_tagged(&mut decoder, dep_node_index) {
Expand Down Expand Up @@ -487,9 +480,7 @@ struct CacheDecoder<'a, 'tcx: 'a, 'x> {
synthetic_expansion_infos: &'x Lock<FxHashMap<AbsoluteBytePos, SyntaxContext>>,
file_index_to_file: &'x Lock<FxHashMap<FileMapIndex, Lrc<FileMap>>>,
file_index_to_stable_id: &'x FxHashMap<FileMapIndex, StableFilemapId>,
interpret_alloc_cache: &'x RefCell<FxHashMap<usize, interpret::AllocId>>,
/// maps from index in the cache file to location in the cache file
prev_interpret_alloc_index: &'x [AbsoluteBytePos],
alloc_decoding_session: AllocDecodingSession<'x>,
}

impl<'a, 'tcx, 'x> CacheDecoder<'a, 'tcx, 'x> {
Expand Down Expand Up @@ -612,30 +603,8 @@ implement_ty_decoder!( CacheDecoder<'a, 'tcx, 'x> );

impl<'a, 'tcx, 'x> SpecializedDecoder<interpret::AllocId> for CacheDecoder<'a, 'tcx, 'x> {
fn specialized_decode(&mut self) -> Result<interpret::AllocId, Self::Error> {
let tcx = self.tcx;
let idx = usize::decode(self)?;
trace!("loading index {}", idx);

if let Some(cached) = self.interpret_alloc_cache.borrow().get(&idx).cloned() {
trace!("loading alloc id {:?} from alloc_cache", cached);
return Ok(cached);
}
let pos = self.prev_interpret_alloc_index[idx].to_usize();
trace!("loading position {}", pos);
self.with_position(pos, |this| {
interpret::specialized_decode_alloc_id(
this,
tcx,
|this, alloc_id| {
trace!("caching idx {} for alloc id {} at position {}", idx, alloc_id, pos);
assert!(this
.interpret_alloc_cache
.borrow_mut()
.insert(idx, alloc_id)
.is_none());
},
)
})
let alloc_decoding_session = self.alloc_decoding_session;
alloc_decoding_session.decode_alloc_id(self)
}
}
impl<'a, 'tcx, 'x> SpecializedDecoder<Span> for CacheDecoder<'a, 'tcx, 'x> {
Expand Down
1 change: 1 addition & 0 deletions src/librustc_data_structures/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub mod control_flow_graph;
pub mod flock;
pub mod sync;
pub mod owning_ref;
pub mod tiny_list;
pub mod sorted_map;

pub struct OnDrop<F: Fn()>(pub F);
Expand Down
Loading