RCNN代码复现
bigfly Lv4

一、工程结构

image

二、源码解读

py/

bbox_regression.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import os
import copy
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.models import AlexNet
from visdom import Visdom

from utils.data.custom_bbox_regression_dataset import BBoxRegressionDataset
import utils.util as util


def load_data(data_root_dir):
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

data_set = BBoxRegressionDataset(data_root_dir, transform=transform)
data_loader = DataLoader(data_set, batch_size=128, shuffle=True, num_workers=8)

return data_loader


def train_model(data_loader, feature_model, model, criterion, optimizer, lr_scheduler, num_epochs=25, device=None):
since = time.time()

model.train() # Set model to training mode
loss_list = list()
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
viz = Visdom() # 初始化visdom类
viz.line(Y=[0.], X=[0.], win="train loss",opts=dict(title='train loss', xlabel='epoch', ylabel='loss'))

running_loss = 0.0
batch_i = 0
# Iterate over data.
for inputs, targets in data_loader:
inputs = inputs.to(device)
targets = targets.float().to(device)

features = feature_model.features(inputs)
features = torch.flatten(features, 1)

# zero the parameter gradients
optimizer.zero_grad()

# forward
outputs = model(features)
loss = criterion(outputs, targets)

loss.backward()
optimizer.step()

# statistics
running_loss += loss.item() * inputs.size(0)
lr_scheduler.step()

batch_i += 1
print("batch", batch_i, "running_loss_adds=", running_loss)

epoch_loss = running_loss / data_loader.dataset.__len__()
loss_list.append(epoch_loss)

print('{} Loss: {:.4f}'.format(epoch, epoch_loss))

# 绘制回归损失图
viz.line(Y=[epoch_loss], X=[epoch + 1],win="train loss",opts=dict(title='train loss', xlabel='epoch', ylabel='loss', update="append"))

# 每训练一轮就保存
util.save_model(model, './models/bbox_regression_%d.pth' % epoch)

print()

time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

return loss_list


def get_model(device=None):
# 加载CNN模型
model = AlexNet(num_classes=2)
model.load_state_dict(torch.load('./models/best_linear_svm_alexnet_car.pth'))
model.eval()

# 取消梯度追踪
for param in model.parameters():
param.requires_grad = False
if device:
model = model.to(device)

return model


if __name__ == '__main__':
data_loader = load_data('./data/bbox_regression')

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
feature_model = get_model(device)

# AlexNet最后一个池化层计算得到256*6*6输出
in_features = 256 * 6 * 6
out_features = 4
model = nn.Linear(in_features, out_features)
model.to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

loss_list = train_model(data_loader, feature_model, model, criterion, optimizer, lr_scheduler, device=device,
num_epochs=5)
util.plot_loss(loss_list)

car_detector.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import time
import copy
import cv2
import numpy as np
import torch
import torch.nn as nn
from torchvision.models import alexnet
import torchvision.transforms as transforms
import selectivesearch

import utils.util as util


def get_device():
return torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


def get_transform():
# 数据转换
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
return transform


def get_model(device=None):
# 加载CNN模型
model = alexnet()
num_classes = 2
num_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(num_features, num_classes)
model.load_state_dict(torch.load('./models/best_linear_svm_alexnet_car.pth'))
model.eval()

# 取消梯度追踪
for param in model.parameters():
param.requires_grad = False
if device:
model = model.to(device)

return model


def draw_box_with_text(img, rect_list, score_list):
"""
绘制边框及其分类概率
:param img:
:param rect_list:
:param score_list:
:return:
"""
for i in range(len(rect_list)):
xmin, ymin, xmax, ymax = rect_list[i]
score = score_list[i]

cv2.rectangle(img, (xmin, ymin), (xmax, ymax), color=(0, 0, 255), thickness=1)
cv2.putText(img, "{:.3f}".format(score), (xmin, ymin), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)


def nms(rect_list, score_list):
"""
非最大抑制
:param rect_list: list,大小为[N, 4]
:param score_list: list,大小为[N]
"""
nms_rects = list()
nms_scores = list()

rect_array = np.array(rect_list)
score_array = np.array(score_list)

# 一次排序后即可
# 按分类概率从大到小排序
idxs = np.argsort(score_array)[::-1]
rect_array = rect_array[idxs]
score_array = score_array[idxs]

thresh = 0.15
while len(score_array) > 0:
# 添加分类概率最大的边界框
nms_rects.append(rect_array[0])
nms_scores.append(score_array[0])
rect_array = rect_array[1:]
score_array = score_array[1:]

length = len(score_array)
if length <= 0:
break

# 计算IoU
iou_scores = util.iou(np.array(nms_rects[len(nms_rects) - 1]), rect_array)
# print(iou_scores)
# 去除重叠率大于等于thresh的边界框
idxs = np.where(iou_scores < thresh)[0]
rect_array = rect_array[idxs]
score_array = score_array[idxs]

return nms_rects, nms_scores


if __name__ == '__main__':
device = get_device()
transform = get_transform()
model = get_model(device=device)

# 创建selectivesearch对象
gs = selectivesearch.get_selective_search()

#test_img_path = '../imgs/000007.jpg'
# test_xml_path = '../imgs/000007.xml'
test_img_path = '../imgs/456.jpg'
#test_xml_path = '../imgs/000012.xml'

img = cv2.imread(test_img_path)
dst = copy.deepcopy(img)

# bndboxs = util.parse_xml(test_xml_path)
# for bndbox in bndboxs:
# xmin, ymin, xmax, ymax = bndbox
# cv2.rectangle(dst, (xmin, ymin), (xmax, ymax), color=(0, 255, 0), thickness=1)

# 候选区域建议
selectivesearch.config(gs, img, strategy='f')
rects = selectivesearch.get_rects(gs)
print('候选区域建议数目: %d' % len(rects))

# softmax = torch.softmax()

svm_thresh = 0.55

# 保存正样本边界框以及
score_list = list()
positive_list = list()

tmp_score_list = list()
tmp_positive_list = list()
start = time.time()
for rect in rects:
xmin, ymin, xmax, ymax = rect
rect_img = img[ymin:ymax, xmin:xmax]

rect_transform = transform(rect_img).to(device)
output = model(rect_transform.unsqueeze(0))[0]

if torch.argmax(output).item() == 1:
"""
预测为汽车
"""
probs = torch.softmax(output, dim=0).cpu().numpy()

tmp_score_list.append(probs[1])
tmp_positive_list.append(rect)

if probs[1] >= svm_thresh:
score_list.append(probs[1])
positive_list.append(rect)
# cv2.rectangle(dst, (xmin, ymin), (xmax, ymax), color=(0, 0, 255), thickness=2)
print(rect, output, probs)
end = time.time()
print('detect time: %d s' % (end - start))

tmp_img2 = copy.deepcopy(dst)
draw_box_with_text(tmp_img2, tmp_positive_list, tmp_score_list)
cv2.imshow('1', tmp_img2)

tmp_img = copy.deepcopy(dst)
draw_box_with_text(tmp_img, positive_list, score_list)
cv2.imshow('2', tmp_img)

nms_rects, nms_scores = nms(positive_list, score_list)
print(nms_rects)
print(nms_scores)
draw_box_with_text(dst, nms_rects, nms_scores)

cv2.imshow('3', dst)
cv2.waitKey(0)

finetune.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import numpy as np
import os
import copy
import time
import torch
import torch.nn as nn
import torch.optim as optim
from visdom import Visdom
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.models as models

from utils.data.custom_finetune_dataset import CustomFinetuneDataset
from utils.data.custom_batch_sampler import CustomBatchSampler
from utils.util import check_dir
from image_handler_show import show_image


def load_data(data_root_dir):
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

data_loaders = {}
data_sizes = {}
for name in ['train', 'val']:
data_dir = os.path.join(data_root_dir, name)
data_set = CustomFinetuneDataset(data_dir, transform=transform)
data_sampler = CustomBatchSampler(data_set.get_positive_num(), data_set.get_negative_num(), 32, 96)
data_loader = DataLoader(data_set, batch_size=128, sampler=data_sampler, num_workers=8, drop_last=True)

data_loaders[name] = data_loader
data_sizes[name] = data_sampler.__len__()

return data_loaders, data_sizes


def train_model(data_loaders, model, criterion, optimizer, lr_scheduler, num_epochs=25, device=None):
since = time.time()

best_model_weights = copy.deepcopy(model.state_dict())
best_acc = 0.0

viz = Visdom(env='loss and val of trainval') # 初始化visdom
viz.line(Y=np.column_stack((0., 0.)), X=np.column_stack((0., 0.)), win="{} loss/acc".format('train'),
opts=dict(title='{} loss&acc'.format('train'), xlabel='epoch', ylabel='loss/acc',
legend=["loss", "acc"]))
viz.line(Y=np.column_stack((0., 0.)), X=np.column_stack((0., 0.)), win="{} loss/acc".format('val'),
opts=dict(title='{} loss&acc'.format('val'), xlabel='epoch', ylabel='loss/acc',
legend=["loss", "acc"])) # 初始化起点
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs))
print('-' * 10)


# Each epoch has a training and validation phase
for phase in ['train', 'val']:



if phase == 'train':
model.train() # Set model to training mode
else:
model.eval() # Set model to evaluate mode

running_loss = 0.0
running_corrects = 0
batch_i = 0
# Iterate over data.
for inputs, labels in data_loaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)

# zero the parameter gradients
optimizer.zero_grad()

# forward
# track history if only in train
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)

# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()

# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
batch_i += 1
print("batch", batch_i, "running_loss_adds=", running_loss)
if phase == 'train':
lr_scheduler.step()

epoch_loss = running_loss / data_sizes[phase]
epoch_acc = running_corrects.double() / data_sizes[phase]

print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))

viz.line(Y=np.column_stack((epoch_loss, epoch_acc)), X=np.column_stack((epoch + 1, epoch + 1)),
win="{} loss/acc".format(phase),
opts=dict(title='{} loss&acc'.format(phase), xlabel='epoch', ylabel='loss/acc',
legend=["loss", "acc"]), update='append')

# deep copy the model
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_weights = copy.deepcopy(model.state_dict())

time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))

# load best model weights
model.load_state_dict(best_model_weights)
return model


def show():
# 打印显示
data_loader = data_loaders['train']
inputs, targets = next(data_loader.__iter__())
print(inputs[0].size(), type(inputs[0]))
trans = transforms.ToPILImage()
print(type(trans(inputs[0])))
print(targets)
print(inputs.shape)
titles = ["TRUE" if i.item() else "FALSE" for i in targets[0:60]]
images = [np.array(trans(i)) for i in inputs[0:60]]
show_image(images, titles=titles, num_cols=12)


if __name__ == '__main__':
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

data_loaders, data_sizes = load_data('./data/finetune_car')

model = models.alexnet(pretrained=True) # 加载前辈们预训练的模型参数
print(model)

# show() # title显示有点问题,只能显示最后一张图片的title

# 把alexnet 变成二分类模型,在最后一层改为2分类
num_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(num_features, 2)
# print(model)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

best_model = train_model(data_loaders, model, criterion, optimizer, lr_scheduler, device=device, num_epochs=5)
# 保存最好的模型参数
check_dir('./models')
torch.save(best_model.state_dict(), 'models/alexnet_car.pth')
print('done')

linear_svm.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import time
import copy
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.models import alexnet
from visdom import Visdom

from utils.data.custom_classifier_dataset import CustomClassifierDataset
from utils.data.custom_hard_negative_mining_dataset import CustomHardNegativeMiningDataset
from utils.data.custom_batch_sampler import CustomBatchSampler
from utils.util import check_dir
from utils.util import save_model

batch_positive = 32
batch_negative = 96
batch_total = 128


def load_data(data_root_dir):
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

data_loaders = {}
data_sizes = {}
remain_negative_list = list()
for name in ['train', 'val']:
data_dir = os.path.join(data_root_dir, name)

data_set = CustomClassifierDataset(data_dir, transform=transform)
if name is 'train':
"""
使用hard negative mining方式
初始正负样本比例为1:1。由于正样本数远小于负样本,所以以正样本数为基准,在负样本集中随机提取同样数目负样本作为初始负样本集
"""

positive_list = data_set.get_positives()
negative_list = data_set.get_negatives()
init_negative_idxs = random.sample(range(len(negative_list)), len(positive_list))
init_negative_list = [negative_list[idx] for idx in range(len(negative_list)) if idx in init_negative_idxs]
remain_negative_list = [negative_list[idx] for idx in range(len(negative_list))
if idx not in init_negative_idxs]

data_set.set_negative_list(init_negative_list)
data_loaders['remain'] = remain_negative_list

sampler = CustomBatchSampler(data_set.get_positive_num(), data_set.get_negative_num(),
batch_positive, batch_negative)


data_loader = DataLoader(data_set, batch_size=batch_total, sampler=sampler, num_workers=8, drop_last=True)
data_loaders[name] = data_loader
data_sizes[name] = len(sampler)
return data_loaders, data_sizes


def hinge_loss(outputs, labels):
"""
折页损失计算
:param outputs: 大小为(N, num_classes)
:param labels: 大小为(N)
:return: 损失值
"""
num_labels = len(labels)
corrects = outputs[range(num_labels), labels].unsqueeze(0).T

# 最大间隔
margin = 1.0
margins = outputs - corrects + margin
loss = torch.sum(torch.max(margins, 1)[0]) / len(labels)

# # 正则化强度
# reg = 1e-3
# loss += reg * torch.sum(weight ** 2)

return loss


def add_hard_negatives(hard_negative_list, negative_list, add_negative_list):
for item in hard_negative_list:
if len(add_negative_list) == 0:
# 第一次添加负样本
negative_list.append(item)
add_negative_list.append(list(item['rect']))
if list(item['rect']) not in add_negative_list:
negative_list.append(item)
add_negative_list.append(list(item['rect']))


def get_hard_negatives(preds, cache_dicts):
fp_mask = preds == 1
tn_mask = preds == 0

fp_rects = cache_dicts['rect'][fp_mask].numpy()
fp_image_ids = cache_dicts['image_id'][fp_mask].numpy()

tn_rects = cache_dicts['rect'][tn_mask].numpy()
tn_image_ids = cache_dicts['image_id'][tn_mask].numpy()

hard_negative_list = [{'rect': fp_rects[idx], 'image_id': fp_image_ids[idx]} for idx in range(len(fp_rects))]
easy_negatie_list = [{'rect': tn_rects[idx], 'image_id': tn_image_ids[idx]} for idx in range(len(tn_rects))]

return hard_negative_list, easy_negatie_list


def train_model(data_loaders, model, criterion, optimizer, lr_scheduler, num_epochs=25, device=None):
since = time.time()

best_model_weights = copy.deepcopy(model.state_dict())
best_acc = 0.0
viz = Visdom(env='loss and val svm') # 初始化visdom
viz.line(Y=np.column_stack((0., 0.)), X=np.column_stack((0., 0.)), win="{} loss/acc".format('train'),
opts=dict(title='{} loss&acc'.format('train'), xlabel='epoch', ylabel='loss/acc',
legend=["loss", "acc"]))
viz.line(Y=np.column_stack((0., 0.)), X=np.column_stack((0., 0.)), win="{} loss/acc".format('val'),
opts=dict(title='{} loss&acc'.format('val'), xlabel='epoch', ylabel='loss/acc',
legend=["loss", "acc"])) # 初始化起点


for epoch in range(num_epochs):

print('Epoch {}/{}'.format(epoch, num_epochs ))
print('-' * 10)

# Each epoch has a training and validation phase
for phase in ['train', 'val']:



if phase == 'train':
model.train() # Set model to training mode
else:
model.eval() # Set model to evaluate mode

running_loss = 0.0
running_corrects = 0
batch_i = 0

# 输出正负样本数
data_set = data_loaders[phase].dataset
print('{} - positive_num: {} - negative_num: {} - data size: {}'.format(
phase, data_set.get_positive_num(), data_set.get_negative_num(), data_sizes[phase]))

# Iterate over data.
for inputs, labels, cache_dicts in data_loaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)

# zero the parameter gradients
optimizer.zero_grad()

# forward
# track history if only in train
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
# print(outputs.shape)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)

# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()

# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
batch_i += 1
print("batch", batch_i, "running_loss_adds=", running_loss)

if phase == 'train':
lr_scheduler.step()

epoch_loss = running_loss / data_sizes[phase]
epoch_acc = running_corrects.double() / data_sizes[phase]

print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))

viz.line(Y=np.column_stack(([epoch_loss], [epoch_acc])), X=np.column_stack(([epoch + 1], [epoch + 1])),
win="{} loss/acc".format(phase),
opts=dict(title='{} loss&acc'.format(phase), xlabel='epoch', ylabel='loss/acc',
legend=["loss", "acc"]), update="append")

# deep copy the model
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_weights = copy.deepcopy(model.state_dict())

# 每一轮训练完成后,测试剩余负样本集,进行hard negative mining
train_dataset = data_loaders['train'].dataset
remain_negative_list = data_loaders['remain']
jpeg_images = train_dataset.get_jpeg_images()
transform = train_dataset.get_transform()

with torch.set_grad_enabled(False):
remain_dataset = CustomHardNegativeMiningDataset(remain_negative_list, jpeg_images, transform=transform)
remain_data_loader = DataLoader(remain_dataset, batch_size=batch_total, num_workers=8, drop_last=True)

# 获取训练数据集的负样本集
negative_list = train_dataset.get_negatives()
# 记录后续增加的负样本
add_negative_list = data_loaders.get('add_negative', [])

running_corrects = 0
# Iterate over data.
for inputs, labels, cache_dicts in remain_data_loader:
inputs = inputs.to(device)
labels = labels.to(device)

# zero the parameter gradients
optimizer.zero_grad()

outputs = model(inputs)
# print(outputs.shape)
_, preds = torch.max(outputs, 1)

running_corrects += torch.sum(preds == labels.data)

hard_negative_list, easy_neagtive_list = get_hard_negatives(preds.cpu().numpy(), cache_dicts)
add_hard_negatives(hard_negative_list, negative_list, add_negative_list)

remain_acc = running_corrects.double() / len(remain_negative_list)
print('remiam negative size: {}, acc: {:.4f}'.format(len(remain_negative_list), remain_acc))

# 训练完成后,重置负样本,进行hard negatives mining
train_dataset.set_negative_list(negative_list)
tmp_sampler = CustomBatchSampler(train_dataset.get_positive_num(), train_dataset.get_negative_num(),
batch_positive, batch_negative)
data_loaders['train'] = DataLoader(train_dataset, batch_size=batch_total, sampler=tmp_sampler,
num_workers=8, drop_last=True)
data_loaders['add_negative'] = add_negative_list

# 重置数据集大小
data_sizes['train'] = len(tmp_sampler)

# 每训练一轮就保存
save_model(model, 'models/linear_svm_alexnet_car_%d.pth' % epoch)

time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))

# load best model weights
model.load_state_dict(best_model_weights)
return model


if __name__ == '__main__':
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# device = 'cpu'
data_loaders, data_sizes = load_data('./data/classifier_car')
# 加载CNN模型
model_path = './models/alexnet_car.pth'
model = alexnet()
num_classes = 2
num_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(num_features, num_classes)
model.load_state_dict(torch.load(model_path))
model.eval()
# 固定特征提取
for param in model.parameters():
param.requires_grad = False
# 创建SVM分类器
model.classifier[6] = nn.Linear(num_features, num_classes)
# print(model)
model = model.to(device)
criterion = hinge_loss
# 由于初始训练集数量很少,所以降低学习率
optimizer = optim.SGD(model.parameters(), lr=1e-4, momentum=0.9)
# 共训练25轮,每隔4论减少一次学习率
lr_schduler = optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.1)
best_model = train_model(data_loaders, model, criterion, optimizer, lr_schduler, num_epochs=25, device=device)
# 保存最好的模型参数
save_model(best_model, 'models/best_linear_svm_alexnet_car.pth')
print('done')

selectivesearch.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import sys
import cv2


def get_selective_search():
gs = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()
return gs


def config(gs, img, strategy='q'):
gs.setBaseImage(img)

if (strategy == 's'):
gs.switchToSingleStrategy()
elif (strategy == 'f'):
gs.switchToSelectiveSearchFast()
elif (strategy == 'q'):
gs.switchToSelectiveSearchQuality()
else:
print(__doc__)
sys.exit(1)


def get_rects(gs):
rects = gs.process()
rects[:, 2] += rects[:, 0]
rects[:, 3] += rects[:, 1]

return rects

def show_rect_in_img(img,rects):
for x1,y1,x2,y2 in rects[0:1000]:
cv2.rectangle(img,(x1,y1),(x2,y2),color=(0,0,255),thickness=2)

return img

if __name__ == '__main__':
"""
选择性搜索算法操作
"""
gs = get_selective_search() #实例化,生成对象
img = cv2.imread('../imgs/456.jpg', cv2.IMREAD_COLOR)
config(gs, img, strategy='f') #配置对象
rects = get_rects(gs) #根据对象取框



print("框的位置坐标:\n",rects,"\n框的个数:",len(rects))
get_img = show_rect_in_img(img,rects)
cv2.imwrite('../imgs/777ss.jpg',get_img) #39-42 and 56-58 lines codes is a demo to show

print("down")

utils/

data
create_bbox_regression_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import shutil
import numpy as np
import utils.util as util

# 正样本边界框数目:4035

if __name__ == '__main__':
"""
从voc_car/train目录中提取标注边界框坐标
从finetune_car/train目录中提取训练集正样本坐标(IoU>=0.5),进一步提取IoU>0.6的边界框
数据集保存在bbox_car目录下
"""
voc_car_train_dir = '../../data/voc_car/train'
# ground truth
gt_annotation_dir = os.path.join(voc_car_train_dir, 'Annotations')
jpeg_dir = os.path.join(voc_car_train_dir, 'JPEGImages')

classifier_car_train_dir = '../../data/finetune_car/train'
# positive
positive_annotation_dir = os.path.join(classifier_car_train_dir, 'Annotations')

dst_root_dir = '../../data/bbox_regression/'
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
dst_bndbox_dir = os.path.join(dst_root_dir, 'bndboxs')
dst_positive_dir = os.path.join(dst_root_dir, 'positive')

util.check_dir(dst_root_dir)
util.check_dir(dst_jpeg_dir)
util.check_dir(dst_bndbox_dir)
util.check_dir(dst_positive_dir)

samples = util.parse_car_csv(voc_car_train_dir)
res_samples = list()
total_positive_num = 0
for sample_name in samples:
# 提取正样本边界框坐标(IoU>=0.5)
positive_annotation_path = os.path.join(positive_annotation_dir, sample_name + '_1.csv')
positive_bndboxes = np.loadtxt(positive_annotation_path, dtype=np.int, delimiter=' ')
# 提取标注边界框
gt_annotation_path = os.path.join(gt_annotation_dir, sample_name + '.xml')
bndboxs = util.parse_xml(gt_annotation_path)
# 计算符合条件(IoU>0.6)的候选建议
positive_list = list()
if len(positive_bndboxes.shape) == 1 and len(positive_bndboxes) != 0:
scores = util.iou(positive_bndboxes, bndboxs)
if np.max(scores) > 0.6:
positive_list.append(positive_bndboxes)
elif len(positive_bndboxes.shape) == 2:
for positive_bndboxe in positive_bndboxes:
scores = util.iou(positive_bndboxe, bndboxs)
if np.max(scores) > 0.6:
positive_list.append(positive_bndboxe)
else:
pass

# 如果存在正样本边界框(IoU>0.6),那么保存相应的图片以及标注边界框
if len(positive_list) > 0:
# 保存图片
jpeg_path = os.path.join(jpeg_dir, sample_name + ".jpg")
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + ".jpg")
shutil.copyfile(jpeg_path, dst_jpeg_path)
# 保存标注边界框
dst_bndbox_path = os.path.join(dst_bndbox_dir, sample_name + ".csv")
np.savetxt(dst_bndbox_path, bndboxs, fmt='%s', delimiter=' ')
# 保存正样本边界框
dst_positive_path = os.path.join(dst_positive_dir, sample_name + ".csv")
np.savetxt(dst_positive_path, np.array(positive_list), fmt='%s', delimiter=' ')

total_positive_num += len(positive_list)
res_samples.append(sample_name)
print('save {} done'.format(sample_name))
else:
print('-------- {} 不符合条件'.format(sample_name))

dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
np.savetxt(dst_csv_path, res_samples, fmt='%s', delimiter=' ')
print('total positive num: {}'.format(total_positive_num))
print('done')

create_classifier_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import random
import numpy as np
import shutil
import time
import cv2
import os
import xmltodict
import selectivesearch
from utils.util import check_dir
from utils.util import parse_car_csv
from utils.util import parse_xml
from utils.util import iou
from utils.util import compute_ious


# train
# positive num: 67
# negative num: 34674
# val
# positive num: 75
# negative num: 26277

def parse_annotation_jpeg(annotation_path, jpeg_path, gs):
"""
获取正负样本(注:忽略属性difficult为True的标注边界框)
正样本:标注边界框
负样本:IoU大于0,小于等于0.3。为了进一步限制负样本数目,其大小必须大于最大标注框的1/5
"""
img = cv2.imread(jpeg_path)

selectivesearch.config(gs, img, strategy='q')
# 计算候选建议
rects = selectivesearch.get_rects(gs)
# 获取标注边界框
bndboxs = parse_xml(annotation_path)

# 标注框大小
maximum_bndbox_size = 0
for bndbox in bndboxs:
xmin, ymin, xmax, ymax = bndbox
bndbox_size = (ymax - ymin) * (xmax - xmin)
if bndbox_size > maximum_bndbox_size:
maximum_bndbox_size = bndbox_size

# 获取候选建议和标注边界框的IoU
iou_list = compute_ious(rects, bndboxs)

positive_list = list()
negative_list = list()
for i in range(len(iou_list)):
xmin, ymin, xmax, ymax = rects[i]
rect_size = (ymax - ymin) * (xmax - xmin)

iou_score = iou_list[i]
if 0 < iou_score <= 0.3 and rect_size > maximum_bndbox_size / 5.0:
# 负样本
negative_list.append(rects[i])
else:
pass

return bndboxs, negative_list


if __name__ == '__main__':

car_root_dir = '../../data/voc_car/'
classifier_root_dir = '../../data/classifier_car/'
check_dir(classifier_root_dir)

gs = selectivesearch.get_selective_search() # 实例化,生成gs对象

for name in ['train', 'val']:
src_root_dir = os.path.join(car_root_dir, name)
src_annotation_dir = os.path.join(src_root_dir, 'Annotations')
src_jpeg_dir = os.path.join(src_root_dir, 'JPEGImages')

dst_root_dir = os.path.join(classifier_root_dir, name)
dst_annotation_dir = os.path.join(dst_root_dir, 'Annotations')
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
check_dir(dst_root_dir)
check_dir(dst_annotation_dir)
check_dir(dst_jpeg_dir)

total_num_positive = 0
total_num_negative = 0

samples = parse_car_csv(src_root_dir) # 在src_root_dir目录下加载csv文件
# 复制csv文件
src_csv_path = os.path.join(src_root_dir, 'car.csv')
dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
shutil.copyfile(src_csv_path, dst_csv_path)

for sample_name in samples:
try:
since = time.time()

src_annotation_path = os.path.join(src_annotation_dir, sample_name + '.xml')
src_jpeg_path = os.path.join(src_jpeg_dir, sample_name + '.jpg')
# 获取正负样本
positive_list, negative_list = parse_annotation_jpeg(src_annotation_path, src_jpeg_path, gs)
total_num_positive += len(positive_list)
total_num_negative += len(negative_list)

dst_annotation_positive_path = os.path.join(dst_annotation_dir, sample_name + '_1' + '.csv')
dst_annotation_negative_path = os.path.join(dst_annotation_dir, sample_name + '_0' + '.csv')
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + '.jpg')
# 保存图片
shutil.copyfile(src_jpeg_path, dst_jpeg_path)
# 保存正负样本标注
np.savetxt(dst_annotation_positive_path, np.array(positive_list), fmt='%d', delimiter=' ')
np.savetxt(dst_annotation_negative_path, np.array(negative_list), fmt='%d', delimiter=' ')

time_elapsed = time.time() - since
print('parse {}.png in {:.0f}m {:.0f}s'.format(sample_name, time_elapsed // 60, time_elapsed % 60))
except Exception as err:
print(err)
continue
print('%s positive num: %d' % (name, total_num_positive))
print('%s negative num: %d' % (name, total_num_negative))
print('done')

create_finetune_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import time
import shutil
import numpy as np
import cv2
import os
import selectivesearch
from utils.util import check_dir
from utils.util import parse_car_csv
from utils.util import parse_xml
from utils.util import compute_ious


# train
# positive num: 7278  这是对数据集进行下采样缩到原数据集的10%得到的数据
# negative num: 44706
# val
# positive num: 7951
# negative num: 36277


def parse_annotation_jpeg(annotation_path, jpeg_path, gs):
"""
获取正负样本(注:忽略属性difficult为True的标注边界框)
正样本:候选建议与标注边界框IoU大于等于0.5
负样本:IoU大于0,小于0.5。为了进一步限制负样本数目,其大小必须大于标注框的1/5
"""
img = cv2.imread(jpeg_path)

selectivesearch.config(gs, img, strategy='q')
# 计算候选建议
rects = selectivesearch.get_rects(gs)
# 获取标注边界框
bndboxs = parse_xml(annotation_path)

# 标注框大小
maximum_bndbox_size = 0
for bndbox in bndboxs:
xmin, ymin, xmax, ymax = bndbox
bndbox_size = (ymax - ymin) * (xmax - xmin)
if bndbox_size > maximum_bndbox_size:
maximum_bndbox_size = bndbox_size

# 获取候选建议和标注边界框的IoU
iou_list = compute_ious(rects, bndboxs)

positive_list = list()
negative_list = list()
for i in range(len(iou_list)):
xmin, ymin, xmax, ymax = rects[i]
rect_size = (ymax - ymin) * (xmax - xmin)

iou_score = iou_list[i]
if iou_list[i] >= 0.5:
# 正样本
positive_list.append(rects[i])
if 0 < iou_list[i] < 0.5 and rect_size > maximum_bndbox_size / 5.0:
# 负样本
negative_list.append(rects[i])
else:
pass

return positive_list, negative_list


if __name__ == '__main__':
car_root_dir = '../../data/voc_car/'
finetune_root_dir = '../../data/finetune_car/'
check_dir(finetune_root_dir)

gs = selectivesearch.get_selective_search()
for name in ['train', 'val']:
src_root_dir = os.path.join(car_root_dir, name)
src_annotation_dir = os.path.join(src_root_dir, 'Annotations')
src_jpeg_dir = os.path.join(src_root_dir, 'JPEGImages')

dst_root_dir = os.path.join(finetune_root_dir, name)
dst_annotation_dir = os.path.join(dst_root_dir, 'Annotations')
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
check_dir(dst_root_dir)
check_dir(dst_annotation_dir)
check_dir(dst_jpeg_dir)

total_num_positive = 0
total_num_negative = 0

samples = parse_car_csv(src_root_dir) # 拿到根目录下的csv 文件
# 复制csv文件
src_csv_path = os.path.join(src_root_dir, 'car.csv')
dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
shutil.copyfile(src_csv_path, dst_csv_path)
for sample_name in samples:
since = time.time()

src_annotation_path = os.path.join(src_annotation_dir, sample_name + '.xml')
src_jpeg_path = os.path.join(src_jpeg_dir, sample_name + '.jpg')
# 获取正负样本
positive_list, negative_list = parse_annotation_jpeg(src_annotation_path, src_jpeg_path, gs)
total_num_positive += len(positive_list)
total_num_negative += len(negative_list)

dst_annotation_positive_path = os.path.join(dst_annotation_dir, sample_name + '_1' + '.csv')
dst_annotation_negative_path = os.path.join(dst_annotation_dir, sample_name + '_0' + '.csv')
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + '.jpg')
# 保存图片
shutil.copyfile(src_jpeg_path, dst_jpeg_path)
# 保存正负样本标注
np.savetxt(dst_annotation_positive_path, np.array(positive_list), fmt='%d', delimiter=' ')
np.savetxt(dst_annotation_negative_path, np.array(negative_list), fmt='%d', delimiter=' ')

time_elapsed = time.time() - since
print('parse {}.png in {:.0f}m {:.0f}s'.format(sample_name, time_elapsed // 60, time_elapsed % 60))
print('%s positive num: %d' % (name, total_num_positive))
print('%s negative num: %d' % (name, total_num_negative))
print('done')
custom_batch_sampler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy  as np
import random
from torch.utils.data import Sampler
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from utils.data.custom_finetune_dataset import CustomFinetuneDataset


class CustomBatchSampler(Sampler):

def __init__(self, num_positive, num_negative, batch_positive, batch_negative) -> None:
"""
2分类数据集
每次批量处理,其中batch_positive个正样本,batch_negative个负样本
@param num_positive: 正样本数目
@param num_negative: 负样本数目
@param batch_positive: 单次正样本数
@param batch_negative: 单次负样本数
"""
self.num_positive = num_positive
self.num_negative = num_negative
self.batch_positive = batch_positive
self.batch_negative = batch_negative

length = num_positive + num_negative
self.idx_list = list(range(length))

self.batch = batch_negative + batch_positive
self.num_iter = length // self.batch

def __iter__(self):
sampler_list = list()
for i in range(self.num_iter):
tmp = np.concatenate(
(random.sample(self.idx_list[:self.num_positive], self.batch_positive),
random.sample(self.idx_list[self.num_positive:], self.batch_negative))
)
random.shuffle(tmp)
sampler_list.extend(tmp)
return iter(sampler_list)

def __len__(self) -> int:
return self.num_iter * self.batch

def get_num_batch(self) -> int:
return self.num_iter


custom_bbox_regression_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import utils.util as util


class BBoxRegressionDataset(Dataset):

def __init__(self, root_dir, transform=None):
super(BBoxRegressionDataset, self).__init__()
self.transform = transform

samples = util.parse_car_csv(root_dir)
jpeg_list = list()
# 保存{'image_id': ?, 'positive': ?, 'bndbox': ?}
box_list = list()
for i in range(len(samples)):
sample_name = samples[i]

jpeg_path = os.path.join(root_dir, 'JPEGImages', sample_name + '.jpg')
bndbox_path = os.path.join(root_dir, 'bndboxs', sample_name + '.csv')
positive_path = os.path.join(root_dir, 'positive', sample_name + '.csv')

jpeg_list.append(cv2.imread(jpeg_path))
bndboxes = np.loadtxt(bndbox_path, dtype=np.int, delimiter=' ')
positives = np.loadtxt(positive_path, dtype=np.int, delimiter=' ')

if len(positives.shape) == 1:
bndbox = self.get_bndbox(bndboxes, positives)
box_list.append({'image_id': i, 'positive': positives, 'bndbox': bndbox})
else:
for positive in positives:
bndbox = self.get_bndbox(bndboxes, positive)
box_list.append({'image_id': i, 'positive': positive, 'bndbox': bndbox})

self.jpeg_list = jpeg_list
self.box_list = box_list

def __getitem__(self, index: int):
assert index < self.__len__(), '数据集大小为%d,当前输入下标为%d' % (self.__len__(), index)

box_dict = self.box_list[index]
image_id = box_dict['image_id']
positive = box_dict['positive']
bndbox = box_dict['bndbox']

# 获取预测图像
jpeg_img = self.jpeg_list[image_id]
xmin, ymin, xmax, ymax = positive
image = jpeg_img[ymin:ymax, xmin:xmax]

if self.transform:
image = self.transform(image)

# 计算P/G的x/y/w/h
target = dict()
p_w = xmax - xmin
p_h = ymax - ymin
p_x = xmin + p_w / 2
p_y = ymin + p_h / 2

xmin, ymin, xmax, ymax = bndbox
g_w = xmax - xmin
g_h = ymax - ymin
g_x = xmin + g_w / 2
g_y = ymin + g_h / 2

# 计算t
t_x = (g_x - p_x) / p_w
t_y = (g_y - p_y) / p_h
t_w = np.log(g_w / p_w)
t_h = np.log(g_h / p_h)

return image, np.array((t_x, t_y, t_w, t_h))

def __len__(self):
return len(self.box_list)

def get_bndbox(self, bndboxes, positive):
"""
返回和positive的IoU最大的标注边界框
:param bndboxes: 大小为[N, 4]或者[4]
:param positive: 大小为[4]
:return: [4]
"""

if len(bndboxes.shape) == 1:
# 只有一个标注边界框,直接返回即可
return bndboxes
else:
scores = util.iou(positive, bndboxes)
return bndboxes[np.argmax(scores)]


custom_classifier_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import numpy  as np
import os
import cv2
from PIL import Image
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as transforms

from utils.util import parse_car_csv


class CustomClassifierDataset(Dataset):

def __init__(self, root_dir, transform=None):
samples = parse_car_csv(root_dir)

jpeg_images = list()
positive_list = list()
negative_list = list()
for idx in range(len(samples)):
sample_name = samples[idx]
jpeg_images.append(cv2.imread(os.path.join(root_dir, 'JPEGImages', sample_name + ".jpg")))

positive_annotation_path = os.path.join(root_dir, 'Annotations', sample_name + '_1.csv')
positive_annotations = np.loadtxt(positive_annotation_path, dtype=np.int, delimiter=' ')
# 考虑csv文件为空或者仅包含单个标注框
if len(positive_annotations.shape) == 1:
# 单个标注框坐标
if positive_annotations.shape[0] == 4:
positive_dict = dict()

positive_dict['rect'] = positive_annotations
positive_dict['image_id'] = idx
# positive_dict['image_name'] = sample_name

positive_list.append(positive_dict)
else:
for positive_annotation in positive_annotations:
positive_dict = dict()

positive_dict['rect'] = positive_annotation
positive_dict['image_id'] = idx
# positive_dict['image_name'] = sample_name

positive_list.append(positive_dict)

negative_annotation_path = os.path.join(root_dir, 'Annotations', sample_name + '_0.csv')
negative_annotations = np.loadtxt(negative_annotation_path, dtype=np.int, delimiter=' ')
# 考虑csv文件为空或者仅包含单个标注框
if len(negative_annotations.shape) == 1:
# 单个标注框坐标
if negative_annotations.shape[0] == 4:
negative_dict = dict()

negative_dict['rect'] = negative_annotations
negative_dict['image_id'] = idx
# negative_dict['image_name'] = sample_name

negative_list.append(negative_dict)
else:
for negative_annotation in negative_annotations:
negative_dict = dict()

negative_dict['rect'] = negative_annotation
negative_dict['image_id'] = idx
# negative_dict['image_name'] = sample_name

negative_list.append(negative_dict)

self.transform = transform
self.jpeg_images = jpeg_images
self.positive_list = positive_list
self.negative_list = negative_list

def __getitem__(self, index: int):
# 定位下标所属图像
if index < len(self.positive_list):
# 正样本
target = 1
positive_dict = self.positive_list[index]

xmin, ymin, xmax, ymax = positive_dict['rect']
image_id = positive_dict['image_id']

image = self.jpeg_images[image_id][ymin:ymax, xmin:xmax]
cache_dict = positive_dict
else:
# 负样本
target = 0
idx = index - len(self.positive_list)
negative_dict = self.negative_list[idx]

xmin, ymin, xmax, ymax = negative_dict['rect']
image_id = negative_dict['image_id']

image = self.jpeg_images[image_id][ymin:ymax, xmin:xmax]
cache_dict = negative_dict

# print('index: %d image_id: %d target: %d image.shape: %s [xmin, ymin, xmax, ymax]: [%d, %d, %d, %d]' %
# (index, image_id, target, str(image.shape), xmin, ymin, xmax, ymax))
if self.transform:
image = self.transform(image)

return image, target, cache_dict

def __len__(self) -> int:
return len(self.positive_list) + len(self.negative_list)

def get_transform(self):
return self.transform

def get_jpeg_images(self) -> list:
return self.jpeg_images

def get_positive_num(self) -> int:
return len(self.positive_list)

def get_negative_num(self) -> int:
return len(self.negative_list)

def get_positives(self) -> list:
return self.positive_list

def get_negatives(self) -> list:
return self.negative_list

# 用于hard negative mining
# 替换负样本
def set_negative_list(self, negative_list):
self.negative_list = negative_list

custom_finetune_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import numpy  as np
import os
import cv2
from PIL import Image
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as transforms

from utils.util import parse_car_csv


class CustomFinetuneDataset(Dataset):

def __init__(self, root_dir, transform=None):
samples = parse_car_csv(root_dir)

jpeg_images = [cv2.imread(os.path.join(root_dir, 'JPEGImages', sample_name + ".jpg"))
for sample_name in samples]

positive_annotations = [os.path.join(root_dir, 'Annotations', sample_name + '_1.csv')
for sample_name in samples]
negative_annotations = [os.path.join(root_dir, 'Annotations', sample_name + '_0.csv')
for sample_name in samples]

# 边界  计数
positive_sizes = list()
negative_sizes = list()
# 边界框坐标,抽取出 边界值放到 下面的列表里面
positive_rects = list()
negative_rects = list()

for annotation_path in positive_annotations:
rects = np.loadtxt(annotation_path, dtype=np.int, delimiter=' ')
# 存在文件为空或者文件中仅有单行数据
if len(rects.shape) == 1:
# 是否为单行
if rects.shape[0] == 4:
positive_rects.append(rects)
positive_sizes.append(1)
else:
positive_sizes.append(0)
else:
positive_rects.extend(rects)
positive_sizes.append(len(rects))
for annotation_path in negative_annotations:
rects = np.loadtxt(annotation_path, dtype=np.int, delimiter=' ')
# 和正样本规则一样
if len(rects.shape) == 1:
if rects.shape[0] == 4:
negative_rects.append(rects)
negative_sizes.append(1)
else:
positive_sizes.append(0)
else:
negative_rects.extend(rects)
negative_sizes.append(len(rects))

self.transform = transform
self.jpeg_images = jpeg_images
self.positive_sizes = positive_sizes
self.negative_sizes = negative_sizes
self.positive_rects = positive_rects
self.negative_rects = negative_rects
self.total_positive_num = int(np.sum(positive_sizes))
self.total_negative_num = int(np.sum(negative_sizes))

def __getitem__(self, index: int):
# 定位下标所属图像
image_id = len(self.jpeg_images) - 1
if index < self.total_positive_num:
# 正样本
target = 1
xmin, ymin, xmax, ymax = self.positive_rects[index]
# 寻找所属图像
for i in range(len(self.positive_sizes) - 1):
if np.sum(self.positive_sizes[:i]) <= index < np.sum(self.positive_sizes[:(i + 1)]):
image_id = i
break
image = self.jpeg_images[image_id][ymin:ymax, xmin:xmax]
else:
# 负样本
target = 0
idx = index - self.total_positive_num
xmin, ymin, xmax, ymax = self.negative_rects[idx]
# 寻找所属图像
for i in range(len(self.negative_sizes) - 1):
if np.sum(self.negative_sizes[:i]) <= idx < np.sum(self.negative_sizes[:(i + 1)]):
image_id = i
break
image = self.jpeg_images[image_id][ymin:ymax, xmin:xmax]

# print('index: %d image_id: %d target: %d image.shape: %s [xmin, ymin, xmax, ymax]: [%d, %d, %d, %d]' %
# (index, image_id, target, str(image.shape), xmin, ymin, xmax, ymax))
if self.transform:
image = self.transform(image)

return image, target

def __len__(self) -> int:
return self.total_positive_num + self.total_negative_num

def get_positive_num(self) -> int:
return self.total_positive_num

def get_negative_num(self) -> int:
return self.total_negative_num

custom_hard_negative_mining_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import torch.nn as nn
from torch.utils.data import Dataset
from utils.data.custom_classifier_dataset import CustomClassifierDataset


class CustomHardNegativeMiningDataset(Dataset):

def __init__(self, negative_list, jpeg_images, transform=None):
self.negative_list = negative_list
self.jpeg_images = jpeg_images
self.transform = transform

def __getitem__(self, index: int):
target = 0

negative_dict = self.negative_list[index]
xmin, ymin, xmax, ymax = negative_dict['rect']
image_id = negative_dict['image_id']

image = self.jpeg_images[image_id][ymin:ymax, xmin:xmax]
if self.transform:
image = self.transform(image)

return image, target, negative_dict

def __len__(self) -> int:
return len(self.negative_list)


if __name__ == '__main__':
root_dir = '../../data/classifier_car/train'
data_set = CustomClassifierDataset(root_dir)

negative_list = data_set.get_negatives()
jpeg_images = data_set.get_jpeg_images()
transform = data_set.get_transform()

hard_negative_dataset = CustomHardNegativeMiningDataset(negative_list, jpeg_images, transform=transform)
image, target, negative_dict = hard_negative_dataset.__getitem__(100)

print(image.shape)
print(target)
print(negative_dict)

pascal_voc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import cv2
import numpy as np
from torchvision.datasets import VOCDetection


if __name__ == '__main__':
"""
下载PASCAL VOC数据集
"""
dataset = VOCDetection('../../data', year='2007', image_set='trainval', download=True)
print("数据集数目", len(dataset))
img, target = dataset.__getitem__(1000)
img = np.array(img)

print("目标检测数据集的目标",target)
print("图片尺寸",img.shape)
cv2.imshow('img', img)
cv2.waitKey(0)

pascal_voc_car.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import shutil # 用copyfile这个函数
import random
import numpy as np
import xmltodict
from utils.util import check_dir

suffix_xml = '.xml'
suffix_jpeg = '.jpg'

car_train_path = '../../data/VOCdevkit/VOC2007/ImageSets/Main/car_train.txt'
car_val_path = '../../data/VOCdevkit/VOC2007/ImageSets/Main/car_val.txt'

voc_annotation_dir = '../../data/VOCdevkit/VOC2007/Annotations/'
voc_jpeg_dir = '../../data/VOCdevkit/VOC2007/JPEGImages/'

car_root_dir = '../../data/voc_car/'


def parse_train_val(data_path):
"""
提取指定类别图像,这里取的是含汽车(int(res[2]) == 1)图片的编号
"""
samples = []

with open(data_path, mode='r') as file:
lines = file.readlines()
for line in lines:
res = line.strip().split(' ')
if len(res) == 3 and int(res[2]) == 1: # 选出正样本,res[2]==0是标注者给出难分辨的样本的标签
samples.append(res[0])

return np.array(samples)


def sample_train_val(samples):
"""
随机采样样本,减少数据集个数(留下1/7)
"""
for name in ['train', 'val']:
dataset = samples[name]
length = len(dataset)

random_samples = random.sample(range(length), int(length / 7))
new_dataset = dataset[random_samples]
samples[name] = new_dataset

return samples


def parse_car(sample_list):
"""
遍历所有的标注文件,筛选包含car的样本
"""

car_samples = list()
for sample_name in sample_list:
annotation_path = os.path.join(voc_annotation_dir, sample_name + suffix_xml)
with open(annotation_path, 'rb') as f:
xml_dict = xmltodict.parse(f)
# print(xml_dict)

bndboxs = list()
objects = xml_dict['annotation']['object']
if isinstance(objects, list):
for obj in objects:
obj_name = obj['name']
difficult = int(obj['difficult'])
if 'car'.__eq__(obj_name) and difficult != 1:
car_samples.append(sample_name)
elif isinstance(objects, dict):
obj_name = objects['name']
difficult = int(objects['difficult'])
if 'car'.__eq__(obj_name) and difficult != 1:
car_samples.append(sample_name)
else:
pass

return car_samples


def save_car(car_samples, data_root_dir, data_annotation_dir, data_jpeg_dir):
"""
保存类别Car的样本图片和标注文件
"""
for sample_name in car_samples:
src_annotation_path = os.path.join(voc_annotation_dir, sample_name + suffix_xml)
dst_annotation_path = os.path.join(data_annotation_dir, sample_name + suffix_xml)
shutil.copyfile(src_annotation_path, dst_annotation_path)

src_jpeg_path = os.path.join(voc_jpeg_dir, sample_name + suffix_jpeg)
dst_jpeg_path = os.path.join(data_jpeg_dir, sample_name + suffix_jpeg)
shutil.copyfile(src_jpeg_path, dst_jpeg_path)

csv_path = os.path.join(data_root_dir, 'car.csv')
np.savetxt(csv_path, np.array(car_samples), fmt='%s')


if __name__ == '__main__':
samples = {'train': parse_train_val(car_train_path), 'val': parse_train_val(car_val_path)} # 定义一个字典

print("source total",len(samples['train'])+len(samples['val']))

samples = sample_train_val(samples) # 随机下采样,减少数据量
print("1/7倍", len(samples['train'])+len(samples['val']))
check_dir(car_root_dir)
for name in ['train', 'val']:
data_root_dir = os.path.join(car_root_dir, name)
data_annotation_dir = os.path.join(data_root_dir, 'Annotations')
data_jpeg_dir = os.path.join(data_root_dir, 'JPEGImages')

check_dir(data_root_dir)
check_dir(data_annotation_dir)
check_dir(data_jpeg_dir)
save_car(samples[name], data_root_dir, data_annotation_dir, data_jpeg_dir)

print('done')

2.1、 ./py/utils/data/pascal_voc.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import cv2
import numpy as np
from torchvision.datasets import VOCDetection


if __name__ == '__main__':
"""
下载PASCAL VOC数据集
"""
dataset = VOCDetection('../../data', year='2007', image_set='trainval', download=True)
print("数据集数目", len(dataset))
img, target = dataset.__getitem__(1000)
img = np.array(img)

print("目标检测数据集的目标",target)
print("图片尺寸",img.shape)
cv2.imshow('img', img)
cv2.waitKey(0)

2.2、 ./py/utils/data/pascal_voc_car.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import shutil # 用copyfile这个函数
import random
import numpy as np
import xmltodict
from utils.util import check_dir

suffix_xml = '.xml'
suffix_jpeg = '.jpg'

car_train_path = '../../data/VOCdevkit/VOC2007/ImageSets/Main/car_train.txt'
car_val_path = '../../data/VOCdevkit/VOC2007/ImageSets/Main/car_val.txt'

voc_annotation_dir = '../../data/VOCdevkit/VOC2007/Annotations/'
voc_jpeg_dir = '../../data/VOCdevkit/VOC2007/JPEGImages/'

car_root_dir = '../../data/voc_car/'


def parse_train_val(data_path):
"""
提取指定类别图像,这里取的是含汽车(int(res[2]) == 1)图片的编号
"""
samples = []

with open(data_path, mode='r') as file:
lines = file.readlines()
for line in lines:
res = line.strip().split(' ')
if len(res) == 3 and int(res[2]) == 1: # 选出正样本,res[2]==0是标注者给出难分辨的样本的标签
samples.append(res[0])

return np.array(samples)


def sample_train_val(samples):
"""
随机采样样本,减少数据集个数(留下1/7)
"""
for name in ['train', 'val']:
dataset = samples[name]
length = len(dataset)

random_samples = random.sample(range(length), int(length / 7))
new_dataset = dataset[random_samples]
samples[name] = new_dataset

return samples


def parse_car(sample_list):
"""
遍历所有的标注文件,筛选包含car的样本
"""

car_samples = list()
for sample_name in sample_list:
annotation_path = os.path.join(voc_annotation_dir, sample_name + suffix_xml)
with open(annotation_path, 'rb') as f:
xml_dict = xmltodict.parse(f)
# print(xml_dict)

bndboxs = list()
objects = xml_dict['annotation']['object']
if isinstance(objects, list):
for obj in objects:
obj_name = obj['name']
difficult = int(obj['difficult'])
if 'car'.__eq__(obj_name) and difficult != 1:
car_samples.append(sample_name)
elif isinstance(objects, dict):
obj_name = objects['name']
difficult = int(objects['difficult'])
if 'car'.__eq__(obj_name) and difficult != 1:
car_samples.append(sample_name)
else:
pass

return car_samples


def save_car(car_samples, data_root_dir, data_annotation_dir, data_jpeg_dir):
"""
保存类别Car的样本图片和标注文件
"""
for sample_name in car_samples:
src_annotation_path = os.path.join(voc_annotation_dir, sample_name + suffix_xml)
dst_annotation_path = os.path.join(data_annotation_dir, sample_name + suffix_xml)
shutil.copyfile(src_annotation_path, dst_annotation_path)

src_jpeg_path = os.path.join(voc_jpeg_dir, sample_name + suffix_jpeg)
dst_jpeg_path = os.path.join(data_jpeg_dir, sample_name + suffix_jpeg)
shutil.copyfile(src_jpeg_path, dst_jpeg_path)

csv_path = os.path.join(data_root_dir, 'car.csv')
np.savetxt(csv_path, np.array(car_samples), fmt='%s')


if __name__ == '__main__':
samples = {'train': parse_train_val(car_train_path), 'val': parse_train_val(car_val_path)} # 定义一个字典

print("source total",len(samples['train'])+len(samples['val']))

samples = sample_train_val(samples) # 随机下采样,减少数据量
print("1/7倍", len(samples['train'])+len(samples['val']))
check_dir(car_root_dir)
for name in ['train', 'val']:
data_root_dir = os.path.join(car_root_dir, name)
data_annotation_dir = os.path.join(data_root_dir, 'Annotations')
data_jpeg_dir = os.path.join(data_root_dir, 'JPEGImages')

check_dir(data_root_dir)
check_dir(data_annotation_dir)
check_dir(data_jpeg_dir)
save_car(samples[name], data_root_dir, data_annotation_dir, data_jpeg_dir)

print('done')

2.3、 ./py/utils/data/create_finetune_data.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import time
import shutil
import numpy as np
import cv2
import os
import selectivesearch
from utils.util import check_dir
from utils.util import parse_car_csv
from utils.util import parse_xml
from utils.util import compute_ious

def parse_annotation_jpeg(annotation_path, jpeg_path, gs):
"""
获取正负样本(注:忽略属性difficult为True的标注边界框)
正样本:候选建议与标注边界框IoU大于等于0.5
负样本:IoU大于0,小于0.5。为了进一步限制负样本数目,其大小必须大于标注框的1/5
"""
img = cv2.imread(jpeg_path)

selectivesearch.config(gs, img, strategy='q')
# 计算候选建议
rects = selectivesearch.get_rects(gs)
# 获取标注边界框
bndboxs = parse_xml(annotation_path)

# 标注框大小
maximum_bndbox_size = 0
for bndbox in bndboxs:
xmin, ymin, xmax, ymax = bndbox
bndbox_size = (ymax - ymin) * (xmax - xmin)
if bndbox_size > maximum_bndbox_size:
maximum_bndbox_size = bndbox_size

# 获取候选建议和标注边界框的IoU
iou_list = compute_ious(rects, bndboxs)

positive_list = list()
negative_list = list()
for i in range(len(iou_list)):
xmin, ymin, xmax, ymax = rects[i]
rect_size = (ymax - ymin) * (xmax - xmin)

iou_score = iou_list[i]
if iou_list[i] >= 0.5:
# 正样本
positive_list.append(rects[i])
if 0 < iou_list[i] < 0.5 and rect_size > maximum_bndbox_size / 5.0:
# 负样本
negative_list.append(rects[i])
else:
pass

return positive_list, negative_list


if __name__ == '__main__':
car_root_dir = '../../data/voc_car/'
finetune_root_dir = '../../data/finetune_car/'
check_dir(finetune_root_dir)

gs = selectivesearch.get_selective_search()
for name in ['train', 'val']:
src_root_dir = os.path.join(car_root_dir, name)
src_annotation_dir = os.path.join(src_root_dir, 'Annotations')
src_jpeg_dir = os.path.join(src_root_dir, 'JPEGImages')

dst_root_dir = os.path.join(finetune_root_dir, name)
dst_annotation_dir = os.path.join(dst_root_dir, 'Annotations')
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
check_dir(dst_root_dir)
check_dir(dst_annotation_dir)
check_dir(dst_jpeg_dir)

total_num_positive = 0
total_num_negative = 0

samples = parse_car_csv(src_root_dir) # 拿到根目录下的csv 文件
# 复制csv文件
src_csv_path = os.path.join(src_root_dir, 'car.csv')
dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
shutil.copyfile(src_csv_path, dst_csv_path)
for sample_name in samples:
since = time.time()

src_annotation_path = os.path.join(src_annotation_dir, sample_name + '.xml')
src_jpeg_path = os.path.join(src_jpeg_dir, sample_name + '.jpg')
# 获取正负样本
positive_list, negative_list = parse_annotation_jpeg(src_annotation_path, src_jpeg_path, gs)
total_num_positive += len(positive_list)
total_num_negative += len(negative_list)

dst_annotation_positive_path = os.path.join(dst_annotation_dir, sample_name + '_1' + '.csv')
dst_annotation_negative_path = os.path.join(dst_annotation_dir, sample_name + '_0' + '.csv')
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + '.jpg')
# 保存图片
shutil.copyfile(src_jpeg_path, dst_jpeg_path)
# 保存正负样本标注
np.savetxt(dst_annotation_positive_path, np.array(positive_list), fmt='%d', delimiter=' ')
np.savetxt(dst_annotation_negative_path, np.array(negative_list), fmt='%d', delimiter=' ')

time_elapsed = time.time() - since
print('parse {}.png in {:.0f}m {:.0f}s'.format(sample_name, time_elapsed // 60, time_elapsed % 60))
print('%s positive num: %d' % (name, total_num_positive))
print('%s negative num: %d' % (name, total_num_negative))
print('done')

2.4、 ./py/utils/data/create_classifier_data.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import random
import numpy as np
import shutil
import time
import cv2
import os
import xmltodict
import selectivesearch
from utils.util import check_dir
from utils.util import parse_car_csv
from utils.util import parse_xml
from utils.util import iou
from utils.util import compute_ious


# train
# positive num: 67
# negative num: 34674
# val
# positive num: 75
# negative num: 26277

def parse_annotation_jpeg(annotation_path, jpeg_path, gs):
"""
获取正负样本(注:忽略属性difficult为True的标注边界框)
正样本:标注边界框
负样本:IoU大于0,小于等于0.3。为了进一步限制负样本数目,其大小必须大于最大标注框的1/5
"""
img = cv2.imread(jpeg_path)

selectivesearch.config(gs, img, strategy='q')
# 计算候选建议
rects = selectivesearch.get_rects(gs)
# 获取标注边界框
bndboxs = parse_xml(annotation_path)

# 标注框大小
maximum_bndbox_size = 0
for bndbox in bndboxs:
xmin, ymin, xmax, ymax = bndbox
bndbox_size = (ymax - ymin) * (xmax - xmin)
if bndbox_size > maximum_bndbox_size:
maximum_bndbox_size = bndbox_size

# 获取候选建议和标注边界框的IoU
iou_list = compute_ious(rects, bndboxs)

positive_list = list()
negative_list = list()
for i in range(len(iou_list)):
xmin, ymin, xmax, ymax = rects[i]
rect_size = (ymax - ymin) * (xmax - xmin)

iou_score = iou_list[i]
if 0 < iou_score <= 0.3 and rect_size > maximum_bndbox_size / 5.0:
# 负样本
negative_list.append(rects[i])
else:
pass

return bndboxs, negative_list


if __name__ == '__main__':

car_root_dir = '../../data/voc_car/'
classifier_root_dir = '../../data/classifier_car/'
check_dir(classifier_root_dir)

gs = selectivesearch.get_selective_search() # 实例化,生成gs对象

for name in ['train', 'val']:
src_root_dir = os.path.join(car_root_dir, name)
src_annotation_dir = os.path.join(src_root_dir, 'Annotations')
src_jpeg_dir = os.path.join(src_root_dir, 'JPEGImages')

dst_root_dir = os.path.join(classifier_root_dir, name)
dst_annotation_dir = os.path.join(dst_root_dir, 'Annotations')
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
check_dir(dst_root_dir)
check_dir(dst_annotation_dir)
check_dir(dst_jpeg_dir)

total_num_positive = 0
total_num_negative = 0

samples = parse_car_csv(src_root_dir) # 在src_root_dir目录下加载csv文件
# 复制csv文件
src_csv_path = os.path.join(src_root_dir, 'car.csv')
dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
shutil.copyfile(src_csv_path, dst_csv_path)

for sample_name in samples:
try:
since = time.time()

src_annotation_path = os.path.join(src_annotation_dir, sample_name + '.xml')
src_jpeg_path = os.path.join(src_jpeg_dir, sample_name + '.jpg')
# 获取正负样本
positive_list, negative_list = parse_annotation_jpeg(src_annotation_path, src_jpeg_path, gs)
total_num_positive += len(positive_list)
total_num_negative += len(negative_list)

dst_annotation_positive_path = os.path.join(dst_annotation_dir, sample_name + '_1' + '.csv')
dst_annotation_negative_path = os.path.join(dst_annotation_dir, sample_name + '_0' + '.csv')
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + '.jpg')
# 保存图片
shutil.copyfile(src_jpeg_path, dst_jpeg_path)
# 保存正负样本标注
np.savetxt(dst_annotation_positive_path, np.array(positive_list), fmt='%d', delimiter=' ')
np.savetxt(dst_annotation_negative_path, np.array(negative_list), fmt='%d', delimiter=' ')

time_elapsed = time.time() - since
print('parse {}.png in {:.0f}m {:.0f}s'.format(sample_name, time_elapsed // 60, time_elapsed % 60))
except Exception as err:
print(err)
continue
print('%s positive num: %d' % (name, total_num_positive))
print('%s negative num: %d' % (name, total_num_negative))
print('done')

2.5、 /py/utils/data/create_bbox_regression_data.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import os
import shutil
import numpy as np
import utils.util as util


if __name__ == '__main__':
"""
从voc_car/train目录中提取标注边界框坐标
从finetune_car/train目录中提取训练集正样本坐标(IoU>=0.5),进一步提取IoU>0.6的边界框
数据集保存在bbox_car目录下
"""
voc_car_train_dir = '../../data/voc_car/train'
# ground truth
gt_annotation_dir = os.path.join(voc_car_train_dir, 'Annotations')
jpeg_dir = os.path.join(voc_car_train_dir, 'JPEGImages')

classifier_car_train_dir = '../../data/finetune_car/train'
# positive
positive_annotation_dir = os.path.join(classifier_car_train_dir, 'Annotations')

dst_root_dir = '../../data/bbox_regression/'
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
dst_bndbox_dir = os.path.join(dst_root_dir, 'bndboxs')
dst_positive_dir = os.path.join(dst_root_dir, 'positive')

util.check_dir(dst_root_dir)
util.check_dir(dst_jpeg_dir)
util.check_dir(dst_bndbox_dir)
util.check_dir(dst_positive_dir)

samples = util.parse_car_csv(voc_car_train_dir)
res_samples = list()
total_positive_num = 0
for sample_name in samples:
# 提取正样本边界框坐标(IoU>=0.5)
positive_annotation_path = os.path.join(positive_annotation_dir, sample_name + '_1.csv')
positive_bndboxes = np.loadtxt(positive_annotation_path, dtype=np.int, delimiter=' ')
# 提取标注边界框
gt_annotation_path = os.path.join(gt_annotation_dir, sample_name + '.xml')
bndboxs = util.parse_xml(gt_annotation_path)
# 计算符合条件(IoU>0.6)的候选建议
positive_list = list()
if len(positive_bndboxes.shape) == 1 and len(positive_bndboxes) != 0:
scores = util.iou(positive_bndboxes, bndboxs)
if np.max(scores) > 0.6:
positive_list.append(positive_bndboxes)
elif len(positive_bndboxes.shape) == 2:
for positive_bndboxe in positive_bndboxes:
scores = util.iou(positive_bndboxe, bndboxs)
if np.max(scores) > 0.6:
positive_list.append(positive_bndboxe)
else:
pass

# 如果存在正样本边界框(IoU>0.6),那么保存相应的图片以及标注边界框
if len(positive_list) > 0:
# 保存图片
jpeg_path = os.path.join(jpeg_dir, sample_name + ".jpg")
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + ".jpg")
shutil.copyfile(jpeg_path, dst_jpeg_path)
# 保存标注边界框
dst_bndbox_path = os.path.join(dst_bndbox_dir, sample_name + ".csv")
np.savetxt(dst_bndbox_path, bndboxs, fmt='%s', delimiter=' ')
# 保存正样本边界框
dst_positive_path = os.path.join(dst_positive_dir, sample_name + ".csv")
np.savetxt(dst_positive_path, np.array(positive_list), fmt='%s', delimiter=' ')

total_positive_num += len(positive_list)
res_samples.append(sample_name)
print('save {} done'.format(sample_name))
else:
print('-------- {} 不符合条件'.format(sample_name))

dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
np.savetxt(dst_csv_path, res_samples, fmt='%s', delimiter=' ')
print('total positive num: {}'.format(total_positive_num))
print('done')