Skip to content

Commit

Permalink
fix: finish diag manager reduction methods at the right time (NOAA-GF…
Browse files Browse the repository at this point in the history
  • Loading branch information
uramirez8707 authored and rem1776 committed May 1, 2024
1 parent a3cc6b4 commit 9fedfce
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 78 deletions.
64 changes: 39 additions & 25 deletions diag_manager/fms_diag_file_object.F90
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ module fms_diag_file_object_mod
procedure :: update_current_new_file_freq_index
procedure :: increase_unlim_dimension_level
procedure :: get_unlim_dimension_level
procedure :: get_next_output
procedure :: get_next_next_output
procedure :: close_diag_file
end type fmsDiagFileContainer_type

Expand Down Expand Up @@ -1293,13 +1295,11 @@ end subroutine write_time_metadata
!> \brief Write out the field data to the file
subroutine write_field_data(this, field_obj, buffer_obj)
class(fmsDiagFileContainer_type), intent(in), target :: this !< The diag file object to write to
type(fmsDiagField_type), intent(in), target :: field_obj(:) !< The field object to write from
type(fmsDiagOutputBuffer_type), intent(inout), target :: buffer_obj(:) !< The buffer object with the data
type(fmsDiagField_type), intent(in), target :: field_obj !< The field object to write from
type(fmsDiagOutputBuffer_type), intent(inout), target :: buffer_obj !< The buffer object with the data

class(fmsDiagFile_type), pointer :: diag_file !< Diag_file object to open
class(FmsNetcdfFile_t), pointer :: fms2io_fileobj !< Fileobj to write to
integer :: i !< For do loops
integer :: field_id !< The id of the field writing the data from
logical :: has_diurnal !< indicates if theres a diurnal axis to adjust for

diag_file => this%FMS_diag_file
Expand All @@ -1309,29 +1309,23 @@ subroutine write_field_data(this, field_obj, buffer_obj)
if (diag_file%is_static) then
!< Here the file is static so there is no need for the unlimited dimension
!! as a variables are static
do i = 1, diag_file%number_of_buffers
call buffer_obj(diag_file%buffer_ids(i))%write_buffer(fms2io_fileobj)
enddo
call buffer_obj%write_buffer(fms2io_fileobj)
else
do i = 1, diag_file%number_of_buffers
field_id = buffer_obj(diag_file%buffer_ids(i))%get_field_id()
if (field_obj(field_id)%is_static()) then
!< If the variable is static, only write it the first time
if (field_obj%is_static()) then
!< If the variable is static, only write it the first time
if (diag_file%unlim_dimension_level .eq. 1) &
call buffer_obj%write_buffer(fms2io_fileobj)
else
has_diurnal = buffer_obj%get_diurnal_sample_size() .gt. 1
if (.not. buffer_obj%is_there_data_to_write()) then
! Only print the error message once
if (diag_file%unlim_dimension_level .eq. 1) &
call buffer_obj(diag_file%buffer_ids(i))%write_buffer(fms2io_fileobj)
else
has_diurnal = buffer_obj(diag_file%buffer_ids(i))%get_diurnal_sample_size() .gt. 1
if (.not. buffer_obj(diag_file%buffer_ids(i))%is_there_data_to_write()) then
! Only print the error message once
if (diag_file%unlim_dimension_level .eq. 1) &
call mpp_error(NOTE, "Send data was never called. Writing fill values for variable "//&
field_obj(field_id)%get_varname()//" in mod "//field_obj(field_id)%get_modname())
cycle
endif
call buffer_obj(diag_file%buffer_ids(i))%write_buffer(fms2io_fileobj, &
unlim_dim_level=diag_file%unlim_dimension_level, is_diurnal=has_diurnal)
call mpp_error(NOTE, "Send data was never called. Writing fill values for variable "//&
field_obj%get_varname()//" in mod "//field_obj%get_modname())
endif
enddo
call buffer_obj%write_buffer(fms2io_fileobj, &
unlim_dim_level=diag_file%unlim_dimension_level, is_diurnal=has_diurnal)
endif
endif

end subroutine write_field_data
Expand All @@ -1354,7 +1348,7 @@ logical function is_time_to_write(this, time_step)
class(fmsDiagFileContainer_type), intent(in), target :: this !< The file object
TYPE(time_type), intent(in) :: time_step !< Current model step time

if (time_step >= this%FMS_diag_file%next_output) then
if (time_step > this%FMS_diag_file%next_output) then
is_time_to_write = .true.
if (this%FMS_diag_file%is_static) return
if (time_step > this%FMS_diag_file%next_next_output) &
Expand Down Expand Up @@ -1489,6 +1483,26 @@ pure function get_unlim_dimension_level(this) &
res = this%FMS_diag_file%unlim_dimension_level
end function

!> \brief Get the next_output for the file object
!! \return The next_output
pure function get_next_output(this) &
result(res)
class(fmsDiagFileContainer_type), intent(in), target :: this !< The file object
type(time_type) :: res

res = this%FMS_diag_file%next_output
end function get_next_output

!> \brief Get the next_output for the file object
!! \return The next_output
pure function get_next_next_output(this) &
result(res)
class(fmsDiagFileContainer_type), intent(in), target :: this !< The file object
type(time_type) :: res

res = this%FMS_diag_file%next_next_output
end function get_next_next_output

!< @brief Writes the axis metadata for the file
subroutine write_axis_metadata(this, diag_axis)
class(fmsDiagFileContainer_type), intent(inout), target :: this !< The file object
Expand Down
102 changes: 51 additions & 51 deletions diag_manager/fms_diag_object.F90
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ subroutine fms_diag_object_end (this, time)
!TODO: loop through files and force write
if (.not. this%initialized) return

call this%fms_diag_do_io(is_end_of_run=.true.)
call this%fms_diag_do_io(end_time=time)
!TODO: Deallocate diag object arrays and clean up all memory
do i=1, size(this%FMS_diag_output_buffers)
call this%FMS_diag_output_buffers(i)%flush_buffer()
Expand Down Expand Up @@ -231,23 +231,9 @@ integer function fms_register_diag_field_obj &
file_ids = get_diag_files_id(diag_field_indices)
call fieldptr%set_file_ids(file_ids)

!> Initialize buffer_ids of this field with the diag_field_indices(diag_field_indices)
!! of the sorted variable list
fieldptr%buffer_ids = get_diag_field_ids(diag_field_indices)
do i = 1, size(fieldptr%buffer_ids)
bufferptr => this%FMS_diag_output_buffers(fieldptr%buffer_ids(i))
call bufferptr%set_field_id(this%registered_variables)
call bufferptr%set_yaml_id(fieldptr%buffer_ids(i))
! check if diurnal reduction for this buffer and if so set the diurnal sample size
yamlfptr => diag_yaml%diag_fields(fieldptr%buffer_ids(i))
if( yamlfptr%get_var_reduction() .eq. time_diurnal) then
call bufferptr%set_diurnal_sample_size(yamlfptr%get_n_diurnal())
endif
call bufferptr%init_buffer_time(init_time)
enddo

!> Allocate and initialize member buffer_allocated of this field
fieldptr%buffer_allocated = .false.
fieldptr%buffer_ids = get_diag_field_ids(diag_field_indices)

!> Register the data for the field
call fieldptr%register(modname, varname, diag_field_indices, this%diag_axis, &
Expand Down Expand Up @@ -296,6 +282,22 @@ integer function fms_register_diag_field_obj &
call fileptr%set_file_time_ops (fieldptr%diag_field(i), fieldptr%is_static())
enddo
endif

!> Initialize buffer_ids of this field with the diag_field_indices(diag_field_indices)
!! of the sorted variable list
do i = 1, size(fieldptr%buffer_ids)
bufferptr => this%FMS_diag_output_buffers(fieldptr%buffer_ids(i))
call bufferptr%set_field_id(this%registered_variables)
call bufferptr%set_yaml_id(fieldptr%buffer_ids(i))
! check if diurnal reduction for this buffer and if so set the diurnal sample size
yamlfptr => diag_yaml%diag_fields(fieldptr%buffer_ids(i))
if( yamlfptr%get_var_reduction() .eq. time_diurnal) then
call bufferptr%set_diurnal_sample_size(yamlfptr%get_n_diurnal())
endif
call bufferptr%init_buffer_time(init_time)
call bufferptr%set_next_output(this%FMS_diag_files(file_ids(i))%get_next_output())
enddo

nullify (fileptr)
nullify (fieldptr)
deallocate(diag_field_indices)
Expand Down Expand Up @@ -733,10 +735,9 @@ end subroutine fms_diag_send_complete
!! variable metadata and data when necessary.
!! TODO: passing in the saved mask from the field obj to diag_reduction_done_wrapper
!! for performance
subroutine fms_diag_do_io(this, is_end_of_run)
subroutine fms_diag_do_io(this, end_time)
class(fmsDiagObject_type), target, intent(inout) :: this !< The diag object
logical, optional, intent(in) :: is_end_of_run !< If .true. this is the end of the run,
!! so force write
type(time_type), optional, target, intent(in) :: end_time !< the model end_time
#ifdef use_yaml
integer :: i !< For do loops
class(fmsDiagFileContainer_type), pointer :: diag_file !< Pointer to this%FMS_diag_files(i) (for convenience)
Expand All @@ -749,17 +750,20 @@ subroutine fms_diag_do_io(this, is_end_of_run)
logical :: file_is_opened_this_time_step !< True if the file was opened in this time_step
!! If true the metadata will need to be written
logical :: force_write !< force the last write if at end of run
logical :: is_writing !< true if we are writing the actual field data (metadata is always written)
logical :: finish_writing !< true if finished writing for all the fields
logical :: has_mask !< whether we have a mask
logical, parameter :: DEBUG_REDUCT = .false. !< enables debugging output
class(*), allocatable :: missing_val !< netcdf missing value for a given field
real(r8_kind) :: mval !< r8 copy of missing value
character(len=128) :: error_string !< outputted error string from reducti

force_write = .false.
if (present (is_end_of_run)) force_write = .true.

model_time => this%current_model_time
if (present (end_time)) then
force_write = .true.
model_time => end_time
else
model_time => this%current_model_time
endif

do i = 1, size(this%FMS_diag_files)
diag_file => this%FMS_diag_files(i)
Expand All @@ -774,22 +778,23 @@ subroutine fms_diag_do_io(this, is_end_of_run)
call diag_file%write_time_metadata()
call diag_file%write_field_metadata(this%FMS_diag_fields, this%diag_axis)
call diag_file%write_axis_data(this%diag_axis)
call diag_file%increase_unlim_dimension_level()
endif

is_writing = diag_file%is_time_to_write(model_time)
finish_writing = diag_file%is_time_to_write(model_time)

! finish reduction method if its time to write
buff_reduct: if (is_writing) then
buff_ids = diag_file%FMS_diag_file%get_buffer_ids()
! loop through the buffers and finish reduction if needed
buff_loop: do ibuff=1, SIZE(buff_ids)
diag_buff => this%FMS_diag_output_buffers(buff_ids(ibuff))
field_yaml => diag_yaml%diag_fields(diag_buff%get_yaml_id())
diag_field => this%FMS_diag_fields(diag_buff%get_field_id())
buff_ids = diag_file%FMS_diag_file%get_buffer_ids()
! loop through the buffers and finish reduction if needed
buff_loop: do ibuff=1, SIZE(buff_ids)
diag_buff => this%FMS_diag_output_buffers(buff_ids(ibuff))
field_yaml => diag_yaml%diag_fields(diag_buff%get_yaml_id())
diag_field => this%FMS_diag_fields(diag_buff%get_field_id())

! Go away if there is no data to write
if (.not. diag_buff%is_there_data_to_write()) cycle
! Go away if there is no data to write
if (.not. diag_buff%is_there_data_to_write()) cycle

if ( diag_buff%is_time_to_finish_reduction(end_time)) then
! sets missing value
mval = diag_field%find_missing_value(missing_val)
! time_average and greater values all involve averaging so need to be "finished" before written
Expand All @@ -800,28 +805,25 @@ subroutine fms_diag_do_io(this, is_end_of_run)
if(has_mask) has_mask = diag_field%get_mask_variant()
error_string = diag_buff%diag_reduction_done_wrapper( &
field_yaml%get_var_reduction(), &
mval, has_mask)
mval, has_mask)
endif
endif
!endif
nullify(diag_buff)
nullify(field_yaml)
enddo buff_loop
deallocate(buff_ids)
endif buff_reduct

if (is_writing) then
call diag_file%increase_unlim_dimension_level()
call diag_file%write_field_data(diag_field, diag_buff)
call diag_buff%set_next_output(diag_file%get_next_next_output())
endif
nullify(diag_buff)
nullify(field_yaml)
enddo buff_loop
deallocate(buff_ids)

if (finish_writing) then
call diag_file%write_time_data()
call diag_file%write_field_data(this%FMS_diag_fields, this%FMS_diag_output_buffers)
call diag_file%update_next_write(model_time)
call diag_file%update_current_new_file_freq_index(model_time)
call diag_file%increase_unlim_dimension_level()
if (diag_file%is_time_to_close_file(model_time)) call diag_file%close_diag_file()
else if (force_write) then
if (diag_file%get_unlim_dimension_level() .eq. 0) then
call diag_file%increase_unlim_dimension_level()
call diag_file%write_time_data()
endif
call diag_file%write_time_data()
call diag_file%close_diag_file()
endif
enddo
Expand Down Expand Up @@ -968,6 +970,7 @@ function fms_diag_do_reduction(this, field_data, diag_field_id, oor_mask, weight

!< Determine the reduction method for the buffer
reduction_method = field_yaml_ptr%get_var_reduction()
if (present(time)) new_time = buffer_ptr%update_buffer_time(time)
select case(reduction_method)
case (time_none)
error_msg = buffer_ptr%do_time_none_wrapper(field_data, oor_mask, field_ptr%get_mask_variant(), &
Expand All @@ -994,21 +997,18 @@ function fms_diag_do_reduction(this, field_data, diag_field_id, oor_mask, weight
return
endif
case (time_average)
new_time = buffer_ptr%update_buffer_time(time)
error_msg = buffer_ptr%do_time_sum_wrapper(field_data, oor_mask, field_ptr%get_mask_variant(), &
bounds_in, bounds_out, missing_value, new_time)
if (trim(error_msg) .ne. "") then
return
endif
case (time_power)
new_time = buffer_ptr%update_buffer_time(time)
error_msg = buffer_ptr%do_time_sum_wrapper(field_data, oor_mask, field_ptr%get_mask_variant(), &
bounds_in, bounds_out, missing_value, new_time, pow_value=field_yaml_ptr%get_pow_value())
if (trim(error_msg) .ne. "") then
return
endif
case (time_rms)
new_time = buffer_ptr%update_buffer_time(time)
error_msg = buffer_ptr%do_time_sum_wrapper(field_data, oor_mask, field_ptr%get_mask_variant(), &
bounds_in, bounds_out, missing_value, new_time, pow_value = 2)
if (trim(error_msg) .ne. "") then
Expand Down
32 changes: 30 additions & 2 deletions diag_manager/fms_diag_output_buffer.F90
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ module fms_diag_output_buffer_mod
#ifdef use_yaml
use platform_mod
use iso_c_binding
use time_manager_mod, only: time_type, operator(==), get_ticks_per_second, get_time, operator(>)
use time_manager_mod, only: time_type, operator(==), operator(>=), get_ticks_per_second, get_time, operator(>)
use constants_mod, only: SECONDS_PER_DAY
use mpp_mod, only: mpp_error, FATAL, NOTE
use mpp_mod, only: mpp_error, FATAL, NOTE, mpp_pe, mpp_root_pe
use diag_data_mod, only: DIAG_NULL, DIAG_NOT_REGISTERED, i4, i8, r4, r8, get_base_time, MIN_VALUE, MAX_VALUE, EMPTY, &
time_min, time_max
use fms2_io_mod, only: FmsNetcdfFile_t, write_data, FmsNetcdfDomainFile_t, FmsNetcdfUnstructuredDomainFile_t
Expand Down Expand Up @@ -61,6 +61,7 @@ module fms_diag_output_buffer_mod
!! time and sample size if using a diurnal reduction
logical :: send_data_called !< .True. if send_data has been called
type(time_type) :: time !< The last time the data was received
type(time_type) :: next_output !< The next time to output the data

contains
procedure :: add_axis_ids
Expand All @@ -70,8 +71,10 @@ module fms_diag_output_buffer_mod
procedure :: set_yaml_id
procedure :: get_yaml_id
procedure :: init_buffer_time
procedure :: set_next_output
procedure :: update_buffer_time
procedure :: is_there_data_to_write
procedure :: is_time_to_finish_reduction
procedure :: set_send_data_called
procedure :: is_done_with_math
procedure :: set_done_with_math
Expand Down Expand Up @@ -347,6 +350,14 @@ subroutine init_buffer_time(this, time)
endif
end subroutine init_buffer_time

!> @brief Sets the next output
subroutine set_next_output(this, time)
class(fmsDiagOutputBuffer_type), intent(inout) :: this !< Buffer object
type(time_type), intent(in) :: time !< time to add to the buffer

this%next_output = time
end subroutine set_next_output

!> @brief Update the buffer time if it is a new time
!! @return .true. if the buffer was updated
function update_buffer_time(this, time) &
Expand Down Expand Up @@ -806,6 +817,23 @@ function is_there_data_to_write(this) &
res = this%send_data_called
end function

!> @brief Determine if it is time to finish the reduction method
!! @return .true. if it is time to finish the reduction method
function is_time_to_finish_reduction(this, end_time) &
result(res)
class(fmsDiagOutputBuffer_type), intent(inout) :: this !< Buffer object
type(time_type), optional, intent(in) :: end_time !< The time at the end of the run

logical :: res

res = .false.
if (this%time >= this%next_output) res = .true.

if (present(end_time)) then
if (end_time >= this%next_output) res = .true.
endif
end function is_time_to_finish_reduction

!> @brief Sets send_data_called to .true.
subroutine set_send_data_called(this)
class(fmsDiagOutputBuffer_type), intent(inout) :: this !< Buffer object
Expand Down

0 comments on commit 9fedfce

Please sign in to comment.