Skip to content

Commit

Permalink
Merge branch '1.3_maintenance' into 1.3_cycles_3.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
boberfly committed Aug 20, 2023
2 parents 1366279 + 836d2a0 commit c1e7493
Show file tree
Hide file tree
Showing 14 changed files with 491 additions and 33 deletions.
17 changes: 16 additions & 1 deletion Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@
Improvements
------------

- Viewer : Added visualisation of light filters for USD lights.
- Viewer :
- Added visualisation of light filters for USD lights.
- Added support for USD lights and shaders in the floating inspector panel.
- ShaderTweaks/ShaderQuery : Added presets for USD light and surface shaders.

Fixes
-----

- Viewer : Fixed crash when visualising lights with a light filter intended for a different renderer.
- Arnold : Fixed screen window export for Lentil cameras.
- Application : Fixed the `-threads` argument to clamp the number of threads to the number of available hardware cores (#5403).

API
---

- ThreadMonitor : Added new class for tracking the threads used to perform processes.

Documentation
-------------
Expand Down Expand Up @@ -290,6 +300,11 @@ Build
1.2.10.x (relative to 1.2.10.1)
========

Fixes
-----

- Arnold : Fixed screen window export for Lentil cameras.
- Application : Fixed the `-threads` argument to clamp the number of threads to the number of available hardware cores (#5403).

1.2.10.1 (relative to 1.2.10.0)
========
Expand Down
108 changes: 108 additions & 0 deletions include/Gaffer/ThreadMonitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above
// copyright notice, this list of conditions and the following
// disclaimer.
//
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided with
// the distribution.
//
// * Neither the name of John Haddon nor the names of
// any other contributors to this software may be used to endorse or
// promote products derived from this software without specific prior
// written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////

#pragma once

#include "Gaffer/Monitor.h"

#include "tbb/enumerable_thread_specific.h"

#include <unordered_map>

namespace Gaffer
{

IE_CORE_FORWARDDECLARE( Plug )

/// A monitor which collects information about which threads
/// initiated processes on each plug.
class GAFFER_API ThreadMonitor : public Monitor
{

public :

ThreadMonitor( const std::vector<IECore::InternedString> &processMask = { "computeNode:compute" } );
~ThreadMonitor() override;

IE_CORE_DECLAREMEMBERPTR( ThreadMonitor )

/// Numeric identifier for a thread. Using our own identifier rather
/// than `std::thread::id` so that we can bind it to Python (and assign
/// human-readable contiguous values).
using ThreadId = int;
/// Returns the `ThreadId` for the calling thread.
static ThreadId thisThreadId();
/// Maps from `ThreadId` to the number of times a process has been
/// invoked on that thread.
using ProcessesPerThread = std::unordered_map<ThreadId, size_t>;
/// Stores per-thread process counts per-plug.
using PlugMap = std::unordered_map<ConstPlugPtr, ProcessesPerThread>;

/// Query functions. These are not thread-safe, and must be called
/// only when the Monitor is not active (as defined by `Monitor::Scope`).
const PlugMap &allStatistics() const;
const ProcessesPerThread &plugStatistics( const Plug *plug ) const;
const ProcessesPerThread &combinedStatistics() const;

protected :

void processStarted( const Process *process ) override;
void processFinished( const Process *process ) override;

private :

const std::vector<IECore::InternedString> m_processMask;

// We collect statistics into a per-thread data structure to avoid contention.
struct ThreadData
{
ThreadData();
using ProcessesPerPlug = std::unordered_map<ConstPlugPtr, size_t>;
ThreadId id;
ProcessesPerPlug processesPerPlug;
};
mutable tbb::enumerable_thread_specific<ThreadData> m_threadData;

// Then when we want to query it, we collate it into `m_statistics`.
void collate() const;
mutable PlugMap m_statistics;
mutable ProcessesPerThread m_combinedStatistics;

};

IE_CORE_DECLAREPTR( ThreadMonitor )

} // namespace Gaffer
16 changes: 12 additions & 4 deletions python/Gaffer/Application.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ def __init__( self, description="" ) :
IECore.IntParameter(
name = "threads",
description = "The maximum number of threads used for computation. "
"The default value of zero causes the number of threads to "
" be chosen automatically based on the available hardware.",
"The default value of zero matches the number of threads to "
"the available hardware cores. Negative values specify a thread count "
"relative to the available cores, leaving some in reserve for other "
"applications. Positive values specify the thread count explicitly, "
"but are clamped so it does not exceed the available cores.",
defaultValue = 0,
minValue = 0,
),

IECore.FileNameParameter(
Expand Down Expand Up @@ -131,11 +133,17 @@ def _executeStartupFiles( self, applicationName ) :

def __run( self ) :

maxThreads = IECore.hardwareConcurrency()
threads = self.parameters()["threads"].getTypedValue()
if threads <= 0 :
threads = max( maxThreads + threads, 1 )
elif threads > maxThreads :
IECore.msg( IECore.Msg.Level.Warning, "Application", f"Clamping to `-threads {maxThreads}` to avoid oversubscription" )
threads = maxThreads

with IECore.tbb_global_control(
IECore.tbb_global_control.parameter.max_allowed_parallelism,
IECore.hardwareConcurrency() if threads == 0 else threads
threads
) :

self._executeStartupFiles( self.root().getName() )
Expand Down
9 changes: 7 additions & 2 deletions python/GafferSceneUI/_SceneViewInspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@
"dl" : "Delight",
"gl" : "OpenGL",
"osl" : "OSL",
"cycles" : "Cycles"
"cycles" : "Cycles",
"" : "USD",
};

__registeredShaderParameters = OrderedDict()
Expand Down Expand Up @@ -176,7 +177,11 @@ def __keyPress( self, gadget, event ) :
@staticmethod
def __attributeLabel( attribute ) :

prefix, name = attribute.split( ":", 1 )
prefix, _, name = attribute.partition( ":" )
if not name :
name = prefix
prefix = ""

prefix = _rendererAttributePrefixes.get( prefix, prefix )
name = " ".join( [ IECore.CamelCase.toSpaced( n ) for n in name.split( ":" ) ] )
return "{} {}".format( prefix, name )
Expand Down
143 changes: 143 additions & 0 deletions python/GafferTest/ThreadMonitorTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
##########################################################################
#
# Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with
# the distribution.
#
# * Neither the name of John Haddon nor the names of
# any other contributors to this software may be used to endorse or
# promote products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
##########################################################################

import threading
import unittest

import IECore

import Gaffer
import GafferTest

class ThreadMonitorTest( GafferTest.TestCase ) :

def testConstruction( self ) :

monitor = Gaffer.ThreadMonitor()
self.assertEqual( monitor.allStatistics(), {} )
self.assertEqual( monitor.plugStatistics( Gaffer.IntPlug() ), {} )
self.assertEqual( monitor.combinedStatistics(), {} )

def testThisThreadId( self ) :

id = Gaffer.ThreadMonitor.thisThreadId()
self.assertEqual( id, Gaffer.ThreadMonitor.thisThreadId() )

ids = { id }
lock = threading.Lock()

def storeId() :
id = Gaffer.ThreadMonitor.thisThreadId()
self.assertEqual( id, Gaffer.ThreadMonitor.thisThreadId() )
with lock :
ids.add( id )

threads = []
for i in range( 0, 5 ) :
thread = threading.Thread( target = storeId )
threads.append( thread )
thread.start()

for thread in threads :
thread.join()

self.assertEqual( len( ids ), 6 )

def testMonitoring( self ) :

random = Gaffer.Random()
monitor = Gaffer.ThreadMonitor()

with monitor :
random["outFloat"].getValue()

self.assertEqual(
monitor.allStatistics(),
{
random["outFloat"] : {
monitor.thisThreadId() : 1
}
}
)
self.assertEqual(
monitor.plugStatistics( random["outFloat"] ),
{ monitor.thisThreadId() : 1 }
)
self.assertEqual(
monitor.combinedStatistics(),
{ monitor.thisThreadId() : 1 }
)

random["seedVariable"].setValue( "test" )
with monitor :
GafferTest.parallelGetValue( random["outFloat"], 100000, "test" )

s = monitor.plugStatistics( random["outFloat"] )
self.assertEqual( len( s ), IECore.tbb_global_control.active_value( IECore.tbb_global_control.parameter.max_allowed_parallelism ) )
self.assertEqual( sum( s.values() ), 100001 )

self.assertEqual( monitor.allStatistics(), { random["outFloat"] : s } )
self.assertEqual( monitor.combinedStatistics(), s )

def testProcessMask( self ) :

for processType in [ "computeNode:hash", "computeNode:compute" ] :

with self.subTest( processType = processType ) :

Gaffer.ValuePlug.clearCache()
Gaffer.ValuePlug.clearHashCache()

random = Gaffer.Random()
threadMonitor = Gaffer.ThreadMonitor( processMask = { processType } )
performanceMonitor = Gaffer.PerformanceMonitor()
context = Gaffer.Context()

with threadMonitor, performanceMonitor, context :
for i in range( 0, 5 ) :
context["i"] = i # Unique context to force hashing
random["outFloat"].getValue()

self.assertEqual( performanceMonitor.plugStatistics( random["outFloat"] ).computeCount, 1 )
self.assertEqual( performanceMonitor.plugStatistics( random["outFloat"] ).hashCount, 5 )

self.assertEqual(
sum( threadMonitor.plugStatistics( random["outFloat"] ).values() ),
1 if processType == "computeNode:compute" else 5
)

if __name__ == "__main__":
unittest.main()
29 changes: 7 additions & 22 deletions python/GafferTest/ValuePlugTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import subprocess
import threading
import time
import unittest

import imath

import IECore
Expand Down Expand Up @@ -884,37 +886,20 @@ def testExceptionDuringParallelEval( self ) :
with self.assertRaisesRegex( BaseException, "Foo" ):
GafferTest.parallelGetValue( m["product"], 10000, "testVar" )

def testCancellationOfSecondGetValueCall( self ) :

## \todo Should just be checking `tbb.global_control.active_value( max_allowed_parallelism )`
# to get the true limit set by `-threads` argument. But IECore's Python
# binding of `global_control` doesn't expose that yet.
if IECore.hardwareConcurrency() < 3 and "VALUEPLUGTEST_SUBPROCESS" not in os.environ :
def testCancellationOfSecondGetValueCall( self ) :

if IECore.tbb_global_control.active_value( IECore.tbb_global_control.parameter.max_allowed_parallelism ) < 3 :
# This test requires at least 3 TBB threads (including the main
# thread), because we need the second enqueued BackgroundTask to
# start execution before the first one has completed. If we have
# insufficient threads then we end up in deadlock, so to avoid this
# we relaunch in a subprocess with sufficient threads.
# insufficient threads then we end up in deadlock, so in this case
# we skip the test.
#
# Note : deadlock only ensues because the first task will never
# return without cancellation. This is an artificial situation, not
# one that would occur in practical usage of Gaffer itself.

print( "Running in subprocess due to insufficient TBB threads" )

try :
env = os.environ.copy()
env["VALUEPLUGTEST_SUBPROCESS"] = "1"
subprocess.check_output(
[ str( Gaffer.executablePath() ), "test", "-threads", "3", "GafferTest.ValuePlugTest.testCancellationOfSecondGetValueCall" ],
stderr = subprocess.STDOUT,
env = env
)
except subprocess.CalledProcessError as e :
self.fail( e.output )

return
self.skipTest( "Not enough worker threads" )

class InfiniteLoop( Gaffer.ComputeNode ) :

Expand Down
Loading

0 comments on commit c1e7493

Please sign in to comment.