| Trees | Indices | Help |
|---|
|
|
1 # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3
4 # Flumotion - a streaming media server
5 # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L.
6 # Copyright (C) 2010,2011 Flumotion Services, S.A.
7 # All rights reserved.
8 #
9 # This file may be distributed and/or modified under the terms of
10 # the GNU Lesser General Public License version 2.1 as published by
11 # the Free Software Foundation.
12 # This file is distributed without any warranty; without even the implied
13 # warranty of merchantability or fitness for a particular purpose.
14 # See "LICENSE.LGPL" in the source distribution for more information.
15 #
16 # Headers in this file shall remain intact.
17
18 import errno
19 import os
20 import stat
21 import tempfile
22 import threading
23 import time
24
25 from twisted.internet import defer, reactor, threads, abstract
26
27 from flumotion.common import log, common
28 from flumotion.component.misc.httpserver import cachestats
29 from flumotion.component.misc.httpserver import cachemanager
30 from flumotion.component.misc.httpserver import fileprovider
31 from flumotion.component.misc.httpserver import localpath
32 from flumotion.component.misc.httpserver.fileprovider import FileClosedError
33 from flumotion.component.misc.httpserver.fileprovider import FileError
34 from flumotion.component.misc.httpserver.fileprovider import NotFoundError
35
36
37 SEEK_SET = 0 # os.SEEK_SET is not defined in python 2.4
38 FILE_COPY_BUFFER_SIZE = abstract.FileDescriptor.bufferSize
39 MAX_LOGNAME_SIZE = 30 # maximum number of characters to use for logging a path
40
41
42 LOG_CATEGORY = "fileprovider-localcached"
43
44
45 errnoLookup = {errno.ENOENT: fileprovider.NotFoundError,
46 errno.EISDIR: fileprovider.CannotOpenError,
47 errno.EACCES: fileprovider.AccessError}
48
49
51 """
52 @rtype: (file, statinfo)
53 """
54 try:
55 handle = open(path, mode)
56 fd = handle.fileno()
57 except IOError, e:
58 cls = errnoLookup.get(e.errno, fileprovider.FileError)
59 raise cls("Failed to open file '%s': %s" % (path, str(e)))
60 try:
61 info = os.fstat(fd)
62 except OSError, e:
63 handle.close()
64 cls = errnoLookup.get(e.errno, fileprovider.FileError)
65 raise cls("Failed to stat file '%s': %s" % (path, str(e)))
66 return handle, info
67
68
71 """
72
73 WARNING: Currently does not work properly in combination with rate-control.
74
75 I'm caching the files taken from a mounted
76 network file system to a shared local directory.
77 Multiple instances can share the same cache directory,
78 but it's recommended to use slightly different values
79 for the property cleanup-high-watermark.
80 I'm using the directory access time to know when
81 the cache usage changed and keep an estimation
82 of the cache usage for statistics.
83
84 I'm creating a unique thread to do the file copying block by block,
85 for all files to be copied to the cache.
86 Using a thread instead of a reactor.callLater 'loop' allow for
87 higher copy throughput and do not slow down the mail loop when
88 lots of files are copied at the same time.
89 Simulations with real request logs show that using a thread
90 gives better results than the equivalent asynchronous implementation.
91 """
92
93 logCategory = LOG_CATEGORY
94
96 props = args['properties']
97 self._sourceDir = props.get('path')
98 cacheDir = props.get('cache-dir')
99 cacheSizeInMB = props.get('cache-size')
100 if cacheSizeInMB is not None:
101 cacheSize = cacheSizeInMB * 10 ** 6 # in bytes
102 else:
103 cacheSize = None
104 cleanupEnabled = props.get('cleanup-enabled')
105 cleanupHighWatermark = props.get('cleanup-high-watermark')
106 cleanupLowWatermark = props.get('cleanup-low-watermark')
107
108 self._sessions = {} # {CopySession: None}
109 self._index = {} # {path: CopySession}
110
111 self.stats = cachestats.CacheStatistics()
112
113 self.cache = cachemanager.CacheManager(self.stats,
114 cacheDir, cacheSize,
115 cleanupEnabled,
116 cleanupHighWatermark,
117 cleanupLowWatermark)
118
119 common.ensureDir(self._sourceDir, "source")
120
121 # Startup copy thread
122 self._thread = CopyThread(self)
123
125 self.debug('Starting cachedprovider plug for component %r', component)
126 d = self.cache.setUp()
127 d.addCallback(lambda x: self._thread.start())
128 return d
129
131 self.debug('Stopping cachedprovider plug for component %r', component)
132 self._thread.stop()
133 dl = []
134 for s in self._index.values():
135 d = s.close()
136 if d:
137 dl.append(d)
138 if len(dl) != 0:
139 return defer.DeferredList(dl)
140
142 #FIXME: This is temporary. Should be done with plug UI.
143 # Used for the UI to know which plug is used
144 updater.update("provider-name", "fileprovider-localcached")
145 self.stats.startUpdates(updater)
146
149
154
155
156 ## Protected Methods ##
157
159 """
160 Returns a log name for a path, shortened to a maximum size
161 specified by the global variable MAX_LOGNAME_SIZE.
162 The log name will be the filename part of the path postfixed
163 by the id in brackets if id is not None.
164 """
165 filename = os.path.basename(path)
166 basename, postfix = os.path.splitext(filename)
167 if id is not None:
168 postfix += "[%s]" % id
169 prefixMaxLen = MAX_LOGNAME_SIZE - len(postfix)
170 if len(basename) > prefixMaxLen:
171 basename = basename[:prefixMaxLen-1] + "*"
172 return basename + postfix
173
176
178 # First outdate existing session for the path
179 self.outdateCopySession(path)
180 # Then create a new one
181 session = CopySession(self, path, file, info)
182 self._index[path] = session
183 return session
184
189
191 path = session.sourcePath
192 if path in self._index:
193 del self._index[path]
194 self.disableSession(session)
195
197 self.debug("Starting Copy Session '%s' (%d)",
198 session.logName, len(self._sessions))
199 if session in self._sessions:
200 return
201 self._sessions[session] = None
202 self._activateCopyLoop()
203
205 self.debug("Stopping Copy Session '%s' (%d)",
206 session.logName, len(self._sessions))
207 if session in self._sessions:
208 del self._sessions[session]
209 if not self._sessions:
210 self._disableCopyLoop()
211
213 self._thread.wakeup()
214
216 self._thread.sleep()
217
218
220
221 logCategory = LOG_CATEGORY
222
224 localpath.LocalPath.__init__(self, path)
225 self.logName = plug.getLogName(path)
226 self.plug = plug
227
231
235
236
237 ## Private Methods ##
238
247
248
250
251 logCategory = LOG_CATEGORY
252
254 threading.Thread.__init__(self)
255 self.plug = plug
256 self._running = True
257 self._event = threading.Event()
258
263
265 self._event.set()
266
268 self._event.clear()
269
271 while self._running:
272 sessions = self.plug._sessions.keys()
273 for session in sessions:
274 try:
275 session.doServe()
276 except Exception, e:
277 log.warning("Error during async file serving: %s",
278 log.getExceptionMessage(e))
279 try:
280 session.doCopy()
281 except Exception, e:
282 log.warning("Error during file copy: %s",
283 log.getExceptionMessage(e))
284 self._event.wait()
285
286
289
290
292 """
293 I'm serving a file at the same time I'm copying it
294 from the network file system to the cache.
295 If the client ask for data not yet copied, the source file
296 read operation is delegated the the copy thread as an asynchronous
297 operation because file seeking/reading is not thread safe.
298
299 The copy session have to open two times the temporary file,
300 one for read-only and one for write only,
301 because closing a read/write file change the modification time.
302 We want the modification time to be set to a known value
303 when the copy is finished even keeping read access to the file.
304
305 The session manage a reference counter to know how many TempFileDelegate
306 instances are using the session to delegate read operations.
307 This is done for two reasons:
308 - To avoid circular references by have the session manage
309 a list of delegate instances.
310 - If not cancelled, sessions should not be deleted
311 when no delegates reference them anymore. So weakref cannot be used.
312 """
313
314 logCategory = LOG_CATEGORY
315
317 self.plug = plug
318 self.logName = plug.getLogName(sourcePath, sourceFile.fileno())
319 self.copying = None # Not yet started
320 self.sourcePath = sourcePath
321 self.tempPath = plug.cache.getTempPath(sourcePath)
322 self.cachePath = plug.cache.getCachePath(sourcePath)
323 # The size and modification time is not supposed to change over time
324 self.mtime = sourceInfo[stat.ST_MTIME]
325 self.size = sourceInfo[stat.ST_SIZE]
326 self._sourceFile = sourceFile
327 self._cancelled = False # True when a session has been outdated
328 self._wTempFile = None
329 self._rTempFile = None
330 self._allocTag = None # Tag used to identify cache allocations
331 self._waitCancel = None
332 # List of the pending read from source file
333 self._pending = [] # [(position, size, defer),]
334 self._refCount = 0
335 self._copied = 0 # None when the file is fully copied
336 self._correction = 0 # Used to take into account copies data for stats
337 self._startCopyingDefer = self._startCopying()
338
342
344 # If the temporary file is open for reading
345 if self._rTempFile:
346 # And the needed data is already downloaded
347 # Safe to read because it's not used by the copy thread
348 if (self._copied is None) or ((position + size) <= self._copied):
349 try:
350 self._rTempFile.seek(position)
351 data = self._rTempFile.read(size)
352 # Adjust the cache/source values to take copy into account
353 size = len(data)
354 # It's safe to use and modify self._correction even if
355 # it's used by the copy thread because the copy thread
356 # only add and the main thread only subtract.
357 # The only thing that could append it's a less accurate
358 # correction...
359 diff = min(self._correction, size)
360 self._correction -= diff
361 stats.onBytesRead(0, size, diff)
362 return data
363 except Exception, e:
364 self.warning("Failed to read from temporary file: %s",
365 log.getExceptionMessage(e))
366 self._cancelSession()
367 # If the source file is not open anymore, we can't continue
368 if self._sourceFile is None:
369 raise FileError("File caching error, cannot proceed")
370 # Otherwise read the data directly from the source
371 try:
372 # It's safe to not use Lock, because simple type operations
373 # are thread safe, and even if the copying state change
374 # from True to False, _onCopyFinished will be called
375 # later in the same thread and will process pending reads.
376 if self.copying:
377 # If we are currently copying the source file,
378 # we defer the file read to the copying thread
379 # because we can't read a file from two threads.
380 d = defer.Deferred()
381
382 def updateStats(data):
383 stats.onBytesRead(len(data), 0, 0)
384 return data
385
386 d.addCallback(updateStats)
387 self._pending.append((position, size, d))
388 return d
389 # Not copying, it's safe to read directly
390 self._sourceFile.seek(position)
391 data = self._sourceFile.read(size)
392 stats.onBytesRead(len(data), 0, 0)
393 return data
394 except IOError, e:
395 cls = errnoLookup.get(e.errno, FileError)
396 raise cls("Failed to read source file: %s" % str(e))
397
400
402 self._refCount -= 1
403 # If there is only one client and the session has been cancelled,
404 # stop copying and and serve the source file directly
405 if (self._refCount == 1) and self._cancelled:
406 # Cancel the copy and close the writing temporary file.
407 self._cancelCopy(False, True)
408 # We close if the copy is finished (if _copied is None)
409 if (self._refCount == 0) and (self._copied is None):
410 self.close()
411
413 self.log("Closing copy session")
414 # Cancel the copy, close the source file and the writing temp file.
415 self._cancelCopy(True, True)
416 self._closeReadTempFile()
417 self.plug.removeCopySession(self)
418 self.plug = None
419
421 if self._startCopyingDefer:
422 d = self._startCopyingDefer
423 self._startCopyingDefer = None
424 d.addCallback(lambda _: self._close())
425 return d
426
428 if not (self.copying and self._pending):
429 # Nothing to do anymore.
430 return False
431 # We have pending source file read operations
432 position, size, d = self._pending.pop(0)
433 self._sourceFile.seek(position)
434 data = self._sourceFile.read(size)
435 # Call the deferred in the main thread
436 reactor.callFromThread(d.callback, data)
437 return len(self._pending) > 0
438
440 # Called in the copy thread context.
441 if not self.copying:
442 # Nothing to do anymore.
443 return False
444 # Copy a buffer from the source file to the temporary writing file
445 cont = True
446 try:
447 # It's safe to use self._copied, because it's only set
448 # by the copy thread during copy.
449 self._sourceFile.seek(self._copied)
450 self._wTempFile.seek(self._copied)
451 data = self._sourceFile.read(FILE_COPY_BUFFER_SIZE)
452 self._wTempFile.write(data)
453 self._wTempFile.flush()
454 except IOError, e:
455 self.warning("Failed to copy source file: %s",
456 log.getExceptionMessage(e))
457 # Abort copy and cancel the session
458 self.copying = False
459 reactor.callFromThread(self.plug.disableSession, self)
460 reactor.callFromThread(self._cancelSession)
461 # Do not continue
462 cont = False
463 else:
464 size = len(data)
465 self._copied += size
466 self._correction += size
467 if size < FILE_COPY_BUFFER_SIZE:
468 # Stop copying
469 self.copying = False
470 reactor.callFromThread(self.plug.disableSession, self)
471 reactor.callFromThread(self._onCopyFinished)
472 cont = False
473 # Check for cancellation
474 if self._waitCancel and self.copying:
475 # Copy has been cancelled
476 self.copying = False
477 reactor.callFromThread(self.plug.disableSession, self)
478 reactor.callFromThread(self._onCopyCancelled, *self._waitCancel)
479 return False
480 return cont
481
482
483 ## Private Methods ##
484
486 # Retrieve a cache allocation tag, used to track the cache free space
487 return self.plug.cache.allocateCacheSpace(self.size)
488
490 if not (self._cancelled or self._allocTag is None):
491 self.plug.cache.releaseCacheSpace(self._allocTag)
492 self._allocTag = None
493
495 if not self._cancelled:
496 self.log("Canceling copy session")
497 # Not a valid copy session anymore
498 self._cancelled = True
499 # If there is no more than 1 client using the session,
500 # stop copying and and serve the source file directly
501 if self._refCount <= 1:
502 # Cancel and close the temp write file.
503 self._cancelCopy(False, True)
504
506 self._allocTag = tag
507
508 if not tag:
509 # No free space, proxying source file directly
510 self._cancelSession()
511 return
512 self.plug.stats.onCopyStarted()
513 # Then open a transient temporary files
514 try:
515 fd, transientPath = tempfile.mkstemp(".tmp", LOG_CATEGORY)
516 self.log("Created transient file '%s'", transientPath)
517 self._wTempFile = os.fdopen(fd, "wb")
518 self.log("Opened temporary file for writing [fd %d]",
519 self._wTempFile.fileno())
520 self._rTempFile = file(transientPath, "rb")
521 self.log("Opened temporary file for reading [fd %d]",
522 self._rTempFile.fileno())
523 except IOError, e:
524 self.warning("Failed to open temporary file: %s",
525 log.getExceptionMessage(e))
526 self._cancelSession()
527 return
528 # Truncate it to the source size
529 try:
530 self.log("Truncating temporary file to size %d", self.size)
531 self._wTempFile.truncate(self.size)
532 except IOError, e:
533 self.warning("Failed to truncate temporary file: %s",
534 log.getExceptionMessage(e))
535 self._cancelSession()
536 return
537 # And move it to the real temporary file path
538 try:
539 self.log("Renaming transient file to '%s'", self.tempPath)
540 os.rename(transientPath, self.tempPath)
541 except IOError, e:
542 self.warning("Failed to rename transient temporary file: %s",
543 log.getExceptionMessage(e))
544 # And start copying
545 self.debug("Start caching '%s' [fd %d]",
546 self.sourcePath, self._sourceFile.fileno())
547 # Activate the copy
548 self.copying = True
549 self.plug.activateSession(self)
550
552 self.log("Start copy session")
553 # First ensure there is not already a temporary file
554 self._removeTempFile()
555 # Reserve cache space, may trigger a cache cleanup
556 d = self._allocCacheSpace()
557 d.addCallback(self._gotCacheSpace)
558 return d
559
561 if self.copying:
562 self.log("Canceling file copy")
563 if self._waitCancel:
564 # Already waiting for cancellation.
565 return
566 self.debug("Cancel caching '%s' [fd %d]",
567 self.sourcePath, self._sourceFile.fileno())
568 # Disable the copy, we do not modify copying directly
569 # to let the copying thread terminate current operations.
570 # The file close operation are deferred.
571 self._waitCancel = (closeSource, closeTempWrite)
572 return
573 # No pending copy, we can close the files
574 if closeSource:
575 self._closeSourceFile()
576 if closeTempWrite:
577 self._closeWriteTempFile()
578
580 self.log("Copy session cancelled")
581 # Called when the copy thread really stopped to read/write
582 self._waitCancel = None
583 self.plug.stats.onCopyCancelled(self.size, self._copied)
584 # Resolve all pending source read operations
585 for position, size, d in self._pending:
586 if self._sourceFile is None:
587 d.errback(CopySessionCancelled())
588 else:
589 try:
590 self._sourceFile.seek(position)
591 data = self._sourceFile.read(size)
592 d.callback(data)
593 except Exception, e:
594 self.warning("Failed to read from source file: %s",
595 log.getExceptionMessage(e))
596 d.errback(e)
597 self._pending = []
598 # then we can safely close files
599 if closeSource:
600 self._closeSourceFile()
601 if closeTempWrite:
602 self._closeWriteTempFile()
603
605 if self._sourceFile is None:
606 return
607 # Called when the copy thread really stopped to read/write
608 self.debug("Finished caching '%s' [fd %d]",
609 self.sourcePath, self._sourceFile.fileno())
610 self.plug.stats.onCopyFinished(self.size)
611 # Set the copy as finished to prevent the temporary file
612 # to be deleted when closed
613 self._copied = None
614 # Closing source and write files
615 self._closeSourceFile()
616 self._closeWriteTempFile()
617 # Setting the modification time on the temporary file
618 try:
619 mtime = self.mtime
620 atime = int(time.time())
621 self.log("Setting temporary file modification time to %d", mtime)
622 # FIXME: Should use futimes, but it's not wrapped by python
623 os.utime(self.tempPath, (atime, mtime))
624 except OSError, e:
625 if e.errno == errno.ENOENT:
626 # The file may have been deleted by another process
627 self._releaseCacheSpace()
628 else:
629 self.warning("Failed to update modification time of temporary "
630 "file: %s", log.getExceptionMessage(e))
631 self._cancelSession()
632 try:
633 self.log("Renaming temporary file to '%s'", self.cachePath)
634 os.rename(self.tempPath, self.cachePath)
635 except OSError, e:
636 if e.errno == errno.ENOENT:
637 self._releaseCacheSpace()
638 else:
639 self.warning("Failed to rename temporary file: %s",
640 log.getExceptionMessage(e))
641 self._cancelSession()
642 # Complete all pending source read operations with the temporary file.
643 for position, size, d in self._pending:
644 try:
645 self._rTempFile.seek(position)
646 data = self._rTempFile.read(size)
647 d.callback(data)
648 except Exception, e:
649 self.warning("Failed to read from temporary file: %s",
650 log.getExceptionMessage(e))
651 d.errback(e)
652 self._pending = []
653 if self._refCount == 0:
654 # We were waiting for the file to be copied to close it.
655 self.close()
656
658 try:
659 os.remove(self.tempPath)
660 self.log("Deleted temporary file '%s'", self.tempPath)
661 # Inform the plug that cache space has been released
662 self._releaseCacheSpace()
663 except OSError, e:
664 if e.errno == errno.ENOENT:
665 if self._wTempFile is not None:
666 # Already deleted but inform the plug anyway
667 self._releaseCacheSpace()
668 else:
669 self.warning("Error deleting temporary file: %s",
670 log.getExceptionMessage(e))
671
673 if self._sourceFile is not None:
674 self.log("Closing source file [fd %d]", self._sourceFile.fileno())
675 try:
676 try:
677 self._sourceFile.close()
678 finally:
679 self._sourceFile = None
680 except IOError, e:
681 self.warning("Failed to close source file: %s",
682 log.getExceptionMessage(e))
683
685 if self._rTempFile is not None:
686 self.log("Closing temporary file for reading [fd %d]",
687 self._rTempFile.fileno())
688 try:
689 try:
690 self._rTempFile.close()
691 finally:
692 self._rTempFile = None
693 except IOError, e:
694 self.warning("Failed to close temporary file for reading: %s",
695 log.getExceptionMessage(e))
696
698 if self._wTempFile is not None:
699 # If the copy is not finished, remove the temporary file
700 if not self._cancelled and self._copied is not None:
701 self._removeTempFile()
702 self.log("Closing temporary file for writing [fd %d]",
703 self._wTempFile.fileno())
704 try:
705 try:
706 self._wTempFile.close()
707 finally:
708 self._wTempFile = None
709 except Exception, e:
710 self.warning("Failed to close temporary file for writing: %s",
711 log.getExceptionMessage(e))
712
713
715
716 logCategory = LOG_CATEGORY
717
719 self.logName = plug.getLogName(session.sourcePath)
720 self.mtime = session.mtime
721 self.size = session.size
722 self._session = session
723 self._reading = False
724 self._position = 0
725 session.incRef()
726
729
732
734 assert not self._reading, "Simultaneous read not supported"
735 d = self._session.read(self._position, size, stats)
736 if isinstance(d, defer.Deferred):
737 self._reading = True
738 return d.addCallback(self._cbGotData)
739 self._position += len(d)
740 return d
741
746
747
748 ## Private Methods ##
749
754
755
757
758 logCategory = LOG_CATEGORY
759
760 # Default values
761 _file = None
762
764 self.logName = plug.getLogName(path, file.fileno())
765 self._file = file
766 # The size and modification time is not supposed to change over time
767 self.mtime = info[stat.ST_MTIME]
768 self.size = info[stat.ST_SIZE]
769
771 try:
772 return self._file.tell()
773 except IOError, e:
774 cls = errnoLookup.get(e.errno, FileError)
775 raise cls("Failed to tell position in file: %s" % str(e))
776
778 try:
779 self._file.seek(offset, SEEK_SET)
780 except IOError, e:
781 cls = errnoLookup.get(e.errno, FileError)
782 raise cls("Failed to seek in cached file: %s" % str(e))
783
785 try:
786 return self._file.read(size)
787 except IOError, e:
788 cls = errnoLookup.get(e.errno, FileError)
789 raise cls("Failed to read data from file: %s" % str(e))
790
801
802
804
806 data = DirectFileDelegate.read(self, size)
807 stats.onBytesRead(0, len(data), 0)
808 return data
809
814
815
817
818 logCategory = LOG_CATEGORY
819
820 # Overriding parent class properties to become attribute
821 mimeType = None
822
823 # Default values
824 _delegate = None
825
827 self.logName = plug.getLogName(path)
828 self.plug = plug
829 self._path = path
830 self.mimeType = mimeType
831 self.stats = cachestats.RequestStatistics(plug.stats)
832 self._delegate = None
833
835 # Opening source file in a separate thread, as it usually involves
836 # accessing a network filesystem (which would block the reactor)
837 d = threads.deferToThread(open_stat, self._path)
838 d.addCallbacks(self._selectDelegate, self._sourceOpenFailed)
839
840 def _setDelegate(delegate):
841 self._delegate = delegate
842 d.addCallback(_setDelegate)
843 d.addCallback(lambda _: self)
844 return d
845
847 failure.trap(NotFoundError)
848 self.debug("Source file %r not found", self._path)
849 self.plug.outdateCopySession(self._path)
850 cachedPath = self.plug.cache.getCachePath(self._path)
851 self._removeCachedFile(cachedPath)
852 raise failure
853
856
858 if self._delegate is None:
859 raise FileClosedError("File closed")
860 return self._delegate.mtime
861
863 if self._delegate is None:
864 raise FileClosedError("File closed")
865 return self._delegate.size
866
868 if self._delegate is None:
869 raise FileClosedError("File closed")
870 return self._delegate.tell()
871
873 if self._delegate is None:
874 raise FileClosedError("File closed")
875 return self._delegate.seek(offset)
876
878 if self._delegate is None:
879 raise FileClosedError("File closed")
880 try:
881 d = self._delegate.read(size, self.stats)
882 if isinstance(d, defer.Deferred):
883 return d
884 return defer.succeed(d)
885 except IOError, e:
886 cls = errnoLookup.get(e.errno, FileError)
887 return defer.fail(cls("Failed to read cached data: %s", str(e)))
888 except:
889 return defer.fail()
890
892 if self._delegate:
893 self.stats.onClosed()
894 self._delegate.close()
895 self._delegate = None
896
898 self.close()
899
902
903
904 ## Private Methods ##
905
907 self.log("Closing source file [fd %d]", sourceFile.fileno())
908 try:
909 sourceFile.close()
910 except Exception, e:
911 self.warning("Failed to close source file: %s",
912 log.getExceptionMessage(e))
913
915 sourcePath = self._path
916 self.log("Selecting delegate for source file %r [fd %d]",
917 sourcePath, sourceFile.fileno())
918 # Update the log name
919 self.logName = self.plug.getLogName(self._path, sourceFile.fileno())
920 # Opening cached file
921 cachedPath = self.plug.cache.getCachePath(sourcePath)
922 try:
923 cachedFile, cachedInfo = open_stat(cachedPath)
924 self.log("Opened cached file [fd %d]", cachedFile.fileno())
925 except NotFoundError:
926 self.debug("Did not find cached file '%s'", cachedPath)
927 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
928 except FileError, e:
929 self.debug("Failed to open cached file: %s", str(e))
930 self._removeCachedFile(cachedPath)
931 return self._tryTempFile(sourcePath, sourceFile, sourceInfo)
932 # Found a cached file, now check the modification time
933 self.debug("Found cached file '%s'", cachedPath)
934 sourceTime = sourceInfo[stat.ST_MTIME]
935 cacheTime = cachedInfo[stat.ST_MTIME]
936 if sourceTime != cacheTime:
937 # Source file changed, remove file and start caching again
938 self.debug("Cached file out-of-date (%d != %d)",
939 sourceTime, cacheTime)
940 self.stats.onCacheOutdated()
941 self.plug.outdateCopySession(sourcePath)
942 self._removeCachedFile(cachedPath)
943 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
944 self._closeSourceFile(sourceFile)
945 # We have a valid cached file, just delegate to it.
946 self.debug("Serving cached file '%s'", cachedPath)
947 delegate = CachedFileDelegate(self.plug, cachedPath,
948 cachedFile, cachedInfo)
949 self.stats.onStarted(delegate.size, cachestats.CACHE_HIT)
950 return delegate
951
953 try:
954 os.remove(cachePath)
955 self.debug("Deleted cached file '%s'", cachePath)
956 except OSError, e:
957 if e.errno != errno.ENOENT:
958 self.warning("Error deleting cached file: %s", str(e))
959
961 session = self.plug.getCopySession(sourcePath)
962 if session is None:
963 self.debug("No copy sessions found")
964 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
965 self.debug("Copy session found")
966 if sourceInfo[stat.ST_MTIME] != session.mtime:
967 self.debug("Copy session out-of-date (%d != %d)",
968 sourceInfo[stat.ST_MTIME], session.mtime)
969 self.stats.onCacheOutdated()
970 session.outdate()
971 return self._cacheFile(sourcePath, sourceFile, sourceInfo)
972 self._closeSourceFile(sourceFile)
973 # We have a valid session, just delegate to it.
974 self.debug("Serving temporary file '%s'", session.tempPath)
975 delegate = TempFileDelegate(self.plug, session)
976 self.stats.onStarted(delegate.size, cachestats.TEMP_HIT)
977 return delegate
978
980 session = self.plug.createCopySession(sourcePath, sourceFile,
981 sourceInfo)
982 self.debug("Serving temporary file '%s'", session.tempPath)
983 delegate = TempFileDelegate(self.plug, session)
984 self.stats.onStarted(delegate.size, cachestats.CACHE_MISS)
985 return delegate
986
| Trees | Indices | Help |
|---|
| Generated by Epydoc 3.0.1 on Mon May 11 00:19:42 2015 | http://epydoc.sourceforge.net |