Commit 80267d8f by Ting PAN

Use sequential sampling as the default shuffle policy

Summary:
This commit reimplements the default shuffle policy of data reader with
sequential sampling (be consistent with DALI) instead of chunk permutation (MXNet solution).
Sequential sampling is tuned by argument ``initial_fill`` only, and works both for HDD and SSD.
1 parent cca00c0d
......@@ -8,6 +8,7 @@
# <https://opensource.org/licenses/BSD-2-Clause>
#
# ------------------------------------------------------------
"""BBox ops."""
from __future__ import absolute_import
from __future__ import division
......
......@@ -8,6 +8,7 @@
# <https://opensource.org/licenses/BSD-2-Clause>
#
# ------------------------------------------------------------
"""Builtin ops."""
from __future__ import absolute_import
from __future__ import division
......
......@@ -8,6 +8,7 @@
# <https://opensource.org/licenses/BSD-2-Clause>
#
# ------------------------------------------------------------
"""Decoder ops."""
from __future__ import absolute_import
from __future__ import division
......
......@@ -8,6 +8,7 @@
# <https://opensource.org/licenses/BSD-2-Clause>
#
# ------------------------------------------------------------
"""Generic ops."""
from __future__ import absolute_import
from __future__ import division
......
......@@ -8,6 +8,7 @@
# <https://opensource.org/licenses/BSD-2-Clause>
#
# ------------------------------------------------------------
"""Image ops."""
from __future__ import absolute_import
from __future__ import division
......@@ -300,8 +301,8 @@ class RandomBBoxCrop(object):
aspect_ratio=(0.5, 2.0),
thresholds=(0.0, 0.1, 0.3, 0.5, 0.7, 0.9),
allow_no_crop=True,
ltrb=True,
num_attempts=10,
bbox_layout=None,
**kwargs
):
"""Create a ``RandomBBoxCrop`` operator.
......@@ -316,10 +317,10 @@ class RandomBBoxCrop(object):
The minimum IoU(s) to satisfy.
allow_no_crop : bool, optional, default=True
**True** to include the no-cropping as a option.
ltrb : bool, optional, default=True
Indicate the bbox is ``ltrb`` or ``xywh`` format.
num_attempts : int, optional, default=10
The max number of sampling trails.
bbox_layout : str, optional
The optional bbox layout.
Returns
-------
......@@ -332,8 +333,8 @@ class RandomBBoxCrop(object):
aspect_ratio=aspect_ratio,
thresholds=thresholds,
allow_no_crop=allow_no_crop,
ltrb=ltrb,
num_attempts=num_attempts,
bbox_layout=bbox_layout,
device='cpu',
**kwargs
)
......@@ -359,7 +360,9 @@ class RandomResizedCrop(object):
def __new__(
cls,
size,
interp_type='LINEAR',
interp_type=None,
mag_filter=None,
min_filter=None,
random_area=(0.08, 1.),
random_aspect_ratio=(0.75, 1.33),
num_attempts=10,
......@@ -371,8 +374,12 @@ class RandomResizedCrop(object):
----------
size : Union[int, Sequence[int]]
The output image size.
interp_type : {'NN', 'LINEAR', 'TRIANGULAR', 'CUBIC', 'GAUSSIAN', 'LANCZOS3'}, optional
The interpolation method.
interp_type : str, optional
The interpolation for both up and down sampling.
mag_filter : str, optional, default='LINEAR'
The interpolation for up sampling.
min_filter : str, optional, default='TRIANGULAR'
The interpolation for down sampling.
random_area : Sequence[float], optional, default=(0.08, 1.)
The range of scale for sampling.
random_aspect_ratio : Sequence[float], optional, default=(0.75, 1.33)
......@@ -388,9 +395,15 @@ class RandomResizedCrop(object):
"""
if isinstance(interp_type, six.string_types):
interp_type = getattr(types, 'INTERP_' + interp_type.upper())
if isinstance(mag_filter, six.string_types):
mag_filter = getattr(types, 'INTERP_' + mag_filter.upper())
if isinstance(min_filter, six.string_types):
min_filter = getattr(types, 'INTERP_' + min_filter.upper())
return ops.RandomResizedCrop(
size=size,
interp_type=interp_type,
mag_filter=mag_filter,
min_filter=min_filter,
random_area=random_area,
random_aspect_ratio=random_aspect_ratio,
num_attempts=num_attempts,
......@@ -425,6 +438,8 @@ class Resize(object):
resize_longer=None,
max_size=None,
interp_type='LINEAR',
mag_filter=None,
min_filter=None,
**kwargs
):
"""Create a ``Resize`` operator.
......@@ -441,12 +456,20 @@ class Resize(object):
Resize along the longer side.
max_size : int, optional, default=0
The limited size for ``resize_shorter``.
interp_type : {'NN', 'LINEAR', 'TRIANGULAR', 'CUBIC', 'GAUSSIAN', 'LANCZOS3'}, optional
The interpolation method.
interp_type : str, optional
The interpolation for both up and down sampling.
mag_filter : str, optional, default='LINEAR'
The interpolation for up sampling.
min_filter : str, optional, default='TRIANGULAR'
The interpolation for down sampling.
"""
if isinstance(interp_type, six.string_types):
interp_type = getattr(types, 'INTERP_' + interp_type.upper())
if isinstance(mag_filter, six.string_types):
mag_filter = getattr(types, 'INTERP_' + mag_filter.upper())
if isinstance(min_filter, six.string_types):
min_filter = getattr(types, 'INTERP_' + min_filter.upper())
return ops.Resize(
resize_x=resize_x,
resize_y=resize_y,
......@@ -454,6 +477,8 @@ class Resize(object):
resize_longer=resize_longer,
max_size=max_size,
interp_type=interp_type,
mag_filter=mag_filter,
min_filter=min_filter,
device=context.get_device_type(),
**kwargs
)
......@@ -8,6 +8,7 @@
# <https://opensource.org/licenses/BSD-2-Clause>
#
# ------------------------------------------------------------
"""Random ops."""
from __future__ import absolute_import
from __future__ import division
......
......@@ -8,6 +8,7 @@
# <https://opensource.org/licenses/BSD-2-Clause>
#
# ------------------------------------------------------------
"""Reader ops."""
from __future__ import absolute_import
from __future__ import division
......@@ -38,6 +39,7 @@ class KPLRecordReader(object):
```python
class MyPipeline(dali.Pipeline):
def __init__():
super(MyPipeline, self).__init__()
# Assume the we have the following data:
......@@ -48,11 +50,11 @@ class KPLRecordReader(object):
path='/data'
features=('image', 'label'),
pipeline=self,
# Shuffle globally within specified number of chunks
# once an epoch is finished
shuffle_after_epoch=True,
# Set **0** to split each example as a chunk
shuffle_chunks=0,
# Shuffle locally in the next ``initial_fill`` examples
# It turns to be weak with the decreasing of ``initial_fill``
# and disabled if ``initial_fill`` is set to **1**
random_shuffle=True,
initial_fill=1024,
)
def iter_step(self):
......@@ -71,8 +73,8 @@ class KPLRecordReader(object):
pipeline,
shard_id=0,
num_shards=1,
shuffle_after_epoch=False,
shuffle_chunks=0,
random_shuffle=False,
initial_fill=1024,
**kwargs
):
"""Create a ``KPLRecordReader``.
......@@ -81,14 +83,18 @@ class KPLRecordReader(object):
----------
path : str
The folder of record files.
features : Sequence[str], required
The name of features to extract.
pipeline : nvidia.dali.Pipeline, required
The pipeline to connect to.
shard_id : int, optional, default=0
The index of specific shard.
The index of partition to read.
num_shards : int, optional, default=1
The total number of shards.
shuffle_after_epoch : bool, optional, default=False
**True** to shuffle examples once an epoch is finished.
shuffle_chunks : int, optional, default=0
The number of chunks to shuffle.
The total number of partitions over dataset.
random_shuffle : bool, optional, default=False
Whether to shuffle the data.
initial_fill : int, optional, default=1024
The length of sampling sequence for shuffle.
"""
self._pipe = pipeline
......@@ -99,8 +105,8 @@ class KPLRecordReader(object):
source=path,
part_idx=shard_id,
num_parts=num_shards,
shuffle=shuffle_after_epoch,
num_chunks=shuffle_chunks,
shuffle=random_shuffle,
initial_fill=initial_fill,
**kwargs
)
self._buffer = self._reader.q_out = mp.Queue(
......@@ -197,13 +203,13 @@ class TFRecordReader(object):
path : str
The folder of record files.
shard_id : int, optional, default=0
The index of specific shard.
The index of partition to read.
num_shards : int, optional, default=1
The total number of shards.
The total number of partitions over dataset.
random_shuffle : bool, optional, default=False
**True** to shuffle examples in a sequence.
Whether to shuffle the data.
initial_fill : int, optional, default=1024
The length of sequence for shuffle.
The length of sampling sequence for shuffle.
Returns
-------
......
......@@ -14,10 +14,6 @@ before_first
############
.. automethod:: dragon.io.DataReader.before_first
next_chunk
##########
.. automethod:: dragon.io.DataReader.next_chunk
next_example
############
.. automethod:: dragon.io.DataReader.next_example
......
......@@ -18,6 +18,7 @@
#include <array>
#include <climits>
#include <cmath>
#include <cstring>
#include <ctime>
#include <functional>
#include <map>
......
......@@ -30,39 +30,39 @@ class DataReader(multiprocessing.Process):
simple_reader = DataReader(dataset=dataset, source=path)
```
Partition are available over distributed nodes:
Shuffle is supported to randomly sampling into a sequence buffer:
```python
distributed_reader = DataReader(
shuffle_reader = DataReader(
dataset=dataset,
source=path,
part_idx=rank,
num_parts=num_ranks,
shuffle=True,
# It is recommended to set a buffer size larger than
# the batch size to make batches of single node more diverse.
# Default value 1024 is sufficient for most case.
initial_fill=1024,
)
```
There are two shuffle schemes:
Partition are available over distributed nodes:
```python
# Recommendation: SSD or dataset is tiny
example_wise_shuffle_reader = DataReader(
dataset=dataset,
source=path,
shuffle=True,
num_chunks=0, # Set to the number of examples
)
# Recommendation: HDD or dataset is huge
chunk_wise_shuffle_reader = DataReader(
distributed_reader = DataReader(
dataset=dataset,
source=path,
shuffle=True,
num_chunks=2048,
part_idx=rank,
num_parts=world_size,
)
```
"""
class PartBoundaries(object):
"""Record the boundary of current part."""
def __init__(self, start, end):
self.start, self.end = start, end
def __init__(self, **kwargs):
"""Create a ``DataReader``.
......@@ -72,14 +72,14 @@ class DataReader(multiprocessing.Process):
The dataset class to load examples.
source : str
The path of data source.
shuffle : bool, optional, default=False
Whether to shuffle the data.r
num_chunks : int, optional, default=0
The number of chunks to split.
num_parts : int, optional, default=1
The number of partitions over dataset.
part_idx : int, optional, default=0
The index of current partition.
The index of partition to read.
num_parts : int, optional, default=1
The total number of partitions over dataset.
shuffle : bool, optional, default=False
Whether to shuffle the data.
initial_fill : int, optional, default=1024
The length of sampling sequence for shuffle.
seed : int, optional
The random seed to use instead.
......@@ -87,65 +87,69 @@ class DataReader(multiprocessing.Process):
super(DataReader, self).__init__()
self._dataset = kwargs.get('dataset', None)
self._source = kwargs.get('source', '')
self._shuffle = kwargs.get('shuffle', False)
self._num_chunks = kwargs.get('num_chunks', 0)
self._num_parts = kwargs.get('num_parts', 1)
self._part_idx = kwargs.get('part_idx', 0)
self._num_parts = kwargs.get('num_parts', 1)
self._shuffle = kwargs.get('shuffle', False)
self._initial_fill = kwargs.get('initial_fill', 1024) if self._shuffle else 1
self._seed = kwargs.get('seed', config.config().random_seed)
self._begin, self._end = 0, 0
self._perm_size, self._perm = 1, None
self._chunk_size, self._num_examples = 1, 1
self._example_cursor, self._chunk_cursor = 0, 0
self._first, self._cursor, self._last = 0, 0, 0
self._part_size = 0
self._num_examples = 0
self._example_buffer = []
self._parts = []
self.q_out = None
self.daemon = True
def before_first(self):
"""Move the cursor before begin."""
self._example_cursor = self._begin
self._dataset.redirect(self._begin)
self._cursor = self._first
self._dataset.redirect(self._first)
def next_example(self):
"""Return the next example."""
self._example_cursor += 1
self._cursor += 1
return self._dataset.get()
def next_chunk(self):
"""Select the next chunk."""
self._chunk_cursor += 1
if self._chunk_cursor >= self._perm_size:
self.reset()
else:
chunk_idx = self._part_idx * self._perm_size + int(self._perm[self._chunk_cursor])
self._begin = chunk_idx * self._chunk_size
if self._begin >= self._num_examples:
self.next_chunk()
else:
self._end = min(self._begin + self._chunk_size, self._num_examples)
self.before_first()
def reset(self):
def reset(self, stick_to_part=False):
"""Reset the environment of dataset."""
if self._num_parts > 1 or self._shuffle:
self._chunk_cursor = -1
# Redirect to the adjacent part if available.
if not stick_to_part:
self._part_idx = (self._part_idx + 1) % self._num_parts
if self._shuffle:
self._perm = numpy.random.permutation(self._perm_size)
self.next_chunk()
else:
self._begin, self._end = 0, self._num_examples
self.before_first()
self._first = self._part_idx * self._part_size
self._last = min(self._first + self._part_size, self._num_examples)
self.before_first()
# Use the new boundaries to avoid sampling duplicates
# when buffer size is greater than dataset size.
counter = self._parts[-1].end
self._parts.append(DataReader.PartBoundaries(counter, counter))
def run(self):
"""Start the process."""
self._init_dataset()
# Persist a loop to read examples.
while True:
self.q_out.put(self.next_example())
if self._example_cursor >= self._end:
if self._num_parts > 1 or self._shuffle:
self.next_chunk()
else:
self.reset()
# Pop the depleted part if necessary
if self._parts[0].start == self._parts[0].end:
self._parts.pop(0)
offset = 0
if self._shuffle:
# Sample a random offset if shuffle required.
offset = self._parts[0].end - self._parts[0].start
offset = int(numpy.random.uniform(high=offset))
# Choose a loaded example from the buffer.
i = self._parts[0].start % len(self._example_buffer)
j = (self._parts[0].start + offset) % len(self._example_buffer)
self.q_out.put(self._example_buffer[j])
self._example_buffer[j] = self._example_buffer[i]
# Load and push back a new example into the buffer.
k = self._parts[-1].end % len(self._example_buffer)
self._example_buffer[k] = self.next_example()
# Increase the part boundaries
self._parts[-1].end += 1
self._parts[0].start += 1
# Reset the cursor if necessary
if self._cursor >= self._last:
self.reset()
def _init_dataset(self):
"""Initialize the dataset."""
......@@ -154,24 +158,16 @@ class DataReader(multiprocessing.Process):
# Instantiate the dataset here to avoid a fork of process.
# Fork will somehow fail if dataset is implemented in C/C++.
self._dataset = self._dataset(self._source)
self._num_examples = self._dataset.size
# Determine the chunk scheme on different settings.
def div_up(a, b):
return (a + b - 1) // b
if self._shuffle:
if self._num_chunks <= 0:
# Each chunk has at most 1 example (ExampleWise).
self._perm_size = div_up(self._num_examples, self._num_parts)
else:
# Each chunk has several examples (ChunkWise).
self._perm_size = div_up(self._num_chunks, self._num_parts)
self._chunk_size = div_up(self._num_examples, self._num_chunks)
else:
# Each chunk has the examples of whole shard (ShardWise).
self._chunk_size = div_up(self._num_examples, self._num_parts)
# Reset the layout of permutation.
self._perm = numpy.arange(self._perm_size)
self.reset()
# Determine the part specification.
self._num_examples = self._dataset.size
self._part_size = (self._num_examples + self._num_parts - 1) // self._num_parts
self._parts.append(DataReader.PartBoundaries(0, 0))
# Fill the initial buffer to support random sampling.
self.reset(stick_to_part=True)
for i in range(self._initial_fill):
self._example_buffer.append(self.next_example())
self._parts[-1].end += 1
if self._cursor >= self._last:
self.reset()
......@@ -35,8 +35,7 @@ class Registry(object):
if not self.has(name):
raise KeyError(
"`%s` is not registered in <%s>."
% (name, self._name)
)
% (name, self._name))
return self._registry[name]
def has(self, name):
......
......@@ -74,8 +74,8 @@ class DataIterator(object):
The path of data source.
shuffle : bool, optional, default=False
Whether to shuffle the data.
num_chunks : int, optional, default=0
The number of chunks to split.
initial_fill : int, optional, default=1024
The length of sampling sequence for shuffle.
resize : int, optional, default=0
The size for the shortest edge.
padding : int, optional, default=0
......@@ -94,7 +94,7 @@ class DataIterator(object):
The range of scales to sample a crop randomly.
random_aspect_ratios : Sequence[float], optional, default=(0.75, 1.33)
The range of aspect ratios to sample a crop randomly.
augment_color : bool, optional, default=False
distort_color : bool, optional, default=False
Whether to apply color distortion.
inverse_color : bool, option, default=False
Whether to inverse channels for color images.
......
......@@ -56,7 +56,7 @@ class DataTransformer(multiprocessing.Process):
The range of scales to sample a crop randomly.
random_aspect_ratios : Sequence[float], optional, default=(0.75, 1.33)
The range of aspect ratios to sample a crop randomly.
augment_color : bool, optional, default=False
distort_color : bool, optional, default=False
Whether to apply color distortion.
inverse_color : bool, option, default=False
Whether to inverse channels for color images.
......@@ -76,7 +76,7 @@ class DataTransformer(multiprocessing.Process):
self._mirror = kwargs.get('mirror', False)
self._random_scales = kwargs.get('random_scales', (0.08, 1.))
self._random_ratios = kwargs.get('random_aspect_ratios', (3. / 4., 4. / 3.))
self._augment_color = kwargs.get('augment_color', False)
self._distort_color = kwargs.get('distort_color', False)
self._inverse_color = kwargs.get('inverse_color', False)
self._phase = kwargs.get('phase', 'TRAIN')
self._seed = kwargs.get('seed', config.config().random_seed)
......@@ -84,7 +84,7 @@ class DataTransformer(multiprocessing.Process):
self.daemon = True
if cv2 is None:
raise ImportError('Failed to import package <cv2>.')
if self._augment_color and PIL is None:
if self._distort_color and PIL is None:
raise ImportError('Failed to import package <PIL>.')
def get(self, example):
......@@ -105,7 +105,7 @@ class DataTransformer(multiprocessing.Process):
"""
# Decode.
img = numpy.frombuffer(example['data'], numpy.uint8)
if example['encoded'] > 0:
if example.get('encoded', 0) > 0:
img = cv2.imdecode(img, 1)
else:
img = img.reshape(example['shape'])
......@@ -117,13 +117,11 @@ class DataTransformer(multiprocessing.Process):
pass
else:
if w < h:
ow, oh = size, size * h // w
ow, oh, im_scale = size, size * h // w, float(size) / w
else:
oh, ow = size, size * w // h
img = cv2.resize(
img, (ow, oh),
interpolation=cv2.INTER_LINEAR,
)
oh, ow, im_scale = size, size * w // h, float(size) / h
interp = cv2.INTER_AREA if im_scale < 1 else cv2.INTER_LINEAR
img = cv2.resize(img, (ow, oh), interpolation=interp)
# Padding.
if self._padding > 0:
......@@ -181,7 +179,9 @@ class DataTransformer(multiprocessing.Process):
j = (width - w) // 2
img = img[i:i + h, j:j + w, :]
new_size = (self._random_crop_size, self._random_crop_size)
img = cv2.resize(img, new_size, interpolation=cv2.INTER_LINEAR)
min_scale = self._random_crop_size / max(img.shape[:2])
interp = cv2.INTER_AREA if min_scale < 1 else cv2.INTER_LINEAR
img = cv2.resize(img, new_size, interpolation=interp)
# CutOut.
if self._cutout_size > 0:
......@@ -199,8 +199,8 @@ class DataTransformer(multiprocessing.Process):
if numpy.random.randint(0, 2) > 0:
img = img[:, ::-1, :]
# Color augmentation.
if self._augment_color:
# Color distortion.
if self._distort_color:
img = PIL.Image.fromarray(img)
transforms = [
PIL.ImageEnhance.Brightness,
......
......@@ -13,8 +13,6 @@
#ifndef DRAGON_UTILS_CAST_H_
#define DRAGON_UTILS_CAST_H_
#include <cstring>
#include "dragon/core/types.h"
#include "dragon/utils/device/common_cuda.h"
......
......@@ -118,7 +118,7 @@ def load_and_assign_pkl_dict(name, module, skip=False):
value_dict = six.moves.pickle.load(f)
except UnicodeDecodeError:
with open(name, 'rb') as f:
value_dict = six.moves.pickle.load(f, encoding='iso-8859-1')
value_dict = six.moves.pickle.load(f, encoding='bytes')
weight_dict = {w.name: w for w in module.weights}
return _assign_weights_from_dict(weight_dict, value_dict, skip=skip)
......
......@@ -66,7 +66,7 @@ def load(f, pickle_module=PICKLE_MODULE):
f, 'rb', lambda f: pickle_module.load(f))
except UnicodeDecodeError:
return _with_file_like(
f, 'rb', lambda f: pickle_module.load(f, encoding='iso-8859-1'))
f, 'rb', lambda f: pickle_module.load(f, encoding='bytes'))
def _save_dict(obj):
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!