| 1 | //! Traits and functions used to implement parallel iteration. These are |
| 2 | //! low-level details -- users of parallel iterators should not need to |
| 3 | //! interact with them directly. See [the `plumbing` README][r] for a general overview. |
| 4 | //! |
| 5 | //! [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
| 6 | |
| 7 | use crate::join_context; |
| 8 | |
| 9 | use super::IndexedParallelIterator; |
| 10 | |
| 11 | use std::cmp; |
| 12 | use std::usize; |
| 13 | |
| 14 | /// The `ProducerCallback` trait is a kind of generic closure, |
| 15 | /// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in |
| 16 | /// the plumbing README][r] for more details. |
| 17 | /// |
| 18 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback |
| 19 | /// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html |
| 20 | pub trait ProducerCallback<T> { |
| 21 | /// The type of value returned by this callback. Analogous to |
| 22 | /// [`Output` from the `FnOnce` trait][Output]. |
| 23 | /// |
| 24 | /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output |
| 25 | type Output; |
| 26 | |
| 27 | /// Invokes the callback with the given producer as argument. The |
| 28 | /// key point of this trait is that this method is generic over |
| 29 | /// `P`, and hence implementors must be defined for any producer. |
| 30 | fn callback<P>(self, producer: P) -> Self::Output |
| 31 | where |
| 32 | P: Producer<Item = T>; |
| 33 | } |
| 34 | |
| 35 | /// A `Producer` is effectively a "splittable `IntoIterator`". That |
| 36 | /// is, a producer is a value which can be converted into an iterator |
| 37 | /// at any time: at that point, it simply produces items on demand, |
| 38 | /// like any iterator. But what makes a `Producer` special is that, |
| 39 | /// *before* we convert to an iterator, we can also **split** it at a |
| 40 | /// particular point using the `split_at` method. This will yield up |
| 41 | /// two producers, one producing the items before that point, and one |
| 42 | /// producing the items after that point (these two producers can then |
| 43 | /// independently be split further, or be converted into iterators). |
| 44 | /// In Rayon, this splitting is used to divide between threads. |
| 45 | /// See [the `plumbing` README][r] for further details. |
| 46 | /// |
| 47 | /// Note that each producer will always produce a fixed number of |
| 48 | /// items N. However, this number N is not queryable through the API; |
| 49 | /// the consumer is expected to track it. |
| 50 | /// |
| 51 | /// NB. You might expect `Producer` to extend the `IntoIterator` |
| 52 | /// trait. However, [rust-lang/rust#20671][20671] prevents us from |
| 53 | /// declaring the DoubleEndedIterator and ExactSizeIterator |
| 54 | /// constraints on a required IntoIterator trait, so we inline |
| 55 | /// IntoIterator here until that issue is fixed. |
| 56 | /// |
| 57 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
| 58 | /// [20671]: https://github.com/rust-lang/rust/issues/20671 |
| 59 | pub trait Producer: Send + Sized { |
| 60 | /// The type of item that will be produced by this producer once |
| 61 | /// it is converted into an iterator. |
| 62 | type Item; |
| 63 | |
| 64 | /// The type of iterator we will become. |
| 65 | type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator; |
| 66 | |
| 67 | /// Convert `self` into an iterator; at this point, no more parallel splits |
| 68 | /// are possible. |
| 69 | fn into_iter(self) -> Self::IntoIter; |
| 70 | |
| 71 | /// The minimum number of items that we will process |
| 72 | /// sequentially. Defaults to 1, which means that we will split |
| 73 | /// all the way down to a single item. This can be raised higher |
| 74 | /// using the [`with_min_len`] method, which will force us to |
| 75 | /// create sequential tasks at a larger granularity. Note that |
| 76 | /// Rayon automatically normally attempts to adjust the size of |
| 77 | /// parallel splits to reduce overhead, so this should not be |
| 78 | /// needed. |
| 79 | /// |
| 80 | /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len |
| 81 | fn min_len(&self) -> usize { |
| 82 | 1 |
| 83 | } |
| 84 | |
| 85 | /// The maximum number of items that we will process |
| 86 | /// sequentially. Defaults to MAX, which means that we can choose |
| 87 | /// not to split at all. This can be lowered using the |
| 88 | /// [`with_max_len`] method, which will force us to create more |
| 89 | /// parallel tasks. Note that Rayon automatically normally |
| 90 | /// attempts to adjust the size of parallel splits to reduce |
| 91 | /// overhead, so this should not be needed. |
| 92 | /// |
| 93 | /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len |
| 94 | fn max_len(&self) -> usize { |
| 95 | usize::MAX |
| 96 | } |
| 97 | |
| 98 | /// Split into two producers; one produces items `0..index`, the |
| 99 | /// other `index..N`. Index must be less than or equal to `N`. |
| 100 | fn split_at(self, index: usize) -> (Self, Self); |
| 101 | |
| 102 | /// Iterate the producer, feeding each element to `folder`, and |
| 103 | /// stop when the folder is full (or all elements have been consumed). |
| 104 | /// |
| 105 | /// The provided implementation is sufficient for most iterables. |
| 106 | fn fold_with<F>(self, folder: F) -> F |
| 107 | where |
| 108 | F: Folder<Self::Item>, |
| 109 | { |
| 110 | folder.consume_iter(self.into_iter()) |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | /// A consumer is effectively a [generalized "fold" operation][fold], |
| 115 | /// and in fact each consumer will eventually be converted into a |
| 116 | /// [`Folder`]. What makes a consumer special is that, like a |
| 117 | /// [`Producer`], it can be **split** into multiple consumers using |
| 118 | /// the `split_at` method. When a consumer is split, it produces two |
| 119 | /// consumers, as well as a **reducer**. The two consumers can be fed |
| 120 | /// items independently, and when they are done the reducer is used to |
| 121 | /// combine their two results into one. See [the `plumbing` |
| 122 | /// README][r] for further details. |
| 123 | /// |
| 124 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
| 125 | /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold |
| 126 | /// [`Folder`]: trait.Folder.html |
| 127 | /// [`Producer`]: trait.Producer.html |
| 128 | pub trait Consumer<Item>: Send + Sized { |
| 129 | /// The type of folder that this consumer can be converted into. |
| 130 | type Folder: Folder<Item, Result = Self::Result>; |
| 131 | |
| 132 | /// The type of reducer that is produced if this consumer is split. |
| 133 | type Reducer: Reducer<Self::Result>; |
| 134 | |
| 135 | /// The type of result that this consumer will ultimately produce. |
| 136 | type Result: Send; |
| 137 | |
| 138 | /// Divide the consumer into two consumers, one processing items |
| 139 | /// `0..index` and one processing items from `index..`. Also |
| 140 | /// produces a reducer that can be used to reduce the results at |
| 141 | /// the end. |
| 142 | fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); |
| 143 | |
| 144 | /// Convert the consumer into a folder that can consume items |
| 145 | /// sequentially, eventually producing a final result. |
| 146 | fn into_folder(self) -> Self::Folder; |
| 147 | |
| 148 | /// Hint whether this `Consumer` would like to stop processing |
| 149 | /// further items, e.g. if a search has been completed. |
| 150 | fn full(&self) -> bool; |
| 151 | } |
| 152 | |
| 153 | /// The `Folder` trait encapsulates [the standard fold |
| 154 | /// operation][fold]. It can be fed many items using the `consume` |
| 155 | /// method. At the end, once all items have been consumed, it can then |
| 156 | /// be converted (using `complete`) into a final value. |
| 157 | /// |
| 158 | /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold |
| 159 | pub trait Folder<Item>: Sized { |
| 160 | /// The type of result that will ultimately be produced by the folder. |
| 161 | type Result; |
| 162 | |
| 163 | /// Consume next item and return new sequential state. |
| 164 | fn consume(self, item: Item) -> Self; |
| 165 | |
| 166 | /// Consume items from the iterator until full, and return new sequential state. |
| 167 | /// |
| 168 | /// This method is **optional**. The default simply iterates over |
| 169 | /// `iter`, invoking `consume` and checking after each iteration |
| 170 | /// whether `full` returns false. |
| 171 | /// |
| 172 | /// The main reason to override it is if you can provide a more |
| 173 | /// specialized, efficient implementation. |
| 174 | fn consume_iter<I>(mut self, iter: I) -> Self |
| 175 | where |
| 176 | I: IntoIterator<Item = Item>, |
| 177 | { |
| 178 | for item in iter { |
| 179 | self = self.consume(item); |
| 180 | if self.full() { |
| 181 | break; |
| 182 | } |
| 183 | } |
| 184 | self |
| 185 | } |
| 186 | |
| 187 | /// Finish consuming items, produce final result. |
| 188 | fn complete(self) -> Self::Result; |
| 189 | |
| 190 | /// Hint whether this `Folder` would like to stop processing |
| 191 | /// further items, e.g. if a search has been completed. |
| 192 | fn full(&self) -> bool; |
| 193 | } |
| 194 | |
| 195 | /// The reducer is the final step of a `Consumer` -- after a consumer |
| 196 | /// has been split into two parts, and each of those parts has been |
| 197 | /// fully processed, we are left with two results. The reducer is then |
| 198 | /// used to combine those two results into one. See [the `plumbing` |
| 199 | /// README][r] for further details. |
| 200 | /// |
| 201 | /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md |
| 202 | pub trait Reducer<Result> { |
| 203 | /// Reduce two final results into one; this is executed after a |
| 204 | /// split. |
| 205 | fn reduce(self, left: Result, right: Result) -> Result; |
| 206 | } |
| 207 | |
| 208 | /// A stateless consumer can be freely copied. These consumers can be |
| 209 | /// used like regular consumers, but they also support a |
| 210 | /// `split_off_left` method that does not take an index to split, but |
| 211 | /// simply splits at some arbitrary point (`for_each`, for example, |
| 212 | /// produces an unindexed consumer). |
| 213 | pub trait UnindexedConsumer<I>: Consumer<I> { |
| 214 | /// Splits off a "left" consumer and returns it. The `self` |
| 215 | /// consumer should then be used to consume the "right" portion of |
| 216 | /// the data. (The ordering matters for methods like find_first -- |
| 217 | /// values produced by the returned value are given precedence |
| 218 | /// over values produced by `self`.) Once the left and right |
| 219 | /// halves have been fully consumed, you should reduce the results |
| 220 | /// with the result of `to_reducer`. |
| 221 | fn split_off_left(&self) -> Self; |
| 222 | |
| 223 | /// Creates a reducer that can be used to combine the results from |
| 224 | /// a split consumer. |
| 225 | fn to_reducer(&self) -> Self::Reducer; |
| 226 | } |
| 227 | |
| 228 | /// A variant on `Producer` which does not know its exact length or |
| 229 | /// cannot represent it in a `usize`. These producers act like |
| 230 | /// ordinary producers except that they cannot be told to split at a |
| 231 | /// particular point. Instead, you just ask them to split 'somewhere'. |
| 232 | /// |
| 233 | /// (In principle, `Producer` could extend this trait; however, it |
| 234 | /// does not because to do so would require producers to carry their |
| 235 | /// own length with them.) |
| 236 | pub trait UnindexedProducer: Send + Sized { |
| 237 | /// The type of item returned by this producer. |
| 238 | type Item; |
| 239 | |
| 240 | /// Split midway into a new producer if possible, otherwise return `None`. |
| 241 | fn split(self) -> (Self, Option<Self>); |
| 242 | |
| 243 | /// Iterate the producer, feeding each element to `folder`, and |
| 244 | /// stop when the folder is full (or all elements have been consumed). |
| 245 | fn fold_with<F>(self, folder: F) -> F |
| 246 | where |
| 247 | F: Folder<Self::Item>; |
| 248 | } |
| 249 | |
| 250 | /// A splitter controls the policy for splitting into smaller work items. |
| 251 | /// |
| 252 | /// Thief-splitting is an adaptive policy that starts by splitting into |
| 253 | /// enough jobs for every worker thread, and then resets itself whenever a |
| 254 | /// job is actually stolen into a different thread. |
| 255 | #[derive (Clone, Copy)] |
| 256 | struct Splitter { |
| 257 | /// The `splits` tell us approximately how many remaining times we'd |
| 258 | /// like to split this job. We always just divide it by two though, so |
| 259 | /// the effective number of pieces will be `next_power_of_two()`. |
| 260 | splits: usize, |
| 261 | } |
| 262 | |
| 263 | impl Splitter { |
| 264 | #[inline ] |
| 265 | fn new() -> Splitter { |
| 266 | Splitter { |
| 267 | splits: crate::current_num_threads(), |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | #[inline ] |
| 272 | fn try_split(&mut self, stolen: bool) -> bool { |
| 273 | let Splitter { splits } = *self; |
| 274 | |
| 275 | if stolen { |
| 276 | // This job was stolen! Reset the number of desired splits to the |
| 277 | // thread count, if that's more than we had remaining anyway. |
| 278 | self.splits = cmp::max(crate::current_num_threads(), self.splits / 2); |
| 279 | true |
| 280 | } else if splits > 0 { |
| 281 | // We have splits remaining, make it so. |
| 282 | self.splits /= 2; |
| 283 | true |
| 284 | } else { |
| 285 | // Not stolen, and no more splits -- we're done! |
| 286 | false |
| 287 | } |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | /// The length splitter is built on thief-splitting, but additionally takes |
| 292 | /// into account the remaining length of the iterator. |
| 293 | #[derive (Clone, Copy)] |
| 294 | struct LengthSplitter { |
| 295 | inner: Splitter, |
| 296 | |
| 297 | /// The smallest we're willing to divide into. Usually this is just 1, |
| 298 | /// but you can choose a larger working size with `with_min_len()`. |
| 299 | min: usize, |
| 300 | } |
| 301 | |
| 302 | impl LengthSplitter { |
| 303 | /// Creates a new splitter based on lengths. |
| 304 | /// |
| 305 | /// The `min` is a hard lower bound. We'll never split below that, but |
| 306 | /// of course an iterator might start out smaller already. |
| 307 | /// |
| 308 | /// The `max` is an upper bound on the working size, used to determine |
| 309 | /// the minimum number of times we need to split to get under that limit. |
| 310 | /// The adaptive algorithm may very well split even further, but never |
| 311 | /// smaller than the `min`. |
| 312 | #[inline ] |
| 313 | fn new(min: usize, max: usize, len: usize) -> LengthSplitter { |
| 314 | let mut splitter = LengthSplitter { |
| 315 | inner: Splitter::new(), |
| 316 | min: cmp::max(min, 1), |
| 317 | }; |
| 318 | |
| 319 | // Divide the given length by the max working length to get the minimum |
| 320 | // number of splits we need to get under that max. This rounds down, |
| 321 | // but the splitter actually gives `next_power_of_two()` pieces anyway. |
| 322 | // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces. |
| 323 | let min_splits = len / cmp::max(max, 1); |
| 324 | |
| 325 | // Only update the value if it's not splitting enough already. |
| 326 | if min_splits > splitter.inner.splits { |
| 327 | splitter.inner.splits = min_splits; |
| 328 | } |
| 329 | |
| 330 | splitter |
| 331 | } |
| 332 | |
| 333 | #[inline ] |
| 334 | fn try_split(&mut self, len: usize, stolen: bool) -> bool { |
| 335 | // If splitting wouldn't make us too small, try the inner splitter. |
| 336 | len / 2 >= self.min && self.inner.try_split(stolen) |
| 337 | } |
| 338 | } |
| 339 | |
| 340 | /// This helper function is used to "connect" a parallel iterator to a |
| 341 | /// consumer. It will convert the `par_iter` into a producer P and |
| 342 | /// then pull items from P and feed them to `consumer`, splitting and |
| 343 | /// creating parallel threads as needed. |
| 344 | /// |
| 345 | /// This is useful when you are implementing your own parallel |
| 346 | /// iterators: it is often used as the definition of the |
| 347 | /// [`drive_unindexed`] or [`drive`] methods. |
| 348 | /// |
| 349 | /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed |
| 350 | /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive |
| 351 | pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result |
| 352 | where |
| 353 | I: IndexedParallelIterator, |
| 354 | C: Consumer<I::Item>, |
| 355 | { |
| 356 | let len: usize = par_iter.len(); |
| 357 | return par_iter.with_producer(Callback { len, consumer }); |
| 358 | |
| 359 | struct Callback<C> { |
| 360 | len: usize, |
| 361 | consumer: C, |
| 362 | } |
| 363 | |
| 364 | impl<C, I> ProducerCallback<I> for Callback<C> |
| 365 | where |
| 366 | C: Consumer<I>, |
| 367 | { |
| 368 | type Output = C::Result; |
| 369 | fn callback<P>(self, producer: P) -> C::Result |
| 370 | where |
| 371 | P: Producer<Item = I>, |
| 372 | { |
| 373 | bridge_producer_consumer(self.len, producer, self.consumer) |
| 374 | } |
| 375 | } |
| 376 | } |
| 377 | |
| 378 | /// This helper function is used to "connect" a producer and a |
| 379 | /// consumer. You may prefer to call [`bridge`], which wraps this |
| 380 | /// function. This function will draw items from `producer` and feed |
| 381 | /// them to `consumer`, splitting and creating parallel tasks when |
| 382 | /// needed. |
| 383 | /// |
| 384 | /// This is useful when you are implementing your own parallel |
| 385 | /// iterators: it is often used as the definition of the |
| 386 | /// [`drive_unindexed`] or [`drive`] methods. |
| 387 | /// |
| 388 | /// [`bridge`]: fn.bridge.html |
| 389 | /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed |
| 390 | /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive |
| 391 | pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result |
| 392 | where |
| 393 | P: Producer, |
| 394 | C: Consumer<P::Item>, |
| 395 | { |
| 396 | let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); |
| 397 | return helper(len, false, splitter, producer, consumer); |
| 398 | |
| 399 | fn helper<P, C>( |
| 400 | len: usize, |
| 401 | migrated: bool, |
| 402 | mut splitter: LengthSplitter, |
| 403 | producer: P, |
| 404 | consumer: C, |
| 405 | ) -> C::Result |
| 406 | where |
| 407 | P: Producer, |
| 408 | C: Consumer<P::Item>, |
| 409 | { |
| 410 | if consumer.full() { |
| 411 | consumer.into_folder().complete() |
| 412 | } else if splitter.try_split(len, migrated) { |
| 413 | let mid = len / 2; |
| 414 | let (left_producer, right_producer) = producer.split_at(mid); |
| 415 | let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); |
| 416 | let (left_result, right_result) = join_context( |
| 417 | |context| { |
| 418 | helper( |
| 419 | mid, |
| 420 | context.migrated(), |
| 421 | splitter, |
| 422 | left_producer, |
| 423 | left_consumer, |
| 424 | ) |
| 425 | }, |
| 426 | |context| { |
| 427 | helper( |
| 428 | len - mid, |
| 429 | context.migrated(), |
| 430 | splitter, |
| 431 | right_producer, |
| 432 | right_consumer, |
| 433 | ) |
| 434 | }, |
| 435 | ); |
| 436 | reducer.reduce(left_result, right_result) |
| 437 | } else { |
| 438 | producer.fold_with(consumer.into_folder()).complete() |
| 439 | } |
| 440 | } |
| 441 | } |
| 442 | |
| 443 | /// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer. |
| 444 | /// |
| 445 | /// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html |
| 446 | pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result |
| 447 | where |
| 448 | P: UnindexedProducer, |
| 449 | C: UnindexedConsumer<P::Item>, |
| 450 | { |
| 451 | let splitter: Splitter = Splitter::new(); |
| 452 | bridge_unindexed_producer_consumer(migrated:false, splitter, producer, consumer) |
| 453 | } |
| 454 | |
| 455 | fn bridge_unindexed_producer_consumer<P, C>( |
| 456 | migrated: bool, |
| 457 | mut splitter: Splitter, |
| 458 | producer: P, |
| 459 | consumer: C, |
| 460 | ) -> C::Result |
| 461 | where |
| 462 | P: UnindexedProducer, |
| 463 | C: UnindexedConsumer<P::Item>, |
| 464 | { |
| 465 | if consumer.full() { |
| 466 | consumer.into_folder().complete() |
| 467 | } else if splitter.try_split(stolen:migrated) { |
| 468 | match producer.split() { |
| 469 | (left_producer: P, Some(right_producer: P)) => { |
| 470 | let (reducer: ::Item>>::Reducer, left_consumer: C, right_consumer: C) = |
| 471 | (consumer.to_reducer(), consumer.split_off_left(), consumer); |
| 472 | let bridge: fn bridge_unindexed_producer_consumer<…, …>(…) -> … = bridge_unindexed_producer_consumer; |
| 473 | let (left_result: ::Item>>::Result, right_result: ::Item>>::Result) = join_context( |
| 474 | |context| bridge(context.migrated(), splitter, left_producer, left_consumer), |
| 475 | |context: FnContext| bridge(context.migrated(), splitter, right_producer, right_consumer), |
| 476 | ); |
| 477 | reducer.reduce(left_result, right_result) |
| 478 | } |
| 479 | (producer: P, None) => producer.fold_with(consumer.into_folder()).complete(), |
| 480 | } |
| 481 | } else { |
| 482 | producer.fold_with(consumer.into_folder()).complete() |
| 483 | } |
| 484 | } |
| 485 | |