Codebase list python-cx-oracle / master test / AQ.py
master

Tree @master (Download .tar.gz)

AQ.py @masterraw · history · blame

#------------------------------------------------------------------------------
# Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------

"""Module for testing AQ objects."""

import TestEnv

import cx_Oracle
import decimal
import threading

class TestCase(TestEnv.BaseTestCase):
    bookData = [
            ("Wings of Fire", "A.P.J. Abdul Kalam",
                    decimal.Decimal("15.75")),
            ("The Story of My Life", "Hellen Keller",
                    decimal.Decimal("10.50")),
            ("The Chronicles of Narnia", "C.S. Lewis",
                    decimal.Decimal("25.25"))
    ]

    def __clearBooksQueue(self):
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        options = self.connection.deqoptions()
        options.wait = cx_Oracle.DEQ_NO_WAIT
        options.deliverymode = cx_Oracle.MSG_PERSISTENT_OR_BUFFERED
        options.visibility = cx_Oracle.ENQ_IMMEDIATE
        props = self.connection.msgproperties()
        while self.connection.deq("BOOKS", options, props, book):
            pass

    def __deqInThread(self, results):
        connection = TestEnv.GetConnection()
        booksType = connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        options = connection.deqoptions()
        options.wait = 10
        props = connection.msgproperties()
        if connection.deq("BOOKS", options, props, book):
            results.append((book.TITLE, book.AUTHORS, book.PRICE))
        connection.commit()

    def __verifyAttribute(self, obj, attrName, value):
        setattr(obj, attrName, value)
        self.assertEqual(getattr(obj, attrName), value)

    def testDeqEmpty(self):
        "test dequeuing an empty queue"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        options = self.connection.deqoptions()
        options.wait = cx_Oracle.DEQ_NO_WAIT
        props = self.connection.msgproperties()
        messageId = self.connection.deq("BOOKS", options, props, book)
        self.assertTrue(messageId is None)

    def testDeqEnq(self):
        "test enqueuing and dequeuing multiple messages"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        options = self.connection.enqoptions()
        props = self.connection.msgproperties()
        for title, authors, price in self.bookData:
            book = booksType.newobject()
            book.TITLE = title
            book.AUTHORS = authors
            book.PRICE = price
            self.connection.enq("BOOKS", options, props, book)
        options = self.connection.deqoptions()
        options.navigation = cx_Oracle.DEQ_FIRST_MSG
        options.wait = cx_Oracle.DEQ_NO_WAIT
        results = []
        while self.connection.deq("BOOKS", options, props, book):
            row = (book.TITLE, book.AUTHORS, book.PRICE)
            results.append(row)
        self.connection.commit()
        self.assertEqual(results, self.bookData)

    def testDeqModeRemoveNoData(self):
        "test dequeuing with DEQ_REMOVE_NODATA option"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        title, authors, price = self.bookData[1]
        book.TITLE = title
        book.AUTHORS = authors
        book.PRICE = price
        options = self.connection.enqoptions()
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", options, props, book)
        options = self.connection.deqoptions()
        options.navigation = cx_Oracle.DEQ_FIRST_MSG
        options.wait = cx_Oracle.DEQ_NO_WAIT
        options.mode = cx_Oracle.DEQ_REMOVE_NODATA
        book = booksType.newobject()
        messageId = self.connection.deq("BOOKS", options, props, book)
        self.connection.commit()
        self.assertTrue(messageId is not None)
        self.assertEqual(book.TITLE, "")

    def testDeqOptions(self):
        "test getting/setting dequeue options attributes"
        options = self.connection.deqoptions()
        self.__verifyAttribute(options, "condition", "TEST_CONDITION")
        self.__verifyAttribute(options, "consumername", "TEST_CONSUMERNAME")
        self.__verifyAttribute(options, "correlation", "TEST_CORRELATION")
        self.__verifyAttribute(options, "mode", cx_Oracle.DEQ_LOCKED)
        self.__verifyAttribute(options, "navigation",
                cx_Oracle.DEQ_NEXT_TRANSACTION)
        self.__verifyAttribute(options, "transformation",
                "TEST_TRANSFORMATION")
        self.__verifyAttribute(options, "visibility", cx_Oracle.ENQ_IMMEDIATE)
        self.__verifyAttribute(options, "wait", 1287)
        self.__verifyAttribute(options, "msgid", b'mID')

    def testDeqWithWait(self):
        "test waiting for dequeue"
        self.__clearBooksQueue()
        results = []
        thread = threading.Thread(target = self.__deqInThread,
                args = (results,))
        thread.start()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        title, authors, price = self.bookData[0]
        book.TITLE = title
        book.AUTHORS = authors
        book.PRICE = price
        options = self.connection.enqoptions()
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", options, props, book)
        self.connection.commit()
        thread.join()
        self.assertEqual(results, [(title, authors, price)])

    def testEnqOptions(self):
        "test getting/setting enqueue options attributes"
        options = self.connection.enqoptions()
        self.__verifyAttribute(options, "visibility", cx_Oracle.ENQ_IMMEDIATE)

    def testErrorsForInvalidValues(self):
        "test errors for invalid values for options"
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        options = self.connection.enqoptions()
        props = self.connection.msgproperties()
        self.assertRaises(TypeError, self.connection.deq, "BOOKS", options,
                props, book)
        options = self.connection.deqoptions()
        self.assertRaises(TypeError, self.connection.enq, "BOOKS", options,
                props, book)

    def testMsgProps(self):
        "test getting/setting message properties attributes"
        props = self.connection.msgproperties()
        self.__verifyAttribute(props, "correlation", "TEST_CORRELATION")
        self.__verifyAttribute(props, "delay", 60)
        self.__verifyAttribute(props, "exceptionq", "TEST_EXCEPTIONQ")
        self.__verifyAttribute(props, "expiration", 30)
        self.assertEqual(props.attempts, 0)
        self.__verifyAttribute(props, "priority", 1)
        self.__verifyAttribute(props, "msgid", b'mID')
        self.assertEqual(props.state, cx_Oracle.MSG_READY)
        self.assertEqual(props.deliverymode, 0)

    def testVisibilityModeCommit(self):
        "test enqueue visibility option - ENQ_ON_COMMIT"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        enqOptions = self.connection.enqoptions()
        enqOptions.visibility = cx_Oracle.ENQ_ON_COMMIT
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        messageId = otherConnection.deq("BOOKS", deqOptions, props, book)
        self.assertTrue(messageId is None)
        self.connection.commit()
        messageId = otherConnection.deq("BOOKS", deqOptions, props, book)
        self.assertTrue(messageId is not None)

    def testVisibilityModeImmediate(self):
        "test enqueue visibility option - ENQ_IMMEDIATE"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        enqOptions = self.connection.enqoptions()
        enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.visibility = cx_Oracle.DEQ_ON_COMMIT
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        otherConnection.deq("BOOKS", deqOptions, props, book)
        results = (book.TITLE, book.AUTHORS, book.PRICE)
        otherConnection.commit()
        self.assertEqual(results, self.bookData[0])

    def testDeliveryModeSameBuffered(self):
        "test enqueue/dequeue delivery modes identical - buffered"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        enqOptions = self.connection.enqoptions()
        enqOptions.deliverymode = cx_Oracle.MSG_BUFFERED
        enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.deliverymode = cx_Oracle.MSG_BUFFERED
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        otherConnection.deq("BOOKS", deqOptions, props, book)
        results = (book.TITLE, book.AUTHORS, book.PRICE)
        otherConnection.commit()
        self.assertEqual(results, self.bookData[0])

    def testDeliveryModeSamePersistent(self):
        "test enqueue/dequeue delivery modes identical - persistent"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        enqOptions = self.connection.enqoptions()
        enqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT
        enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        otherConnection.deq("BOOKS", deqOptions, props, book)
        results = (book.TITLE, book.AUTHORS, book.PRICE)
        otherConnection.commit()
        self.assertEqual(results, self.bookData[0])

    def testDeliveryModeSamePersistentBuffered(self):
        "test enqueue/dequeue delivery modes identical - persistent/buffered"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        enqOptions = self.connection.enqoptions()
        enqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT_OR_BUFFERED
        enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT_OR_BUFFERED
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        otherConnection.deq("BOOKS", deqOptions, props, book)
        results = (book.TITLE, book.AUTHORS, book.PRICE)
        otherConnection.commit()
        self.assertEqual(results, self.bookData[0])

    def testDeliveryModeDifferent(self):
        "test enqueue/dequeue delivery modes different"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        enqOptions = self.connection.enqoptions()
        enqOptions.deliverymode = cx_Oracle.MSG_BUFFERED
        enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        messageId = otherConnection.deq("BOOKS", deqOptions, props, book)
        self.assertTrue(messageId is None)

    def testDequeueTransformation(self):
        "test dequeue transformation"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        expectedPrice = book.PRICE + 10
        enqOptions = self.connection.enqoptions()
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)
        self.connection.commit()

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
        deqOptions.transformation = "%s.transform2" % self.connection.username
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        otherConnection.deq("BOOKS", deqOptions, props, book)
        otherPrice = book.PRICE
        self.assertEqual(otherPrice, expectedPrice)

    def testEnqueueTransformation(self):
        "test enqueue transformation"
        self.__clearBooksQueue()
        booksType = self.connection.gettype("UDT_BOOK")
        book = booksType.newobject()
        book.TITLE, book.AUTHORS, book.PRICE = self.bookData[0]
        expectedPrice = book.PRICE + 5
        enqOptions = self.connection.enqoptions()
        enqOptions.transformation = "%s.transform1" % self.connection.username
        props = self.connection.msgproperties()
        self.connection.enq("BOOKS", enqOptions, props, book)
        self.connection.commit()

        otherConnection = TestEnv.GetConnection()
        deqOptions = otherConnection.deqoptions()
        deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
        deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
        deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
        booksType = otherConnection.gettype("UDT_BOOK")
        book = booksType.newobject()
        props = otherConnection.msgproperties()
        otherConnection.deq("BOOKS", deqOptions, props, book)
        otherPrice = book.PRICE
        self.assertEqual(otherPrice, expectedPrice)

if __name__ == "__main__":
    TestEnv.RunTestCases()