ce44090b6436dc487e7a6fe3366a4be16a164ad5
[time-slider.git] / usr / share / time-slider / lib / plugin / rsync / backup.py
1 #!/usr/bin/python2.6
2 #
3 # CDDL HEADER START
4 #
5 # The contents of this file are subject to the terms of the
6 # Common Development and Distribution License (the "License").
7 # You may not use this file except in compliance with the License.
8 #
9 # You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 # or http://www.opensolaris.org/os/licensing.
11 # See the License for the specific language governing permissions
12 # and limitations under the License.
13 #
14 # When distributing Covered Code, include this CDDL HEADER in each
15 # file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 # If applicable, add the following below this CDDL HEADER, with the
17 # fields enclosed by brackets "[]" replaced with your own identifying
18 # information: Portions Copyright [yyyy] [name of copyright owner]
19 #
20 # CDDL HEADER END
21 #
22
23 import os
24 import os.path
25 import fcntl
26 import tempfile
27 import sys
28 import subprocess
29 import statvfs
30 import time
31 import threading
32 import math
33 import syslog
34 import gobject
35 import gio
36 import dbus
37 import shutil
38 import copy
39 from bisect import insort, bisect_left
40
41 from time_slider import util, zfs, dbussvc, autosnapsmf, timeslidersmf
42 import rsyncsmf
43
44
45 # Set to True if SMF property value of "plugin/command" is "true"
46 verboseprop = "plugin/verbose"
47 propbasename = "org.opensolaris:time-slider-plugin"
48
49 class RsyncError(Exception):
50     """Generic base class for RsyncError
51
52     Attributes:
53         msg -- explanation of the error
54     """
55     def __init__(self, msg):
56         self.msg = msg
57     def __str__(self):
58         return repr(self.msg)
59
60
61 class RsyncTargetDisconnectedError(RsyncError):
62     """Exception raised when the backup device goes offline during
63        the rsync transfer.
64
65     Attributes:
66         msg -- explanation of the error
67     """
68     def __init__(self, source, dest, message):
69         msg = "Target directory error during rsync backup from " \
70               "%s to target \'%s\' Rsync error details:\n%s" \
71               % (source, dest, message)
72         RsyncError.__init__(self, msg)
73
74
75 class RsyncTransferInterruptedError(RsyncError):
76     """Exception raised when the rsync transfer process pid was
77        interrupted or killed during the rsync transfer.
78
79     Attributes:
80         msg -- explanation of the error
81     """
82     def __init__(self, source, dest, message):
83         msg = "Interrputed rsync transfer from %s to %s " \
84               "Rsync error details:\n%s" % (source, dest, message)
85         RsyncError.__init__(self, msg)
86
87
88 class RsyncSourceVanishedError(RsyncError):
89     """Exception raised when rsync could only partially transfer
90        due to the contents of the source directory being removed.
91        Possibly due to a snapshot being destroyed during transfer
92        because of immediate or deferred (holds released) destruction.
93
94     Attributes:
95         msg -- explanation of the error
96     """
97     def __init__(self, source, dest, message):
98         msg = "Rsync source directory vanished during transfer of %s to %s" \
99               "Rsync error details:\n%s" % (source, dest, message)
100         RsyncError.__init__(self, msg)
101
102
103 class RsyncProcess(threading.Thread):
104
105
106     def __init__(self, source, target, latest=None, verbose=False, logfile=None):
107
108         self._sourceDir = source
109         self._backupDir = target
110         self._latest = latest
111         self._verbose = verbose
112         self._proc = None
113         self._forkError = None
114         self._logFile = logfile
115         # Init done. Now initiaslise threading.
116         threading.Thread.__init__ (self)
117
118     def run(self):
119         try:
120             self._proc = subprocess.Popen(self._cmd,
121                                           stderr=subprocess.PIPE,
122                                           close_fds=True)
123         except OSError as e:
124             # _check_exit_code() will pick up this and raise an
125             # exception in the original thread.
126             self._forkError = "%s: %s" % (self._cmd[0], str(e))
127         else:
128             self._stdout,self._stderr = self._proc.communicate()
129             self._exitValue = self._proc.wait()
130
131     def _check_exit_code(self):
132         if self._forkError:
133             # The rsync process failed to execute, probably
134             # received an OSError exception. Pass it up.
135             raise RsyncError(self._forkError)
136
137         if self._exitValue == 0:
138             return
139         # Non zero return code means rsync encountered an
140         # error which may be transient or require sys-admin
141         # intervention to fix.
142         
143         # This method basically just maps known rsync exit codes
144         # to exception classes.
145         
146         # Rsync exit value codes (non-zero)
147         
148         # 11/12 Indicates backup drive was disconnected during
149         # transfer. Recoverable once drive reconnected:
150         # 11   Error in file I/O
151         # 12   Error in rsync protocol data stream
152         if self._exitValue == 11 or \
153             self._exitValue == 12:
154             raise RsyncTargetDisconnectedError(self._sourceDir,
155                                                self._backupDir,
156                                                self._stderr)
157         # Transfer pid interrupted by SIGUSR1 or SIGINT. Recoverable:
158         # 20   Received SIGUSR1 or SIGINT
159         elif self._proc.returncode == 20:
160             raise RsyncTransferInterruptedError(self._sourceDir,
161                                                 self._backupDir,
162                                                 self._stderr)
163
164         # For everything else unknown or unexpected, treat it as 
165         # fatal and provide the rsync stderr output.
166         else:
167             raise RsyncError(self._stderr)
168
169     def start_backup(self):
170         # First, check to see if the rsync destination
171         # directory is accessible.
172         try:
173             os.stat(self._backupDir)
174         except OSError:
175             util.debug("Backup directory is not " \
176                        "currently accessible: %s" \
177                        % (self._backupDir),
178                        self._verbose)
179             #FIXME exit/exception needs to be raise here
180             # or status needs to be set.
181             return
182
183         try:
184             os.stat(self._sourceDir)
185         except OSError:
186             util.debug("Backup source directory is not " \
187                        "currently accessible: %s" \
188                        % (self._sourceDir),
189                        self._verbose)
190             #FIXME exit/excpetion needs to be raise here
191             # or status needs to be set.
192             return
193
194         if self._latest:
195             self._cmd = ["/usr/bin/rsync", "-a", "--inplace",\
196                    "%s/." % (self._sourceDir), \
197                    "--link-dest=%s" % (self._latest), \
198                    self._backupDir]
199         else:
200             self._cmd = ["/usr/bin/rsync", "-a", "--inplace",\
201                    "%s/." % (self._sourceDir), \
202                    self._backupDir]
203
204         if self._logFile:
205             self._cmd.insert(1, "--log-file=%s" % (self._logFile))
206         if self._verbose:
207             self._cmd.insert(1, "-vv")
208
209         self.start()
210
211
212 class BackupQueue():
213
214     def __init__(self, fmri, dbus, mainLoop=None):
215         self._bus = dbus
216         self._mainLoop = mainLoop
217         self._started = False
218         self._pluginFMRI = fmri
219         self._smfInst = rsyncsmf.RsyncSMF(self._pluginFMRI)
220         self._verbose = self._smfInst.get_verbose()
221         self._rsyncVerbose = self._smfInst.get_rsync_verbose()
222         self._propName = "%s:%s" % (propbasename, fmri.rsplit(':', 1)[1])
223
224         # Variables to quickly access time sorted backups and 
225         # to map directory names to mtimes.
226         self._backupDirs = None
227         self._backups = None
228         self._backupTimes = {}
229
230         released = release_held_snapshots(self._propName)
231         for snapName in released:
232             util.debug("Released dangling userref on: " + snapName,
233                        self._verbose)
234
235         self._tempSchedules = None
236         # List of all snapshots currently in our backup queue
237         self._pendingList = list_pending_snapshots(self._propName)
238
239         # Try to backup in sets of snapshots grouped by a common
240         # snapshot label. These get taken from the head of the
241         # pending list. After a working set has been completed,
242         # pending list gets refreshed and a new working set is
243         # extracted.
244         self._currentQueueSet = []
245         self._skipList = []
246         self._queueLength = 0
247
248         self._cleanupThreshold = self._smfInst.get_cleanup_threshold()
249         if self._cleanupThreshold < 1 or \
250            self._cleanupThreshold > 99:
251             # Stupid value. #FIXME Plugin should be checked by SMF start
252             # method and placed into maintenance if this happens.
253             # For now, go with default
254             util.log_error(syslog.LOG_ERR,
255                            "Invalid value for SMF property " \
256                            "<rsync/cleanup_threshold>: %d." \
257                            "Using default value of 95%" \
258                            % (self._cleanupThreshold))
259
260         # Base variables for backup device. Will be initialised
261         # later in _find_backup_device()
262         self._smfTargetKey = self._smfInst.get_target_key()
263         sys,self._nodeName,rel,ver,arch = os.uname()
264         self._rsyncBaseDir = None
265         self._rsyncDir = None
266         self._keyFile = None
267
268         tsSMF = timeslidersmf.TimeSliderSMF()
269         self._labelSeparator = tsSMF.get_separator()
270         del tsSMF
271         # Finally go look for the backup device
272         self._find_backup_device()
273
274     def empty_trash_folders(self):
275         trashDirs = []
276         trashedBackups = []
277         os.chdir(self._rsyncDir)
278         for root, dirs, files in os.walk(self._rsyncDir):
279             if '.time-slider' in dirs:
280                 dirs.remove ('.time-slider')
281                 trashDir = os.path.join(root,
282                                         rsyncsmf.RSYNCTRASHSUFFIX)
283                 if os.path.exists(trashDir):
284                     trashDirs.append(trashDir)
285         for trashDir in trashDirs:
286             os.chdir(trashDir)
287             trashItems = []
288             trashItems = [d for d in os.listdir(trashDir) \
289                           if os.path.isdir(d) and
290                           not os.path.islink(d)]
291             if len(trashItems) > 0:
292                 util.debug("Deleting trash backups in %s" % (trashDir),
293                            self._verbose)
294             for trashItem in trashItems:
295                 util.debug("Deleting trash item: %s" % (trashItem),
296                            self._verbose)
297                 # FIXME add some dbus notification here to let the
298                 # applet know what's going on.
299                 shutil.rmtree(trashItem)
300
301     def _get_temp_schedules(self):
302         # Get retention rule for non archival snapshots as per
303         # rules defined in:
304         # svc://system/filesystem/zfs/auto-snapshot:<schedule>
305         archived = self._smfInst.get_archived_schedules()
306         triggers = self._smfInst.get_trigger_list()
307         defScheds = autosnapsmf.get_default_schedules()
308         customScheds = autosnapsmf.get_custom_schedules()
309         try:
310             triggers.index('all')
311             # Expand the wildcard value 'all' 
312             triggers = [sched for sched,i,p,k in defScheds]
313             customTriggers = [sched for sched,i,p,k in customScheds]
314             triggers.extend(customTriggers)
315         except ValueError:
316             pass
317
318         self._tempSchedules = [schedule for schedule in defScheds if \
319                                schedule[0] not in archived]
320         self._tempSchedules.extend([schedule for schedule in customScheds if \
321                                    schedule[0] not in archived])
322
323     def _remove_dead_backups(self):
324         """
325            Identifies and removes partially completed backups whose origin
326            snapshot is no longer in the pending queue, indicating that the
327            backup will never get completed, in which case it's just a waste
328            of space
329         """
330         backupDirs = []
331         partialDirs = []
332         deadBackups = []
333         os.chdir(self._rsyncDir)
334         for root, dirs, files in os.walk(self._rsyncDir):
335             if '.time-slider' in dirs:
336                 dirs.remove ('.time-slider')
337                 partialDir = os.path.join(root,
338                                           rsyncsmf.RSYNCPARTIALSUFFIX)
339                 partialDirs.append(partialDir)
340         for dirName in partialDirs:
341             if not os.path.exists(partialDir):
342                 continue
343             os.chdir(dirName)
344             partials = [d for d in os.listdir(dirName) \
345                         if os.path.isdir(d)]
346             if len(partials) == 0:
347                 continue
348             suffix = rsyncsmf.RSYNCPARTIALSUFFIX
349             prefix = self._rsyncDir
350             # Reconstruct the origin ZFS filesystem name
351             baseName = dirName.replace(prefix, '', 1).lstrip('/')
352             fsName = baseName.replace(suffix, '', 1).rstrip('/')
353             for snapshotLabel in partials:
354                 pending = False
355                 # Reconstruct the origin snapshot name and see
356                 # if it's still pending rsync backup. If it is
357                 # then leave it alone since it can be used to
358                 # resume a partial backup later. Otherwise it's
359                 # never going to be backed up and needs to be
360                 # manually deleted.
361                 snapshotName = "%s@%s" % (fsName, snapshotLabel)
362                 for ctime,name in self._pendingList:
363                     if name == snapshotName:
364                         pending = True
365                         continue
366                 if pending == False:
367                     util.debug("Deleting zombied partial backup: %s" \
368                                % (os.path.abspath(snapshotLabel)),
369                                self._verbose)
370                     shutil.rmtree(snapshotLabel)
371                     # Don't forget the log file too.
372                     logFile = os.path.join(os.path.pardir,
373                                            os.path.pardir,
374                                            rsyncsmf.RSYNCLOGSUFFIX,
375                                            snapshotLabel + ".log")
376                     try:
377                         os.stat(logFile)
378                         util.debug("Deleting zombie log file: %s" \
379                                    % (os.path.abspath(logFile)),
380                                    self._verbose)
381                         os.unlink(logFile)
382                     except OSError:
383                         util.debug("Expected rsync log file not " \
384                                    "found: %s"\
385                                    % (os.path.abspath(logFile)),
386                                    self._verbose)
387
388     def _discover_backups(self):
389         self._backupDirs = []
390         self._backups = []
391         self._backupTimes = {}
392         # backupTimes dictionary is not primarily used
393         # to store directory listings, but to map 
394         # mtimes to directories for quick lookups.
395         
396         os.chdir (self._rsyncDir)
397         for root, dirs, files in os.walk(self._rsyncDir):
398             if '.time-slider' in dirs:
399                 dirs.remove('.time-slider')
400                 backupDir = os.path.join(root, rsyncsmf.RSYNCDIRSUFFIX)
401                 if os.path.exists(backupDir):
402                     insort(self._backupDirs, os.path.abspath(backupDir))
403         for dirName in self._backupDirs:
404             self._backupTimes[dirName] = {}
405             os.chdir(dirName)
406             dirList = [d for d in os.listdir(dirName) \
407                         if os.path.isdir(d) and
408                         not os.path.islink(d)]
409             for d in dirList:
410                 mtime = os.stat(d).st_mtime
411                 insort(self._backups, [long(mtime), os.path.abspath(d)])
412                 self._backupTimes[dirName][d] = mtime
413
414     def _find_backup_device(self):
415         # Determine the rsync backup dir. This is the target dir
416         # defined by the SMF instance plus the "TIMESLIDER/<nodename>"
417         # suffix. Try finding it at the preconfigured path first,
418         # then failing that, scan removable media mounts, in case it
419         # got remounted under a different path than at setup time.
420         self._rsyncBaseDir = None
421         path = self._smfInst.get_target_dir()
422         if self._validate_rsync_target(path) == True:
423             self._rsyncBaseDir = path
424             util.debug("Backup target device online: %s" % (path),
425                        self._verbose)
426         else:
427             util.debug("Backup target device not mounted at: %s" \
428                        "Scanning removable devices.." \
429                        % (path),
430                        self._verbose)
431             volMonitor = gio.volume_monitor_get()
432             mounts = volMonitor.get_mounts()
433             for mount in mounts:
434                 root = mount.get_root()
435                 path = root.get_path()
436                 if self._validate_rsync_target(path) == True:
437                     util.debug("Located backup target device at: %s" \
438                                % (path),
439                                self._verbose)
440                     self._rsyncBaseDir = path
441
442         if self._rsyncBaseDir != None:
443             self._rsyncDir = os.path.join(self._rsyncBaseDir,
444                                          rsyncsmf.RSYNCDIRPREFIX,
445                                          self._nodeName)
446             self._keyFile = os.path.join(self._rsyncBaseDir,
447                                          rsyncsmf.RSYNCDIRPREFIX,
448                                          rsyncsmf.RSYNCCONFIGFILE)
449
450         
451     def _validate_rsync_target(self, path):
452         """
453            Tests path to see if it is the pre-configured
454            rsync backup device path.
455            Returns True on success, otherwise False
456         """
457         # FIXME - this is duplicate in the applet and should
458         # be moved into a shared module
459         if not os.path.exists(path):
460             return False
461         testDir = os.path.join(path,
462                                rsyncsmf.RSYNCDIRPREFIX,
463                                self._nodeName)
464         testKeyFile = os.path.join(path,
465                                    rsyncsmf.RSYNCDIRPREFIX,
466                                    rsyncsmf.RSYNCCONFIGFILE)
467         if os.path.exists(testDir) and \
468             os.path.exists(testKeyFile):
469             testKeyVal = None
470             f = open(testKeyFile, 'r')
471             for line in f.readlines():
472                 key, val = line.strip().split('=')
473                 if key.strip() == "target_key":
474                     targetKey = val.strip()
475                     break
476             f.close()
477             if targetKey == self._smfTargetKey:
478                 return True
479         return False
480
481     def _find_deleteable_backups(self, timestamp):
482         """
483            Returns a list of backup directory paths that are older than
484            timestamp, and can be deleted in order to make room for newer
485            backups. Certain restrictions apply such as:
486            - The backup cannot be locked (indicating it's being or about
487              to be deleted by another process
488            - The last remaining backup in it's parent directory, which
489              would mean it is the only backup of a given fileystem on the
490              device. We always keep at least 1 full backup of a filesystem
491              on the device if it belongs to a currently selected filesystem.
492         """
493         deleteables = []
494         # This should have already occured in
495         # backup_snapshot() mainloop method
496         if self._backupDirs == None:
497             self._discover_backups()
498
499         if len(self._backupDirs) == 0:
500             # We were not able to find any backups to delete. Try again later
501             return []
502
503         # Extract the subset of self._backupDirs that are older than timestamp
504         idx = bisect_left(self._backups, [timestamp, ''])
505         subset = self._backups[0:idx]
506
507         # Copy the backupTimes dictionary since so we can
508         # modify it safely.
509         copiedTimes = copy.deepcopy(self._backupTimes)
510         for mtime, dirName in subset:
511             head,tail = os.path.split(dirName)
512             if len(copiedTimes[head]) < 2:
513                 # We can only delete this single backup provided
514                 # it's filesystem is no longer tagged for rsync
515                 # replication. Othewise we need to leave at least
516                 # one backup on the device at all times, and also
517                 # to provide an incremental backup point for
518                 # future backups.
519                 snapName = backup_name_to_snapshot_name(dirName)
520                 snapshot = zfs.Snapshot(snapName)
521                 fs = zfs.Filesystem(snapshot.fsname)
522                 if fs.get_user_property(rsyncsmf.RSYNCFSTAG) == "true":
523                     continue
524                 else:
525                     # If we are going to actually remove this final
526                     # remaining backup of this filesystem then we need
527                     # to unqueue any pending snapshots of it,
528                     # otherwise after deleting this one, we'll just 
529                     # start backing up the older snapshots of this 
530                     # filesystem afterwards, which is a waste of time
531                     # and space.
532                     pending = [name for time,name in \
533                                list_pending_snapshots(self._propName) if \
534                                name.find(snapshot.fsname + '@') == 0]
535                     cmd = [zfs.PFCMD, zfs.ZFSCMD, "inherit", self._propName]
536                     util.debug("Unqueuing pending backups of deselected " \
537                                "filesystem: " + snapshot.fsname + '\n' + \
538                                str(pending),
539                                self._verbose)
540                     cmd.extend(pending)
541                     util.run_command(cmd)
542
543             lockFileDir = os.path.join(head,
544                                        os.path.pardir,
545                                        os.path.pardir,
546                                        rsyncsmf.RSYNCLOCKSUFFIX)
547             lockFile = os.path.join(lockFileDir, tail)
548
549             if not os.path.exists(lockFile):
550                 # No lock file so we are free to delete it.
551                 deleteables.append([mtime, dirName])
552                 # Remove it from copiedTimes so it doesn't get
553                 # double counted
554                 del copiedTimes[head][tail]
555                 continue
556             # Lock file so probably can't delete this, but try it out
557             # anyway incase it's stale/unlocked
558             try:
559                 lockFp = open(lockFile, 'w')
560                 fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB)
561             except IOError:
562                 util.debug("Can't delete backup %s because it " \
563                            "is locked by another process." \
564                            "Skipping" % (dirName),
565                            self._verbose)
566                 continue
567             # Ok, we can still delete it, but get rid of the stale lock file
568             lockFp.close()
569             os.unlink(lockFile)
570             deleteabled.append([mtime, dirName])
571             # Remove it from copiedTimes so it doesn't get
572             # double counted
573             del copiedTimes[head][tail]
574         return deleteables
575
576     def _recover_space(self, deleteables):
577
578         # Important:
579         # Don't actually loop throught this list fully. Break out
580         # as soon as pool capacity is beneath the threshhold level
581         # again.
582         remainder = deleteables[:]
583         for mtime,dirName in deleteables:
584             if util.get_filesystem_capacity(self._rsyncDir) < \
585                self._cleanupThreshold:
586                 # No need to delete anything further
587                 return remainder
588             lockFile = None
589             lockFp = None
590             head,tail = os.path.split(dirName)
591                 
592             # Check if it has a lock file and try to grab it.
593             # If that fails we need to move onto the next backup in
594             # the list.
595             lockFileDir = os.path.join(head,
596                                        os.path.pardir,
597                                        os.path.pardir,
598                                        rsyncsmf.RSYNCLOCKSUFFIX)
599             lockFile = os.path.join(lockFileDir, tail)
600
601             if not os.path.exists(lockFileDir):
602                 os.makedirs(lockFileDir, 0755)
603             try:
604                 lockFp = open(lockFile, 'w')
605                 fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB)
606             except IOError:
607                 util.debug("Can't delete backup %s because it " \
608                            "is locked by another process." \
609                            "Skipping" % (dirName),
610                            self._verbose)
611                 # Remove if from the remainder list
612                 idx = bisect_left(remainder, [mtime, dirName])
613                 del remainder[idx]
614                 continue
615
616             trash = os.path.join(head,
617                                  os.path.pardir,
618                                  os.path.pardir,
619                                  rsyncsmf.RSYNCTRASHSUFFIX)
620             trashDir = os.path.join(trash, tail)
621
622             if not os.path.exists(trash):
623                 os.makedirs(trash, 0755)
624
625             util.debug("Deleting rsync backup to recover space: %s"\
626                 % (dirName), self._verbose)
627             os.rename(dirName, trashDir)
628             lockFp.close()
629             os.unlink(lockFile)
630             shutil.rmtree(trashDir)
631             # Remove the log file if it exists
632             logFile = os.path.join(head,
633                                    os.path.pardir,
634                                    os.path.pardir,
635                                    rsyncsmf.RSYNCLOGSUFFIX,
636                                    tail + ".log")
637             if os.path.exists(logFile):
638                 os.unlink(logFile)
639             else:
640                 util.debug("Expected to find log file %s when deleting %s " \
641                            "during space recovery" % (logFile, dirName),
642                            self._verbose)
643             # Remove dirName from the backup list
644             idx = bisect_left(self._backups, [mtime, dirName])
645             del self._backups[idx]
646             # Remove if from the remainder list too
647             idx = bisect_left(remainder, [mtime, dirName])
648             del remainder[idx]
649         return remainder
650
651     def backup_snapshot(self):
652         # First, check to see if the rsync destination
653         # directory is accessible.
654
655         if self._rsyncBaseDir == None:
656             util.debug("Backup target device is not " \
657                        "accessible right now: %s" \
658                        % (self._smfInst.get_target_dir()),
659                        self._verbose)
660             self._bus.rsync_unsynced(len(self._pendingList))
661             if self._mainLoop:
662                 self._mainLoop.quit()
663             sys.exit(0)
664
665         # Extra paranoia
666         if self._validate_rsync_target(self._rsyncBaseDir) == False:
667             util.debug("Backup target directory does not " \
668                        "have a matching configuration key. " \
669                        "Possibly old or wrong device: %s" \
670                        % (self._keyFile),
671                        self._verbose)
672             self._bus.rsync_unsynced(len(self._pendingList))
673             if self._mainLoop:
674                 self._mainLoop.quit()
675             sys.exit(0)
676
677         # Before getting started. Identify what needs to be cleaned up on 
678         # the backup target.
679         if self._tempSchedules == None:
680             self._get_temp_schedules()
681         
682         # Remove incompleteable partial backups, then find out what
683         # complete backups we already have on the target device
684         if self._started == False:
685             self._remove_dead_backups()
686             self._discover_backups()
687
688         if len(self._currentQueueSet) == 0:
689             # Means we are just getting started or have just completed
690             # backup of one full set of snapshots. Clear out anything
691             # we may have moved to the trash during the previous 
692             # backup set iterations.
693             self.empty_trash_folders()
694             # Refresh the pending list and build a new working set queue
695             self._pendingList = list_pending_snapshots(self._propName)
696             # Remove skipped items to avoid infinite looping.
697             for item in self._skipList:
698                 try:
699                     self._pendingList.remove(item)
700                 except ValueError:
701                     pass
702             if len(self._pendingList) == 0:
703                 # If something was actually backed up, signal
704                 # that it is now completed.
705                 if self._started == True:
706                     self._bus.rsync_complete(self._rsyncBaseDir)
707                 self._bus.rsync_synced()
708                 # Nothing to do exit
709                 if self._mainLoop:
710                     self._mainLoop.quit()
711                 sys.exit(0)
712             else:
713                 # Construct a new working set queue
714                 # Identify the newest snapshot and then all
715                 # snapshots with a matching snapshot label
716                 self._queueLength = len(self._pendingList)
717                 ctime,headSnapName = self._pendingList[0]
718                 label = headSnapName.rsplit("@", 1)[1]
719                 self._currentQueueSet = \
720                     [(ctime,snapName) for \
721                      ctime,snapName in self._pendingList if \
722                      snapName.rsplit("@", 1)[1] == label]
723
724
725         if len(self._backups) > 0:
726             oldestBackupTime, oldestBackup = self._backups[0]
727             qTime, qItem = self._currentQueueSet[0]
728
729             # If the backup device is nearly full, don't
730             # bother trying to backup anything unless it's
731             # newer than the oldest backup set already on
732             # the device.
733             # This avoids potential situations where if the
734             # backup device has severely limited capacity and
735             # older backups sets were in the queue, newer backups
736             # might get deleted in order to make room for older
737             # ones, creating a downward spiral.
738             capacity = util.get_filesystem_capacity(self._rsyncDir)
739             if capacity > self._cleanupThreshold:
740                 # Find backups older than qTime that could in theory
741                 # be deleted in order to make room for the curtent
742                 # pending item.
743                 deleteables = self._find_deleteable_backups(qTime)
744
745                 if len(deleteables) == 0 and \
746                    qTime < oldestBackupTime:
747                     util.debug("%s has exceeded %d%% of its capacity. " \
748                                 "Skipping pending backups prior to: " \
749                                 "%s" % (self._rsyncDir,
750                                         self._cleanupThreshold,
751                                         time.ctime(oldestBackupTime)),
752                                 self._verbose)
753                     if self._started == True:
754                         self._bus.rsync_complete(self._rsyncBaseDir)
755                     self._bus.rsync_synced()
756                     # Nothing to do exit
757                     if self._mainLoop:
758                         self._mainLoop.quit()
759                     sys.exit(0)
760
761         if self._started == False:
762             self._started = True
763             self._bus.rsync_started(self._rsyncBaseDir)
764
765         ctime,snapName = self._currentQueueSet[0]
766         snapshot = zfs.Snapshot(snapName, long(ctime))
767         # Make sure the snapshot didn't get destroyed since we last
768         # checked it.
769         remainingList = self._currentQueueSet[1:]
770         if snapshot.exists() == False:
771             util.debug("Snapshot: %s no longer exists. Skipping" \
772                         % (snapName), self._verbose)
773             self._currentQueueSet = remainingList
774             return True
775
776         # Place a hold on the snapshot so it doesn't go anywhere
777         # while rsync is trying to back it up.
778         snapshot.hold(self._propName)
779         self._queueLength -= 1
780         fs = zfs.Filesystem(snapshot.fsname)
781         sourceDir = None
782         if fs.is_mounted() == True:
783             # Get the mountpoint
784             mountPoint = fs.get_mountpoint()
785             sourceDir = "%s/.zfs/snapshot/%s" \
786                         % (mountPoint, snapshot.snaplabel)
787         else:
788             # If the filesystem is not mounted just skip it. If it's
789             # not mounted then nothing is being written to it. And
790             # we can just catch up with it again later if it doesn't
791             # get expired by time-sliderd
792             util.debug("%s is not mounted. Skipping." \
793                         % (snapshot.fsname), self._verbose)
794             snapshot.release(self._propName)
795             self._skipList.append(ctime, snapName)
796             self._currentQueueSet = remainingList
797             return True
798
799         # targetDir is the parent folder of all backups
800         # for a given filesystem
801         targetDir = os.path.join(self._rsyncDir,
802                                  snapshot.fsname,
803                                  rsyncsmf.RSYNCDIRSUFFIX)
804         # partialDir is a separate directory in which
805         # snapshots are initially backed up to. Upon successful
806         # completion they are moved to the backupDir.
807         partialDir = os.path.join(self._rsyncDir,
808                                   snapshot.fsname,
809                                   rsyncsmf.RSYNCPARTIALSUFFIX,
810                                   snapshot.snaplabel)
811         lockFileDir = os.path.join(self._rsyncDir,
812                                    snapshot.fsname,
813                                    rsyncsmf.RSYNCLOCKSUFFIX)
814         logDir = os.path.join(self._rsyncDir,
815                               snapshot.fsname,
816                               rsyncsmf.RSYNCLOGSUFFIX)
817         logFile = os.path.join(logDir,
818                                snapshot.snaplabel + ".log")
819
820         
821         # backupDir is the full directory path where the new
822         # backup will be located ie <targetDir>/<snapshot label>
823         backupDir = os.path.join(targetDir, snapshot.snaplabel)
824
825         # Figure out the closest previous backup. Since we
826         # backup newest first instead of oldest first it's
827         # determined as follows:
828         # If queued backup item is newer than the most recent
829         # backup on the backup target, use the most recent 
830         # backup as the incremental source.
831         # Othewise identify the backup on the device that is
832         # nearest to but newer than the queued backup.
833         nearestOlder = None
834         nearestNewer = None
835         dirList = []
836
837         if not os.path.exists(partialDir):
838             os.makedirs(partialDir, 0755)
839         if not os.path.exists(logDir):
840             os.makedirs(logDir, 0755)
841
842         if not os.path.exists(targetDir):
843             os.makedirs(targetDir, 0755)
844             # Add the new directory to our internal
845             # mtime dictionary and sorted list.
846             self._backupTimes[targetDir] = {}
847             insort(self._backupDirs, targetDir)
848         else:
849             for name,value in self._backupTimes[targetDir].items():
850                 if ctime > value:
851                     if nearestOlder == None or \
852                        value > nearestOlder[1]:
853                         nearestOlder = [name, value]
854                 else:
855                     if nearestNewer == None or \
856                        value < nearestNewer[1]:
857                         nearestNewer = [name, value]
858
859         os.chdir(targetDir)
860         link = None
861         linkDest = None
862         lockFile = None
863         lockFp = None
864         if nearestNewer:
865             link = nearestNewer[0]
866         elif nearestOlder:
867             link = nearestOlder[0]
868         if link:
869             linkDest = os.path.realpath(link)
870             # Create a lock for linkDest. We need to ensure that
871             # nautilus' restore view or the time-slider-delete
872             # GUI doesn't attempt to delete it or move it to the
873             # trash while it is being used by rsync for incremental
874             # backup.
875             lockFile = os.path.join(lockFileDir,
876                                     link + ".lock")
877
878             if not os.path.exists(lockFileDir):
879                 os.makedirs(lockFileDir, 0755)
880
881             try:
882                 lockFp = open(lockFile, 'w')
883                 fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB)
884             except IOError:
885                 util.debug("Can't perform incremental rsync of %s because " \
886                            "unable to obtain exclusive lock on incremental " \
887                            "backup reference point: %s. Exiting" \
888                            % (lockFile), self._verbose)
889                 os.chdir("/")
890                 snapshot.release(self._propName)
891                 sys.exit(1)
892
893         self._rsyncProc = RsyncProcess(sourceDir,
894                                        partialDir,
895                                        linkDest,
896                                        self._rsyncVerbose,
897                                        logFile)
898
899         # Notify the applet of current status via dbus
900         self._bus.rsync_current(snapshot.name, self._queueLength)
901
902         # Set umask temporarily so that rsync backups are read-only to
903         # the owner by default. Rync will override this to match the
904         # permissions of each snapshot as appropriate.
905         origmask = os.umask(0222)
906         util.debug("Starting rsync backup of '%s' to: %s" \
907                    % (sourceDir, partialDir),
908                    self._verbose)
909         self._rsyncProc.start_backup()
910
911         warningDone = False
912         while self._rsyncProc.is_alive():
913             if len(self._backups) > 0:
914                 # Monitor backup target capacity while we wait for rsync.
915                 capacity = util.get_filesystem_capacity(self._rsyncDir)
916                 if capacity > self._cleanupThreshold:
917                     # Find backups older than qTime that could in theory
918                     # be deleted in order to make room for the curtent
919                     # pending item.
920                     deleteables = self._find_deleteable_backups(qTime)
921                     # Only generate annoying debug message once instead of
922                     # every 5 seconds.
923                     if warningDone == False:
924                         util.debug("Backup device capacity exceeds %d%%. " \
925                                    "Found %d deleteable backups for space " \
926                                    "recovery." \
927                                     % (capacity, len(deleteables)),
928                                     self._verbose)
929                         warningDone = True
930                     if len(deleteables) > 0:
931                         deleteables = self._recover_space(deleteables)
932             time.sleep(5)
933
934         try:
935             self._rsyncProc._check_exit_code()
936         except (RsyncTransferInterruptedError,
937                 RsyncTargetDisconnectedError,
938                 RsyncSourceVanishedError) as e:
939             os.chdir("/")
940             snapshot.release(self._propName)
941             util.log_error(syslog.LOG_ERR, str(e))
942             # These are recoverable, so exit for now and try again
943             # later
944             sys.exit(-1)
945
946         except RsyncError as e:
947             # If the backup device went offline we need to chdir
948             # out of it or running further commands might fail.
949             os.chdir("/")
950             util.log_error(syslog.LOG_ERR,
951                            "Unexpected rsync error encountered: \n" + \
952                            str(e))
953             util.log_error(syslog.LOG_ERR,
954                            "Rsync log file location: %s" \
955                            % (os.path.abspath(logFile)))
956             util.log_error(syslog.LOG_ERR,
957                            "Placing plugin into maintenance mode")
958             self._smfInst.mark_maintenance()
959             snapshot.release(self._propName)
960             sys.exit(-1)
961
962         finally:
963             if lockFp:
964                 lockFp.close()
965                 os.unlink(lockFile)
966
967         util.debug("Rsync process exited", self._verbose)
968         os.umask(origmask)
969
970         # Move the completed backup from the partial dir to the
971         # the propert backup directory 
972         util.debug("Renaming completed backup from %s to %s" \
973                    % (partialDir, backupDir), self._verbose)
974         os.rename(partialDir, backupDir)
975
976         # Reset the mtime and atime properties of the backup directory so that
977         # they match the snapshot creation time. This is extremely important
978         # because the backup mechanism relies on it to determine backup times
979         # and nearest matches for incremental rsync (linkDest)
980         os.utime(backupDir, (long(ctime), long(ctime)))
981         # Update the dictionary and time sorted list with ctime also
982         self._backupTimes[targetDir][snapshot.snaplabel] = long(ctime)
983         insort(self._backups, [long(ctime), os.path.abspath(backupDir)]) 
984         snapshot.set_user_property(self._propName, "completed")
985         snapshot.release(self._propName)
986         self._currentQueueSet = remainingList
987         
988         # Now is a good time to clean out the directory:
989         # Check to see if the backup just completed belonged to an
990         # auto-snapshot schedule and whether older backups should get
991         # deleted.
992         if snapshot.snaplabel.find(autosnapsmf.SNAPLABELPREFIX) == 0:
993             tempSchedule = None
994             label = None
995             for schedule in self._tempSchedules:
996                 label = "%s%s%s" % (autosnapsmf.SNAPLABELPREFIX,
997                                     self._labelSeparator,
998                                     schedule[0])
999                 if snapshot.snaplabel.find(label) == 0:
1000                     tempSchedule = schedule
1001                     break
1002             if tempSchedule == None:
1003                 # Backup doesn't belong to a temporary schedule so 
1004                 # nothing left to do
1005                 return True
1006
1007             keep = tempSchedule[3] # [schedule,interval,period,keep]
1008             schedBackups = [d for d in os.listdir(targetDir) if 
1009                             d.find(label) == 0]
1010             # The minimum that can be kept around is one:
1011             # keeping zero is stupid since it might trigger
1012             # a total replication rather than an incremental
1013             # rsync replication.
1014             if len(schedBackups) <= 1:
1015                 return True
1016             if len(schedBackups) <= keep:
1017                 return True
1018
1019             sortedBackupList = []
1020             for backup in schedBackups:
1021                 stInfo = os.stat(backup)
1022                 # List is sorted by mtime, oldest first
1023                 insort(sortedBackupList, [stInfo.st_mtime, backup])
1024             purgeList = sortedBackupList[0:-keep]
1025
1026             trash = os.path.join(self._rsyncDir,
1027                                  snapshot.fsname,
1028                                  rsyncsmf.RSYNCTRASHSUFFIX)
1029             if not os.path.exists(trash):
1030                 os.makedirs(trash, 0755)
1031             for mtime,dirName in purgeList:
1032                 trashDir = os.path.join(trash,
1033                                         dirName)
1034                 # Perform a final sanity check to make sure a backup
1035                 # directory and not a system directory is being purged.
1036                 # If it doesn't contain the RSYNCDIRSUFFIX string a
1037                 # ValueError will be raised.
1038                 try:
1039                     os.getcwd().index(rsyncsmf.RSYNCDIRSUFFIX)
1040                     lockFp = None
1041                     lockFile = os.path.join(lockFileDir,
1042                                             dirName + ".lock")
1043
1044                     if not os.path.exists(lockFileDir):
1045                         os.makedirs(lockFileDir, 0755)
1046
1047                     try:
1048                         lockFp = open(lockFile, 'w')
1049                         fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB)
1050                     except IOError:
1051                         util.debug("Can't move expired  backup %s to trash " \
1052                                    "because it is locked by another " \
1053                                    "process. Skipping" % (dirName),
1054                                    self._verbose)
1055
1056                     util.debug("Moving expired rsync backup to trash:" \
1057                                " %s -> %s" % (dirName, trash),
1058                                self._verbose)
1059                     os.rename(dirName, trashDir)
1060                     # Release and delete lock file
1061                     lockFp.close()
1062                     os.unlink(lockFile)
1063                     # Remove its mtime key/value from self._backupTimes
1064                     del self._backupTimes[targetDir][dirName]
1065                     # Log file needs to be deleted too.
1066                     logFile = os.path.join(logDir,
1067                                             dirName + ".log")
1068                     try:
1069                         os.stat(logFile)
1070                         util.debug("Deleting rsync log file: %s" \
1071                                     % (os.path.abspath(logFile)),
1072                                     self._verbose)
1073                         os.unlink(logFile)
1074                     except OSError:
1075                         util.debug("Expected rsync log file not " \
1076                                     "found: %s"\
1077                                     % (os.path.abspath(logFile)),
1078                                     self._verbose)
1079                                                 
1080                 except ValueError:
1081                     util.log_error(syslog.LOG_ALERT,
1082                                     "Invalid attempt to delete " \
1083                                     "non-backup directory: %s\n" \
1084                                     "Placing plugin into " \
1085                                     "maintenance state" % (dirName))
1086                     self._smfInst.mark_maintenance()
1087                     sys.exit(-1)
1088         return True
1089
1090 def release_held_snapshots(propName):
1091     """
1092     Releases dangling user snapshot holds that could
1093     have occured during abnormal termination of a
1094     previous invocation of this command during a 
1095     previous rsync transfer.
1096     Returns a list of snapshots that had holds mathcing
1097     propName released.
1098     """ 
1099     # First narrow the list down by finding snapshots
1100     # with userref count > 0
1101     heldList = []
1102     released = []
1103     cmd = [zfs.ZFSCMD, "list", "-H",
1104            "-t", "snapshot",
1105            "-o", "userrefs,name"]
1106     outdata,errdata = util.run_command(cmd)
1107     for line in outdata.rstrip().split('\n'):
1108         holdCount,name = line.split()
1109         if int(holdCount) > 0:
1110             heldList.append(name)
1111     # Now check to see if any of those holds
1112     # match 'propName'
1113     for snapName in heldList:
1114         snapshot = zfs.Snapshot(snapName)
1115         holds = snapshot.holds()
1116         try:
1117             holds.index(propName)
1118             snapshot.release(propName)
1119             released.append(snapName)
1120         except ValueError:
1121             pass
1122     return released
1123
1124
1125 def backup_name_to_snapshot_name(path):
1126     """Reconstructs the original snapshot based on an
1127        rsync backup's full path.
1128        Returns a zfs snapshot name.
1129     """
1130     head,snapLabel = os.path.split(path)
1131     sys,nodeName,rel,ver,arch = os.uname()
1132     prefix = os.path.join(rsyncsmf.RSYNCDIRPREFIX,
1133                           nodeName + '/')
1134     suffix = rsyncsmf.RSYNCDIRSUFFIX
1135
1136     rsyncBaseDir,tail = head.split(prefix , 1)
1137     fsName = tail.split(suffix, 1)[0].rstrip('/')
1138     snapshotName = "%s@%s" % (fsName, snapLabel)
1139     return snapshotName
1140
1141 def list_pending_snapshots(propName):
1142     """
1143     Lists all snaphots which have 'propName" set locally.
1144     Resulting list is returned sorted in descending order
1145     of creation time (ie.newest first).
1146     Each element in the returned list is tuple of the form:
1147     [creationtime, snapshotname]
1148     """
1149     results = []
1150     snaplist = []
1151     sortsnaplist = []
1152     # The process for backing up snapshots is:
1153     # Identify all filesystem snapshots that have the (propName)
1154     # property set to "pending" on them. Back them up starting
1155     # with the oldest first.
1156     #
1157     # Unfortunately, there's no single zfs command that can
1158     # output a locally set user property and a creation timestamp
1159     # in one go. So this is done in two passes. The first pass
1160     # identifies snapshots that are tagged as "pending". The 
1161     # second pass uses the filtered results from the first pass
1162     # as arguments to zfs(1) to get creation times.
1163     cmd = [zfs.ZFSCMD, "get", "-H",
1164             "-s", "local",
1165             "-o", "name,value",
1166             propName]
1167     outdata,errdata = util.run_command(cmd)
1168     for line in outdata.rstrip().split('\n'):
1169         if len(line) > 1:
1170             line = line.split()
1171             results.append(line)
1172
1173     for name,value in results:
1174         if value != "pending":
1175             # Already backed up. Skip it."
1176             continue
1177         if name.find('@') == -1:
1178             # Not a snapshot, and should not be set on a filesystem/volume
1179             # Ignore it.
1180             util.log_error(syslog.LOG_WARNING,
1181                            "Dataset: %s shouldn't have local property: %s" \
1182                            % (name, propName))
1183             continue
1184         snaplist.append(name)
1185
1186     # Nothing pending so just return the empty list
1187     if len(snaplist) == 0:
1188         return snaplist
1189
1190     cmd = [zfs.ZFSCMD, "get", "-p", "-H",
1191             "-o", "value,name",
1192             "creation"]
1193     cmd.extend(snaplist)
1194
1195     outdata,errdata = util.run_command(cmd)
1196     for line in outdata.rstrip().split('\n'):
1197         ctimeStr,name = line.split()
1198         insort(sortsnaplist, tuple((long(ctimeStr), name)))
1199     sortsnaplist.reverse()
1200     return sortsnaplist
1201
1202
1203 def main(argv):
1204     # This command needs to be executed by the super user (root) to
1205     # ensure that rsync has permissions to access all local filesystem
1206     # snapshots and to replicate permissions and ownership on the target
1207     # device
1208     if os.geteuid() != 0:
1209         head,tail = os.path.split(sys.argv[0])
1210         sys.stderr.write(tail + " can only be executed by root")
1211         sys.exit(-1)
1212
1213     # This process needs to be run as a system wide single instance
1214     # only at any given time. So create a lockfile in /tmp and try
1215     # to obtain an exclusive lock on it. If we can't then another 
1216     # instance is running and already has a lock on it so just exit.
1217     lockFileDir = os.path.normpath(tempfile.gettempdir() + '/' + \
1218                                                         ".time-slider")
1219     if not os.path.exists(lockFileDir):
1220             os.makedirs(lockFileDir, 0755)
1221     lockFile = os.path.join(lockFileDir, 'rsync-backup.lock')
1222
1223     lockFp = open(lockFile, 'w')
1224     try:
1225         fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB)
1226     except IOError:
1227         sys.exit(2)
1228
1229     # The SMF fmri of the time-slider plugin instance associated with
1230     # this command needs to be supplied as the argument immeditately
1231     # proceeding the command. ie. argv[1]
1232     try:
1233         pluginFMRI = sys.argv[1]
1234     except IndexError:
1235         # No FMRI provided. Probably a user trying to invoke the command
1236         # from the command line.
1237         sys.stderr.write("No time-slider plugin SMF instance FMRI defined. " \
1238                          "This plugin does not support command line " \
1239                          "execution. Exiting\n")
1240         sys.exit(-1)
1241
1242     # Open up a syslog session
1243     syslog.openlog(sys.argv[0], 0, syslog.LOG_DAEMON)
1244
1245     gobject.threads_init()
1246     # Tell dbus to use the gobject mainloop for async ops
1247     dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
1248     dbus.mainloop.glib.threads_init()
1249     # Register a bus name with the system dbus daemon
1250     sysbus = dbus.SystemBus()
1251     busName = dbus.service.BusName("org.opensolaris.TimeSlider.plugin.rsync", sysbus)
1252     dbusObj = dbussvc.RsyncBackup(sysbus, \
1253         "/org/opensolaris/TimeSlider/plugin/rsync")
1254
1255     mainLoop = gobject.MainLoop()
1256     backupQueue = BackupQueue(pluginFMRI, dbusObj, mainLoop)
1257     gobject.idle_add(backupQueue.backup_snapshot)
1258     mainLoop.run()
1259     sys.exit(0)
1260