This documentation describes an old release, version 3.6.0. Documentation for the latest release, 3.6.1, can be found here.

Injecting Python Code

Certain Zuar Runner inputters, transforms, and steps allow the injection of Python code into a running job:

  • inputter - ExampleInput

  • transform - PythonTransform

  • step - PythonStep

TODO: add links to relevant docs

General

The value of python_code can be any of the following:

  1. A string containing the name of a file located in /var/mitto/data containing valid Python code.

  2. A string containing the fully-qualified path to a file containing valid Python code.

  3. A list of one or more strings, with each string being a line of valid Python. The individual strings are joined into a single string that is passed to the Python exec function.

Depending upon where the python_code is used, additional constraints may be placed on the code.

Formatting the List of Strings

When python_code is a list of strings, a non-standard formatting convention is used due to inconsistent handling of indentation by HJSON. This is best explained by example:

{
    use: mitto.iov2.steps.builtin#PythonStep
    python_code: [
        # Executed in the context of an instance of the PythonStep class
        # Because this uses the store as input, the job must be configured
        # with a store.
        def _dynamic_step(self):
        .    logging.info("start")
        .    from mitto.iov2.input import StoreInput
        .    from mitto.io.db.utils import (DEFAULT_ENCODE_ERRORS, to_copyfrom_line)
        .    from mitto.io.db.redshift import StreamIter
        .    streamer = StreamIter(
        .        to_copyfrom_line(record, DEFAULT_ENCODE_ERRORS).encode("utf-8")
        .        for record in self.environ[STORE].list()
        .    )
        .    data = streamer.read()
        .    logging.info("stop")
        # Function must be assigned to `step`
        self.step = _dynamic_step
    ]
}

Things to note:

  • The first non-space character on the line is considered to be “column 1”.

  • If the first non-space character is a ., it is converted to a space.

  • Python comments can be used

  • The variables available for use depend upon the context of execution

Execution Context and Other Requirements

PythonStep

When using the PythonStep step, python_code must define a function that will be valid as a method of the PythonStep class. The function must:

  • Accept a single argument: self

  • Expect to be called once during the execution of the job

  • Not return a value

  • Be assigned to the step attribute of the class instance

PythonTransform

When using the PythonTransform transform, python_code must define a function that will be valid as a method of the PythonTransform class. The function must:

  • Accept two arguments: self and record

  • Expect to be called once for each row of data

  • Return record or a modified version of record

  • Be assigned to the transform_ attributed of the class instance

Tips and Tricks

  1. If you are running the job manually using the CLI via job_io.py config.json, you can invoke the python debugger via, e.g.:

    {
        use: mitto.iov2.steps.builtin#PythonStep
        python_code: [
            import pdb; pdb.set_trace()
        ]
    }
    

    Note: this is not possible when the job is being run from the UI, the scheduler, a sequence, or via mitto run.

  2. You can easily add logging statements.

    To log every row at a certain point in a set of transforms:

    {
        use: mitto.iov2.transform.builtin#PythonTransform
        python_code: [
            def transform_(self, record):
            .   logging.info("record=%s", record)
            .   return record
            self.transform_ = transform_
       ]
    }
    

    To log the job execution environment at a certain point in the steps:

    {
        use: mitto.iov2.steps.builtin#PythonStep
        python_code: [
            logging.info("environ=%s", self.environ)
        ]
    }