sparkdl package¶
Submodule Functions & Classes¶
-
sparkdl.udf.keras_image_model.
registerKerasImageUDF
(udf_name, keras_model_or_file_path, preprocessor=None)[source]¶ Create a Keras image model as a Spark SQL UDF. The UDF takes a column (formatted in
sparkdl.image.imageIO.imageSchema
) and produces the output of the given Keras model (e.g. for Inception V3 it produces a real valued score vector over the ImageNet object categories). For other models, the output could have different meanings. Please consult the actual models specification.The user can provide an existing model in Keras as follows.
from keras.applications import InceptionV3 registerKerasImageUDF("udf_name", InceptionV3(weights="imagenet"))
To use a customized Keras model, we can save it and pass the file path as parameter.
# Assume we have a compiled and trained Keras model model.save('path/to/my/model.h5') registerKerasImageUDF("my_custom_keras_model_udf", "path/to/my/model.h5")
If there are further preprocessing steps are required to prepare the images, the user has the option to provide a preprocessing function
preprocessor
. Thepreprocessor
converts a file path into a image array. This function is usually introduced in Keras workflow, as in the following example.Warning
There is a performance penalty to use a
preprocessor
as it will first convert the image into a file buffer and reloaded back. This provides compatibility with the usual way Keras model input are preprocessed. Please consider directly using Keras/TensorFlow layers for this purpose.def keras_load_img(fpath): from keras.preprocessing.image import load_img, img_to_array import numpy as np from pyspark.sql import Row img = load_img(fpath, target_size=(299, 299)) return img_to_array(img).astype(np.uint8) registerKerasImageUDF("my_inception_udf", InceptionV3(weights="imagenet"), keras_load_img)
If the preprocessor is not provided, we assume the function will be applied to a (struct) column encoded in [sparkdl.image.imageIO.imageSchema]. The output will be a single (struct) column containing the resulting tensor data.
Parameters: - udf_name – str, name of the UserDefinedFunction. If the name exists, it will be overwritten.
- keras_model_or_file_path – str or KerasModel, either a path to the HDF5 Keras model file or an actual loaded Keras model
- preprocessor – function, optional, a function that converts image file path to image tensor/ndarray in the correct shape to be served as input to the Keras model
Returns: GraphFunction
, the graph function for the Keras image model
-
class
sparkdl.estimators.keras_image_file_estimator.
KerasImageFileEstimator
(*args, **kwargs)[source]¶ Bases:
pyspark.ml.base.Estimator
,sparkdl.param.shared_params.HasInputCol
,sparkdl.param.image_params.HasInputImageNodeName
,sparkdl.param.shared_params.HasOutputCol
,sparkdl.param.shared_params.HasOutputNodeName
,sparkdl.param.shared_params.HasLabelCol
,sparkdl.param.shared_params.HasKerasModel
,sparkdl.param.shared_params.HasKerasOptimizer
,sparkdl.param.shared_params.HasKerasLoss
,sparkdl.param.image_params.CanLoadImage
,sparkdl.param.image_params.HasOutputMode
Build a Estimator from a Keras model.
First, create a model and save it to file system
from keras.applications.resnet50 import ResNet50 model = ResNet50(weights=None) model.save("path_to_my_model.h5")
Then, create a image loading function that reads image data from URI, preprocess them, and returns the numerical tensor.
def load_image_and_process(uri): import PIL.Image from keras.applications.imagenet_utils import preprocess_input original_image = PIL.Image.open(uri).convert('RGB') resized_image = original_image.resize((224, 224), PIL.Image.ANTIALIAS) image_array = np.array(resized_image).astype(np.float32) image_tensor = preprocess_input(image_array[np.newaxis, :]) return image_tensor
Assume the image URIs live in the following DataFrame.
original_dataset = spark.createDataFrame([ Row(imageUri="image1_uri", imageLabel="image1_label"), Row(imageUri="image2_uri", imageLabel="image2_label"), # and more rows ... ]) stringIndexer = StringIndexer(inputCol="imageLabel", outputCol="categoryIndex") indexed_dateset = stringIndexer.fit(original_dataset).transform(original_dataset) encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec") image_dataset = encoder.transform(indexed_dateset)
We can then create a Keras estimator that takes our saved model file and train it using Spark.
estimator = KerasImageFileEstimator(inputCol="imageUri", outputCol="name_of_result_column", labelCol="categoryVec", imageLoader=load_image_and_process, kerasOptimizer="adam", kerasLoss="categorical_crossentropy", kerasFitParams={"epochs": 5, "batch_size": 64}, modelFile="path_to_my_model.h5") transformers = estimator.fit(image_dataset)
-
fit
(dataset, params=None)[source]¶ Fits a model to the input dataset with optional parameters.
Warning
This returns the byte serialized HDF5 file for each model to the driver. If the model file is large, the driver might go out-of-memory. As we cannot assume the existence of a sufficiently large (and writable) file system, users are advised to not train too many models in a single Spark job.
Parameters: - dataset – input dataset, which is an instance of
pyspark.sql.DataFrame
. The column inputCol should be of type sparkdl.image.imageIO.imgSchema. - params – An optional param map that overrides embedded params. If a list/tuple of param maps is given, this calls fit on each param map and returns a list of models.
Returns: fitted model(s). If params includes a list of param maps, the order of these models matches the order of the param maps.
- dataset – input dataset, which is an instance of
-
sparkdl Module Contents¶
-
sparkdl.
imageType
(imageRow)[source]¶ Get type information about the image.
Parameters: imageRow – spark image row. Returns: ImageType
-
sparkdl.
readImages
(imageDirectory, numPartition=None)[source]¶ Read a directory of images (or a single image) into a DataFrame.
Parameters: - sc – spark context
- imageDirectory – str, file path.
- numPartition – int, number or partitions to use for reading files.
Returns: DataFrame, with columns: (filepath: str, image: imageSchema).
-
class
sparkdl.
TFImageTransformer
(*args, **kwargs)[source]¶ Bases:
pyspark.ml.base.Transformer
,sparkdl.param.shared_params.HasInputCol
,sparkdl.param.shared_params.HasOutputCol
,sparkdl.param.image_params.HasOutputMode
Applies the Tensorflow graph to the image column in DataFrame.
Restrictions of the current API:
- Does not use minibatches, which is a major low-hanging fruit for performance.
- Only one output node can be specified.
- The output is expected to be an image or a 1-d vector.
- All images in the dataframe are expected be of the same numerical data type (i.e. the dtype of the values in the numpy array representation is the same.)
We assume all graphs have a “minibatch” dimension (i.e. an unknown leading dimension) in the tensor shapes.
Note
The input tensorflow graph should have appropriate weights constantified, since a new session is created inside this transformer.
-
NEW_OUTPUT_PREFIX
= 'sdl_flattened'¶
-
USER_GRAPH_NAMESPACE
= 'given'¶
-
graph
= Param(parent='undefined', name='graph', doc='A TensorFlow computation graph')¶
-
inputTensor
= Param(parent='undefined', name='inputTensor', doc='A TensorFlow tensor object or name representing the input image')¶
-
outputTensor
= Param(parent='undefined', name='outputTensor', doc='A TensorFlow tensor object or name representing the output')¶
-
class
sparkdl.
DeepImagePredictor
(*args, **kwargs)[source]¶ Bases:
pyspark.ml.base.Transformer
,sparkdl.param.shared_params.HasInputCol
,sparkdl.param.shared_params.HasOutputCol
Applies the model specified by its popular name to the image column in DataFrame. The input image column should be 3-channel SpImage. The output is a MLlib Vector.
-
decodePredictions
= Param(parent='undefined', name='decodePredictions', doc='If true, output predictions in the (class, description, probability) format')¶
-
modelName
= Param(parent='undefined', name='modelName', doc='A deep learning model name')¶
-
setParams
(*args, **kwargs)[source]¶ - setParams(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False,
- topK=5)
-
topK
= Param(parent='undefined', name='topK', doc='How many classes to return if decodePredictions is True')¶
-
-
class
sparkdl.
DeepImageFeaturizer
(*args, **kwargs)[source]¶ Bases:
pyspark.ml.base.Transformer
,sparkdl.param.shared_params.HasInputCol
,sparkdl.param.shared_params.HasOutputCol
Applies the model specified by its popular name, with its prediction layer(s) chopped off, to the image column in DataFrame. The output is a MLlib Vector so that DeepImageFeaturizer can be used in a MLlib Pipeline. The input image column should be 3-channel SpImage.
-
modelName
= Param(parent='undefined', name='modelName', doc='A deep learning model name')¶
-
-
class
sparkdl.
KerasImageFileTransformer
(*args, **kwargs)[source]¶ Bases:
pyspark.ml.base.Transformer
,sparkdl.param.shared_params.HasInputCol
,sparkdl.param.shared_params.HasOutputCol
,sparkdl.param.image_params.CanLoadImage
,sparkdl.param.shared_params.HasKerasModel
,sparkdl.param.image_params.HasOutputMode
Applies the Tensorflow-backed Keras model (specified by a file name) to images (specified by the URI in the inputCol column) in the DataFrame.
- Restrictions of the current API:
- see TFImageTransformer.
- Only supports Tensorflow-backed Keras models (no Theano).