Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 50 additions & 7 deletions tensorboardX/event_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
from __future__ import division
from __future__ import print_function

import atexit
import os
import socket
import threading
import time
import weakref

import six

Expand Down Expand Up @@ -109,7 +111,27 @@ def __init__(self, logdir, max_queue_size=10, flush_secs=120, filename_suffix=''
self._closed = False
self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
flush_secs)

# Make sure that at exit, the thread is shut down cleanly and events
# are flushed to disk. A weak reference is used to not block garbage
# collection of this object and the thread.
def thread_finalizer(ref):
obj = ref()
if obj is not None:
# If the `EventFileWriter` object instance is deleted, then
# `obj` is `None`; do nothing. This means `__del__` has
# already been called and has already called `_close(obj)`.
# This can happen when this thread finalizer is called at
# exit after the thread has already been terminated earlier
# in the code.
EventFileWriter._close(obj)
if hasattr(weakref, 'finalize'):
# Python 3
self._thread_finalizer = weakref.finalize(self._worker,
thread_finalizer,
weakref.ref(self))
else:
# Python 2
atexit.register(thread_finalizer, weakref.ref(self))
self._worker.start()

def get_logdir(self):
Expand Down Expand Up @@ -153,11 +175,24 @@ def close(self):
write/flush worker and closes the file. Call this method when you do not
need the summary writer anymore.
"""
if not self._closed:
self.flush()
self._worker.stop()
self._ev_writer.close()
self._closed = True
EventFileWriter._close(self)

@staticmethod
def _close(obj):
"""This method is static so that a `weakref.finalizer` could run it
without storing a reference to the object that produces the `close()`
method, thus allowing this object to be garbage collected before the
interpreter begins exit procedures.
"""
if not obj._closed:
obj._worker.stop()
obj._worker.join()
obj.flush()
obj._ev_writer.close()
obj._closed = True

def __del__(self):
self.close()


class _EventLoggerThread(threading.Thread):
Expand All @@ -173,6 +208,15 @@ def __init__(self, queue, record_writer, flush_secs):
pending file to disk.
"""
threading.Thread.__init__(self)
# NOTE: although this thread writes to disk, it is a daemon thread
# so that the python interpretor does not wait until the thread
# completes to begin tearing down the environment at the end of a
# script. Instead, a weakref finalizer ensures clean termination
# of this thread at exit before the rest of the environment is torn
# down. If this thread were not a daemon, the user would have to
# manually call close() on the summary writer that spawns this thread
# in order to terminate it and allow the main process to exit at the
# end of a script.
self.daemon = True
self._queue = queue
self._record_writer = record_writer
Expand All @@ -184,7 +228,6 @@ def __init__(self, queue, record_writer, flush_secs):

def stop(self):
self._queue.put(self._shutdown_signal)
self.join()

def run(self):
# Here wait on the queue until an data appears, or till the next
Expand Down