sparkdl package

Submodule Functions & Classes

sparkdl.udf.keras_image_model.registerKerasImageUDF(udf_name, keras_model_or_file_path, preprocessor=None)[source]

Create a Keras image model as a Spark SQL UDF. The UDF takes a column (formatted in sparkdl.image.imageIO.imageSchema) and produces the output of the given Keras model (e.g. for Inception V3 it produces a real valued score vector over the ImageNet object categories). For other models, the output could have different meanings. Please consult the actual models specification.

The user can provide an existing model in Keras as follows.

from keras.applications import InceptionV3
registerKerasImageUDF("udf_name", InceptionV3(weights="imagenet"))

To use a customized Keras model, we can save it and pass the file path as parameter.

# Assume we have a compiled and trained Keras model
model.save('path/to/my/model.h5')

registerKerasImageUDF("my_custom_keras_model_udf", "path/to/my/model.h5")

If there are further preprocessing steps are required to prepare the images, the user has the option to provide a preprocessing function preprocessor. The preprocessor converts a file path into a image array. This function is usually introduced in Keras workflow, as in the following example.

Warning

There is a performance penalty to use a preprocessor as it will first convert the image into a file buffer and reloaded back. This provides compatibility with the usual way Keras model input are preprocessed. Please consider directly using Keras/TensorFlow layers for this purpose.

def keras_load_img(fpath):
    from keras.preprocessing.image import load_img, img_to_array
    import numpy as np
    from pyspark.sql import Row
    img = load_img(fpath, target_size=(299, 299))
    return img_to_array(img).astype(np.uint8)

registerKerasImageUDF("my_inception_udf", InceptionV3(weights="imagenet"), keras_load_img)

If the preprocessor is not provided, we assume the function will be applied to a (struct) column encoded in [sparkdl.image.imageIO.imageSchema]. The output will be a single (struct) column containing the resulting tensor data.

Parameters:
  • udf_name – str, name of the UserDefinedFunction. If the name exists, it will be overwritten.
  • keras_model_or_file_path – str or KerasModel, either a path to the HDF5 Keras model file or an actual loaded Keras model
  • preprocessor – function, optional, a function that converts image file path to image tensor/ndarray in the correct shape to be served as input to the Keras model
Returns:

GraphFunction, the graph function for the Keras image model

class sparkdl.estimators.keras_image_file_estimator.KerasImageFileEstimator(*args, **kwargs)[source]

Bases: pyspark.ml.base.Estimator, sparkdl.param.shared_params.HasInputCol, sparkdl.param.image_params.HasInputImageNodeName, sparkdl.param.shared_params.HasOutputCol, sparkdl.param.shared_params.HasOutputNodeName, sparkdl.param.shared_params.HasLabelCol, sparkdl.param.shared_params.HasKerasModel, sparkdl.param.shared_params.HasKerasOptimizer, sparkdl.param.shared_params.HasKerasLoss, sparkdl.param.image_params.CanLoadImage, sparkdl.param.image_params.HasOutputMode

Build a Estimator from a Keras model.

First, create a model and save it to file system

from keras.applications.resnet50 import ResNet50
model = ResNet50(weights=None)
model.save("path_to_my_model.h5")

Then, create a image loading function that reads image data from URI, preprocess them, and returns the numerical tensor.

def load_image_and_process(uri):
    import PIL.Image
    from keras.applications.imagenet_utils import preprocess_input

    original_image = PIL.Image.open(uri).convert('RGB')
    resized_image = original_image.resize((224, 224), PIL.Image.ANTIALIAS)
    image_array = np.array(resized_image).astype(np.float32)
    image_tensor = preprocess_input(image_array[np.newaxis, :])
    return image_tensor

Assume the image URIs live in the following DataFrame.

original_dataset = spark.createDataFrame([
    Row(imageUri="image1_uri", imageLabel="image1_label"),
    Row(imageUri="image2_uri", imageLabel="image2_label"),
    # and more rows ...
])
stringIndexer = StringIndexer(inputCol="imageLabel", outputCol="categoryIndex")
indexed_dateset = stringIndexer.fit(original_dataset).transform(original_dataset)
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
image_dataset = encoder.transform(indexed_dateset)

We can then create a Keras estimator that takes our saved model file and train it using Spark.

estimator = KerasImageFileEstimator(inputCol="imageUri",
                                    outputCol="name_of_result_column",
                                    labelCol="categoryVec",
                                    imageLoader=load_image_and_process,
                                    kerasOptimizer="adam",
                                    kerasLoss="categorical_crossentropy",
                                    kerasFitParams={"epochs": 5, "batch_size": 64},
                                    modelFile="path_to_my_model.h5")

transformers = estimator.fit(image_dataset)
fit(dataset, params=None)[source]

Fits a model to the input dataset with optional parameters.

Warning

This returns the byte serialized HDF5 file for each model to the driver. If the model file is large, the driver might go out-of-memory. As we cannot assume the existence of a sufficiently large (and writable) file system, users are advised to not train too many models in a single Spark job.

Parameters:
  • dataset – input dataset, which is an instance of pyspark.sql.DataFrame. The column inputCol should be of type sparkdl.image.imageIO.imgSchema.
  • params – An optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.
Returns:

fitted model(s). If params includes a list of param maps, the order of these models matches the order of the param maps.

setParams(*args, **kwargs)[source]
setParams(self, inputCol=None, inputImageNodeName=None, outputCol=None,
outputNodeName=None, outputMode=”vector”, labelCol=None, modelFile=None, imageLoader=None, kerasOptimizer=None, kerasLoss=None, kerasFitParams=None)

sparkdl Module Contents

sparkdl.imageType(imageRow)[source]

Get type information about the image.

Parameters:imageRow – spark image row.
Returns:ImageType
sparkdl.readImages(imageDirectory, numPartition=None)[source]

Read a directory of images (or a single image) into a DataFrame.

Parameters:
  • sc – spark context
  • imageDirectory – str, file path.
  • numPartition – int, number or partitions to use for reading files.
Returns:

DataFrame, with columns: (filepath: str, image: imageSchema).

class sparkdl.TFImageTransformer(*args, **kwargs)[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol, sparkdl.param.image_params.HasOutputMode

Applies the Tensorflow graph to the image column in DataFrame.

Restrictions of the current API:

  • Does not use minibatches, which is a major low-hanging fruit for performance.
  • Only one output node can be specified.
  • The output is expected to be an image or a 1-d vector.
  • All images in the dataframe are expected be of the same numerical data type (i.e. the dtype of the values in the numpy array representation is the same.)

We assume all graphs have a “minibatch” dimension (i.e. an unknown leading dimension) in the tensor shapes.

Note

The input tensorflow graph should have appropriate weights constantified, since a new session is created inside this transformer.

NEW_OUTPUT_PREFIX = 'sdl_flattened'
USER_GRAPH_NAMESPACE = 'given'
getGraph()[source]
getInputTensor()[source]
getOutputTensor()[source]
graph = Param(parent='undefined', name='graph', doc='A TensorFlow computation graph')
inputTensor = Param(parent='undefined', name='inputTensor', doc='A TensorFlow tensor object or name representing the input image')
outputTensor = Param(parent='undefined', name='outputTensor', doc='A TensorFlow tensor object or name representing the output')
setGraph(value)[source]
setInputTensor(value)[source]
setOutputTensor(value)[source]
setParams(*args, **kwargs)[source]
setParams(self, inputCol=None, outputCol=None, graph=None,
inputTensor=utils.IMAGE_INPUT_PLACEHOLDER_NAME, outputTensor=None, outputMode=”vector”)
class sparkdl.DeepImagePredictor(*args, **kwargs)[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol

Applies the model specified by its popular name to the image column in DataFrame. The input image column should be 3-channel SpImage. The output is a MLlib Vector.

decodePredictions = Param(parent='undefined', name='decodePredictions', doc='If true, output predictions in the (class, description, probability) format')
getModelName()[source]
modelName = Param(parent='undefined', name='modelName', doc='A deep learning model name')
setModelName(value)[source]
setParams(*args, **kwargs)[source]
setParams(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False,
topK=5)
topK = Param(parent='undefined', name='topK', doc='How many classes to return if decodePredictions is True')
class sparkdl.DeepImageFeaturizer(*args, **kwargs)[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol

Applies the model specified by its popular name, with its prediction layer(s) chopped off, to the image column in DataFrame. The output is a MLlib Vector so that DeepImageFeaturizer can be used in a MLlib Pipeline. The input image column should be 3-channel SpImage.

getModelName()[source]
modelName = Param(parent='undefined', name='modelName', doc='A deep learning model name')
setModelName(value)[source]
setParams(self, inputCol=None, outputCol=None, modelName=None)[source]
class sparkdl.KerasImageFileTransformer(*args, **kwargs)[source]

Bases: pyspark.ml.base.Transformer, sparkdl.param.shared_params.HasInputCol, sparkdl.param.shared_params.HasOutputCol, sparkdl.param.image_params.CanLoadImage, sparkdl.param.shared_params.HasKerasModel, sparkdl.param.image_params.HasOutputMode

Applies the Tensorflow-backed Keras model (specified by a file name) to images (specified by the URI in the inputCol column) in the DataFrame.

Restrictions of the current API:
  • see TFImageTransformer.
  • Only supports Tensorflow-backed Keras models (no Theano).
setParams(*args, **kwargs)[source]
setParams(self, inputCol=None, outputCol=None, modelFile=None, imageLoader=None,
outputMode=”vector”)
sparkdl.imageInputPlaceholder(nChannels=None)[source]