Skip to content

Commit

Permalink
[#456] auto-close data objects that go out of scope
Browse files Browse the repository at this point in the history
Previously, open data-object write handles (those with modes including
'w' or 'a') could go out of scope at the wrong time relative to the
session object which managed their connection.

This could result in pending write updates to the data object being
lost, and/or the replica ending up stale.

Now, we can opt-in to use "managed" write handles to ensure f.close()
will ultimately be called for any write handle f persisting to the end
of the Python interpreter's lifetime.  Those that exit scope prior to
exit time are also guaranteed (as much as Python allows) the same
managed clean-up via their "__del__" method.

remove unneeded quotes; unset script executable status
  • Loading branch information
d-w-moore authored and trel committed Oct 13, 2023
1 parent 9c27dd6 commit 2ba0b68
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 10 deletions.
16 changes: 16 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,22 @@ PRC provides `file-like objects <http://docs.python.org/2/library/stdtypes.html#
foo
bar

As of v1.1.9, there is also an auto-close configuration setting for data objects, set to :code:`False` by default,
which may be assigned the value :code:`True` for guaranteed auto-closing of open data object handles at the proper
time.

In a small but illustrative example, the following Python session does not require an explicit call to f.close():

>>> import irods.client_configuration as config, irods.test.helpers as helpers
>>> config.data_objects.auto_close = True
>>> session = helpers.make_session()
>>> f = session.data_objects.open('/{0.zone}/home/{0.username}/new_object.txt'.format(session),'w')
>>> f.write(b'new content.')

This may be useful for Python programs in which frequent flushing of write updates to data objects is undesirable --
with descriptors on such objects possibly being held open for indeterminately long lifetimes -- yet the eventual
application of those updates prior to the teardown of the Python interpreter is required.


Computing and Retrieving Checksums
----------------------------------
Expand Down
50 changes: 50 additions & 0 deletions irods/client_configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import print_function
import ast
import copy
import io
import logging
import re
import sys
import types

logger = logging.Logger(__name__)

class iRODSConfiguration(object):
__slots__ = ()

def getter(category, setting):
return lambda:getattr(globals()[category], setting)

# #############################################################################
#
# Classes for building client configuration categories
# (irods.client_configuration.data_objects is one such category):

class DataObjects(iRODSConfiguration):
__slots__ = ('auto_close',)

def __init__(self):

# Setting it in the constructor lets the attribute be a
# configurable one and allows a default value of False.
#
# Running following code will opt in to the the auto-closing
# behavior for any data objects opened subsequently.
#
# >>> import irods.client_configuration as config
# >>> irods.client_configuration.data_objects.auto_close = True

self.auto_close = False

# #############################################################################
#
# Instantiations of client-configuration categories:

# The usage "irods.client_configuration.data_objects" reflects the commonly used
# manager name (session.data_objects) and is thus understood to influence the
# behavior of data objects.
#
# By design, valid configurable targets (e.g. auto_close) are limited to the names
# listed in the __slots__ member of the category class.

data_objects = DataObjects()
49 changes: 45 additions & 4 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from irods.collection import iRODSCollection
from irods.data_object import (
iRODSDataObject, iRODSDataObjectFileRaw, chunks, irods_dirname, irods_basename)
import irods.client_configuration as client_config
import irods.keywords as kw
import irods.parallel as parallel
from irods.parallel import deferred_call
Expand All @@ -20,6 +21,33 @@
import json
import logging



def call___del__if_exists(super_):
"""
Utility method to call __del__ if it exists anywhere in superclasses' MRO (method
resolution order).
"""
next_finalizer_in_MRO = getattr(super_,'__del__',None)
if next_finalizer_in_MRO:
next_finalizer_in_MRO()

class ManagedBufferedRandom(io.BufferedRandom):

def __init__(self,*a,**kwd):
# Help ensure proper teardown sequence by storing a reference to the session,
# if provided via keyword '_session'.
self._iRODS_session = kwd.pop('_session',None)
super(ManagedBufferedRandom,self).__init__(*a,**kwd)
import irods.session
with irods.session._fds_lock:
irods.session._fds[self] = None

def __del__(self):
if not self.closed:
self.close()
call___del__if_exists(super(ManagedBufferedRandom,self))

MAXIMUM_SINGLE_THREADED_TRANSFER_SIZE = 32 * ( 1024 ** 2)

DEFAULT_NUMBER_OF_THREADS = 0 # Defaults for reasonable number of threads -- optimized to be
Expand Down Expand Up @@ -298,9 +326,14 @@ def open_with_FileRaw(self, *arg, **kw_options):
kw.RESC_HIER_STR_KW
))


def open(self, path, mode, create = True, finalize_on_close = True, returned_values = None, allow_redirect = True, **options):

def open(self, path, mode,
create = True, # (Dis-)allow object creation.
finalize_on_close = True, # For PRC internal use.
auto_close = client_config.getter('data_objects','auto_close'), # The default value will be a lambda returning the
# global setting. Use True or False as an override.
returned_values = None, # Used to update session reference, for forging more conns to same host, in irods.parallel.io_main
allow_redirect = True, # This may be set to False to disallow the client redirect-to-resource.
**options):
_raw_fd_holder = options.get('_raw_fd_holder',[])
# If no keywords are used that would influence the server as to the choice of a storage resource,
# then use the default resource in the client configuration.
Expand Down Expand Up @@ -395,8 +428,16 @@ def make_FileOpenRequest(**extra_opts):
raw.session = directed_sess

(_raw_fd_holder).append(raw)
return io.BufferedRandom(raw)

if callable(auto_close):
# Use case: auto_close has defaulted to the irods.configuration getter.
# access entry in irods.configuration
auto_close = auto_close()

if auto_close:
return ManagedBufferedRandom(raw, _session = self.sess)

return io.BufferedRandom(raw)

def trim(self, path, **options):

Expand Down
21 changes: 15 additions & 6 deletions irods/session.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import absolute_import
import ast
import atexit
import copy
import os
import ast
import json
import errno
import json
import logging
import os
import threading
import weakref
from irods.query import Query
from irods.pool import Pool
from irods.account import iRODSAccount
Expand All @@ -19,25 +21,32 @@
from irods.exception import NetworkException
from irods.password_obfuscation import decode
from irods import NATIVE_AUTH_SCHEME, PAM_AUTH_SCHEME
import threading
import weakref
from . import DEFAULT_CONNECTION_TIMEOUT

_fds = None
_fds_lock = threading.Lock()
_sessions = None
_sessions_lock = threading.Lock()


def _cleanup_remaining_sessions():
for fd in list(_fds.keys()):
if not fd.closed:
fd.close()
# remove refs to session objects no longer needed
fd._iRODS_session = None
for ses in _sessions.copy():
ses.cleanup() # internally modifies _sessions

def _weakly_reference(ses):
global _sessions
global _sessions, _fds
try:
if _sessions is None:
with _sessions_lock:
do_register = (_sessions is None)
if do_register:
_sessions = weakref.WeakKeyDictionary()
_fds = weakref.WeakKeyDictionary()
atexit.register(_cleanup_remaining_sessions)
finally:
_sessions[ses] = None
Expand Down
24 changes: 24 additions & 0 deletions irods/test/data_obj_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import stat
import string
import sys
import subprocess
import time
import unittest
import xml.etree.ElementTree
Expand All @@ -38,6 +39,7 @@ def is_localhost_synonym(name):
from irods.column import Criterion
from irods.data_object import chunks, irods_dirname
import irods.test.helpers as helpers
import irods.test.modules as test_modules
import irods.keywords as kw
from irods.manager import data_object_manager
from irods.message import RErrorStack
Expand Down Expand Up @@ -1888,6 +1890,28 @@ def test_set_and_access_data_comments__issue_450(self):
finally:
d.unlink(force = True)

def _auto_close_test(self, data_object_path, content):
d = None
try:
d = self.sess.data_objects.get(data_object_path)
self.assertEqual(int(d.replicas[0].status), 1)
self.assertEqual(d.open('r').read().decode(), content)
finally:
if d: d.unlink(force = True)

def test_data_objects_auto_close_on_process_exit__issue_456(self):
program = os.path.join(test_modules.__path__[0], 'test_auto_close_of_data_objects__issue_456.py')
# Use the currently running Python interpreter binary to run the script in the child process.
p = subprocess.Popen([sys.executable,program], stdout=subprocess.PIPE)
data_object_path, expected_content = p.communicate()[0].decode().split()
self._auto_close_test(data_object_path, expected_content)

def test_data_objects_auto_close_on_function_exit__issue_456(self):
import irods.test.modules.test_auto_close_of_data_objects__issue_456 as test_module
data_object_path, expected_content = test_module.test(return_locals = ('name','expected_content'))
self._auto_close_test(data_object_path, expected_content)


if __name__ == '__main__':
# let the tests find the parent irods lib
sys.path.insert(0, os.path.abspath('../..'))
Expand Down
Empty file added irods/test/modules/__init__.py
Empty file.
46 changes: 46 additions & 0 deletions irods/test/modules/test_auto_close_of_data_objects__issue_456.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# This helper module can double as a Python script, allowing us to run the below
# test() method either within the current process or in a child process. The
# method in question can thus be called by the following unit tests so that we may assert
# proper data object auto-closing functionality under these respective scenarios:
#
# irods.test.data_obj_test.TestDataObjOps.test_data_objects_auto_close_on_function_exit__issue_456
# irods.test.data_obj_test.TestDataObjOps.test_data_objects_auto_close_on_process_exit__issue_456

from __future__ import print_function
import contextlib
try:
import irods.client_configuration as config
except ImportError:
pass
from datetime import datetime
import os
from irods.test import helpers

@contextlib.contextmanager
def auto_close_data_objects(value):
if 'config' not in globals():
yield
return
ORIGINAL_VALUE = config.data_objects.auto_close
try:
config.data_objects.auto_close = value
yield
finally:
config.data_objects.auto_close = ORIGINAL_VALUE

def test(return_locals = True):
with auto_close_data_objects(True):
expected_content = 'content'
ses = helpers.make_session()
name = '/{0.zone}/home/{0.username}/{1}-object.dat'.format(ses, helpers.unique_name(os.getpid(), datetime.now()))
f = ses.data_objects.open(name,'w')
f.write(expected_content.encode('utf8'))
L=locals()
# By default, ses and f will be automatically exported to calling frame (with L being returned),
# but by specifying a list/tuple of keys we can export only those specific locals by name.
return L if not isinstance(return_locals,(tuple,list)) \
else [ L[k] for k in return_locals ]

if __name__ == '__main__':
test_output = test()
print("{name} {expected_content}".format(**test_output))

0 comments on commit 2ba0b68

Please sign in to comment.