Source code for sparkdl.transformers.named_image

# Copyright 2017 Databricks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from keras.applications.imagenet_utils import decode_predictions
import numpy as np

from pyspark.ml import Transformer
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.sql.functions import udf
from pyspark.sql.types import (ArrayType, FloatType, StringType, StructField, StructType)

import sparkdl.graph.utils as tfx
from sparkdl.image.imageIO import resizeImage
import sparkdl.transformers.keras_applications as keras_apps
from sparkdl.param import (
    keyword_only, HasInputCol, HasOutputCol, SparkDLTypeConverters)
from sparkdl.transformers.tf_image import TFImageTransformer


SUPPORTED_MODELS = ["InceptionV3", "Xception", "ResNet50"]


[docs]class DeepImagePredictor(Transformer, HasInputCol, HasOutputCol): """ Applies the model specified by its popular name to the image column in DataFrame. The input image column should be 3-channel SpImage. The output is a MLlib Vector. """ modelName = Param(Params._dummy(), "modelName", "A deep learning model name", typeConverter=SparkDLTypeConverters.supportedNameConverter(SUPPORTED_MODELS)) decodePredictions = Param(Params._dummy(), "decodePredictions", "If true, output predictions in the (class, description, probability) format", typeConverter=TypeConverters.toBoolean) topK = Param(Params._dummy(), "topK", "How many classes to return if decodePredictions is True", typeConverter=TypeConverters.toInt) @keyword_only def __init__(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False, topK=5): """ __init__(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False, topK=5) """ super(DeepImagePredictor, self).__init__() self._setDefault(decodePredictions=False) self._setDefault(topK=5) kwargs = self._input_kwargs self.setParams(**kwargs)
[docs] @keyword_only def setParams(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False, topK=5): """ setParams(self, inputCol=None, outputCol=None, modelName=None, decodePredictions=False, topK=5) """ kwargs = self._input_kwargs self._set(**kwargs) return self
[docs] def setModelName(self, value): return self._set(modelName=value)
[docs] def getModelName(self): return self.getOrDefault(self.modelName)
def _transform(self, dataset): transformer = _NamedImageTransformer(inputCol=self.getInputCol(), outputCol=self._getIntermediateOutputCol(), modelName=self.getModelName(), featurize=False) transformed = transformer.transform(dataset) if self.getOrDefault(self.decodePredictions): return self._decodeOutputAsPredictions(transformed) else: return transformed.withColumnRenamed( self._getIntermediateOutputCol(), self.getOutputCol()) def _decodeOutputAsPredictions(self, df): # If we start having different weights than imagenet, we'll need to # move this logic to individual model building in NamedImageTransformer. # Also, we could put the computation directly in the main computation # graph or use a scala UDF for potentially better performance. topK = self.getOrDefault(self.topK) def decode(predictions): pred_arr = np.expand_dims(np.array(predictions), axis=0) decoded = decode_predictions(pred_arr, top=topK)[0] # convert numpy dtypes to python native types return [(t[0], t[1], t[2].item()) for t in decoded] decodedSchema = ArrayType( StructType([StructField("class", StringType(), False), StructField("description", StringType(), False), StructField("probability", FloatType(), False)])) decodeUDF = udf(decode, decodedSchema) interim_output = self._getIntermediateOutputCol() return ( df.withColumn(self.getOutputCol(), decodeUDF(df[interim_output])) .drop(interim_output) ) def _getIntermediateOutputCol(self): return "__tmp_" + self.getOutputCol()
# TODO: give an option to take off multiple layers so it can be used in tuning # (could be the name of the layer or int for how many to take off).
[docs]class DeepImageFeaturizer(Transformer, HasInputCol, HasOutputCol): """ Applies the model specified by its popular name, with its prediction layer(s) chopped off, to the image column in DataFrame. The output is a MLlib Vector so that DeepImageFeaturizer can be used in a MLlib Pipeline. The input image column should be 3-channel SpImage. """ modelName = Param(Params._dummy(), "modelName", "A deep learning model name", typeConverter=SparkDLTypeConverters.supportedNameConverter(SUPPORTED_MODELS)) @keyword_only def __init__(self, inputCol=None, outputCol=None, modelName=None): """ __init__(self, inputCol=None, outputCol=None, modelName=None) """ super(DeepImageFeaturizer, self).__init__() kwargs = self._input_kwargs self.setParams(**kwargs)
[docs] @keyword_only def setParams(self, inputCol=None, outputCol=None, modelName=None): """ setParams(self, inputCol=None, outputCol=None, modelName=None) """ kwargs = self._input_kwargs self._set(**kwargs) return self
[docs] def setModelName(self, value): return self._set(modelName=value)
[docs] def getModelName(self): return self.getOrDefault(self.modelName)
def _transform(self, dataset): transformer = _NamedImageTransformer(inputCol=self.getInputCol(), outputCol=self.getOutputCol(), modelName=self.getModelName(), featurize=True) return transformer.transform(dataset)
class _NamedImageTransformer(Transformer, HasInputCol, HasOutputCol): """ For internal use only. NamedImagePredictor and NamedImageFeaturizer are the recommended classes to use. Applies the model specified by its popular name to the image column in DataFrame. There are two output modes: predictions or the featurization from the model. In either case the output is a MLlib Vector. """ modelName = Param(Params._dummy(), "modelName", "A deep learning model name", typeConverter=SparkDLTypeConverters.supportedNameConverter(SUPPORTED_MODELS)) featurize = Param(Params._dummy(), "featurize", "If true, output features. If false, output predictions. Either way the output is a vector.", typeConverter=TypeConverters.toBoolean) @keyword_only def __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False): """ __init__(self, inputCol=None, outputCol=None, modelName=None, featurize=False) """ super(_NamedImageTransformer, self).__init__() kwargs = self._input_kwargs self.setParams(**kwargs) self._inputTensorName = None self._outputTensorName = None self._outputMode = None @keyword_only def setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False): """ setParams(self, inputCol=None, outputCol=None, modelName=None, featurize=False) """ kwargs = self._input_kwargs self._set(**kwargs) return self def setModelName(self, value): return self._set(modelName=value) def getModelName(self): return self.getOrDefault(self.modelName) def setFeaturize(self, value): return self._set(featurize=value) def getFeaturize(self): return self.getOrDefault(self.featurize) def _transform(self, dataset): modelGraphSpec = _buildTFGraphForName(self.getModelName(), self.getFeaturize()) inputCol = self.getInputCol() resizedCol = "__sdl_imagesResized" tfTransformer = TFImageTransformer(inputCol=resizedCol, outputCol=self.getOutputCol(), graph=modelGraphSpec["graph"], inputTensor=modelGraphSpec["inputTensorName"], outputTensor=modelGraphSpec["outputTensorName"], outputMode=modelGraphSpec["outputMode"]) resizeUdf = resizeImage(modelGraphSpec["inputTensorSize"]) result = tfTransformer.transform(dataset.withColumn(resizedCol, resizeUdf(inputCol))) return result.drop(resizedCol) def _buildTFGraphForName(name, featurize): """ Currently only supports pre-trained models from the Keras applications module. """ modelData = keras_apps.getKerasApplicationModel(name).getModelData(featurize) sess = modelData["session"] outputTensorName = modelData["outputTensorName"] graph = tfx.strip_and_freeze_until([outputTensorName], sess.graph, sess, return_graph=True) modelData["graph"] = graph return modelData