This documentation describes an old release, version 3.5.0. Documentation for the latest release, 3.6.1, can be found here.
Injecting Python Code¶
Certain Zuar Runner inputters, transforms, and steps allow the injection of Python code into a running job:
inputter -
ExampleInput
transform -
PythonTransform
step -
PythonStep
TODO: add links to relevant docs
General¶
The value of python_code
can be any of the following:
A string containing the name of a file located in
/var/mitto/data
containing valid Python code.A string containing the fully-qualified path to a file containing valid Python code.
A list of one or more strings, with each string being a line of valid Python. The individual strings are joined into a single string that is passed to the Python
exec
function.
Depending upon where the python_code
is used, additional constraints
may be placed on the code.
Formatting the List of Strings¶
When python_code
is a list of strings, a non-standard formatting
convention is used due to inconsistent handling of indentation by
HJSON. This is best explained by example:
{
use: mitto.iov2.steps.builtin#PythonStep
python_code: [
# Executed in the context of an instance of the PythonStep class
# Because this uses the store as input, the job must be configured
# with a store.
def _dynamic_step(self):
. logging.info("start")
. from mitto.iov2.input import StoreInput
. from mitto.io.db.utils import (DEFAULT_ENCODE_ERRORS, to_copyfrom_line)
. from mitto.io.db.redshift import StreamIter
. streamer = StreamIter(
. to_copyfrom_line(record, DEFAULT_ENCODE_ERRORS).encode("utf-8")
. for record in self.environ[STORE].list()
. )
. data = streamer.read()
. logging.info("stop")
# Function must be assigned to `step`
self.step = _dynamic_step
]
}
Things to note:
The first non-space character on the line is considered to be “column 1”.
If the first non-space character is a
.
, it is converted to a space.Python comments can be used
The variables available for use depend upon the context of execution
Execution Context and Other Requirements¶
PythonStep
¶
When using the PythonStep
step, python_code
must define a function
that will be valid as a method of the PythonStep
class. The
function must:
Accept a single argument:
self
Expect to be called once during the execution of the job
Not return a value
Be assigned to the
step
attribute of the class instance
PythonTransform
¶
When using the PythonTransform
transform, python_code
must define
a function that will be valid as a method of the PythonTransform
class. The function must:
Accept two arguments:
self
andrecord
Expect to be called once for each row of data
Return
record
or a modified version ofrecord
Be assigned to the
transform_
attributed of the class instance
Tips and Tricks¶
If you are running the job manually using the CLI via
job_io.py config.json
, you can invoke the python debugger via, e.g.:{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ import pdb; pdb.set_trace() ] }
Note: this is not possible when the job is being run from the UI, the scheduler, a sequence, or via
mitto run
.You can easily add logging statements.
To log every row at a certain point in a set of transforms:
{ use: mitto.iov2.transform.builtin#PythonTransform python_code: [ def transform_(self, record): . logging.info("record=%s", record) . return record self.transform_ = transform_ ] }
To log the job execution environment at a certain point in the steps:
{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ logging.info("environ=%s", self.environ) ] }