#!/usr/bin/python2.6 # # CDDL HEADER START # # The contents of this file are subject to the terms of the # Common Development and Distribution License (the "License"). # You may not use this file except in compliance with the License. # # You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE # or http://www.opensolaris.org/os/licensing. # See the License for the specific language governing permissions # and limitations under the License. # # When distributing Covered Code, include this CDDL HEADER in each # file and include the License file at usr/src/OPENSOLARIS.LICENSE. # If applicable, add the following below this CDDL HEADER, with the # fields enclosed by brackets "[]" replaced with your own identifying # information: Portions Copyright [yyyy] [name of copyright owner] # # CDDL HEADER END # import os import os.path import fcntl import tempfile import sys import subprocess import statvfs import time import threading import math import syslog import gobject import gio import dbus import shutil import copy from bisect import insort, bisect_left from time_slider import util, zfs, dbussvc, autosnapsmf, timeslidersmf import rsyncsmf # Set to True if SMF property value of "plugin/command" is "true" verboseprop = "plugin/verbose" propbasename = "org.opensolaris:time-slider-plugin" class RsyncError(Exception): """Generic base class for RsyncError Attributes: msg -- explanation of the error """ def __init__(self, msg): self.msg = msg def __str__(self): return repr(self.msg) class RsyncTargetDisconnectedError(RsyncError): """Exception raised when the backup device goes offline during the rsync transfer. Attributes: msg -- explanation of the error """ def __init__(self, source, dest, message): msg = "Target directory error during rsync backup from " \ "%s to target \'%s\' Rsync error details:\n%s" \ % (source, dest, message) RsyncError.__init__(self, msg) class RsyncTransferInterruptedError(RsyncError): """Exception raised when the rsync transfer process pid was interrupted or killed during the rsync transfer. Attributes: msg -- explanation of the error """ def __init__(self, source, dest, message): msg = "Interrputed rsync transfer from %s to %s " \ "Rsync error details:\n%s" % (source, dest, message) RsyncError.__init__(self, msg) class RsyncSourceVanishedError(RsyncError): """Exception raised when rsync could only partially transfer due to the contents of the source directory being removed. Possibly due to a snapshot being destroyed during transfer because of immediate or deferred (holds released) destruction. Attributes: msg -- explanation of the error """ def __init__(self, source, dest, message): msg = "Rsync source directory vanished during transfer of %s to %s" \ "Rsync error details:\n%s" % (source, dest, message) RsyncError.__init__(self, msg) class RsyncProcess(threading.Thread): def __init__(self, source, target, latest=None, verbose=False, logfile=None): self._sourceDir = source self._backupDir = target self._latest = latest self._verbose = verbose self._proc = None self._forkError = None self._logFile = logfile # Init done. Now initiaslise threading. threading.Thread.__init__ (self) def run(self): try: self._proc = subprocess.Popen(self._cmd, stderr=subprocess.PIPE, close_fds=True) except OSError as e: # _check_exit_code() will pick up this and raise an # exception in the original thread. self._forkError = "%s: %s" % (self._cmd[0], str(e)) else: self._stdout,self._stderr = self._proc.communicate() self._exitValue = self._proc.wait() def _check_exit_code(self): if self._forkError: # The rsync process failed to execute, probably # received an OSError exception. Pass it up. raise RsyncError(self._forkError) if self._exitValue == 0: return # Non zero return code means rsync encountered an # error which may be transient or require sys-admin # intervention to fix. # This method basically just maps known rsync exit codes # to exception classes. # Rsync exit value codes (non-zero) # 11/12 Indicates backup drive was disconnected during # transfer. Recoverable once drive reconnected: # 11 Error in file I/O # 12 Error in rsync protocol data stream if self._exitValue == 11 or \ self._exitValue == 12: raise RsyncTargetDisconnectedError(self._sourceDir, self._backupDir, self._stderr) # Transfer pid interrupted by SIGUSR1 or SIGINT. Recoverable: # 20 Received SIGUSR1 or SIGINT elif self._proc.returncode == 20: raise RsyncTransferInterruptedError(self._sourceDir, self._backupDir, self._stderr) # For everything else unknown or unexpected, treat it as # fatal and provide the rsync stderr output. else: raise RsyncError(self._stderr) def start_backup(self): # First, check to see if the rsync destination # directory is accessible. try: os.stat(self._backupDir) except OSError: util.debug("Backup directory is not " \ "currently accessible: %s" \ % (self._backupDir), self._verbose) #FIXME exit/exception needs to be raise here # or status needs to be set. return try: os.stat(self._sourceDir) except OSError: util.debug("Backup source directory is not " \ "currently accessible: %s" \ % (self._sourceDir), self._verbose) #FIXME exit/excpetion needs to be raise here # or status needs to be set. return if self._latest: self._cmd = ["/usr/bin/rsync", "-a", "--inplace",\ "%s/." % (self._sourceDir), \ "--link-dest=%s" % (self._latest), \ self._backupDir] else: self._cmd = ["/usr/bin/rsync", "-a", "--inplace",\ "%s/." % (self._sourceDir), \ self._backupDir] if self._logFile: self._cmd.insert(1, "--log-file=%s" % (self._logFile)) if self._verbose: self._cmd.insert(1, "-vv") self.start() class BackupQueue(): def __init__(self, fmri, dbus, mainLoop=None): self._bus = dbus self._mainLoop = mainLoop self._started = False self._pluginFMRI = fmri self._smfInst = rsyncsmf.RsyncSMF(self._pluginFMRI) self._verbose = self._smfInst.get_verbose() self._rsyncVerbose = self._smfInst.get_rsync_verbose() self._propName = "%s:%s" % (propbasename, fmri.rsplit(':', 1)[1]) # Variables to quickly access time sorted backups and # to map directory names to mtimes. self._backupDirs = None self._backups = None self._backupTimes = {} released = release_held_snapshots(self._propName) for snapName in released: util.debug("Released dangling userref on: " + snapName, self._verbose) self._tempSchedules = None # List of all snapshots currently in our backup queue self._pendingList = list_pending_snapshots(self._propName) # Try to backup in sets of snapshots grouped by a common # snapshot label. These get taken from the head of the # pending list. After a working set has been completed, # pending list gets refreshed and a new working set is # extracted. self._currentQueueSet = [] self._skipList = [] self._queueLength = 0 self._cleanupThreshold = self._smfInst.get_cleanup_threshold() if self._cleanupThreshold < 1 or \ self._cleanupThreshold > 99: # Stupid value. #FIXME Plugin should be checked by SMF start # method and placed into maintenance if this happens. # For now, go with default util.log_error(syslog.LOG_ERR, "Invalid value for SMF property " \ ": %d." \ "Using default value of 95%" \ % (self._cleanupThreshold)) # Base variables for backup device. Will be initialised # later in _find_backup_device() self._smfTargetKey = self._smfInst.get_target_key() sys,self._nodeName,rel,ver,arch = os.uname() self._rsyncBaseDir = None self._rsyncDir = None self._keyFile = None tsSMF = timeslidersmf.TimeSliderSMF() self._labelSeparator = tsSMF.get_separator() del tsSMF # Finally go look for the backup device self._find_backup_device() def empty_trash_folders(self): trashDirs = [] trashedBackups = [] os.chdir(self._rsyncDir) for root, dirs, files in os.walk(self._rsyncDir): if '.time-slider' in dirs: dirs.remove ('.time-slider') trashDir = os.path.join(root, rsyncsmf.RSYNCTRASHSUFFIX) if os.path.exists(trashDir): trashDirs.append(trashDir) for trashDir in trashDirs: os.chdir(trashDir) trashItems = [] trashItems = [d for d in os.listdir(trashDir) \ if os.path.isdir(d) and not os.path.islink(d)] if len(trashItems) > 0: util.debug("Deleting trash backups in %s" % (trashDir), self._verbose) for trashItem in trashItems: util.debug("Deleting trash item: %s" % (trashItem), self._verbose) # FIXME add some dbus notification here to let the # applet know what's going on. shutil.rmtree(trashItem) def _get_temp_schedules(self): # Get retention rule for non archival snapshots as per # rules defined in: # svc://system/filesystem/zfs/auto-snapshot: archived = self._smfInst.get_archived_schedules() triggers = self._smfInst.get_trigger_list() defScheds = autosnapsmf.get_default_schedules() customScheds = autosnapsmf.get_custom_schedules() try: triggers.index('all') # Expand the wildcard value 'all' triggers = [sched for sched,i,p,k in defScheds] customTriggers = [sched for sched,i,p,k in customScheds] triggers.extend(customTriggers) except ValueError: pass self._tempSchedules = [schedule for schedule in defScheds if \ schedule[0] not in archived] self._tempSchedules.extend([schedule for schedule in customScheds if \ schedule[0] not in archived]) def _remove_dead_backups(self): """ Identifies and removes partially completed backups whose origin snapshot is no longer in the pending queue, indicating that the backup will never get completed, in which case it's just a waste of space """ backupDirs = [] partialDirs = [] deadBackups = [] os.chdir(self._rsyncDir) for root, dirs, files in os.walk(self._rsyncDir): if '.time-slider' in dirs: dirs.remove ('.time-slider') partialDir = os.path.join(root, rsyncsmf.RSYNCPARTIALSUFFIX) partialDirs.append(partialDir) for dirName in partialDirs: if not os.path.exists(partialDir): continue os.chdir(dirName) partials = [d for d in os.listdir(dirName) \ if os.path.isdir(d)] if len(partials) == 0: continue suffix = rsyncsmf.RSYNCPARTIALSUFFIX prefix = self._rsyncDir # Reconstruct the origin ZFS filesystem name baseName = dirName.replace(prefix, '', 1).lstrip('/') fsName = baseName.replace(suffix, '', 1).rstrip('/') for snapshotLabel in partials: pending = False # Reconstruct the origin snapshot name and see # if it's still pending rsync backup. If it is # then leave it alone since it can be used to # resume a partial backup later. Otherwise it's # never going to be backed up and needs to be # manually deleted. snapshotName = "%s@%s" % (fsName, snapshotLabel) for ctime,name in self._pendingList: if name == snapshotName: pending = True continue if pending == False: util.debug("Deleting zombied partial backup: %s" \ % (os.path.abspath(snapshotLabel)), self._verbose) shutil.rmtree(snapshotLabel) # Don't forget the log file too. logFile = os.path.join(os.path.pardir, os.path.pardir, rsyncsmf.RSYNCLOGSUFFIX, snapshotLabel + ".log") try: os.stat(logFile) util.debug("Deleting zombie log file: %s" \ % (os.path.abspath(logFile)), self._verbose) os.unlink(logFile) except OSError: util.debug("Expected rsync log file not " \ "found: %s"\ % (os.path.abspath(logFile)), self._verbose) def _discover_backups(self): self._backupDirs = [] self._backups = [] self._backupTimes = {} # backupTimes dictionary is not primarily used # to store directory listings, but to map # mtimes to directories for quick lookups. os.chdir (self._rsyncDir) for root, dirs, files in os.walk(self._rsyncDir): if '.time-slider' in dirs: dirs.remove('.time-slider') backupDir = os.path.join(root, rsyncsmf.RSYNCDIRSUFFIX) if os.path.exists(backupDir): insort(self._backupDirs, os.path.abspath(backupDir)) for dirName in self._backupDirs: self._backupTimes[dirName] = {} os.chdir(dirName) dirList = [d for d in os.listdir(dirName) \ if os.path.isdir(d) and not os.path.islink(d)] for d in dirList: mtime = os.stat(d).st_mtime insort(self._backups, [long(mtime), os.path.abspath(d)]) self._backupTimes[dirName][d] = mtime def _find_backup_device(self): # Determine the rsync backup dir. This is the target dir # defined by the SMF instance plus the "TIMESLIDER/" # suffix. Try finding it at the preconfigured path first, # then failing that, scan removable media mounts, in case it # got remounted under a different path than at setup time. self._rsyncBaseDir = None path = self._smfInst.get_target_dir() if self._validate_rsync_target(path) == True: self._rsyncBaseDir = path util.debug("Backup target device online: %s" % (path), self._verbose) else: util.debug("Backup target device not mounted at: %s" \ "Scanning removable devices.." \ % (path), self._verbose) volMonitor = gio.volume_monitor_get() mounts = volMonitor.get_mounts() for mount in mounts: root = mount.get_root() path = root.get_path() if self._validate_rsync_target(path) == True: util.debug("Located backup target device at: %s" \ % (path), self._verbose) self._rsyncBaseDir = path if self._rsyncBaseDir != None: self._rsyncDir = os.path.join(self._rsyncBaseDir, rsyncsmf.RSYNCDIRPREFIX, self._nodeName) self._keyFile = os.path.join(self._rsyncBaseDir, rsyncsmf.RSYNCDIRPREFIX, rsyncsmf.RSYNCCONFIGFILE) def _validate_rsync_target(self, path): """ Tests path to see if it is the pre-configured rsync backup device path. Returns True on success, otherwise False """ # FIXME - this is duplicate in the applet and should # be moved into a shared module if not os.path.exists(path): return False testDir = os.path.join(path, rsyncsmf.RSYNCDIRPREFIX, self._nodeName) testKeyFile = os.path.join(path, rsyncsmf.RSYNCDIRPREFIX, rsyncsmf.RSYNCCONFIGFILE) if os.path.exists(testDir) and \ os.path.exists(testKeyFile): testKeyVal = None f = open(testKeyFile, 'r') for line in f.readlines(): key, val = line.strip().split('=') if key.strip() == "target_key": targetKey = val.strip() break f.close() if targetKey == self._smfTargetKey: return True return False def _find_deleteable_backups(self, timestamp): """ Returns a list of backup directory paths that are older than timestamp, and can be deleted in order to make room for newer backups. Certain restrictions apply such as: - The backup cannot be locked (indicating it's being or about to be deleted by another process - The last remaining backup in it's parent directory, which would mean it is the only backup of a given fileystem on the device. We always keep at least 1 full backup of a filesystem on the device if it belongs to a currently selected filesystem. """ deleteables = [] # This should have already occured in # backup_snapshot() mainloop method if self._backupDirs == None: self._discover_backups() if len(self._backupDirs) == 0: # We were not able to find any backups to delete. Try again later return [] # Extract the subset of self._backupDirs that are older than timestamp idx = bisect_left(self._backups, [timestamp, '']) subset = self._backups[0:idx] # Copy the backupTimes dictionary since so we can # modify it safely. copiedTimes = copy.deepcopy(self._backupTimes) for mtime, dirName in subset: head,tail = os.path.split(dirName) if len(copiedTimes[head]) < 2: # We can only delete this single backup provided # it's filesystem is no longer tagged for rsync # replication. Othewise we need to leave at least # one backup on the device at all times, and also # to provide an incremental backup point for # future backups. snapName = backup_name_to_snapshot_name(dirName) snapshot = zfs.Snapshot(snapName) fs = zfs.Filesystem(snapshot.fsname) if fs.get_user_property(rsyncsmf.RSYNCFSTAG) == "true": continue else: # If we are going to actually remove this final # remaining backup of this filesystem then we need # to unqueue any pending snapshots of it, # otherwise after deleting this one, we'll just # start backing up the older snapshots of this # filesystem afterwards, which is a waste of time # and space. pending = [name for time,name in \ list_pending_snapshots(self._propName) if \ name.find(snapshot.fsname + '@') == 0] cmd = [zfs.PFCMD, zfs.ZFSCMD, "inherit", self._propName] util.debug("Unqueuing pending backups of deselected " \ "filesystem: " + snapshot.fsname + '\n' + \ str(pending), self._verbose) cmd.extend(pending) util.run_command(cmd) lockFileDir = os.path.join(head, os.path.pardir, os.path.pardir, rsyncsmf.RSYNCLOCKSUFFIX) lockFile = os.path.join(lockFileDir, tail) if not os.path.exists(lockFile): # No lock file so we are free to delete it. deleteables.append([mtime, dirName]) # Remove it from copiedTimes so it doesn't get # double counted del copiedTimes[head][tail] continue # Lock file so probably can't delete this, but try it out # anyway incase it's stale/unlocked try: lockFp = open(lockFile, 'w') fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: util.debug("Can't delete backup %s because it " \ "is locked by another process." \ "Skipping" % (dirName), self._verbose) continue # Ok, we can still delete it, but get rid of the stale lock file lockFp.close() os.unlink(lockFile) deleteabled.append([mtime, dirName]) # Remove it from copiedTimes so it doesn't get # double counted del copiedTimes[head][tail] return deleteables def _recover_space(self, deleteables): # Important: # Don't actually loop throught this list fully. Break out # as soon as pool capacity is beneath the threshhold level # again. remainder = deleteables[:] for mtime,dirName in deleteables: if util.get_filesystem_capacity(self._rsyncDir) < \ self._cleanupThreshold: # No need to delete anything further return remainder lockFile = None lockFp = None head,tail = os.path.split(dirName) # Check if it has a lock file and try to grab it. # If that fails we need to move onto the next backup in # the list. lockFileDir = os.path.join(head, os.path.pardir, os.path.pardir, rsyncsmf.RSYNCLOCKSUFFIX) lockFile = os.path.join(lockFileDir, tail) if not os.path.exists(lockFileDir): os.makedirs(lockFileDir, 0755) try: lockFp = open(lockFile, 'w') fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: util.debug("Can't delete backup %s because it " \ "is locked by another process." \ "Skipping" % (dirName), self._verbose) # Remove if from the remainder list idx = bisect_left(remainder, [mtime, dirName]) del remainder[idx] continue trash = os.path.join(head, os.path.pardir, os.path.pardir, rsyncsmf.RSYNCTRASHSUFFIX) trashDir = os.path.join(trash, tail) if not os.path.exists(trash): os.makedirs(trash, 0755) util.debug("Deleting rsync backup to recover space: %s"\ % (dirName), self._verbose) os.rename(dirName, trashDir) lockFp.close() os.unlink(lockFile) shutil.rmtree(trashDir) # Remove the log file if it exists logFile = os.path.join(head, os.path.pardir, os.path.pardir, rsyncsmf.RSYNCLOGSUFFIX, tail + ".log") if os.path.exists(logFile): os.unlink(logFile) else: util.debug("Expected to find log file %s when deleting %s " \ "during space recovery" % (logFile, dirName), self._verbose) # Remove dirName from the backup list idx = bisect_left(self._backups, [mtime, dirName]) del self._backups[idx] # Remove if from the remainder list too idx = bisect_left(remainder, [mtime, dirName]) del remainder[idx] return remainder def backup_snapshot(self): # First, check to see if the rsync destination # directory is accessible. if self._rsyncBaseDir == None: util.debug("Backup target device is not " \ "accessible right now: %s" \ % (self._smfInst.get_target_dir()), self._verbose) self._bus.rsync_unsynced(len(self._pendingList)) if self._mainLoop: self._mainLoop.quit() sys.exit(0) # Extra paranoia if self._validate_rsync_target(self._rsyncBaseDir) == False: util.debug("Backup target directory does not " \ "have a matching configuration key. " \ "Possibly old or wrong device: %s" \ % (self._keyFile), self._verbose) self._bus.rsync_unsynced(len(self._pendingList)) if self._mainLoop: self._mainLoop.quit() sys.exit(0) # Before getting started. Identify what needs to be cleaned up on # the backup target. if self._tempSchedules == None: self._get_temp_schedules() # Remove incompleteable partial backups, then find out what # complete backups we already have on the target device if self._started == False: self._remove_dead_backups() self._discover_backups() if len(self._currentQueueSet) == 0: # Means we are just getting started or have just completed # backup of one full set of snapshots. Clear out anything # we may have moved to the trash during the previous # backup set iterations. self.empty_trash_folders() # Refresh the pending list and build a new working set queue self._pendingList = list_pending_snapshots(self._propName) # Remove skipped items to avoid infinite looping. for item in self._skipList: try: self._pendingList.remove(item) except ValueError: pass if len(self._pendingList) == 0: # If something was actually backed up, signal # that it is now completed. if self._started == True: self._bus.rsync_complete(self._rsyncBaseDir) self._bus.rsync_synced() # Nothing to do exit if self._mainLoop: self._mainLoop.quit() sys.exit(0) else: # Construct a new working set queue # Identify the newest snapshot and then all # snapshots with a matching snapshot label self._queueLength = len(self._pendingList) ctime,headSnapName = self._pendingList[0] label = headSnapName.rsplit("@", 1)[1] self._currentQueueSet = \ [(ctime,snapName) for \ ctime,snapName in self._pendingList if \ snapName.rsplit("@", 1)[1] == label] if len(self._backups) > 0: oldestBackupTime, oldestBackup = self._backups[0] qTime, qItem = self._currentQueueSet[0] # If the backup device is nearly full, don't # bother trying to backup anything unless it's # newer than the oldest backup set already on # the device. # This avoids potential situations where if the # backup device has severely limited capacity and # older backups sets were in the queue, newer backups # might get deleted in order to make room for older # ones, creating a downward spiral. capacity = util.get_filesystem_capacity(self._rsyncDir) if capacity > self._cleanupThreshold: # Find backups older than qTime that could in theory # be deleted in order to make room for the curtent # pending item. deleteables = self._find_deleteable_backups(qTime) if len(deleteables) == 0 and \ qTime < oldestBackupTime: util.debug("%s has exceeded %d%% of its capacity. " \ "Skipping pending backups prior to: " \ "%s" % (self._rsyncDir, self._cleanupThreshold, time.ctime(oldestBackupTime)), self._verbose) if self._started == True: self._bus.rsync_complete(self._rsyncBaseDir) self._bus.rsync_synced() # Nothing to do exit if self._mainLoop: self._mainLoop.quit() sys.exit(0) if self._started == False: self._started = True self._bus.rsync_started(self._rsyncBaseDir) ctime,snapName = self._currentQueueSet[0] snapshot = zfs.Snapshot(snapName, long(ctime)) # Make sure the snapshot didn't get destroyed since we last # checked it. remainingList = self._currentQueueSet[1:] if snapshot.exists() == False: util.debug("Snapshot: %s no longer exists. Skipping" \ % (snapName), self._verbose) self._currentQueueSet = remainingList return True # Place a hold on the snapshot so it doesn't go anywhere # while rsync is trying to back it up. snapshot.hold(self._propName) self._queueLength -= 1 fs = zfs.Filesystem(snapshot.fsname) sourceDir = None if fs.is_mounted() == True: # Get the mountpoint mountPoint = fs.get_mountpoint() sourceDir = "%s/.zfs/snapshot/%s" \ % (mountPoint, snapshot.snaplabel) else: # If the filesystem is not mounted just skip it. If it's # not mounted then nothing is being written to it. And # we can just catch up with it again later if it doesn't # get expired by time-sliderd util.debug("%s is not mounted. Skipping." \ % (snapshot.fsname), self._verbose) snapshot.release(self._propName) self._skipList.append(ctime, snapName) self._currentQueueSet = remainingList return True # targetDir is the parent folder of all backups # for a given filesystem targetDir = os.path.join(self._rsyncDir, snapshot.fsname, rsyncsmf.RSYNCDIRSUFFIX) # partialDir is a separate directory in which # snapshots are initially backed up to. Upon successful # completion they are moved to the backupDir. partialDir = os.path.join(self._rsyncDir, snapshot.fsname, rsyncsmf.RSYNCPARTIALSUFFIX, snapshot.snaplabel) lockFileDir = os.path.join(self._rsyncDir, snapshot.fsname, rsyncsmf.RSYNCLOCKSUFFIX) logDir = os.path.join(self._rsyncDir, snapshot.fsname, rsyncsmf.RSYNCLOGSUFFIX) logFile = os.path.join(logDir, snapshot.snaplabel + ".log") # backupDir is the full directory path where the new # backup will be located ie / backupDir = os.path.join(targetDir, snapshot.snaplabel) # Figure out the closest previous backup. Since we # backup newest first instead of oldest first it's # determined as follows: # If queued backup item is newer than the most recent # backup on the backup target, use the most recent # backup as the incremental source. # Othewise identify the backup on the device that is # nearest to but newer than the queued backup. nearestOlder = None nearestNewer = None dirList = [] if not os.path.exists(partialDir): os.makedirs(partialDir, 0755) if not os.path.exists(logDir): os.makedirs(logDir, 0755) if not os.path.exists(targetDir): os.makedirs(targetDir, 0755) # Add the new directory to our internal # mtime dictionary and sorted list. self._backupTimes[targetDir] = {} insort(self._backupDirs, targetDir) else: for name,value in self._backupTimes[targetDir].items(): if ctime > value: if nearestOlder == None or \ value > nearestOlder[1]: nearestOlder = [name, value] else: if nearestNewer == None or \ value < nearestNewer[1]: nearestNewer = [name, value] os.chdir(targetDir) link = None linkDest = None lockFile = None lockFp = None if nearestNewer: link = nearestNewer[0] elif nearestOlder: link = nearestOlder[0] if link: linkDest = os.path.realpath(link) # Create a lock for linkDest. We need to ensure that # nautilus' restore view or the time-slider-delete # GUI doesn't attempt to delete it or move it to the # trash while it is being used by rsync for incremental # backup. lockFile = os.path.join(lockFileDir, link + ".lock") if not os.path.exists(lockFileDir): os.makedirs(lockFileDir, 0755) try: lockFp = open(lockFile, 'w') fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: util.debug("Can't perform incremental rsync of %s because " \ "unable to obtain exclusive lock on incremental " \ "backup reference point: %s. Exiting" \ % (lockFile), self._verbose) os.chdir("/") snapshot.release(self._propName) sys.exit(1) self._rsyncProc = RsyncProcess(sourceDir, partialDir, linkDest, self._rsyncVerbose, logFile) # Notify the applet of current status via dbus self._bus.rsync_current(snapshot.name, self._queueLength) # Set umask temporarily so that rsync backups are read-only to # the owner by default. Rync will override this to match the # permissions of each snapshot as appropriate. origmask = os.umask(0222) util.debug("Starting rsync backup of '%s' to: %s" \ % (sourceDir, partialDir), self._verbose) self._rsyncProc.start_backup() warningDone = False while self._rsyncProc.is_alive(): if len(self._backups) > 0: # Monitor backup target capacity while we wait for rsync. capacity = util.get_filesystem_capacity(self._rsyncDir) if capacity > self._cleanupThreshold: # Find backups older than qTime that could in theory # be deleted in order to make room for the curtent # pending item. deleteables = self._find_deleteable_backups(qTime) # Only generate annoying debug message once instead of # every 5 seconds. if warningDone == False: util.debug("Backup device capacity exceeds %d%%. " \ "Found %d deleteable backups for space " \ "recovery." \ % (capacity, len(deleteables)), self._verbose) warningDone = True if len(deleteables) > 0: deleteables = self._recover_space(deleteables) time.sleep(5) try: self._rsyncProc._check_exit_code() except (RsyncTransferInterruptedError, RsyncTargetDisconnectedError, RsyncSourceVanishedError) as e: os.chdir("/") snapshot.release(self._propName) util.log_error(syslog.LOG_ERR, str(e)) # These are recoverable, so exit for now and try again # later sys.exit(-1) except RsyncError as e: # If the backup device went offline we need to chdir # out of it or running further commands might fail. os.chdir("/") util.log_error(syslog.LOG_ERR, "Unexpected rsync error encountered: \n" + \ str(e)) util.log_error(syslog.LOG_ERR, "Rsync log file location: %s" \ % (os.path.abspath(logFile))) util.log_error(syslog.LOG_ERR, "Placing plugin into maintenance mode") self._smfInst.mark_maintenance() snapshot.release(self._propName) sys.exit(-1) finally: if lockFp: lockFp.close() os.unlink(lockFile) util.debug("Rsync process exited", self._verbose) os.umask(origmask) # Move the completed backup from the partial dir to the # the propert backup directory util.debug("Renaming completed backup from %s to %s" \ % (partialDir, backupDir), self._verbose) os.rename(partialDir, backupDir) # Reset the mtime and atime properties of the backup directory so that # they match the snapshot creation time. This is extremely important # because the backup mechanism relies on it to determine backup times # and nearest matches for incremental rsync (linkDest) os.utime(backupDir, (long(ctime), long(ctime))) # Update the dictionary and time sorted list with ctime also self._backupTimes[targetDir][snapshot.snaplabel] = long(ctime) insort(self._backups, [long(ctime), os.path.abspath(backupDir)]) snapshot.set_user_property(self._propName, "completed") snapshot.release(self._propName) self._currentQueueSet = remainingList # Now is a good time to clean out the directory: # Check to see if the backup just completed belonged to an # auto-snapshot schedule and whether older backups should get # deleted. if snapshot.snaplabel.find(autosnapsmf.SNAPLABELPREFIX) == 0: tempSchedule = None label = None for schedule in self._tempSchedules: label = "%s%s%s" % (autosnapsmf.SNAPLABELPREFIX, self._labelSeparator, schedule[0]) if snapshot.snaplabel.find(label) == 0: tempSchedule = schedule break if tempSchedule == None: # Backup doesn't belong to a temporary schedule so # nothing left to do return True keep = tempSchedule[3] # [schedule,interval,period,keep] schedBackups = [d for d in os.listdir(targetDir) if d.find(label) == 0] schedBackups = [d for d in dirList if d.find(label) == 0] # The minimum that can be kept around is one: # keeping zero is stupid since it might trigger # a total replication rather than an incremental # rsync replication. if len(schedBackups) <= 1: return True if len(schedBackups) <= keep: return True sortedBackupList = [] for backup in schedBackups: stInfo = os.stat(backup) # List is sorted by mtime, oldest first insort(sortedBackupList, [stInfo.st_mtime, backup]) purgeList = sortedBackupList[0:-keep] trash = os.path.join(self._rsyncDir, snapshot.fsname, rsyncsmf.RSYNCTRASHSUFFIX) if not os.path.exists(trash): os.makedirs(trash, 0755) for mtime,dirName in purgeList: trashDir = os.path.join(trash, dirName) # Perform a final sanity check to make sure a backup # directory and not a system directory is being purged. # If it doesn't contain the RSYNCDIRSUFFIX string a # ValueError will be raised. try: os.getcwd().index(rsyncsmf.RSYNCDIRSUFFIX) lockFp = None lockFile = os.path.join(lockFileDir, dirName + ".lock") if not os.path.exists(lockFileDir): os.makedirs(lockFileDir, 0755) try: lockFp = open(lockFile, 'w') fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: util.debug("Can't move expired backup %s to trash " \ "because it is locked by another " \ "process. Skipping" % (dirName), self._verbose) util.debug("Moving expired rsync backup to trash:" \ " %s -> %s" % (dirName, trash), self._verbose) os.rename(dirName, trashDir) # Release and delete lock file lockFp.close() os.unlink(lockFile) # Remove its mtime key/value from self._backupTimes del self._backupTimes[targetDir][dirName] # Log file needs to be deleted too. logFile = os.path.join(logDir, dirName + ".log") try: os.stat(logFile) util.debug("Deleting rsync log file: %s" \ % (os.path.abspath(logFile)), self._verbose) os.unlink(logFile) except OSError: util.debug("Expected rsync log file not " \ "found: %s"\ % (os.path.abspath(logFile)), self._verbose) except ValueError: util.log_error(syslog.LOG_ALERT, "Invalid attempt to delete " \ "non-backup directory: %s\n" \ "Placing plugin into " \ "maintenance state" % (dirName)) self._smfInst.mark_maintenance() sys.exit(-1) return True def release_held_snapshots(propName): """ Releases dangling user snapshot holds that could have occured during abnormal termination of a previous invocation of this command during a previous rsync transfer. Returns a list of snapshots that had holds mathcing propName released. """ # First narrow the list down by finding snapshots # with userref count > 0 heldList = [] released = [] cmd = [zfs.ZFSCMD, "list", "-H", "-t", "snapshot", "-o", "userrefs,name"] outdata,errdata = util.run_command(cmd) for line in outdata.rstrip().split('\n'): holdCount,name = line.split() if int(holdCount) > 0: heldList.append(name) # Now check to see if any of those holds # match 'propName' for snapName in heldList: snapshot = zfs.Snapshot(snapName) holds = snapshot.holds() try: holds.index(propName) snapshot.release(propName) released.append(snapName) except ValueError: pass return released def backup_name_to_snapshot_name(path): """Reconstructs the original snapshot based on an rsync backup's full path. Returns a zfs snapshot name. """ head,snapLabel = os.path.split(path) sys,nodeName,rel,ver,arch = os.uname() prefix = os.path.join(rsyncsmf.RSYNCDIRPREFIX, nodeName + '/') suffix = rsyncsmf.RSYNCDIRSUFFIX rsyncBaseDir,tail = head.split(prefix , 1) fsName = tail.split(suffix, 1)[0].rstrip('/') snapshotName = "%s@%s" % (fsName, snapLabel) return snapshotName def list_pending_snapshots(propName): """ Lists all snaphots which have 'propName" set locally. Resulting list is returned sorted in descending order of creation time (ie.newest first). Each element in the returned list is tuple of the form: [creationtime, snapshotname] """ results = [] snaplist = [] sortsnaplist = [] # The process for backing up snapshots is: # Identify all filesystem snapshots that have the (propName) # property set to "pending" on them. Back them up starting # with the oldest first. # # Unfortunately, there's no single zfs command that can # output a locally set user property and a creation timestamp # in one go. So this is done in two passes. The first pass # identifies snapshots that are tagged as "pending". The # second pass uses the filtered results from the first pass # as arguments to zfs(1) to get creation times. cmd = [zfs.ZFSCMD, "get", "-H", "-s", "local", "-o", "name,value", propName] outdata,errdata = util.run_command(cmd) for line in outdata.rstrip().split('\n'): if len(line) > 1: line = line.split() results.append(line) for name,value in results: if value != "pending": # Already backed up. Skip it." continue if name.find('@') == -1: # Not a snapshot, and should not be set on a filesystem/volume # Ignore it. util.log_error(syslog.LOG_WARNING, "Dataset: %s shouldn't have local property: %s" \ % (name, propName)) continue snaplist.append(name) # Nothing pending so just return the empty list if len(snaplist) == 0: return snaplist cmd = [zfs.ZFSCMD, "get", "-p", "-H", "-o", "value,name", "creation"] cmd.extend(snaplist) outdata,errdata = util.run_command(cmd) for line in outdata.rstrip().split('\n'): ctimeStr,name = line.split() insort(sortsnaplist, tuple((long(ctimeStr), name))) sortsnaplist.reverse() return sortsnaplist def main(argv): # This command needs to be executed by the super user (root) to # ensure that rsync has permissions to access all local filesystem # snapshots and to replicate permissions and ownership on the target # device if os.geteuid() != 0: head,tail = os.path.split(sys.argv[0]) sys.stderr.write(tail + " can only be executed by root") sys.exit(-1) # This process needs to be run as a system wide single instance # only at any given time. So create a lockfile in /tmp and try # to obtain an exclusive lock on it. If we can't then another # instance is running and already has a lock on it so just exit. lockFileDir = os.path.normpath(tempfile.gettempdir() + '/' + \ ".time-slider") if not os.path.exists(lockFileDir): os.makedirs(lockFileDir, 0755) lockFile = os.path.join(lockFileDir, 'rsync-backup.lock') lockFp = open(lockFile, 'w') try: fcntl.flock(lockFp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: sys.exit(2) # The SMF fmri of the time-slider plugin instance associated with # this command needs to be supplied as the argument immeditately # proceeding the command. ie. argv[1] try: pluginFMRI = sys.argv[1] except IndexError: # No FMRI provided. Probably a user trying to invoke the command # from the command line. sys.stderr.write("No time-slider plugin SMF instance FMRI defined. " \ "This plugin does not support command line " \ "execution. Exiting\n") sys.exit(-1) # Open up a syslog session syslog.openlog(sys.argv[0], 0, syslog.LOG_DAEMON) gobject.threads_init() # Tell dbus to use the gobject mainloop for async ops dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.threads_init() # Register a bus name with the system dbus daemon sysbus = dbus.SystemBus() busName = dbus.service.BusName("org.opensolaris.TimeSlider.plugin.rsync", sysbus) dbusObj = dbussvc.RsyncBackup(sysbus, \ "/org/opensolaris/TimeSlider/plugin/rsync") mainLoop = gobject.MainLoop() backupQueue = BackupQueue(pluginFMRI, dbusObj, mainLoop) gobject.idle_add(backupQueue.backup_snapshot) mainLoop.run() sys.exit(0)