Custom DataReader Class for SQL databases

I’m using a custom reader_class to read from a database rather than a csv file. So a simplified version of my test looks like this:

*** Settings ***
Library             DataDriver    reader_class=sql_reader
...                     dbname=${CUSTOMER}
...                     client_encoding=latin1
...                     query=SELECT pay_period_date, employee_code, posted_flag FROM table
...                     title_str=Period %s Employee %s Posted %s
...                     title_vars=pay_period_date employee_code posted_flag
...                     extra_vars=method: Detailed, Summary; work_flag: Yes; xml_posted_flag: Yes, No, Yes
Test Template       Print Payroll Reports Diff

*** Variables ***
${work_flag}            Yes


*** Test Cases ***
Print Payroll Reports Diff N    Default    Default    Default    Default    Default    Default

*** Keywords ***
Print Payroll Reports Diff
    [Arguments]
    ...    ${pay_period_date}
    ...    ${employee_code}
    ...    ${posted_flag}
    ...    ${method}
    ...    ${work_flag}
    ...    ${xml_posted_flag}
    Rest of test runs from here

So the reader_class runs the sql and returns a TestCaseData object for each row with args containing the values from the row: pay_period_date, employee_code, posted_flag.

It can also append additional static arguments which are given in extra_vars, which is ; delimited to separate each argument. Each argument contains several comma separated values. The custom reader sets each extra_var to one of these options sequentially, sort of like a truth table.

The sql_reader is implemented using the psycopg python module as I only need to use it for PostgreSQL.

The code is here if anyone finds it useful:

from DataDriver.AbstractReaderClass import (
    AbstractReaderClass,
)  # inherit class from AbstractReaderClass
from DataDriver.ReaderConfig import (
    TestCaseData,
)  # return list of TestCaseData to DataDriver
import psycopg
from psycopg.rows import dict_row


class sql_reader(AbstractReaderClass):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.dbname = self.kwargs["dbname"]
        self.query = self.kwargs["query"]
        self.title_str = self.kwargs["title_str"]
        self.title_vars = self.kwargs["title_vars"]
        self.client_encoding = self.kwargs["client_encoding"]
        self.extra_vars = {}
        print(self.kwargs)
        if "extra_vars" in self.kwargs.keys():
            # extra_vars is a string that we need to parse into a python dict
            # string format "key1: value1, value2; key2: value1, value2"
            # result {'key1': ['value1', 'value2'], 'key2': ['value1', 'value2']
            self.extra_vars = {}
            for variable in self.kwargs["extra_vars"].split(";"):
                key, values = variable.split(":")
                key = key.strip()
                values_list = [v.strip() for v in values.split(",")]
                self.extra_vars[key] = values_list

    def get_data_from_source(self):
        print(f"Connecting to database {self.dbname}")
        dbconn = psycopg.connect(
            dbname=self.dbname, user="username", client_encoding=self.client_encoding
        )
        test_data = []
        cursor = dbconn.cursor(row_factory=dict_row)
        print(f"Fetching test cases with query: {self.query}")
        cursor.execute(self.query)
        rows = cursor.fetchall()
        total_rows = len(rows)
        for row_i, row in enumerate(rows):
            vars = tuple(row[c] for c in self.title_vars.split())
            test_name = self.title_str % vars
            args = {"${" + key + "}": value for key, value in row.items()}
            # append next selection from extra_vars using modulus of row index
            for extra_key, extra_options in self.extra_vars.items():
                selected_option = extra_options[row_i % len(extra_options)]
                args["${" + extra_key + "}"] = selected_option
                test_name += f" {extra_key} {selected_option}"
            test_data.append(
                TestCaseData(test_name, args, [""], f"{row_i+1}/{total_rows}")
            )  # add a TestCaseData object to the list of tests.
        dbconn.close()
        print(self.extra_vars)
        for d in test_data:
            print(d)
        return test_data  # return the list of TestCaseData to DataDriver

Now this is might be too niche to consider adding it to the DataDriver module. But perhaps something similar but more generic could be added that can work with all of the major relational databases. I noticed that pandas is already a dependency of DataDriver, so that might work without adding more dependencies. Otherwise perhaps sqlalchemy or records?

Anyone else using SQL data driven tests?

@René would you be open to a PR for this?

1 Like

Interesting idea.

I was also thinking about the external dependancies.
What about using DataBase library for this?

This could handle multiple different flavours.

And i am wondering if these “extra_vars” make really sense.
I think a proper, understandable and generic interface is really important when it should a reader in the core.

And the biggest problem i see, that we need tests for these.
The situation that i can not test or fix it, i would maybe not merge that.
But i think you could definitely add a paragraph in the readme/docs that lists “third party DataReader” and add yours to it.
I am happy to direct people to other readers!

Thanks for this contribution!