Data-driven Testing with 'pytest', Part Three: Chained Operations

My previous post discussed the iterative process of arriving at a test harness with enough design abstraction to capture the complexity of its corresponding ETL application. This post discusses the first major breaking version update I made to the test case schema, why it was necessary, and how it has functioned in practice.

Initial Model

After I validated pytest.mark.parametrize() can disambiguate test cases and pass anything to a custom-designed stub metafunc, I went ahead with designing the actual schema I would pass as input parameters. After some cursory munging, the original schema looked something like this:

{
  "ARGS": ["-s", "./_data/csv/some.csv", "-d", "database://::table"],
  "SOURCE": {
    "PATH": "relative"
  },
  "DESTINATION": {
    "PATH": "absolute"
  },
  "ORDER_BY": "$COLUMN_NAME",
  "ORDER_BY_DIRECTION": "ascending",
  "MODE": "CREATE",
  "ASSERT": "SUCCESS",
  "DESCRIPTION_LONG": "$LONG_DESCRIPTION",
  "DESCRIPTION_SHORT": "$SHORT_DESCRIPTION",
  "VERSION": 0.1,
  "TIMESTAMP": "$ISO_8601_TIMESTAMP",
  "JIRA_TICKET": "$JIRA_TICKET"
}

This test case would pass in the arguments found in "ARGS", pre-processed by configuration options in "SOURCE" and "DESTINATION", into the ETL tool via the Python-based CLI call, then assert a particular test condition based on a per-record comparison of the original source and the resultant destination data as fetched into memory by conditions listed in "ORDER_BY" and "ORDER_BY_DIRECTION". Additional metadata related to the test case, such as the JIRA ticket number the test covers, short and long descriptions of the test case, and the unique identifier as an ISO-8601 formatted timestamp, help users parse the test case by eye.

It worked fine in the beginning. If you're doing a basic ingest of a CSV file into a database table, the information stored in this schema would more or less fully model the transfer through the ETL tool as done through the CLI. I happily migrated over most of the code-based tests to this framework without incident.

New Limitations

That happy time ended before I could migrate all the tests over to this new framework. I didn't meet my own needs through this schema, for several reasons:

  • What if you're trying to ingest data from a source database, or an in-memory cluster, or something that is process-based (the data can only be fetched through a particular process)? I didn't think the test harness should simply assume that a resource required in a test exists, because it makes reproducibility by other users on other machine instances more difficult. For example, if you had to ingest a PostgreSQL table, you don't want to go find out what the original data exactly was, then figure out how the original tester had loaded it into the table, to ensure the source data is exactly how the original tester ingested it to ensure you assert the specified assertion in the same test context. The test data should be easily accessible by testers and the way it is loaded into the source data format should be clearly documented, if not modeled by the test itself.

  • How do you test appending data to an existing destination, versus writing to a brand new destination? Again, the test harness does not assume a resource exists outside the test context, and destinations are dropped after the test has finished executing. Yet with only one operation available, you can only create a destination with this limitation.

  • How do you compare different destination instances, generated by ingesting the same source through different paths, for correctness? One major push for improving performance in this ETL tool was to integrate Apache Spark to act as the data handling layer, with the Python logic to act as the request layer. This job is made more complicated as the CLI functionality implemented for one paradigm isn't entirely compatible with the new paradigm (e.g. Python neither has nor needs logic to handle Spark memory allocation). Among other reasons, this meant if you want to be backwards compatible, you keep the old path as well. So both paths need to be tested, and compared against each other to ensure data is copied over correctly and any migrated features behave the same way.

  • How are operations besides the test execution and comparison operations handled? For example, PostGIS ships with a tool called shp2pgsql, a bash-compatible tool which ingests ESRI shapefiles into PostGIS. Using this tool for testing purposes would be preferable to building out a separate path within the ETL tool to ingest ESRI shapefiles into PostGIS as it does not contribute source code and technical debt, and because it's a common tool tested by other users already (which helps with robustness). If you only have "ARGS" available, you assume the execution operation, and if you have "ORDER_BY" and "ORDER_BY_DIRECTION" available, you assume the comparison operation, at the cost of excluding all other possible operation types.

These cases, and more that I likely didn't think about, clearly proved this schema wasn't flexible enough. Some error conditions simply couldn't be modeled with any of the three test harnesses I had to maintain due to this failed migration. This situation wasn't sustainable, and required an update to the test schema and test harness.

On one hand, additional design work may have caught these problems. On the other, identifying and prioritizing these problems took real-world experience, and I hoped having a more lightweight schema evolve based on problems identified in the field would create a final schema with a high degree of efficacy, contrasted with an overly complicated schema incurring a linearly scaling testing implementation slowdown (e.g. copying unhelpful keys again and again). Time constraints also contributed.

Okay, so how do Updates Work?

I'm very happy I added in the "VERSION" keyword to my original schema, because I could add in different execution paths in my test harness for new tests without old tests breaking. I didn't want to write a migration script for these tests because the scripts wouldn't be useful after the tests were migrated (there wouldn't be a need to roll back the migration after the code to handle the new schemas was checked in, changes would simply be patched in the code), and because I wanted to see how easily a green developer assigned to this ETL tool could contribute a new schema.

I was pleasantly surprised at how adaptable the pytest.mark.parametrize() metafunc model could adapt to a new schema. The single stub method to be parametrized, which contained all the logic for the original test execution, became a simple case/switch tree:

def test_single_test_case(test_case):
    """docstring
    """
    if test_case[test_args.VERSION] == 0.1:
        stub_version_zero_one(test_case)
    elif test_case[test_args.VERSION] == 0.2:
        stub_version_zero_two(test_case)
    else:
        raise odo.KIOValueError(
            '$ERROR_MESSAGE'
        )

This likely isn't the best way to add test cases, but it is stupidly simple and understandable along the likeliest path of extension (adding in an additional version involves adding another elif statement and another sub-stub method), and that fits in with the design goals of this ETL tool (be simple to contribute to).

One extreme difficulty I encountered was reconciling the code for validating different test schemas, as the code I wanted to reuse was hardcoded to understand the original schema. I didn't end up implementing a great solution to this; it ended up being a lot of if/else nonsense. During the next refactoring pass, I would probably add a reference schema, a JSON file that looked something like this:

{
  "ARGS": {
    "REQUIRED": true
  },
  "SOURCE": {
    "REQUIRED": true,
    "KEYS": {
      "PATH": {
        "REQUIRED": true,
        "OPTIONS": ["relative", "absolute"]
      }
    }
  },
  "DESTINATION": {
    "REQUIRED": true,
    "KEYS": {
      "PATH": {
        "REQUIRED": true,
        "OPTIONS": ["relative", "absolute"]
      }
    }
  },
  "ORDER_BY": {
    "REQUIRED": true
  },
  "ORDER_BY_DIRECTION": {
    "REQUIRED": true,
    "OPTIONS": ["ascending", "descending"]
  },
  "MODE": {
    "REQUIRED": true,
    "OPTIONS": ["CREATE", "APPEND"]
  },
  "ASSERT": {
    "REQUIRED": true,
    "OPTIONS": ["SUCCESS", "FAILURE"]
  },
  "DESCRIPTION_LONG": {
    "REQUIRED": false
  },
  "DESCRIPTION_SHORT": {
    "REQUIRED": false
  },
  "VERSION": {
    "REQUIRED": true
  },
  "TIMESTAMP": {
    "REQUIRED": true
  },
  "JIRA_TICKET": {
    "REQUIRED": false
  }
}

for every schema version, and just traverse the schema and recursively validate each key. This would make the validation logic generic to different schema versions, and more data-driven.

It was much less painful than upgrading the first test harness I built. There was almost no new code needed, beyond migrating the existing schema validation and execution logic (less than a thousand lines of Python total).

A New Schema

Some insights and requirements informed the new schema design:

  • This schema design should be the final-ish schema design -- I should not be revisiting the schema design unless a major feature requirement is implemented causing the testing challenge to change dimensionally. I want to spend my time building out features and providing business value, and while testing robustness and coverage form a crucial part of that, design sessions for the test harness do not.

  • There's a clear distinction between the information contained in the schema used by the test harness and the information for the human reader.

  • It isn't possible to predict what types of operations are necessary ahead of time. Originally, only execution and comparison operations existed; however, other operations, such as arbitrary bash executions, may become necessary. Adding types of operations should be as easy as adding numbers of operations. Hence, operations should remain generic to the test harness.

  • Operation execution order should be self-documented in the test case, and a total ordering should be enforced by the validation logic. Ordering of operations should be arbitrary with regards to the operation type; you should be able to execute a comparison in the middle of the test before continuing executions, for example to ensure for lossy ingest of bad data, the correct number of records is ingested at every stage of the ETL pipeline. While the ETL tool does support concurrent ingest at some points in the data layer, it does not do so from the CLI; multiple ETL jobs executed concurrently results in undefined behavior, due to limitations from third-party dependencies. Even in this case, though, concurrent operations should "play nice" with any other operations in play during the life of the test, concurrent or otherwise.

  • Although the ETL tool remains more or less a product of functional programming (a good thing), there may be situations where there is state generated on the fly that should not be persisted. For example, the test harness must have the ability to record an expected failure for a particular execution and pass the exception message to a comparison operation down the line, to ensure the correct exception was thrown and incorrect exceptions are failed. Therefore, the test harness must keep a global state store for the context through the lifetime of the test.

After some additional design munging, the next schema iteration was born:

{
  "OPERATIONS": [
    {
      "TYPE": "EXECUTION",
      "ARGS": [
        "-s",
        "./_data/parquet/some.parquet",
        "-d",
        "kinetica://::some_table"
      ],
      "SOURCE": {
        "PATH": "relative"
      },
      "DESTINATION": {
        "PATH": "absolute"
      }
    },
    {
      "TYPE": "EXECUTION",
      "ARGS": [
        "-s",
        "kinetica://::some_table",
        "-d",
        "kinetica://::some_other_table"
      ],
      "SOURCE": {
        "PATH": "absolute"
      },
      "DESTINATION": {
        "PATH": "absolute"
      }
    },
    {
      "TYPE": "COMPARISON",
      "SOURCE": {
        "URI": "kinetica://::some_table",
        "PATH": "absolute"
      },
      "DESTINATION": {
        "URI": "kinetica://::some_other_table",
        "PATH": "absolute"
      },
      "ORDER_BY": "$COLUMN_NAME",
      "ORDER_BY_DIRECTION": "ascending",
      "ASSERT": "SUCCESS"
    }
  ],
  "DESCRIPTION_SHORT": "$SHORT_DESCRIPTION",
  "DESCRIPTION_LONG": "$LONG_DESCRIPTION",
  "VERSION": 0.2,
  "TIMESTAMP": "$ISO_8601_TIMESTAMP"
}

This particular test case takes an Apache Parquet file, ingests into the database, then duplicates the generated table and compares the two tables to ensure correctness of the duplication logic. This schema effectively satisfies the original requirements that came out of the originally encountered problems:

  • Namespaces all keys used by the test harness into the "OPERATIONS" top-level key, with all other keys being for humans only.

  • Specifies operation type as the value of key "TYPE", keeping operation type configurable.

  • Uses a list to preserve ordering of operations.

While the schema might look simple, I have found the design simplicity is directly proportional to the amount of thinking focused on the underlying problem; this effort was no different.

I have used this schema for several months now, and no major changes have been required, although minor improvements such as bash execution functionality have been added. As the harness matures and bugs in the harness are patched, the robustness and trustworthiness of the harness should improve.

I would very much like to continue iterating on the effectiveness of this test harness. In particular, I would very much like to execute parametrized regressions in parallel with pytest-xdist, as it would allow me to use more data per regression and execute more regressions in less time. Seeing only one CPU out of the 8 CPUs I have being used during testing via gtop is quite painful. Maybe another time :blush:

Click here to read Part Four