""" Copy Center Description: Python script to copy data between different datastores. Author: Sebastian Sauer Copyright: Dual-licensed under LGPL v2+higher and the BSD license. """ class CopyCenter: class Plugin: def __init__(self, plugin): self.plugin = plugin self.name = plugin.name self.source = self.load("Source") self.destination = self.load("Destination") def load(self, plugintype): instance = None try: if hasattr(self.plugin, plugintype): return getattr(self.plugin, plugintype)(self.plugin) except: import traceback print("".join( traceback.format_exception(sys.exc_info()[0],sys.exc_info()[1],sys.exc_info()[2]) )) return None def __init__(self, scriptpath): self.scriptpath = scriptpath self.homepath = self.getHomePath() self.plugins = {} import os import sys if not os.path.exists(scriptpath): print("The Path %s does not exist" % scriptpath) else: import re regexp = re.compile('^CopyCenterPlugin(.*)\\.py$') for f in os.listdir(scriptpath): file = os.path.join(scriptpath, f) if not os.path.isfile(file): continue m = regexp.match(f) if not m: continue print("Plugin name=%s file=%s" % (m.group(1),file)) mylocals = {} try: exec(compile(open(file, "rb").read(), file, 'exec'), globals(), mylocals) if "CopyCenterPlugin" in mylocals: plugin = mylocals.get("CopyCenterPlugin")(self) self.plugins[plugin.name] = self.Plugin(plugin) except: print("Failed to import file=%s" % file) import traceback print("".join( traceback.format_exception(sys.exc_info()[0],sys.exc_info()[1],sys.exc_info()[2]) )) def getHomePath(self): """ Return the homedirectory. """ import os try: home = os.getenv("HOME") if not home: import pwd user = os.getenv("USER") or os.getenv("LOGNAME") if not user: pwent = pwd.getpwuid(os.getuid()) else: pwent = pwd.getpwnam(user) home = pwent[6] return home except (KeyError, ImportError): return os.curdir class Copierer: def __init__(self): pass def appendProgressMessage(self,messagetext): pass def writeSuccess(self,record,rowcount): pass def writeFailed(self,record): pass def runGuiApp(copycenter, name): from TQt import tqt import sys #-------------------------------------------------------------------- class ListViewDialog(tqt.TQDialog): def __init__(self, parent, caption): tqt.TQDialog.__init__(self, parent, "ProgressDialog", 1) self.parent = parent self.setCaption(caption) layout = tqt.TQVBoxLayout(self) box = tqt.TQVBox(self) box.setMargin(2) layout.addWidget(box) self.listview = tqt.TQListView(box) self.listview.setAllColumnsShowFocus(True) self.listview.header().setStretchEnabled(True,0) btnbox = tqt.TQHBox(box) btnbox.setMargin(6) btnbox.setSpacing(6) self.okbtn = tqt.TQPushButton(btnbox) self.okbtn.setText("Ok") #tqt.TQObject.connect(okbtn, tqt.SIGNAL("clicked()"), self.okClicked) self.cancelbtn = tqt.TQPushButton(btnbox) self.cancelbtn.setText("Cancel") tqt.TQObject.connect(self.cancelbtn, tqt.SIGNAL("clicked()"), self.close) box.setMinimumSize(tqt.TQSize(460,380)) def addItem(self,valuelist,afteritem = None): if afteritem == None: item = tqt.TQListViewItem(self.listview) else: item = tqt.TQListViewItem(self.listview,afteritem) i = 0 for value in valuelist: item.setText(i,value) i += 1 return item #-------------------------------------------------------------------- class CopyJobWidget(tqt.TQVBox): def __init__(self,dialog,parent): self.dialog = dialog tqt.TQVBox.__init__(self,parent) self.setSpacing(6) typebox = tqt.TQHBox(self) typebox.setSpacing(6) label = tqt.TQLabel("Job File:",typebox) self.jobfilecombobox = tqt.TQComboBox(typebox) typebox.setStretchFactor(self.jobfilecombobox,1) self.jobfilecombobox.setEditable(True) self.jobfilecombobox.insertItem("") label.setBuddy(self.jobfilecombobox) tqt.TQObject.connect(self.jobfilecombobox, tqt.SIGNAL("textChanged(const TQString&)"), self.jobfilecomboboxChanged) import os import re for f in os.listdir(self.dialog.copycenter.homepath): file = os.path.join(self.dialog.copycenter.homepath,f) if os.path.isfile(file) and re.search(".+\\.copycenterjob.xml$",f): self.jobfilecombobox.insertItem(file) loadbtn = tqt.TQPushButton(typebox) loadbtn.setText("Open...") tqt.TQObject.connect(loadbtn, tqt.SIGNAL("clicked()"), self.openClicked) savebtn = tqt.TQPushButton(typebox) savebtn.setText("Save...") tqt.TQObject.connect(savebtn, tqt.SIGNAL("clicked()"), self.saveClicked) self.listview = tqt.TQListView(self) self.listview.setAllColumnsShowFocus(True) self.listview.setSorting(-1) self.listview.setDefaultRenameAction(tqt.TQListView.Reject) self.listview.header().setClickEnabled(False) self.listview.addColumn("Name") self.listview.addColumn("Value") tqt.TQObject.connect(self.listview, tqt.SIGNAL("doubleClicked(TQListViewItem*, const TQPoint&, int)"), self.doubleClicked) #tqt.TQObject.connect(self.listview, tqt.SIGNAL("itemRenamed(TQListViewItem*, int, const TQString&)"), self.itemRenamed) def doubleClicked(self, **args): print("CopyJobWidget.doubleClicked") item = self.listview.selectedItem() if item and item.parent(): item.startRename(1) def readOptions(self,domnode,plugininst): print("CopyJobWidget.readOptions plugintype=\"%s\"" % plugininst.plugintype) for node in domnode.childNodes: if node.nodeType == node.ELEMENT_NODE: v = node.getAttribute("value") plugininst.options[node.nodeName] = v print("Option \"%s\" has value \"%s\" now." % (node.nodeName, v)) def jobfilecomboboxChanged(self, **args): print("CopyJobWidget.jobfilecomboboxChanged") import os import xml.dom.minidom filename = str(self.jobfilecombobox.currentText()) if not os.path.isfile(filename): return domdoc = xml.dom.minidom.parse(filename) try: elements = domdoc.getElementsByTagName("CopyCenterJob")[0] sourcenode = elements.getElementsByTagName("Source")[0] destinationnode = elements.getElementsByTagName("Destination")[0] except: raise Exception("The XML-file \"%s\" does not contain a valid copy-job." % filename) sourcepluginname = str(sourcenode.getAttribute('plugin')) if not self.dialog.sourcedata.combobox.listBox().findItem(sourcepluginname,tqt.TQt.ExactMatch): raise Exception("There exists no plugin with the name \"%s\"." % sourcepluginname) self.dialog.sourcedata.combobox.setCurrentText(sourcepluginname) destinationpluginname = str(destinationnode.getAttribute('plugin')) if not self.dialog.destinationdata.combobox.listBox().findItem(destinationpluginname,tqt.TQt.ExactMatch): raise Exception("There exists no plugin with the name \"%s\"." % destinationpluginname) self.dialog.destinationdata.combobox.setCurrentText(destinationpluginname) self.readOptions(sourcenode,self.dialog.getSourcePluginImpl()) self.readOptions(destinationnode,self.dialog.getDestinationPluginImpl()) self.maybeUpdate() def openClicked(self): text = str(self.jobfilecombobox.currentText()) if text == "": text = self.dialog.copycenter.homepath filename = str(tqt.TQFileDialog.getOpenFileName(text,"*.copycenterjob.xml;;*",self.dialog)) if filename != "": self.jobfilecombobox.setCurrentText(filename) def escape(self,s): return s.replace("&", "&").replace("'", "'").replace("<", "<").replace(">", ">").replace('"', """) def writeOptions(self,writer,pluginname,plugininst): print("CopyJobWidget.writeOptions") writer.write("<%s plugin=\"%s\">\n" % (plugininst.plugintype, pluginname)) for optionname in plugininst.options: value = self.escape( str(plugininst.options[optionname]).encode("utf-8") ) writer.write("\t<%s value=\"%s\" />\n" % (optionname,value)) writer.write("\n" % plugininst.plugintype) def saveClicked(self): text = str(self.jobfilecombobox.currentText()) if text == "": import os text = os.path.join(self.dialog.copycenter.homepath,"default.copycenterjob.xml") filename = str(tqt.TQFileDialog.getSaveFileName(text,"*.copycenterjob.xml;;*",self.dialog)) if str(filename) == "": return f = open(filename, "w") f.write("\n") f.write("\n") sourcepluginname = self.dialog.sourcedata.combobox.currentText() self.writeOptions(f, sourcepluginname, self.dialog.getSourcePluginImpl()) destinationpluginname = self.dialog.destinationdata.combobox.currentText() self.writeOptions(f, destinationpluginname, self.dialog.getDestinationPluginImpl()) f.write("\n") f.close() print("File \%s\" successfully written." % filename) def addItem(self, pluginimpl, afteritem = None, parentitem = None): #print "CopyJobWidget.addItem" class ListViewItem(tqt.TQListViewItem): def __init__(self, pluginimpl, listview, parentitem = None, afteritem = None): self.pluginimpl = pluginimpl if parentitem == None: tqt.TQListViewItem.__init__(self,listview) self.setOpen(True) else: if afteritem == None: tqt.TQListViewItem.__init__(self,parentitem) else: tqt.TQListViewItem.__init__(self,parentitem,afteritem) self.setRenameEnabled(1,True) def startRename(self, columnindex): tqt.TQListViewItem.startRename(self,columnindex) #lineedit = self.listView().viewport().child("tqt_renamebox") #if lineedit: # regexp = tqt.TQRegExp("^[_A-Z]+[_A-Z0-9]*$", False) # v = tqt.TQRegExpValidator(regexp, self.listView()); # lineedit.setValidator(v) def okRename(self, columnindex): if columnindex == 1: n = str(self.text(0)) if n not in self.pluginimpl.options: raise Exception("No such option \"%s\"" % n) tqt.TQListViewItem.okRename(self,columnindex) v = str(tqt.TQListViewItem.text(self,1)) print("Option \"%s\" has value \"%s\" now." % (n,v)) self.pluginimpl.options[n] = v def text(self, columnindex): if columnindex == 1: if tqt.TQListViewItem.text(self,0).contains("password"): return "*" * len(str(tqt.TQListViewItem.text(self,1))) return tqt.TQListViewItem.text(self,columnindex) return ListViewItem(pluginimpl, self.listview, parentitem, afteritem) def updateItem(self,pluginname,pluginimpl): #print "CopyJobWidget.updateItem" if pluginimpl == None: return #plugin = self.dialog.plugins[pluginname] item = self.addItem(pluginimpl) item.setText(0,"%s: %s" % (pluginimpl.plugintype, pluginname)) afteritem = None for i in pluginimpl.options: afteritem = self.addItem(pluginimpl, afteritem, item) afteritem.setText(0,str(i)) afteritem.setText(1,str(pluginimpl.options[i])) print("CopyJobWidget.updateItem Added item with name \"%s\" and value \"%s\"" % (str(i),str(pluginimpl.options[i]))) pass def maybeUpdate(self): print("CopyJobWidget.maybeUpdate") self.listview.clear() try: self.updateItem(self.dialog.getDestinationPluginName(), self.dialog.getDestinationPluginImpl()) self.updateItem(self.dialog.getSourcePluginName(), self.dialog.getSourcePluginImpl()) except: import traceback print("".join( traceback.format_exception(sys.exc_info()[0],sys.exc_info()[1],sys.exc_info()[2]) )) self.listview.clear() #-------------------------------------------------------------------- class ProgressDialog(tqt.TQDialog): def __init__(self, dialog): self.dialog = dialog self.starttime = None tqt.TQDialog.__init__(self, dialog, "ProgressDialog", 1) self.setCaption("Copying...") layout = tqt.TQVBoxLayout(self) box = tqt.TQVBox(self) box.setSpacing(6) box.setMargin(6) layout.addWidget(box) self.textbrowser = tqt.TQTextBrowser(box) self.textbrowser.setWordWrap(tqt.TQTextEdit.WidgetWidth) self.textbrowser.setTextFormat(tqt.TQt.RichText) statusbox = tqt.TQFrame(box) layout = tqt.TQGridLayout(statusbox,4,2,0,2) layout.addWidget(tqt.TQLabel("Number of records done:",statusbox),0,0) self.donecounter = 0 self.donelabel = tqt.TQLabel("-",statusbox) layout.addWidget(self.donelabel,0,1) layout.addWidget(tqt.TQLabel("Successfully copied records:",statusbox),1,0) self.successcounter = 0 self.successlabel = tqt.TQLabel("-",statusbox) layout.addWidget(self.successlabel,1,1) layout.addWidget(tqt.TQLabel("Failed to copy records:",statusbox),2,0) self.failedcounter = 0 self.failedlabel = tqt.TQLabel("-",statusbox) layout.addWidget(self.failedlabel,2,1) layout.addWidget(tqt.TQLabel("Elapsed time in seconds:",statusbox),3,0) self.elapsedlabel = tqt.TQLabel("-",statusbox) layout.addWidget(self.elapsedlabel,3,1) btnbox = tqt.TQHBox(box) btnbox.setSpacing(6) self.donebtn = tqt.TQPushButton(btnbox) self.donebtn.setText("Done") self.donebtn.setEnabled(False) tqt.TQObject.connect(self.donebtn,tqt.SIGNAL("clicked()"),self.close) self.cancelbtn = tqt.TQPushButton(btnbox) self.cancelbtn.setText("Cancel") tqt.TQObject.connect(self.cancelbtn,tqt.SIGNAL("clicked()"),self.close) box.setMinimumSize( tqt.TQSize(500,380) ) def updateStates(self): if self.starttime != None: self.donelabel.setText(str(self.donecounter)) self.failedlabel.setText(str(self.failedcounter)) self.successlabel.setText(str(self.successcounter)) self.elapsedlabel.setText( str(self.starttime.elapsed() / 1000) ) self.donelabel.update() self.failedlabel.update() self.successlabel.update() self.elapsedlabel.update() def writeSuccess(self, record, rowcount): self.donecounter += rowcount self.successcounter += rowcount tqt.tqApp.processEvents() def writeFailed(self, record): self.donecounter += 1 self.failedcounter += 1 tqt.tqApp.processEvents() def startCopy(self): try: global Copierer copierer = Copierer() copierer.appendProgressMessage = self.textbrowser.append copierer.writeSuccess = self.writeSuccess copierer.writeFailed = self.writeFailed self.starttime = tqt.TQTime() self.updatetimer = tqt.TQTimer(self) tqt.TQObject.connect(self.updatetimer,tqt.SIGNAL("timeout()"),self.updateStates) # Initialize the source sourcename = self.dialog.getSourcePluginName() sourceimpl = self.dialog.getSourcePluginImpl() self.textbrowser.append("Source: %s" % sourcename) if sourceimpl == None: raise Exception("No such source.") try: sourceimpl.init(copierer) # Initialize the destination destinationname = self.dialog.getDestinationPluginName() destinationimpl = self.dialog.getDestinationPluginImpl() self.textbrowser.append("
Destination: %s" % destinationname) if destinationimpl == None: raise Exception("No such destination.") try: destinationimpl.init(copierer) self.starttime.start() self.updatetimer.start(500) tqt.tqApp.processEvents() # Copy the records self.textbrowser.append("
Copy the records...") while True: record = sourceimpl.read() if record == None: break destinationimpl.write(record) self.updateStates() finally: destinationimpl.finish() finally: sourceimpl.finish() self.setCaption("Copy done") self.textbrowser.append("
Copy done.") except: self.setCaption("Copy failed") self.textbrowser.append("Error: %s" % sys.exc_info()[0]) import traceback print("".join( traceback.format_exception(sys.exc_info()[0],sys.exc_info()[1],sys.exc_info()[2]) )) #self.progressbar.setEnabled(False) self.donebtn.setEnabled(True) self.cancelbtn.setEnabled(False) self.updatetimer.stop() self.starttime = None def show(self): tqt.TQDialog.show(self) tqt.TQTimer.singleShot(10,self.startCopy) tqt.tqApp.processEvents() def closeEvent(self, closeevent): if not self.dialog.getSourcePluginImpl().isFinished(): if tqt.TQMessageBox.warning(self,"Abort?","Abort the copy?",tqt.TQMessageBox.Yes,tqt.TQMessageBox.No) != tqt.TQMessageBox.Yes: closeevent.ignore() return self.dialog.getSourcePluginImpl().finish() self.dialog.getDestinationPluginImpl().finish() closeevent.accept() #-------------------------------------------------------------------- class DataSelector(tqt.TQVGroupBox): def __init__(self, plugintype, title, caption, parent, dialog, items): self.plugintype = plugintype self.pluginimpl = None self.dialog = dialog self.mainbox = None tqt.TQVGroupBox.__init__(self,title,parent) self.setInsideMargin(6) self.setInsideSpacing(0) typebox = tqt.TQHBox(self) label = tqt.TQLabel(caption,typebox) self.combobox = tqt.TQComboBox(typebox) for item in items: self.combobox.insertItem(str(item)) label.setBuddy(self.combobox) typebox.setStretchFactor(self.combobox,1) self.scrollview = tqt.TQScrollView(self) try: self.scrollview.setResizePolicy(tqt.TQScrollView.AutoOne) self.scrollview.setFrameStyle(tqt.TQFrame.NoFrame); self.scrollview.setResizePolicy(tqt.TQScrollView.AutoOneFit); self.scrollview.viewport().setPaletteBackgroundColor(self.paletteBackgroundColor()) except: import traceback print("".join( traceback.format_exception(sys.exc_info()[0],sys.exc_info()[1],sys.exc_info()[2]) )) tqt.TQObject.connect(self.combobox, tqt.SIGNAL("activated(int)"), self.activated) def updatePlugin(self): print("DataSelector.updatePlugin") self.pluginimpl = None text = str(self.combobox.currentText()) plugin = self.dialog.copycenter.plugins[text] self.pluginimpl = getattr(plugin, self.plugintype) def removeMainBox(self): if self.mainbox == None: return try: self.scrollview.removeChild(self.mainbox) self.mainbox.destroy() except: pass self.mainbox = None def updateMainBox(self): print("DataSelector.updateMainBox") self.removeMainBox() self.mainbox = tqt.TQVBox( self.scrollview.viewport() ) self.mainbox.setSpacing(2) if self.pluginimpl != None: try: self.pluginimpl.createWidget(self.dialog, self.mainbox) except: import traceback print("".join( traceback.format_exception(sys.exc_info()[0],sys.exc_info()[1],sys.exc_info()[2]) )) self.mainbox.setStretchFactor(tqt.TQWidget(self.mainbox), 1) self.mainbox.show() self.scrollview.addChild(self.mainbox) def activated(self, **args): self.updatePlugin() self.updateMainBox() def maybeUpdate(self): print("DataSelector.maybeUpdate") self.removeMainBox() tqt.TQTimer.singleShot(50, self.activated) def maybeDone(self): print("DataSelector.maybeDone") if self.pluginimpl.widget == None: return for optionname in self.pluginimpl.options: self.pluginimpl.options[optionname] = self.pluginimpl.widget.getOptionValue(optionname) #-------------------------------------------------------------------- class Dialog(tqt.TQDialog): def __init__(self, copycenter, parent): self.copycenter = copycenter from TQt import tqt import os import sys self.ListViewDialog = ListViewDialog tqt.TQDialog.__init__(self, parent, "Dialog", 1, tqt.TQt.WDestructiveClose) self.setCaption("Copy Center") layout = tqt.TQVBoxLayout(self) box = tqt.TQVBox(self) box.setMargin(6) box.setSpacing(6) layout.addWidget(box) self.tab = tqt.TQTabWidget(box) self.tab.setMargin(6) box.setStretchFactor(self.tab,1) self.jobsbox = CopyJobWidget(self,self.tab) self.tab.addTab(self.jobsbox,"Jobs") self.splitter = tqt.TQSplitter(self.tab) sourceplugins = [] destinationplugins = [] for pluginname in self.copycenter.plugins: if self.copycenter.plugins[pluginname].source != None: sourceplugins.append(pluginname) if self.copycenter.plugins[pluginname].destination != None: destinationplugins.append(pluginname) sourceplugins.sort() destinationplugins.sort() self.sourcedata = DataSelector( "source", # id "Read Data From", # title "Source:", # caption self.splitter, self, sourceplugins) self.destinationdata = DataSelector( "destination", # id "Write Data to", # title "Destination:", # caption self.splitter, self, destinationplugins) btnbox = tqt.TQHBox(box) btnbox.setSpacing(6) okbtn = tqt.TQPushButton(btnbox) okbtn.setText("Start Copy") okbtn.setDefault(True) tqt.TQObject.connect(okbtn,tqt.SIGNAL("clicked()"),self.startCopy) cancelbtn = tqt.TQPushButton(btnbox) cancelbtn.setText("Cancel") tqt.TQObject.connect(cancelbtn,tqt.SIGNAL("clicked()"),self.close) self.tab.addTab(self.splitter,"Copy") self.tab.setCurrentPage(1) self.helpbrowser = tqt.TQTextBrowser(self.tab) self.helpbrowser.setLinkUnderline(False) self.helpbrowser.setUndoRedoEnabled(False) self.tab.addTab(self.helpbrowser,"Help") tqt.TQObject.connect(self.tab,tqt.SIGNAL("currentChanged(TQWidget*)"),self.currentTabChanged) box.setMinimumSize( tqt.TQSize(760,500) ) defaultfile = os.path.join(self.copycenter.homepath,"default.copycenterjob.xml") if os.path.isfile(defaultfile): print("Reading default copy job file: %s" % defaultfile) self.jobsbox.jobfilecombobox.setCurrentText(defaultfile) def getSourcePluginName(self): return str(self.sourcedata.combobox.currentText()) def getSourcePluginImpl(self): return self.copycenter.plugins[self.getSourcePluginName()].source def getDestinationPluginName(self): return str(self.destinationdata.combobox.currentText()) def getDestinationPluginImpl(self): return self.copycenter.plugins[self.getDestinationPluginName()].destination def currentTabChanged(self,widget): if self.tab.currentPage() == self.jobsbox: # The "Copy" page is done self.sourcedata.maybeDone() self.destinationdata.maybeDone() # Update the "Jobs" page self.jobsbox.maybeUpdate() elif self.tab.currentPage() == self.splitter: # Update the "Copy" page self.sourcedata.maybeUpdate() self.destinationdata.maybeUpdate() elif self.tab.currentPage() == self.helpbrowser and self.helpbrowser.lines() <= 1: # Update the "Help" page import os file = os.path.join(self.copycenter.scriptpath, "readme.html") if not os.path.isfile(file): return fh = open(file,'r') self.helpbrowser.setText( fh.read() ) fh.close() def startCopy(self): dlg = ProgressDialog(self) dlg.show() #-------------------------------------------------------------------- if name == "__main__": tqtapp = tqt.TQApplication(sys.argv) else: tqtapp = tqt.tqApp dialog = Dialog(copycenter, tqtapp.mainWidget()) dialog.exec_loop() import os if __name__ == "__main__": scriptpath = os.getcwd() else: scriptpath = os.path.dirname(__name__) copycenter = CopyCenter(scriptpath) runGuiApp(copycenter, __name__)