Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion numpy/_core/src/common/npy_pycompat.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
} \
}
#else
#define NPY_BEGIN_CRITICAL_SECTION_SEQUENCE_FAST(original) {
#define NPY_BEGIN_CRITICAL_SECTION_SEQUENCE_FAST(original) { do { (void)(original); } while (0)
#define NPY_END_CRITICAL_SECTION_SEQUENCE_FAST() }
#endif

Expand Down
22 changes: 17 additions & 5 deletions numpy/_core/src/multiarray/array_coercion.c
Original file line number Diff line number Diff line change
Expand Up @@ -1159,24 +1159,30 @@ PyArray_DiscoverDTypeAndShape_Recursive(
return -1;
}

int ret = -1;

NPY_BEGIN_CRITICAL_SECTION_SEQUENCE_FAST(obj);

npy_intp size = PySequence_Fast_GET_SIZE(seq);
PyObject **objects = PySequence_Fast_ITEMS(seq);

if (update_shape(curr_dims, &max_dims,
out_shape, 1, &size, NPY_TRUE, flags) < 0) {
/* But do update, if there this is a ragged case */
*flags |= FOUND_RAGGED_ARRAY;
return max_dims;
ret = max_dims;
goto finish;
}
if (size == 0) {
/* If the sequence is empty, this must be the last dimension */
*flags |= MAX_DIMS_WAS_REACHED;
return curr_dims + 1;
ret = curr_dims + 1;
goto finish;
}

/* Allow keyboard interrupts. See gh issue 18117. */
if (PyErr_CheckSignals() < 0) {
return -1;
goto finish;
}

/*
Expand All @@ -1196,10 +1202,16 @@ PyArray_DiscoverDTypeAndShape_Recursive(
flags, copy);

if (max_dims < 0) {
return -1;
goto finish;
}
}
return max_dims;
ret = max_dims;

finish:;

NPY_END_CRITICAL_SECTION_SEQUENCE_FAST();

return ret;
}


Expand Down
73 changes: 47 additions & 26 deletions numpy/_core/src/multiarray/ctors.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,33 +493,49 @@ NPY_NO_EXPORT int
PyArray_AssignFromCache_Recursive(
PyArrayObject *self, const int ndim, coercion_cache_obj **cache)
{
int ret = -1;
/* Consume first cache element by extracting information and freeing it */
PyObject *obj = (*cache)->arr_or_sequence;
Py_INCREF(obj);
npy_bool sequence = (*cache)->sequence;
npy_bool is_sequence = (*cache)->sequence;
/*
If it is a sequence, this object is the argument to PySequence_Fast, e.g.
the iterable that the user wants to coerce into an array
*/
PyObject *orig_seq = (*cache)->converted_obj;
/* Owned reference to an item in the sequence */
PyObject *item_pyvalue = NULL;
int depth = (*cache)->depth;
*cache = npy_unlink_coercion_cache(*cache);

/* The element is either a sequence, or an array */
if (!sequence) {
/* The element is either a sequence or an array */
if (!is_sequence) {
/* Straight forward array assignment */
assert(PyArray_Check(obj));
if (PyArray_CopyInto(self, (PyArrayObject *)obj) < 0) {
goto fail;
goto finish;
}
}
else {
assert(depth != ndim);
npy_intp length = PySequence_Length(obj);
if (length != PyArray_DIMS(self)[0]) {
PyErr_SetString(PyExc_RuntimeError,
"Inconsistent object during array creation? "
"Content of sequences changed (length inconsistent).");
goto fail;
}

for (npy_intp i = 0; i < length; i++) {
PyObject *value = PySequence_Fast_GET_ITEM(obj, i);
npy_intp orig_length = PyArray_DIMS(self)[0];
int err = 1;
NPY_BEGIN_CRITICAL_SECTION_SEQUENCE_FAST(orig_seq);
for (npy_intp i = 0; i < orig_length; i++) {
// this macro takes *the argument* of PySequence_Fast, which is orig_seq;
// not the object returned by PySequence_Fast, which is a proxy object
// with its own per-object PyMutex lock.
// We want to lock the list object exposed to users, not the proxy.
npy_intp length = PySequence_Fast_GET_SIZE(obj);
if (length != orig_length) {
PyErr_SetString(PyExc_RuntimeError,
"Inconsistent object during array creation? "
"Content of sequences changed (length inconsistent).");
goto finish_critical_section;
}
else {
Py_XSETREF(item_pyvalue, Py_NewRef(PySequence_Fast_GET_ITEM(obj, i)));
}

if (ndim == depth + 1) {
/*
Expand All @@ -532,34 +548,42 @@ PyArray_AssignFromCache_Recursive(
*/
char *item;
item = (PyArray_BYTES(self) + i * PyArray_STRIDES(self)[0]);
if (PyArray_Pack(PyArray_DESCR(self), item, value) < 0) {
goto fail;
if (PyArray_Pack(PyArray_DESCR(self), item, item_pyvalue) < 0) {
goto finish_critical_section;
}
/* If this was an array(-like) we still need to unlike int: */
if (*cache != NULL && (*cache)->converted_obj == value) {
if (*cache != NULL && (*cache)->converted_obj == item_pyvalue) {
*cache = npy_unlink_coercion_cache(*cache);
}
}
else {
PyArrayObject *view;
view = (PyArrayObject *)array_item_asarray(self, i);
if (view == NULL) {
goto fail;
goto finish_critical_section;
}
if (PyArray_AssignFromCache_Recursive(view, ndim, cache) < 0) {
Py_DECREF(view);
goto fail;
goto finish_critical_section;
}
Py_DECREF(view);
}
}
err = 0;
finish_critical_section:;

NPY_END_CRITICAL_SECTION_SEQUENCE_FAST();
if (err) {
goto finish;
}

}
Py_DECREF(obj);
return 0;
ret = 0;

fail:
finish:;
Py_XDECREF(item_pyvalue);
Py_DECREF(obj);
return -1;
return ret;
}


Expand Down Expand Up @@ -1571,8 +1595,6 @@ PyArray_FromAny_int(PyObject *op, PyArray_Descr *in_descr,
copy = 1;
}

Py_BEGIN_CRITICAL_SECTION(op);

ndim = PyArray_DiscoverDTypeAndShape(
op, max_depth, dims, &cache, in_DType, in_descr, &dtype,
copy, &was_copied_by__array__);
Expand Down Expand Up @@ -1741,7 +1763,6 @@ PyArray_FromAny_int(PyObject *op, PyArray_Descr *in_descr,
cleanup:;

Py_XDECREF(dtype);
Py_END_CRITICAL_SECTION();
return (PyObject *)ret;
}

Expand Down
4 changes: 2 additions & 2 deletions numpy/_core/src/multiarray/iterators.c
Original file line number Diff line number Diff line change
Expand Up @@ -1354,15 +1354,15 @@ arraymultiter_new(PyTypeObject *NPY_UNUSED(subtype), PyObject *args,
if (fast_seq == NULL) {
return NULL;
}
NPY_BEGIN_CRITICAL_SECTION_SEQUENCE_FAST(args)
NPY_BEGIN_CRITICAL_SECTION_SEQUENCE_FAST(args);
n = PySequence_Fast_GET_SIZE(fast_seq);
if (n > NPY_MAXARGS) {
ret = multiiter_wrong_number_of_args();
} else {
ret = multiiter_new_impl(n, PySequence_Fast_ITEMS(fast_seq));
}
Py_DECREF(fast_seq);
NPY_END_CRITICAL_SECTION_SEQUENCE_FAST()
NPY_END_CRITICAL_SECTION_SEQUENCE_FAST();
return ret;
}

Expand Down
58 changes: 41 additions & 17 deletions numpy/_core/tests/test_multithreading.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,46 +308,70 @@ def func(index):

# These are all implemented using PySequence_Fast, which needs locking to be safe
def np_broadcast(arrs):
for i in range(100):
for i in range(50):
np.broadcast(arrs)

def create_array(arrs):
for i in range(100):
for i in range(50):
np.array(arrs)

def create_nditer(arrs):
for i in range(1000):
for i in range(50):
np.nditer(arrs)

@pytest.mark.parametrize("kernel", (np_broadcast, create_array, create_nditer))
def test_arg_locking(kernel):
# should complete without failing or generating an error about an array size
# changing

b = threading.Barrier(5)
@pytest.mark.parametrize(
"kernel, outcome",
(
(np_broadcast, "error"),
(create_array, "error"),
(create_nditer, "success"),
),
)
def test_arg_locking(kernel, outcome):
# should complete without triggering races but may error

done = 0
arrs = []
arrs = [np.array([1, 2, 3]) for _ in range(1000)]

def read_arrs():
def read_arrs(b):
nonlocal done
b.wait()
try:
kernel(arrs)
finally:
done += 1

def mutate_list():
def contract_and_expand_list(b):
b.wait()
while done < 4:
if len(arrs) > 10:
arrs.pop(0)
elif len(arrs) <= 10:
arrs.extend([np.array([1, 2, 3]) for _ in range(1000)])

arrs = [np.array([1, 2, 3]) for _ in range(1000)]

tasks = [threading.Thread(target=read_arrs) for _ in range(4)]
tasks.append(threading.Thread(target=mutate_list))
def replace_list_items(b):
b.wait()
rng = np.random.RandomState()
rng.seed(0x4d3d3d3)
while done < 4:
data = rng.randint(0, 1000, size=4)
arrs[data[0]] = data[1:]

[t.start() for t in tasks]
[t.join() for t in tasks]
for mutation_func in (replace_list_items, contract_and_expand_list):
b = threading.Barrier(5)
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as tpe:
tasks = [tpe.submit(read_arrs, b) for _ in range(4)]
tasks.append(tpe.submit(mutation_func, b))
for t in tasks:
t.result()
except RuntimeError as e:
if outcome == "success":
raise
assert "Inconsistent object during array creation?" in str(e)
msg = "replace_list_items should not raise errors"
assert mutation_func is contract_and_expand_list, msg
finally:
if len(tasks) < 5:
b.abort()
Loading