Logo Search packages:      
Sourcecode: zope version File versions

regressionCatalog.py

#!/usr/bin/env python

# Regression test for ZCatalog


import os,sys
sys.path.insert(0,'.')

try:
    import Testing
except ImportError:
    sys.path[0] = "../../.."
    import Testing

os.environ['STUPID_LOG_FILE']= "debug.log"

here = os.getcwd()

import Zope
import ZODB, ZODB.FileStorage
from Products.ZCatalog import ZCatalog,Vocabulary
from Products.ZCatalog.Catalog import CatalogError
import Persistence
import ExtensionClass
from Testing import dispatcher
import keywords
from zLOG import LOG


import getopt,whrandom,time,string,mailbox,rfc822
import unittest_patched as unittest

# maximum number of files to read for the test suite
maxFiles = 1000

# maximum number of threads for stress testa
numThreads = 4


# number of iterations for searches
searchIterations = 1000

# number of iterations for catalog/uncatalog operations
updateIterations = 100

# input mailbox file
mbox   = os.environ.get("TESTCATALOG_MBOX","/usr/home/andreas/zope.mbox")
mbox2  = os.environ.get("TESTCATALOG_MBOX2", "/usr/home/andreas/python.mbox")

dataDir = ""


#
# Don't change anything below
#


00058 class testZODB:
    """ some wrapper stuff around ZODB """

    def __init__(self, file = "data/work/Data.fs",open=1):

        self.db = ZODB.DB( ZODB.FileStorage.FileStorage(file) )

        if open==1:
            self.connection = self.db.open()
            self.root = self.connection.root()


    def write(self,name,obj):
        self.root[name] = obj
        get_transaction().commit()


    def read(self,name):
        return self.root[name]


    def __del__(self):
        self.db.close()



00084 class testCatalog(Persistence.Persistent,unittest.TestCase):
    """ Wrapper around the catalog stuff """

    def __init__(self,mboxname,maxfiles):
        self.msg_ids = []
        self.num_files = 0
        self.keywords = []
        self.maxfiles = maxfiles

        self._vocabulary = Vocabulary.Vocabulary('Vocabulary',
                            'Vocabulary', globbing=1)
        self._catalog    = ZCatalog.ZCatalog("zcatalog")
        self._catalog.addIndex('to',      'TextIndex')
        self._catalog.addIndex('sender',  'TextIndex')
        self._catalog.addIndex('subject', 'TextIndex')
        self._catalog.addIndex('content', 'TextIndex')
        self._catalog.addIndex('file_id', 'TextIndex')
        self._catalog.addColumn('file_id')
        self._catalog.addIndex('length',  'FieldIndex')
        self._catalog.addColumn('length')
        self._catalog.addIndex('date',    'FieldIndex')
        self._catalog.addIndex('keywords', "KeywordIndex")

        self.build_catalog(mboxname)


    def build_catalog(self,mboxname):

        mb = mailbox.UnixMailbox(open(mboxname,"r"))
        i = 0

        msg = mb.next()
        while msg and self.num_files<self.maxfiles:

            try:
                self.catMessage(msg)
                self.msg_ids.append(msg.dict["message-id"])
            except:
                msg = mb.next()
                continue


            msg = mb.next()
            self.num_files = self.num_files + 1
            if self.num_files % 100==0: print self.num_files

            try:
                sub = string.split(msg.dict.get("subject",""))
            except:
                msg = mb.next()
                continue

            for s in sub:
                if not s in self.keywords: self.keywords.append(s)

        self._catalog.aq_parent = None


    def catMessage(self,m):
        self._catalog.catalogObject( testMessage(m) ,
                                    m.dict["message-id"] )

    def uncatMessage(self,uid):
        self._catalog.uncatalogObject( uid )


class testMessage(ExtensionClass.Base):

    def __init__(self,msg,modify_doc=0):

        self.sender  = msg.dict.get("from","")
        self.subject = msg.dict.get("subject","")
        self.to      = msg.dict.get("to","")
        self.content = str(msg)
        self.keywords= string.split(self.subject , " ")

        if modify_doc !=0:
            self.keywords = map(self.reverse,self.keywords)


        self.file_id = msg.dict.get("message-id","")

        self.length  = len(str(msg))
        date         = msg.dict.get("date","")
        try:
            self.date    =  time.mktime(rfc822.parsedate(date)[:9])
        except: pass

    def reverse(self,s):
        l = list(s)
        l.reverse()
        return string.join(l,"")


    def __del__(self):
        pass


00182 class BuildEnv(dispatcher.Dispatcher,unittest.TestCase):
    """ build environment """

    def __init__(self,func,*args,**kw):

        unittest.TestCase.__init__(self,func,args,kw)
        dispatcher.Dispatcher.__init__(self,func)

        self.init_phase = 0

        self.setlog( open("dispatcher.log","a") )
        self.logn('treads=%d  searchiterations=%d' %
                    (numThreads,searchIterations))
        self.logn('updateiterations=%d  maxfiles=%d' %
                    (updateIterations,maxFiles))

    #############################################################
    # Build up ZODB
    #############################################################


    def buildTestEnvironment(self,args,kw):
        self.init_phase = 1
        self.dispatcher("funcTestEnvironment",("funcTestEnvironment",1,args,kw))


    def funcTestEnvironment(self,dataDir,maxFiles):

        env = self.th_setup()

        if not os.path.exists(dataDir): os.makedirs(dataDir)

        os.system("rm -f %s/*" % dataDir)
        zodb = testZODB("%s/Data_orig.fs" % dataDir)

        print "parsing and reading mailbox file %s....please wait" % mbox
        tc = testCatalog( mbox,maxFiles )

        print "writing Catalog to ZODB"
        zodb.write("catalog" , tc)

        print "Creating keywords file"
        kw = keywords.Keywords()
        kw.build(mbox,1000)


        print tc.num_files, "files read"
        print "Initalization complete"

        self.th_teardown(env)


00234 class testSearches(dispatcher.Dispatcher,unittest.TestCase):
    """ test searches """

    def __init__(self,func,*args,**kw):

        unittest.TestCase.__init__(self,func,args,kw)
        dispatcher.Dispatcher.__init__(self,func)

        self.init_phase = 0

        self.setlog( open("dispatcher.log","a") )


    def setUp(self):

        os.system("rm -fr data/work")
        if not os.path.exists("data/work"): os.makedirs("data/work")
        assert os.system("cp %s/Data_orig.fs data/work/Data.fs" % dataDir)==0, \
            "Error while replicating original data"

        self.zodb               = testZODB("data/work/Data.fs",open=0)
        self.threads    = {}
        self.init_zodb_size = self.zodb_size()

        kw = keywords.Keywords()
        kw.reload()
        self.keywords  = kw.keywords()

        self.logn("-" * 80)
        self.logn('treads=%d  searchiterations=%d' %
                    (numThreads,searchIterations))
        self.logn('updateiterations=%d  maxfiles=%d' %
                    (updateIterations,maxFiles))


    def tearDown(self):
        self.log_zodb_size("before",self.init_zodb_size)
        self.log_zodb_size("after ",self.zodb_size())
        del self.zodb
        self.zodb = self.catalog = None

    def log_zodb_size(self,s,n):
        self.logn("Size of ZODB (data/work/Data.fs) %s test : %s" % (s,n) )

    def zodb_size(self):
        return self.size2size(os.stat("data/work/Data.fs")[6])


    def size2size(self,n):
        import math
        if n <1024.0: return "%8.3lf Bytes" % n
        if n <1024.0*1024.0: return "%8.3lf KB" % (1.0*n/1024.0)
        if n <1024.0*1024.0*1024.0: return "%8.3lf MB" % (1.0*n/1024.0/1024.0)



    #############################################################
    # Fulltext test
    #############################################################


00295     def testFulltextIndex(self,args,kw):
        """ benchmark FulltextIndex """
        self.dispatcher('funcFulltextIndex' ,
            ('funcFulltextIndex', kw["numThreads"] , () , {} ) )


00301     def funcFulltextIndex(self,*args):
        """ benchmark FulltextIndex """

        cat,msg_ids = self.get_catalog()

        env = self.th_setup()

        for kw in self.keywords:
            res = cat.searchResults( {"content" : kw } )

        self.th_teardown(env)


    #############################################################
    # Field index test
    #############################################################

00318     def testFieldIndex(self,args,kw):
        """ benchmark field index"""
        self.dispatcher('funcFieldIndex' ,
            ('funcFieldIndex',kw["numThreads"] , () , {} ) )


00324     def funcFieldIndex(self,*args):
        """ benchmark FieldIndex """

        cat,msg_ids = self.get_catalog()

        env = self.th_setup()

        for i in range(0,searchIterations):

            res = cat.searchResults( {"length" : i } )
            for r in res:
                assert i==r.length , "%s should have size %d but is %s" %  \
                    (r.file_id,i,r.length)

        self.th_teardown(env)

    #############################################################
    # Keyword index test
    #############################################################

00344     def testKeywordIndex(self,args,kw):
        """ benchmark Keyword index"""
        self.dispatcher('funcKeywordIndex' ,
            ('funcKeywordIndex', kw["numThreads"] , () , {} ) )


00350     def funcKeywordIndex(self,*args):
        """ benchmark KeywordIndex """

        cat,msg_ids = self.get_catalog()

        env = self.th_setup()

        for kw in self.keywords:
            res = cat.searchResults( {"subject" : kw } )
#            assert len(res) != 0 , "Search result for keyword '%s' is empty" % kw

        self.th_teardown(env)

    #############################################################
    # Field range index test
    #############################################################

00367     def testFieldRangeIndex(self,args,kw):
        """ benchmark field range index"""
        self.dispatcher('funcFieldRangeIndex' ,
            ('funcFieldRangeIndex', kw["numThreads"] , () , {} ) )


00373     def funcFieldRangeIndex(self,*args):
        """ benchmark FieldRangeIndex """

        cat,msg_ids = self.get_catalog()

        env = self.th_setup()

        rg = []
        for i in range(searchIterations):
            m = whrandom.randint(0,10000)
            n = m + 200
            rg.append((m,n))


        for i in range(searchIterations):
            for r  in cat.searchResults( {"length" : rg[i],"length_usage" : "range:min:max" } ):
                size = r.length
                assert rg[i][0]<=size and size<=rg[i][1] , \
                "Filesize of %s is out of range (%d,%d) %d" % (r.file_id,rg[i][0],rg[i][1],size)

        self.th_teardown(env)



    #############################################################
    # Keyword + range index test
    #############################################################

00401     def testKeywordRangeIndex(self,args,kw):
        """ benchmark Keyword range index"""
        self.dispatcher('funcKeywordRangeIndex' ,
            ('funcKeywordRangeIndex', kw["numThreads"] , () , {} ) )


00407     def funcKeywordRangeIndex(self,*args):
        """ benchmark Keyword & IndexRange search """

        cat,msg_ids = self.get_catalog()

        rg = []
        for i in range(len(self.keywords)):
            m = whrandom.randint(0,10000)
            n = m + 200
            rg.append( (m,n) )

        env = self.th_setup()

        results = []
        for i in range(len(self.keywords)):
            results.append( cat.searchResults( {"keywords":self.keywords[i],
                                                "length" : rg[i],
                                                "length_usage" : "range:min:max" } )
                                            )

        self.th_teardown(env)


    #############################################################
    # Test full reindexing
    #############################################################

00434     def testUpdates(self,args,kw):
        """ benchmark concurrent catalog/uncatalog operations """
        self.dispatcher("testUpdates" ,
            ("funcUpdates", kw["numThreads"] , args, kw ))


00440     def funcUpdates(self,*args,**kw):
        """ benchmark concurrent catalog/uncatalog operations """

        uncat_conflicts = cat_conflicts = 0
        cat,msg_ids = self.get_catalog()


        msgs = self.setupUpdatesMethod(kw["numUpdates"])
        keys = msgs.keys()

        rdgen = whrandom.whrandom()
        rdgen.seed(int(time.time()) % 256,int(time.time()) % 256,int(time.time()) % 256)

        env = self.th_setup()

        for i in range(len(keys)):

            r = rdgen.randint(0,len(msgs)-1)

            mid = keys[r]
            obj = msgs[mid]

            try:
                cat.uncatalog_object(mid)

                if kw.get("commit",1)==1:
                    get_transaction().commit()
                    time.sleep(0.1)
            except ZODB.POSException.ConflictError:
                uncat_conflicts = uncat_conflicts + 1

            try:
                cat.catalog_object(obj,mid)

                if kw.get("commit",1)==1:
                    get_transaction().commit()
                    time.sleep(0.1)

            except ZODB.POSException.ConflictError:
                cat_conflicts = cat_conflicts + 1

        try:
            get_transaction().commit()
        except: pass


        self.th_teardown(env,cat_conflicts=cat_conflicts,uncat_conflicts=uncat_conflicts)


00489     def setupUpdatesMethod(self,numUpdates):
        """ this method prepares a datastructure for the updates test.
            we are reading the first n mails from the primary mailbox.
            they are used for the update test
        """

        i = 0
        dict = {}

        mb = mailbox.UnixMailbox(open(mbox,"r"))

        msg = mb.next()
        while msg and i<numUpdates:

            obj = testMessage(msg)

            mid = msg.dict.get("message-id",None)
            if mid:
                dict[mid] = obj
                i = i+1

            msg = mb.next()

        return dict



    #############################################################
    # Test full reindexing
    #############################################################

00520     def testReindexing(self,args,kw):
        """ test reindexing of existing data """
        self.dispatcher("testReindexing" ,
            ("funcReindexing",kw["numThreads"] , (mbox,1000) , {} ))

00525     def testReindexingAndModify(self,args,kw):
        """ test reindexing of existing data but with modifications"""
        self.dispatcher("testReindexing" ,
            ("funcReindexing",kw["numThreads"] , (mbox,1000,1) , {} ))


00531     def funcReindexing(self,mbox,numfiles=100,modify_doc=0):
        """ test reindexing of existing data """

        cat_conflicts = 0
        cat,msg_ids = self.get_catalog()

        env = self.th_setup()

        mb = mailbox.UnixMailbox(open(mbox,"r"))
        i = 0

        msg = mb.next()

        while msg and i<numfiles:

            obj = testMessage(msg,modify_doc)
            if msg.dict.has_key("message-id"):
                mid = msg.dict["message-id"]
            else:
                msg = mb.next()
                continue

            try:
                cat.catalogObject(obj,mid)
                get_transaction().commit()
            except:
                cat_conflicts = cat_conflicts + 1

            msg = mb.next()
            i = i+1
            if i%100==0: print i

        self.th_teardown(env,cat_conflicts=cat_conflicts)


    #############################################################
    # Test full reindexing
    #############################################################

00570     def testIncrementalIndexing(self,args,kw):
        """ testing incremental indexing """
        self.dispatcher("testIncrementalIndexing" ,
            ("funcReindexing",kw["numThreads"], (mbox2,1000) , {}))


00576     def get_catalog(self):
        """ return a catalog object """

        # depended we are running in multithreaded mode we must take
        # care how threads open the ZODB

        connection  = self.zodb.db.open()
        root        = connection.root()
        cat         = root["catalog"]._catalog
        msg_ids     = root['catalog'].msg_ids

        return cat,msg_ids



def usage(program):
    print "Usage: "
    print
    print "initalize the test catalog:   %s -i -f <maximum number files to use> " % program
    print "to run the basic tests:       %s -b -f <maximum number files to use> " % program
    print "to run the advanced tests:    %s -a -f <maximum number files to use> " % program


def main():

    global dataDir,maxFiles

    opts,args = getopt.getopt(sys.argv[1:],"hiabf:xp",['help'])
    opts.sort()

    optsLst = map(lambda x: x[0],opts)

    if optsLst==[]: usage(os.path.basename(sys.argv[0])); sys.exit(0)

    for k,v in opts:
        if k in ['-h','--help'] : usage(os.path.basename(sys.argv[0])); sys.exit(0)
        if k == "-f":   maxFiles    = string.atoi(v)

    dataDir = os.path.join("data",str(maxFiles))

    if '-i' in optsLst:
        unittest.TextTestRunner().run(get_tests('init'))


    if '-b' in optsLst:
        unittest.TextTestRunner().run(get_tests('bench1'))


    if '-a' in optsLst:
        unittest.TextTestRunner().run(get_tests('bench2'))


    if '-x' in optsLst:
        unittest.TextTestRunner().run(get_tests('exp'))



    if '-p' in optsLst:
        unittest.TextTestRunner().run(test_suite())

def test_suite():

    return get_tests('basic')


def get_tests(what):
    global dataDir,maxFiles

    if what=='basic':
        maxFiles = 100
        dataDir = 'data/%d' % maxFiles

    t_aj = (
         BuildEnv('buildTestEnvironment',dataDir,maxFiles),
         testSearches("testFulltextIndex",numThreads=1),
         testSearches("testFieldIndex",numThreads= 1),
         testSearches("testFieldRangeIndex",numThreads=1),
         testSearches("testKeywordIndex",numThreads= 1),
         testSearches("testKeywordRangeIndex",numThreads= 1)
    )

    bench1_tests = (
         testSearches("testFulltextIndex",numThreads=1),
         testSearches("testFulltextIndex",numThreads= 4),
         testSearches("testFieldIndex",numThreads= 1),
         testSearches("testFieldIndex",numThreads= 4),
         testSearches("testFieldRangeIndex",numThreads=1),
         testSearches("testFieldRangeIndex",numThreads= 4),
         testSearches("testKeywordIndex",numThreads= 1),
         testSearches("testKeywordIndex",numThreads= 4),
         testSearches("testKeywordRangeIndex",numThreads= 1),
         testSearches("testKeywordRangeIndex",numThreads=4)
    )

    bench2_tests = (
#       testSearches("testReindexing",numThreads=1),
#        testSearches("testIncrementalIndexing",numThreads=1),
        testSearches("testUpdates",numThreads=2,numUpdates=200),
#        testSearches("testUpdates",numThreads=4,numUpdates=200)
    )

    exp_tests = (
#        testRS("testRangeSearch"),
#       testSearches("testReindexing",numThreads=1),
         testSearches("testReindexingAndModify",numThreads=1),
#        testSearches("testUpdates",numThreads=10,numUpdates=100),
    )

    init_tests = (
        BuildEnv("buildTestEnvironment",dataDir,maxFiles) ,
    )

    ts = unittest.TestSuite()
    for x in eval('%s_tests' % what): ts.addTest(x)
    return ts

    return



def pdebug():
    import pdb
    test_suite()

def debug():
    test_suite().debug()

def pdebug():
    import pdb
    pdb.run('debug()')


if __name__ == '__main__':
    main()

Generated by  Doxygen 1.6.0   Back to index