| 1 | //! Parallel iterator types for [results][std::result] | 
| 2 | //! | 
|---|
| 3 | //! You will rarely need to interact with this module directly unless you need | 
|---|
| 4 | //! to name one of the iterator types. | 
|---|
| 5 | //! | 
|---|
| 6 | //! [std::result]: https://doc.rust-lang.org/stable/std/result/ | 
|---|
| 7 |  | 
|---|
| 8 | use crate::iter::plumbing::*; | 
|---|
| 9 | use crate::iter::*; | 
|---|
| 10 | use std::sync::Mutex; | 
|---|
| 11 |  | 
|---|
| 12 | use crate::option; | 
|---|
| 13 |  | 
|---|
| 14 | /// Parallel iterator over a result | 
|---|
| 15 | #[ derive(Debug, Clone)] | 
|---|
| 16 | pub struct IntoIter<T: Send> { | 
|---|
| 17 | inner: option::IntoIter<T>, | 
|---|
| 18 | } | 
|---|
| 19 |  | 
|---|
| 20 | impl<T: Send, E> IntoParallelIterator for Result<T, E> { | 
|---|
| 21 | type Item = T; | 
|---|
| 22 | type Iter = IntoIter<T>; | 
|---|
| 23 |  | 
|---|
| 24 | fn into_par_iter(self) -> Self::Iter { | 
|---|
| 25 | IntoIter { | 
|---|
| 26 | inner: self.ok().into_par_iter(), | 
|---|
| 27 | } | 
|---|
| 28 | } | 
|---|
| 29 | } | 
|---|
| 30 |  | 
|---|
| 31 | delegate_indexed_iterator! { | 
|---|
| 32 | IntoIter<T> => T, | 
|---|
| 33 | impl<T: Send> | 
|---|
| 34 | } | 
|---|
| 35 |  | 
|---|
| 36 | /// Parallel iterator over an immutable reference to a result | 
|---|
| 37 | #[ derive(Debug)] | 
|---|
| 38 | pub struct Iter<'a, T: Sync> { | 
|---|
| 39 | inner: option::IntoIter<&'a T>, | 
|---|
| 40 | } | 
|---|
| 41 |  | 
|---|
| 42 | impl<'a, T: Sync> Clone for Iter<'a, T> { | 
|---|
| 43 | fn clone(&self) -> Self { | 
|---|
| 44 | Iter { | 
|---|
| 45 | inner: self.inner.clone(), | 
|---|
| 46 | } | 
|---|
| 47 | } | 
|---|
| 48 | } | 
|---|
| 49 |  | 
|---|
| 50 | impl<'a, T: Sync, E> IntoParallelIterator for &'a Result<T, E> { | 
|---|
| 51 | type Item = &'a T; | 
|---|
| 52 | type Iter = Iter<'a, T>; | 
|---|
| 53 |  | 
|---|
| 54 | fn into_par_iter(self) -> Self::Iter { | 
|---|
| 55 | Iter { | 
|---|
| 56 | inner: self.as_ref().ok().into_par_iter(), | 
|---|
| 57 | } | 
|---|
| 58 | } | 
|---|
| 59 | } | 
|---|
| 60 |  | 
|---|
| 61 | delegate_indexed_iterator! { | 
|---|
| 62 | Iter<'a, T> => &'a T, | 
|---|
| 63 | impl<'a, T: Sync + 'a> | 
|---|
| 64 | } | 
|---|
| 65 |  | 
|---|
| 66 | /// Parallel iterator over a mutable reference to a result | 
|---|
| 67 | #[ derive(Debug)] | 
|---|
| 68 | pub struct IterMut<'a, T: Send> { | 
|---|
| 69 | inner: option::IntoIter<&'a mut T>, | 
|---|
| 70 | } | 
|---|
| 71 |  | 
|---|
| 72 | impl<'a, T: Send, E> IntoParallelIterator for &'a mut Result<T, E> { | 
|---|
| 73 | type Item = &'a mut T; | 
|---|
| 74 | type Iter = IterMut<'a, T>; | 
|---|
| 75 |  | 
|---|
| 76 | fn into_par_iter(self) -> Self::Iter { | 
|---|
| 77 | IterMut { | 
|---|
| 78 | inner: self.as_mut().ok().into_par_iter(), | 
|---|
| 79 | } | 
|---|
| 80 | } | 
|---|
| 81 | } | 
|---|
| 82 |  | 
|---|
| 83 | delegate_indexed_iterator! { | 
|---|
| 84 | IterMut<'a, T> => &'a mut T, | 
|---|
| 85 | impl<'a, T: Send + 'a> | 
|---|
| 86 | } | 
|---|
| 87 |  | 
|---|
| 88 | /// Collect an arbitrary `Result`-wrapped collection. | 
|---|
| 89 | /// | 
|---|
| 90 | /// If any item is `Err`, then all previous `Ok` items collected are | 
|---|
| 91 | /// discarded, and it returns that error.  If there are multiple errors, the | 
|---|
| 92 | /// one returned is not deterministic. | 
|---|
| 93 | impl<C, T, E> FromParallelIterator<Result<T, E>> for Result<C, E> | 
|---|
| 94 | where | 
|---|
| 95 | C: FromParallelIterator<T>, | 
|---|
| 96 | T: Send, | 
|---|
| 97 | E: Send, | 
|---|
| 98 | { | 
|---|
| 99 | fn from_par_iter<I>(par_iter: I) -> Self | 
|---|
| 100 | where | 
|---|
| 101 | I: IntoParallelIterator<Item = Result<T, E>>, | 
|---|
| 102 | { | 
|---|
| 103 | fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ { | 
|---|
| 104 | move |item| match item { | 
|---|
| 105 | Ok(item) => Some(item), | 
|---|
| 106 | Err(error) => { | 
|---|
| 107 | // We don't need a blocking `lock()`, as anybody | 
|---|
| 108 | // else holding the lock will also be writing | 
|---|
| 109 | // `Some(error)`, and then ours is irrelevant. | 
|---|
| 110 | if let Ok(mut guard) = saved.try_lock() { | 
|---|
| 111 | if guard.is_none() { | 
|---|
| 112 | *guard = Some(error); | 
|---|
| 113 | } | 
|---|
| 114 | } | 
|---|
| 115 | None | 
|---|
| 116 | } | 
|---|
| 117 | } | 
|---|
| 118 | } | 
|---|
| 119 |  | 
|---|
| 120 | let saved_error = Mutex::new(None); | 
|---|
| 121 | let collection = par_iter | 
|---|
| 122 | .into_par_iter() | 
|---|
| 123 | .map(ok(&saved_error)) | 
|---|
| 124 | .while_some() | 
|---|
| 125 | .collect(); | 
|---|
| 126 |  | 
|---|
| 127 | match saved_error.into_inner().unwrap() { | 
|---|
| 128 | Some(error) => Err(error), | 
|---|
| 129 | None => Ok(collection), | 
|---|
| 130 | } | 
|---|
| 131 | } | 
|---|
| 132 | } | 
|---|
| 133 |  | 
|---|