[Python, Data Mining, Big Data, Data Engineering, TensorFlow] Coins classifier Neural Network: Head or Tail?
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Home of this article.The global objective of these articles is to build a coin classifier, capable of scanning your pocket change and find rare / valuable coins. This is a second article in a series, so let me remind you what happened earlier. During previous step we got a rather large dataset composed of pairs of images, loaded from an online coins site meshok.ru. Those images were uploaded to the Internet by people we do not know, and though they are supposed to contain coin's head in one image and tail in the other, we can not rule out a situation when we have two heads and no tail and vice versa. Also at the moment we have no idea which image contains head and which contains tail: this might be important when we feed data to our final classifier.So let's write a program to distinguish heads from tails. It is a rather simple task, involving a convolutional neural network that is using transfer learning.Same way as before, we are going to use Google Colab environment, taking the advantage of a free video card they grant us an access to. We will store data on a Google Drive, so first thing we need is to allow Colab to access the Drive:
from google.colab import drive
drive.mount("/content/drive/", force_remount=True)
Next step, we are going to install the Efficient Net. This is the pretrained network (remember I spoke about transfer learning?) that we use as a starting point, rather than training a network from scratch.
!pip install -q efficientnet
import efficientnet.tfkeras as efn
Next, i usually have a large "include" section, please note that some files may be included that are not really used: feel free to delete them:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import sys
import random
import os
from os import listdir
from os.path import isfile, join
from tensorflow.keras import regularizers
from tensorflow.keras.optimizers import Adamax
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import array_to_img, img_to_array
from tensorflow.keras import backend as K
from tensorflow.keras.applications.vgg16 import VGG16,preprocess_input
from tensorflow.keras.applications import InceptionResNetV2, Xception, NASNetLarge
from mpl_toolkits.mplot3d import Axes3D
from sklearn.manifold import TSNE
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dense, Activation, Dropout, Flatten, Lambda, concatenate, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.callbacks import LambdaCallback
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import Sequential
from sklearn.neighbors import NearestNeighbors
import seaborn as sns
import cv2
from tensorflow.python.keras.utils.data_utils import Sequence
import re
Let's see which version of Tensorflow is used. This step is important, as Google is known for suddenly changing (increasing) versions:
import tensorflow as tf
print(tf.__version__)
tf.test.gpu_device_name()
The output in my case was:
2.4.0
'/device:GPU:0'
Then we do some additional initializations. Setting directories where our project is, and some subfolders for weight stored during training:
working_path = "/content/drive/My Drive/02_avers_or_revers/"
best_weights_filepath = working_path + "models/01_avers_or_revers.h5"
last_weights_filepath = working_path + "models/01_avers_or_revers.h5"
We only train once, why would we do it every time, right? So we are going to use the boolean flag, if false, it means that training was already done, weights are stored in files, and instead of re-training, we can simply load those weights:
bDoTraining = True
We are going to scale down images to 256x256, use batch size 8 during training, and so on: here are constants we will need. Names are self-explainatory. We are also going to break our data to training images (used to tune network's weights), validation images used to calculate performance on data the net never saw) and the rest (testing data, used to test the result).
IMAGE_SIZE = 256
input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3)
BATCH_SIZE = 8
embedding_model = 0
alpha = 0.4
TRAINING_IMAGES_PERCENT = 0.6
VALIDATION_IMAGES_PERCENT = 0.2
IMAGE_ROTATION_ANGLE = 180
We have two classes for our classifier to distinguish between:
# Class name corresponds to a folder.
# Image path is "images" + class name + image name
arrClasses = ["head", "tail"]
Let's load data by reading the "head" and "tail" folders' content:
if(bDoTraining):
pdLabels = pd.get_dummies(arrClasses)
arrLabeledData = []
for cls in arrClasses:
arrImageNames = [f for f in listdir(working_path + "images/" + cls) if isfile(join(working_path, "images/", cls, f))]
arrLabeledData.append(
{
'class':cls,
'image_names':arrImageNames
})
Function to load images:
def loadImage(path):
img=cv2.imread(str(path))
#img = rotate_bound(img, angle)
img = cv2.resize(img, (IMAGE_SIZE, IMAGE_SIZE))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32)/255.
img = img.reshape(input_shape)
return img
For an array of image info (file names and so on), get max indexes of training, validation and testing subsets:
def getClassMinMax(cls, bIsTrain):
nLen = len(cls['image_names'])
if(bIsTrain):
nMinIdx = 0
nMaxIdx = nLen * TRAINING_IMAGES_PERCENT
else:
nMinIdx = nLen * TRAINING_IMAGES_PERCENT + 1
nMaxIdx = nLen * (TRAINING_IMAGES_PERCENT + VALIDATION_IMAGES_PERCENT)
return int(nMinIdx), int(nMaxIdx)
It is always a good idea to make sure everything works as intended, so let's test image loading:
if(bDoTraining):
nClassIdx = np.random.randint(len(arrLabeledData))
cls = arrLabeledData[nClassIdx]
nMinIdx, nMaxIdx = getClassMinMax(cls, False)
nImageIdx = random.randint(nMinIdx, nMaxIdx)
arrLabeledData[0]['class']
img = loadImage(join(working_path, "images/", cls['class'], cls['image_names'][nImageIdx]))#, 0)
#img = img.reshape((IMAGE_SIZE, IMAGE_SIZE))
print(cls['class'])
plt.imshow(img)
plt.show()
To make our dataset more diverse (augmentation), we might want to add noise to images:
def add_noise(img):
'''Add random noise to an image'''
VARIABILITY = 40
deviation = VARIABILITY*random.random() / 255.
noise = np.random.normal(0, deviation, img.shape)
img += noise
np.clip(img, 0., 1.)
return img
We will need the ImageDataGenerator to produce augmented images:
if(bDoTraining):
datagen = ImageDataGenerator(
samplewise_center=True,
rotation_range=IMAGE_ROTATION_ANGLE,
width_shift_range=0.1,
height_shift_range=0.1,
zoom_range=0.1 #[1, 1.2],
#preprocessing_function=add_noise
)
The following function is used to get an image by index from data we loaded earlier, using image data generator we just created:
def getImage(cClass, nImageIdx, datagen):
image_name = cClass['image_names'][nImageIdx]
#angle = random.randint(-180, 180)
img = loadImage(join(working_path, "images/", cClass['class'], cClass['image_names'][nImageIdx]))#, angle)
arrImg = img_to_array(img)
arrImg = datagen.random_transform(arrImg) # augmentation
arrImg = add_noise(arrImg)
return np.array(arrImg, dtype="float32")
Again, we need to make sure everything works, so let's see what this function returns:
if(bDoTraining):
nClassIdx = np.random.randint(len(arrLabeledData))
cls = arrLabeledData[nClassIdx]
img = getImage(cls, 0, datagen)
print(cls['class'])
plt.imshow(img) #, cmap='gray')
plt.show()
If we do training and for some reason want to start it over, we need to delete network we saved by that time:
def deleteSavedNet(best_weights_filepath):
if(os.path.isfile(best_weights_filepath)):
os.remove(best_weights_filepath)
print("deleteSavedNet():File removed")
else:
print("deleteSavedNet():No file to remove")
As we train our network, it accumulates "history". It is a good idea to be able to show it as a chart, this way we can often see if training can be improved:
def plotHistory(history, strParam1, strParam2):
plt.plot(history.history[strParam1], label=strParam1)
plt.plot(history.history[strParam2], label=strParam2)
#plt.title('strParam1')
#plt.ylabel('Y')
#plt.xlabel('Epoch')
plt.legend(loc="best")
plt.show()
def plotFullHistory(history):
arrHistory = []
for i,his in enumerate(history.history):
arrHistory.append(his)
plotHistory(history, arrHistory[0], arrHistory[2])
plotHistory(history, arrHistory[1], arrHistory[3])
Now a function that creates a model. It loads the EfficientNet, removes its last layers (the classifier) and attaches our own classifier, one we are going to train:
def createModel(nL2, optimizer):
global embedding_model
inputs = keras.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
model_b0 = efn.EfficientNetB0(weights='imagenet', include_top=False)(inputs)
model_b0.trainable = False
model_concat = model_b0 #layers.concatenate([model_b0, model_vgg16]) #, model_x]) #model_b0
model_classifier = layers.Flatten(name="Flatten")(model_concat)
model_classifier = layers.Dense(32, kernel_regularizer=regularizers.l2(nL2), activation='relu', name="Dense128")(model_classifier)
model_classifier = layers.LeakyReLU(alpha=0.1, name="LeakyReLU")(model_classifier)
model_classifier = layers.Dropout(0.4, name="Dropout")(model_classifier)
base_model = layers.Dense(len(arrClasses), activation="softmax", kernel_regularizer=regularizers.l2(nL2), name="DenseEmbedding")(model_classifier)
embedding_model = keras.Model(inputs=inputs, outputs=base_model, name="embedding_model")
embedding_model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=optimizer, metrics=["accuracy"])
return embedding_model
The following class is used to produce batches of images (and labels) that are used during training. Sequence class that is used as a parent is a new standard of Keras (if you don't want to use tfdata), it is highly paralelizeable and convenient:
from skimage.io import imread
from skimage.transform import resize
import numpy as np
# Here, `x_set` is list of path to the images
# and `y_set` are the associated classes.
class MyImageDataGenerator(Sequence):
def __init__(self, bIsTrain):
self.batch_size = BATCH_SIZE
self.bIsTrain = bIsTrain
nNumOfTrainSamples = 10000
for cls in arrLabeledData:
nMin, nMax = getClassMinMax(cls, True)
nNumOfTrainSamples = min(nNumOfTrainSamples, nMax - nMin)
if(self.bIsTrain):
self.STEP_SIZE = nNumOfTrainSamples // BATCH_SIZE
else:
nNumOfValidSamples = int(nNumOfTrainSamples * VALIDATION_IMAGES_PERCENT / TRAINING_IMAGES_PERCENT)
self.STEP_SIZE = nNumOfValidSamples // BATCH_SIZE
if(self.STEP_SIZE < 100):
self.STEP_SIZE = 100
print("STEP_SIZE: ", self.STEP_SIZE, " (bIsTrain: ", bIsTrain, ")")
def __len__(self):
return self.STEP_SIZE
def __getitem__(self, idx):
arrBatchImages = []
arrBatchLabels = []
for i in range(self.batch_size):
arrClassIdx = np.random.randint(len(arrLabeledData))
cls = arrLabeledData[arrClassIdx]
nMinIdx, nMaxIdx = getClassMinMax(cls, self.bIsTrain)
nImageIdx = random.randint(nMinIdx, nMaxIdx)
img = getImage(cls, nImageIdx, datagen)
strLabel = cls['class']
arrBatchImages.append(img)
arrBatchLabels.append(pdLabels[strLabel].to_list())
return np.array(arrBatchImages), np.array(arrBatchLabels)
We will need two objects of this class, one for training and one for validation:
if(bDoTraining):
gen_train = MyImageDataGenerator(True)
gen_valid = MyImageDataGenerator(False)
As usual, we need a function to show image obtained this way:
def ShowImg(img, label):
print(label)
fig = plt.figure()
fig.add_subplot(1, 1, 1)
plt.imshow(img) #, cmap='gray')
plt.show()
plt.close()
And (again, as usual) we want to test the result:
if(bDoTraining):
(images, labels) = gen_valid.__getitem__(0) #next(gen_train)
for i, img in enumerate(images):
ShowImg(img, labels[i])
break
We want to be able to stop training any time and later start from where we left, so we need to save weights at the end of each epoch. To do it, we create a list of callbacks and use it during training.
def getCallbacks(monitor, mode):
checkpoint = ModelCheckpoint(best_weights_filepath, monitor=monitor, save_best_only=True, save_weights_only=True, mode=mode, verbose=1)
save_model_at_epoch_end_callback = LambdaCallback(on_epoch_end=lambda epoch, logs: embedding_model.save_weights(last_weights_filepath))
callbacks_list = [checkpoint, save_model_at_epoch_end_callback] # , early]
return callbacks_list
Also, we need to be able to load the model (to continue training or to do testing):
def loadModel(embedding_model, bBest):
if(bBest):
path = best_weights_filepath
strMessage = "load best model"
else:
path = last_weights_filepath
strMessage = "load last model"
if(os.path.isfile(path)):
embedding_model.load_weights(path)
print(strMessage, ": File loaded")
else:
print(strMessage, ": No file to load")
return embedding_model
The following function does actual training:
def trainNetwork(EPOCHS, nL2, optimizer, bCumulativeLearning = False):
global embedding_model
global history
global arrImages
global arrLabels
if(bCumulativeLearning == False):
deleteSavedNet(best_weights_filepath)
random.seed(7)
embedding_model = createModel(nL2, optimizer)
print("Model created")
callbacks_list = getCallbacks("val_accuracy", 'max')
if(bCumulativeLearning == True):
loadModel(embedding_model, False)
nNumOfTrainSamples = 10000
for cls in arrLabeledData:
nMin, nMax = getClassMinMax(cls, True)
nNumOfTrainSamples = min(nNumOfTrainSamples, nMax - nMin)
STEP_SIZE_TRAIN = nNumOfTrainSamples // BATCH_SIZE
if(STEP_SIZE_TRAIN < 100):
STEP_SIZE_TRAIN = 100
nNumOfValidSamples = int(nNumOfTrainSamples * VALIDATION_IMAGES_PERCENT / TRAINING_IMAGES_PERCENT)
STEP_SIZE_VALID = nNumOfValidSamples // BATCH_SIZE
if(STEP_SIZE_VALID < 100):
STEP_SIZE_VALID = 100
print(STEP_SIZE_TRAIN, STEP_SIZE_VALID)
print("Available metrics: ", embedding_model.metrics_names)
history = embedding_model.fit(gen_train,
validation_data=gen_valid, verbose=0,
epochs=EPOCHS, steps_per_epoch=STEP_SIZE_TRAIN,
validation_steps=STEP_SIZE_VALID, callbacks=callbacks_list)
print(nL2)
plotFullHistory(history)
# TBD: here, return best model, not last one
return embedding_model
As you can see, it does some initializations, and then calls Keras's "fit" function. Another data generator. This one reads images that we use AFTER network was trained. We don't care about labels here, as we deal with test set (or pretend it is test data).
def data_generator_simple(arrAllImageNames, arrAllImageClasses):
i = 0
arrImages = []
arrImageLabels = []
arrImageClasses = []
for nImageIdx in range(len(arrAllImageNames)):
if(i == 0):
arrImages = []
arrImageNames = []
arrImageClasses = []
i += 1
strClass = arrAllImageClasses[nImageIdx]
strImageName = arrAllImageNames[nImageIdx]
#angle = random.randint(0, 90)
img = loadImage(join(working_path, "images/", strClass, strImageName)) #, angle)
arrImg = img_to_array(img)
#arrImg = datagen.random_transform(arrImg) #/ 255.
#arrImg = add_noise(arrImg)
arrImg = np.array(arrImg, dtype="float32")
arrImages.append(arrImg)
arrImageNames.append(strImageName)
arrImageClasses.append(strClass)
if i == BATCH_SIZE:
i = 0
yield np.array(arrImages), arrImageNames, arrImageClasses
raise StopIteration()
As usual, load image using this generator:
def ShowImgSimple(img, label):
print(label)
fig = plt.figure()
fig.add_subplot(1, 1, 1)
plt.imshow(img, cmap='gray')
plt.show()
plt.close()
And display it:
Using the generator above, we can load all test images and run prediction on them:
def getAllTestImages():
global embedding_model
arrAllImageNames = []
arrAllImageClasses = []
for cClass in arrLabeledData:
for nIdx in range(int(len(cClass['image_names']) * (TRAINING_IMAGES_PERCENT + VALIDATION_IMAGES_PERCENT)), len(cClass['image_names'])):
arrAllImageNames.append(cClass['image_names'][nIdx])
arrAllImageClasses.append(cClass['class'])
test_preds = []
test_file_names = []
test_class_names = []
for imgs, fnames, classes in data_generator_simple(arrAllImageNames, arrAllImageClasses):
predicts = embedding_model.predict(imgs)
predicts = predicts.tolist()
test_preds += predicts
test_file_names += fnames
test_class_names += classes
test_preds = np.array(test_preds)
return test_preds, test_file_names, test_class_names
By the way, we can get accuracies for all our predictions:
def getAccuracy(test_preds, test_file_names, test_class_names):
nTotalSuccess = 0
for i, arrPredictedProbabilities in enumerate(test_preds):
nPredictedClassIdx = arrPredictedProbabilities.argmax()
gt_class = test_class_names[i]
predicted_class = arrClasses[nPredictedClassIdx]
if(predicted_class == gt_class):
nTotalSuccess += 1
else:
print("GT: ", gt_class, "; Pred: ", predicted_class, "; Probabilitires: ", arrPredictedProbabilities[0], ", ", arrPredictedProbabilities[1])
img = loadImage(join(working_path, "images/", gt_class, test_file_names[i]))#, 0)
plt.imshow(img)
plt.show()
nSuccess = nTotalSuccess / (i+1)
return nSuccess
Finally, here is the function that STARTS the training. It has somewhat confusing name "test":
def test(EPOCHS, nL2, optimizer, learning_rate, bCumulativeLearning):
global embedding_model
embedding_model = trainNetwork(EPOCHS, nL2, optimizer, bCumulativeLearning)
print("loading best model")
embedding_model = loadModel(embedding_model, True)
test_preds, test_file_names, test_class_names = getAllTestImages()
# print("test_preds[0], test_file_names[0], test_class_names[0]: ", test_preds[0], test_file_names[0], test_class_names[0])
nSuccess = getAccuracy(test_preds, test_file_names, test_class_names)
print(">>> Accuracy on test set:", nSuccess, "<<<")
We can now call this function and therefore start training:
opt = tf.keras.optimizers.Adam(0.0002) ##Adamax(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0)
nL2 = 0.4
if(bDoTraining):
EPOCHS = 50
learning_rate=0.001
np.random.seed(7)
test(EPOCHS, nL2, opt, learning_rate, bCumulativeLearning=False)
embedding_model = loadModel(embedding_model, True)
embedding_model.save(best_weights_filepath) # A full model is saved
After training is complete, we can run predictions on all test data:
if(bDoTraining):
nClassIdx = np.random.randint(len(arrLabeledData))
cls = arrLabeledData[nClassIdx]
nMinIdx, nMaxIdx = getClassMinMax(cls, False)
nImageIdx = random.randint(nMinIdx, nMaxIdx)
for i, nImageIdx in enumerate(range(nMinIdx, nMaxIdx)):
print(i+1, "of", nMaxIdx - nMinIdx)
img = loadImage(join(working_path, "images/", arrLabeledData[nClassIdx]['class'], arrLabeledData[nClassIdx]['image_names'][nImageIdx]))#, 0)
arrImg = img_to_array(img)
arrImg = np.array(arrImg, dtype="float32")
# ---
test_preds = embedding_model.predict(arrImg.reshape(1, IMAGE_SIZE, IMAGE_SIZE, 3))
nIdx = test_preds.argmax()
if(nClassIdx != nIdx):
print("GT: ", arrLabeledData[nClassIdx]['class'], "; Pred: ", arrClasses[nIdx])
plt.imshow(img)
plt.show()
Ok, our model is trained and tested on a test data set. Now we can actually USE it: we can load a HUGE set of images and classify them (see comments in code). Note that this code is written to work with image file names convention from previous step:
# Same as above in "test" section, but this time we process images from output folder
# The "/content/drive/My Drive/01_Output/" is the output of the previous step, remember, we goi pairs of images, and
# now need to figure which ones are avers and which ones are revers?
images_source_path = "/content/drive/My Drive/01_Output/"
# We will save images by new names (with "head" or "tail" suffix) in this folder
images_dest_path = working_path + "images_processed/"
arrSourceImageNames = [f for f in listdir(images_source_path) if isfile(join(images_source_path, f))]
# Create model and load its weights (ones we got during training)
embedding_model = createModel(nL2, opt)
embedding_model = loadModel(embedding_model, True)
# Dictionary will store image names and counter: see below for details
dictNames = {}
nTotal = len(arrSourceImageNames)
for i, file_name in enumerate(arrSourceImageNames):
image_path = join(images_source_path, file_name)
img = loadImage(image_path)
arrImg = img_to_array(img)
arrImg = np.array(arrImg, dtype="float32")
# ---
# For image, predict its class
test_preds = embedding_model.predict(arrImg.reshape(1, IMAGE_SIZE, IMAGE_SIZE, 3))
nIdx = test_preds.argmax()
#print(i+1, "of", nTotal, ": ", arrClasses[nIdx])
#plt.imshow(img)
#plt.show()
# Split image name
word_list = file_name.split(".") # ['0_000_00', 'png']
image_name = word_list[0]
image_ext = word_list[1]
plt.imsave(images_dest_path + image_name + "_" + arrClasses[nIdx] + ".png", img)
# Now we need to move source file to trash, but make it zero size first so it doesn't take space there
open(image_path, 'w').close() #overwrite and make the file blank instead
os.remove(image_path)
if(i%100 == 0):
print(i, " of ", nTotal)
# File names look like 123496110_07_03.
# Here 123496110 is the file root name, 07 is number of a coin in that image (some images contain >1 coins), and 03 is a number of images of that coin
# (Say, we have 169860023_000.jpg, 169860023_001.jpg, 169860023_002.jpg, one coin per image, tail-tail-head. Then at step 1 we will get
# 169860023_00_00, 169860023_00_01, and 169860023_00_02)
# We append _head or _tail: 169860023_00_00_tail(.png)
# In dictNames we keep pairs 169860023_00 + flag. Flag == 0 if no heads, no tails, 1 if heads / no tails, 2 if tails / no heads and 3 if has both
# When scanning is complete, we delete files that have flag != 3
arrImageNameParts = image_name.split("_") # ['169860023', '000', '00']
# We do not need "000" here, as it is just number of an image in a group of images for that coin. We need name (169860023) of course, plus
# number of a coin (00)
coin_name = arrImageNameParts[0] + "_" + arrImageNameParts[2]
if(arrClasses[nIdx] == "head"):
if coin_name in dictNames:
dictNames[coin_name] = dictNames[coin_name] | 1
else:
dictNames[coin_name] = 1
else:
if coin_name in dictNames:
dictNames[coin_name] = dictNames[coin_name] | 2
else:
dictNames[coin_name] = 2
#print(dictNames)
# Now we need to delete all files for which dictNames[coin_name] != 3
print("Deleting files that do not have both head and tail")
nDeleted = 0
for i, file_name in enumerate(arrSourceImageNames):
image_path = join(images_dest_path, file_name)
word_list = file_name.split(".") # ['0_000_00', 'png']
image_name = word_list[0]
image_ext = word_list[1]
arrImageNameParts = image_name.split("_") # ['169860023', '000', '00']
# We do not need "000" here, as it is just number of an image in a group of images for that coin. We need name (169860023) of course, plus
# number of a coin (00)
coin_name = arrImageNameParts[0] + "_" + arrImageNameParts[2]
if (coin_name not in dictNames) or (dictNames[coin_name] != 3):
open(image_path, 'w').close() #overwrite and make the file blank instead
os.remove(image_path)
if(i%100 == 0):
print(i, " of ", nTotal)
print("Deleted", nDeleted)
As the result, we have file names with "_head" or "_tail" suffix, and coins that have no pair are removed.
===========
Источник:
habr.com
===========
Похожие новости:
- [Машинное обучение] Что такое TinyML и что в нем такого важного? (перевод)
- [Python, Программирование, Алгоритмы, Машинное обучение, Искусственный интеллект] Собираем нейросети. Классификатор животных из мультфильмов. Без данных и за 5 минут. CLIP: Обучение без Обучения + код
- [Python, Профессиональная литература] Разбор популярных книг по Python
- [Python, Изучение языков] Где используется python (разбор его направлений)
- [PHP, Python, Карьера в IT-индустрии, Изучение языков] PHP или Python, что лучше учить
- [Информационная безопасность, Open source, DevOps] Автоматизируем поиск секретов в git и ansible
- [Python] Уроки компьютерного зрения на Python + OpenCV с самых азов
- [Data Mining, Разработка под Android, Тестирование мобильных приложений] Telegram объявил два новых конкурса для разработчиков
- Выпуск Python-библиотеки для научных вычислений NumPy 1.20.0
- [Занимательные задачки, Python, Программирование, Математика] L-системы и что они себе позволяют
Теги для поиска: #_python, #_data_mining, #_big_data, #_data_engineering, #_tensorflow, #_deep_learning, #_neural, #_image, #_python, #_data_mining, #_big_data, #_data_engineering, #_tensorflow
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:17
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Home of this article.The global objective of these articles is to build a coin classifier, capable of scanning your pocket change and find rare / valuable coins. This is a second article in a series, so let me remind you what happened earlier. During previous step we got a rather large dataset composed of pairs of images, loaded from an online coins site meshok.ru. Those images were uploaded to the Internet by people we do not know, and though they are supposed to contain coin's head in one image and tail in the other, we can not rule out a situation when we have two heads and no tail and vice versa. Also at the moment we have no idea which image contains head and which contains tail: this might be important when we feed data to our final classifier.So let's write a program to distinguish heads from tails. It is a rather simple task, involving a convolutional neural network that is using transfer learning.Same way as before, we are going to use Google Colab environment, taking the advantage of a free video card they grant us an access to. We will store data on a Google Drive, so first thing we need is to allow Colab to access the Drive: from google.colab import drive
drive.mount("/content/drive/", force_remount=True) !pip install -q efficientnet
import efficientnet.tfkeras as efn import numpy as np
import pandas as pd import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import sys import random import os from os import listdir from os.path import isfile, join from tensorflow.keras import regularizers from tensorflow.keras.optimizers import Adamax from tensorflow.keras.preprocessing.image import ImageDataGenerator from tensorflow.keras.preprocessing.image import array_to_img, img_to_array from tensorflow.keras import backend as K from tensorflow.keras.applications.vgg16 import VGG16,preprocess_input from tensorflow.keras.applications import InceptionResNetV2, Xception, NASNetLarge from mpl_toolkits.mplot3d import Axes3D from sklearn.manifold import TSNE from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dense, Activation, Dropout, Flatten, Lambda, concatenate, BatchNormalization, GlobalAveragePooling2D from tensorflow.keras.callbacks import LambdaCallback from tensorflow.keras.callbacks import ModelCheckpoint from tensorflow.keras.models import Sequential from sklearn.neighbors import NearestNeighbors import seaborn as sns import cv2 from tensorflow.python.keras.utils.data_utils import Sequence import re import tensorflow as tf
print(tf.__version__) tf.test.gpu_device_name() 2.4.0
'/device:GPU:0' working_path = "/content/drive/My Drive/02_avers_or_revers/"
best_weights_filepath = working_path + "models/01_avers_or_revers.h5" last_weights_filepath = working_path + "models/01_avers_or_revers.h5" bDoTraining = True
IMAGE_SIZE = 256
input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3) BATCH_SIZE = 8 embedding_model = 0 alpha = 0.4 TRAINING_IMAGES_PERCENT = 0.6 VALIDATION_IMAGES_PERCENT = 0.2 IMAGE_ROTATION_ANGLE = 180 # Class name corresponds to a folder.
# Image path is "images" + class name + image name arrClasses = ["head", "tail"] if(bDoTraining):
pdLabels = pd.get_dummies(arrClasses) arrLabeledData = [] for cls in arrClasses: arrImageNames = [f for f in listdir(working_path + "images/" + cls) if isfile(join(working_path, "images/", cls, f))] arrLabeledData.append( { 'class':cls, 'image_names':arrImageNames }) def loadImage(path):
img=cv2.imread(str(path)) #img = rotate_bound(img, angle) img = cv2.resize(img, (IMAGE_SIZE, IMAGE_SIZE)) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = img.astype(np.float32)/255. img = img.reshape(input_shape) return img def getClassMinMax(cls, bIsTrain):
nLen = len(cls['image_names']) if(bIsTrain): nMinIdx = 0 nMaxIdx = nLen * TRAINING_IMAGES_PERCENT else: nMinIdx = nLen * TRAINING_IMAGES_PERCENT + 1 nMaxIdx = nLen * (TRAINING_IMAGES_PERCENT + VALIDATION_IMAGES_PERCENT) return int(nMinIdx), int(nMaxIdx) if(bDoTraining):
nClassIdx = np.random.randint(len(arrLabeledData)) cls = arrLabeledData[nClassIdx] nMinIdx, nMaxIdx = getClassMinMax(cls, False) nImageIdx = random.randint(nMinIdx, nMaxIdx) arrLabeledData[0]['class'] img = loadImage(join(working_path, "images/", cls['class'], cls['image_names'][nImageIdx]))#, 0) #img = img.reshape((IMAGE_SIZE, IMAGE_SIZE)) print(cls['class']) plt.imshow(img) plt.show() To make our dataset more diverse (augmentation), we might want to add noise to images: def add_noise(img):
'''Add random noise to an image''' VARIABILITY = 40 deviation = VARIABILITY*random.random() / 255. noise = np.random.normal(0, deviation, img.shape) img += noise np.clip(img, 0., 1.) return img if(bDoTraining):
datagen = ImageDataGenerator( samplewise_center=True, rotation_range=IMAGE_ROTATION_ANGLE, width_shift_range=0.1, height_shift_range=0.1, zoom_range=0.1 #[1, 1.2], #preprocessing_function=add_noise ) def getImage(cClass, nImageIdx, datagen):
image_name = cClass['image_names'][nImageIdx] #angle = random.randint(-180, 180) img = loadImage(join(working_path, "images/", cClass['class'], cClass['image_names'][nImageIdx]))#, angle) arrImg = img_to_array(img) arrImg = datagen.random_transform(arrImg) # augmentation arrImg = add_noise(arrImg) return np.array(arrImg, dtype="float32") if(bDoTraining):
nClassIdx = np.random.randint(len(arrLabeledData)) cls = arrLabeledData[nClassIdx] img = getImage(cls, 0, datagen) print(cls['class']) plt.imshow(img) #, cmap='gray') plt.show() If we do training and for some reason want to start it over, we need to delete network we saved by that time: def deleteSavedNet(best_weights_filepath):
if(os.path.isfile(best_weights_filepath)): os.remove(best_weights_filepath) print("deleteSavedNet():File removed") else: print("deleteSavedNet():No file to remove") def plotHistory(history, strParam1, strParam2):
plt.plot(history.history[strParam1], label=strParam1) plt.plot(history.history[strParam2], label=strParam2) #plt.title('strParam1') #plt.ylabel('Y') #plt.xlabel('Epoch') plt.legend(loc="best") plt.show() def plotFullHistory(history): arrHistory = [] for i,his in enumerate(history.history): arrHistory.append(his) plotHistory(history, arrHistory[0], arrHistory[2]) plotHistory(history, arrHistory[1], arrHistory[3]) def createModel(nL2, optimizer):
global embedding_model inputs = keras.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3)) model_b0 = efn.EfficientNetB0(weights='imagenet', include_top=False)(inputs) model_b0.trainable = False model_concat = model_b0 #layers.concatenate([model_b0, model_vgg16]) #, model_x]) #model_b0 model_classifier = layers.Flatten(name="Flatten")(model_concat) model_classifier = layers.Dense(32, kernel_regularizer=regularizers.l2(nL2), activation='relu', name="Dense128")(model_classifier) model_classifier = layers.LeakyReLU(alpha=0.1, name="LeakyReLU")(model_classifier) model_classifier = layers.Dropout(0.4, name="Dropout")(model_classifier) base_model = layers.Dense(len(arrClasses), activation="softmax", kernel_regularizer=regularizers.l2(nL2), name="DenseEmbedding")(model_classifier) embedding_model = keras.Model(inputs=inputs, outputs=base_model, name="embedding_model") embedding_model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=optimizer, metrics=["accuracy"]) return embedding_model from skimage.io import imread
from skimage.transform import resize import numpy as np # Here, `x_set` is list of path to the images # and `y_set` are the associated classes. class MyImageDataGenerator(Sequence): def __init__(self, bIsTrain): self.batch_size = BATCH_SIZE self.bIsTrain = bIsTrain nNumOfTrainSamples = 10000 for cls in arrLabeledData: nMin, nMax = getClassMinMax(cls, True) nNumOfTrainSamples = min(nNumOfTrainSamples, nMax - nMin) if(self.bIsTrain): self.STEP_SIZE = nNumOfTrainSamples // BATCH_SIZE else: nNumOfValidSamples = int(nNumOfTrainSamples * VALIDATION_IMAGES_PERCENT / TRAINING_IMAGES_PERCENT) self.STEP_SIZE = nNumOfValidSamples // BATCH_SIZE if(self.STEP_SIZE < 100): self.STEP_SIZE = 100 print("STEP_SIZE: ", self.STEP_SIZE, " (bIsTrain: ", bIsTrain, ")") def __len__(self): return self.STEP_SIZE def __getitem__(self, idx): arrBatchImages = [] arrBatchLabels = [] for i in range(self.batch_size): arrClassIdx = np.random.randint(len(arrLabeledData)) cls = arrLabeledData[arrClassIdx] nMinIdx, nMaxIdx = getClassMinMax(cls, self.bIsTrain) nImageIdx = random.randint(nMinIdx, nMaxIdx) img = getImage(cls, nImageIdx, datagen) strLabel = cls['class'] arrBatchImages.append(img) arrBatchLabels.append(pdLabels[strLabel].to_list()) return np.array(arrBatchImages), np.array(arrBatchLabels) if(bDoTraining):
gen_train = MyImageDataGenerator(True) gen_valid = MyImageDataGenerator(False) def ShowImg(img, label):
print(label) fig = plt.figure() fig.add_subplot(1, 1, 1) plt.imshow(img) #, cmap='gray') plt.show() plt.close() if(bDoTraining):
(images, labels) = gen_valid.__getitem__(0) #next(gen_train) for i, img in enumerate(images): ShowImg(img, labels[i]) break We want to be able to stop training any time and later start from where we left, so we need to save weights at the end of each epoch. To do it, we create a list of callbacks and use it during training. def getCallbacks(monitor, mode):
checkpoint = ModelCheckpoint(best_weights_filepath, monitor=monitor, save_best_only=True, save_weights_only=True, mode=mode, verbose=1) save_model_at_epoch_end_callback = LambdaCallback(on_epoch_end=lambda epoch, logs: embedding_model.save_weights(last_weights_filepath)) callbacks_list = [checkpoint, save_model_at_epoch_end_callback] # , early] return callbacks_list def loadModel(embedding_model, bBest):
if(bBest): path = best_weights_filepath strMessage = "load best model" else: path = last_weights_filepath strMessage = "load last model" if(os.path.isfile(path)): embedding_model.load_weights(path) print(strMessage, ": File loaded") else: print(strMessage, ": No file to load") return embedding_model def trainNetwork(EPOCHS, nL2, optimizer, bCumulativeLearning = False):
global embedding_model global history global arrImages global arrLabels if(bCumulativeLearning == False): deleteSavedNet(best_weights_filepath) random.seed(7) embedding_model = createModel(nL2, optimizer) print("Model created") callbacks_list = getCallbacks("val_accuracy", 'max') if(bCumulativeLearning == True): loadModel(embedding_model, False) nNumOfTrainSamples = 10000 for cls in arrLabeledData: nMin, nMax = getClassMinMax(cls, True) nNumOfTrainSamples = min(nNumOfTrainSamples, nMax - nMin) STEP_SIZE_TRAIN = nNumOfTrainSamples // BATCH_SIZE if(STEP_SIZE_TRAIN < 100): STEP_SIZE_TRAIN = 100 nNumOfValidSamples = int(nNumOfTrainSamples * VALIDATION_IMAGES_PERCENT / TRAINING_IMAGES_PERCENT) STEP_SIZE_VALID = nNumOfValidSamples // BATCH_SIZE if(STEP_SIZE_VALID < 100): STEP_SIZE_VALID = 100 print(STEP_SIZE_TRAIN, STEP_SIZE_VALID) print("Available metrics: ", embedding_model.metrics_names) history = embedding_model.fit(gen_train, validation_data=gen_valid, verbose=0, epochs=EPOCHS, steps_per_epoch=STEP_SIZE_TRAIN, validation_steps=STEP_SIZE_VALID, callbacks=callbacks_list) print(nL2) plotFullHistory(history) # TBD: here, return best model, not last one return embedding_model def data_generator_simple(arrAllImageNames, arrAllImageClasses):
i = 0 arrImages = [] arrImageLabels = [] arrImageClasses = [] for nImageIdx in range(len(arrAllImageNames)): if(i == 0): arrImages = [] arrImageNames = [] arrImageClasses = [] i += 1 strClass = arrAllImageClasses[nImageIdx] strImageName = arrAllImageNames[nImageIdx] #angle = random.randint(0, 90) img = loadImage(join(working_path, "images/", strClass, strImageName)) #, angle) arrImg = img_to_array(img) #arrImg = datagen.random_transform(arrImg) #/ 255. #arrImg = add_noise(arrImg) arrImg = np.array(arrImg, dtype="float32") arrImages.append(arrImg) arrImageNames.append(strImageName) arrImageClasses.append(strClass) if i == BATCH_SIZE: i = 0 yield np.array(arrImages), arrImageNames, arrImageClasses raise StopIteration() def ShowImgSimple(img, label):
print(label) fig = plt.figure() fig.add_subplot(1, 1, 1) plt.imshow(img, cmap='gray') plt.show() plt.close() Using the generator above, we can load all test images and run prediction on them: def getAllTestImages():
global embedding_model arrAllImageNames = [] arrAllImageClasses = [] for cClass in arrLabeledData: for nIdx in range(int(len(cClass['image_names']) * (TRAINING_IMAGES_PERCENT + VALIDATION_IMAGES_PERCENT)), len(cClass['image_names'])): arrAllImageNames.append(cClass['image_names'][nIdx]) arrAllImageClasses.append(cClass['class']) test_preds = [] test_file_names = [] test_class_names = [] for imgs, fnames, classes in data_generator_simple(arrAllImageNames, arrAllImageClasses): predicts = embedding_model.predict(imgs) predicts = predicts.tolist() test_preds += predicts test_file_names += fnames test_class_names += classes test_preds = np.array(test_preds) return test_preds, test_file_names, test_class_names def getAccuracy(test_preds, test_file_names, test_class_names):
nTotalSuccess = 0 for i, arrPredictedProbabilities in enumerate(test_preds): nPredictedClassIdx = arrPredictedProbabilities.argmax() gt_class = test_class_names[i] predicted_class = arrClasses[nPredictedClassIdx] if(predicted_class == gt_class): nTotalSuccess += 1 else: print("GT: ", gt_class, "; Pred: ", predicted_class, "; Probabilitires: ", arrPredictedProbabilities[0], ", ", arrPredictedProbabilities[1]) img = loadImage(join(working_path, "images/", gt_class, test_file_names[i]))#, 0) plt.imshow(img) plt.show() nSuccess = nTotalSuccess / (i+1) return nSuccess def test(EPOCHS, nL2, optimizer, learning_rate, bCumulativeLearning):
global embedding_model embedding_model = trainNetwork(EPOCHS, nL2, optimizer, bCumulativeLearning) print("loading best model") embedding_model = loadModel(embedding_model, True) test_preds, test_file_names, test_class_names = getAllTestImages() # print("test_preds[0], test_file_names[0], test_class_names[0]: ", test_preds[0], test_file_names[0], test_class_names[0]) nSuccess = getAccuracy(test_preds, test_file_names, test_class_names) print(">>> Accuracy on test set:", nSuccess, "<<<") opt = tf.keras.optimizers.Adam(0.0002) ##Adamax(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0)
nL2 = 0.4 if(bDoTraining): EPOCHS = 50 learning_rate=0.001 np.random.seed(7) test(EPOCHS, nL2, opt, learning_rate, bCumulativeLearning=False) embedding_model = loadModel(embedding_model, True) embedding_model.save(best_weights_filepath) # A full model is saved if(bDoTraining):
nClassIdx = np.random.randint(len(arrLabeledData)) cls = arrLabeledData[nClassIdx] nMinIdx, nMaxIdx = getClassMinMax(cls, False) nImageIdx = random.randint(nMinIdx, nMaxIdx) for i, nImageIdx in enumerate(range(nMinIdx, nMaxIdx)): print(i+1, "of", nMaxIdx - nMinIdx) img = loadImage(join(working_path, "images/", arrLabeledData[nClassIdx]['class'], arrLabeledData[nClassIdx]['image_names'][nImageIdx]))#, 0) arrImg = img_to_array(img) arrImg = np.array(arrImg, dtype="float32") # --- test_preds = embedding_model.predict(arrImg.reshape(1, IMAGE_SIZE, IMAGE_SIZE, 3)) nIdx = test_preds.argmax() if(nClassIdx != nIdx): print("GT: ", arrLabeledData[nClassIdx]['class'], "; Pred: ", arrClasses[nIdx]) plt.imshow(img) plt.show() # Same as above in "test" section, but this time we process images from output folder
# The "/content/drive/My Drive/01_Output/" is the output of the previous step, remember, we goi pairs of images, and # now need to figure which ones are avers and which ones are revers? images_source_path = "/content/drive/My Drive/01_Output/" # We will save images by new names (with "head" or "tail" suffix) in this folder images_dest_path = working_path + "images_processed/" arrSourceImageNames = [f for f in listdir(images_source_path) if isfile(join(images_source_path, f))] # Create model and load its weights (ones we got during training) embedding_model = createModel(nL2, opt) embedding_model = loadModel(embedding_model, True) # Dictionary will store image names and counter: see below for details dictNames = {} nTotal = len(arrSourceImageNames) for i, file_name in enumerate(arrSourceImageNames): image_path = join(images_source_path, file_name) img = loadImage(image_path) arrImg = img_to_array(img) arrImg = np.array(arrImg, dtype="float32") # --- # For image, predict its class test_preds = embedding_model.predict(arrImg.reshape(1, IMAGE_SIZE, IMAGE_SIZE, 3)) nIdx = test_preds.argmax() #print(i+1, "of", nTotal, ": ", arrClasses[nIdx]) #plt.imshow(img) #plt.show() # Split image name word_list = file_name.split(".") # ['0_000_00', 'png'] image_name = word_list[0] image_ext = word_list[1] plt.imsave(images_dest_path + image_name + "_" + arrClasses[nIdx] + ".png", img) # Now we need to move source file to trash, but make it zero size first so it doesn't take space there open(image_path, 'w').close() #overwrite and make the file blank instead os.remove(image_path) if(i%100 == 0): print(i, " of ", nTotal) # File names look like 123496110_07_03. # Here 123496110 is the file root name, 07 is number of a coin in that image (some images contain >1 coins), and 03 is a number of images of that coin # (Say, we have 169860023_000.jpg, 169860023_001.jpg, 169860023_002.jpg, one coin per image, tail-tail-head. Then at step 1 we will get # 169860023_00_00, 169860023_00_01, and 169860023_00_02) # We append _head or _tail: 169860023_00_00_tail(.png) # In dictNames we keep pairs 169860023_00 + flag. Flag == 0 if no heads, no tails, 1 if heads / no tails, 2 if tails / no heads and 3 if has both # When scanning is complete, we delete files that have flag != 3 arrImageNameParts = image_name.split("_") # ['169860023', '000', '00'] # We do not need "000" here, as it is just number of an image in a group of images for that coin. We need name (169860023) of course, plus # number of a coin (00) coin_name = arrImageNameParts[0] + "_" + arrImageNameParts[2] if(arrClasses[nIdx] == "head"): if coin_name in dictNames: dictNames[coin_name] = dictNames[coin_name] | 1 else: dictNames[coin_name] = 1 else: if coin_name in dictNames: dictNames[coin_name] = dictNames[coin_name] | 2 else: dictNames[coin_name] = 2 #print(dictNames) # Now we need to delete all files for which dictNames[coin_name] != 3 print("Deleting files that do not have both head and tail") nDeleted = 0 for i, file_name in enumerate(arrSourceImageNames): image_path = join(images_dest_path, file_name) word_list = file_name.split(".") # ['0_000_00', 'png'] image_name = word_list[0] image_ext = word_list[1] arrImageNameParts = image_name.split("_") # ['169860023', '000', '00'] # We do not need "000" here, as it is just number of an image in a group of images for that coin. We need name (169860023) of course, plus # number of a coin (00) coin_name = arrImageNameParts[0] + "_" + arrImageNameParts[2] if (coin_name not in dictNames) or (dictNames[coin_name] != 3): open(image_path, 'w').close() #overwrite and make the file blank instead os.remove(image_path) if(i%100 == 0): print(i, " of ", nTotal) print("Deleted", nDeleted) =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:17
Часовой пояс: UTC + 5