Documentation
¶
Overview ¶
Package rx provide utility functions for channel inspired by RX: https://rxjs.dev/guide/operators
Index ¶
- Variables
- func BufferCount[T any](source <-chan Result[T], n int, option ...Option) <-chan Result[[]T]
- func CatchError[T any](source <-chan Result[T], selector func(err error) <-chan Result[T], ...) <-chan Result[T]
- func Concat[T any](sources []<-chan Result[T], options ...Option) <-chan Result[T]
- func ConcatAll[T any](sources <-chan Result[<-chan Result[T]], options ...Option) <-chan Result[T]
- func ConcatMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], ...) <-chan Result[R]
- func Count[T any](source <-chan Result[T], options ...Option) <-chan Result[int]
- func CountBy[T any](source <-chan Result[T], pred func(value T, index int) (bool, error), ...) <-chan Result[int]
- func Empty[T any](options ...Option) <-chan Result[T]
- func Filter[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), ...) <-chan Result[T]
- func First[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), ...) <-chan Result[T]
- func FirstOrElse[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), ...) <-chan Result[T]
- func FlatMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], ...) <-chan Result[R]
- func From[T any](vv []T, options ...Option) <-chan Result[T]
- func FromChannel[T any](ch <-chan T, options ...Option) <-chan Result[T]
- func Generate[T any, R any](seed T, condition func(v T) bool, iterate func(v T) T, ...) <-chan Result[R]
- func GroupBy[T any, K comparable](source <-chan Result[T], keySelector func(v T, index int) (K, error), ...) <-chan Result[GroupedResults[T, K]]
- func Interval(d time.Duration, options ...Option) <-chan Result[int]
- func Last[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), ...) <-chan Result[T]
- func LastOrElse[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), ...) <-chan Result[T]
- func Map[T any, R any](source <-chan Result[T], mapper func(value T, index int) (R, error), ...) <-chan Result[R]
- func Max[T constraints.Ordered](source <-chan Result[T], options ...Option) <-chan Result[T]
- func Merge[T any](sources []<-chan Result[T], options ...Option) <-chan Result[T]
- func MergeAll[T any](sources <-chan Result[<-chan Result[T]], options ...Option) <-chan Result[T]
- func MergeMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], ...) <-chan Result[R]
- func Min[T constraints.Ordered](source <-chan Result[T], options ...Option) <-chan Result[T]
- func Observable[T any](observe func(observer Observer[T]), options ...Option) <-chan Result[T]
- func Observe[T any](results <-chan Result[T], observer Observer[T]) error
- func Of[T any](vv ...T) <-chan Result[T]
- func Range(start int, count int, options ...Option) <-chan Result[int]
- func Reduce[T any, R any](source <-chan Result[T], reducer func(acc R, cur T, index int) (R, error), ...) <-chan Result[R]
- func Scan[T any, R any](source <-chan Result[T], reducer func(acc R, cur T, index int) (R, error), ...) <-chan Result[R]
- func SwitchAll[T any](sources <-chan Result[<-chan Result[T]], options ...Option) <-chan Result[T]
- func SwitchMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], ...) <-chan Result[R]
- func Take[T any](source <-chan Result[T], n int, options ...Option) <-chan Result[T]
- func TakeLast[T any](source <-chan Result[T], n int, options ...Option) <-chan Result[T]
- func TakeWhile[T any](source <-chan Result[T], pred func(value T, index int) (bool, error), ...) <-chan Result[T]
- func Tap[T any](source <-chan Result[T], observer func(value T, index int), options ...Option) <-chan Result[T]
- func ThrowError[T any](err error, options ...Option) <-chan Result[T]
- func Timer(d time.Duration, options ...Option) <-chan Result[int]
- func ToSlice[T any](source <-chan Result[T], options ...Option) <-chan Result[[]T]
- type GroupedResults
- type Observer
- type Option
- type Result
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotFound is an error type that indicates the result is not found in the channel ErrNotFound = errors.New("result not found") )
Functions ¶
func BufferCount ¶
BufferCount buffers the source channel values until the buffer size is reached, then emits the buffer and starts a new buffer
func CatchError ¶
func CatchError[T any]( source <-chan Result[T], selector func(err error) <-chan Result[T], options ...Option, ) <-chan Result[T]
CatchError catches errors from the source channel. If errors are emitted, it will call the selector function to get a new channel to observe
func ConcatMap ¶
func ConcatMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], options ...Option) <-chan Result[R]
ConcatMap transforms the values from the source channel to another channel using the provided function and `ConcatAll` them
func Count ¶
Count emits the number of values from the source channel when the source channel closed
func CountBy ¶
func CountBy[T any](source <-chan Result[T], pred func(value T, index int) (bool, error), options ...Option) <-chan Result[int]
CountBy emits the number of values from the source channel that satisfy the predicate function when the source channel closed
func Filter ¶
func Filter[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), options ...Option) <-chan Result[T]
Filter emits values from the source channel that pass the predicate function
func First ¶
func First[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), options ...Option) <-chan Result[T]
First emits the first value immediately from the source channel that pass the predicate function. If there are no values that pass the predicate function, it will emit an `ErrNotFound`. If there are any errors during the process, it will emit the error
func FirstOrElse ¶
func FirstOrElse[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), defaultValue T, options ...Option) <-chan Result[T]
FirstOrElse same as `First`, but it will emit the `defaultValue` when there is no value that pass the predicate function
func FlatMap ¶
func FlatMap[T any, R any]( source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], joinFn func(sources <-chan Result[<-chan Result[R]], options ...Option) <-chan Result[R], options ...Option, ) <-chan Result[R]
FlatMap transforms the values from the source channel to another channel using the provided mapper and joins them using the provided join function
func FromChannel ¶
FromChannel creates a Result channel that emits the values from the provided channel. Default buffer size is the same as the provided channel.
func Generate ¶
func Generate[T any, R any](seed T, condition func(v T) bool, iterate func(v T) T, resultSelector func(v T) R, options ...Option) <-chan Result[R]
Generate creates a channel that emits values generated by the provided seed, condition, iterate, and resultSelector functions
func GroupBy ¶
func GroupBy[T any, K comparable](source <-chan Result[T], keySelector func(v T, index int) (K, error), options ...Option) <-chan Result[GroupedResults[T, K]]
GroupBy groups the values from the source channel based on the keySelector function
func Last ¶
func Last[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), options ...Option) <-chan Result[T]
Last emits the last value from the source channel that pass the predicate function when the source channel closed. If there are no values that pass the predicate function, it will emit an `ErrNotFound`. If there are any errors during the process, it will emit the error
func LastOrElse ¶
func LastOrElse[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), defaultValue T, options ...Option) <-chan Result[T]
LastOrElse same as `Last`, but it will emit the `defaultValue` when there is no value that pass the predicate function
func Map ¶
func Map[T any, R any](source <-chan Result[T], mapper func(value T, index int) (R, error), options ...Option) <-chan Result[R]
Map transforms the values from the source channel using the mapper function
func Max ¶
func Max[T constraints.Ordered](source <-chan Result[T], options ...Option) <-chan Result[T]
Max emits the maximum value from the source channel when the source channel closed If the source channel is empty, it will lead ro an `ErrNotFound`
func Merge ¶
Merge creates a new channel that will merge the results of the given sources.
If any of the sources emits an error, the returned channel will be immediately emits the error and close.
func MergeMap ¶
func MergeMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], options ...Option) <-chan Result[R]
MergeMap transforms the values from the source channel to another channel using the provided mapper and `MergeAll` them
func Min ¶
func Min[T constraints.Ordered](source <-chan Result[T], options ...Option) <-chan Result[T]
Min emits the minimum value from the source channel when the source channel closed If the source channel is empty, it will lead ro an `ErrNotFound`
func Observable ¶
Observable creates an Result channel with the given observer function
func Of ¶
Of creates a channel that emits the provided values. Please note that this function is not receive Option, so the channel is always no buffer. If you want to set some Option, please use From function instead
func Reduce ¶
func Reduce[T any, R any](source <-chan Result[T], reducer func(acc R, cur T, index int) (R, error), seed R, options ...Option) <-chan Result[R]
Reduce reduces the values from the source channel into a single value that emit when the source channel closed using the reducer function If the source channel is empty, it will lead ro an `ErrNotFound`
func Scan ¶
func Scan[T any, R any](source <-chan Result[T], reducer func(acc R, cur T, index int) (R, error), seed R, options ...Option) <-chan Result[R]
Scan transforms the values from the source channel using the provided accumulator function (or reducer function). Like Reduce, but emits values every time the source channel emits a value.
func SwitchAll ¶
SwitchAll merges all emitting channels into one by merging their emission. It will immediately switch to the current emitting channel.
func SwitchMap ¶
func SwitchMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], options ...Option) <-chan Result[R]
SwitchMap transforms the values from the source channel to another channel using the provided function and `SwitchAll` them
func Take ¶
Take takes the first n values from the source channel and emits them to the output rx. If the source channel emits less than n values, the output channel will close after emitting all values
func TakeLast ¶
TakeLast takes the last n values from the source channel and emits them to the output when source channel closed
func TakeWhile ¶
func TakeWhile[T any](source <-chan Result[T], pred func(value T, index int) (bool, error), options ...Option) <-chan Result[T]
TakeWhile takes values from the source channel while the predicate function returns false
func Tap ¶
func Tap[T any](source <-chan Result[T], observer func(value T, index int), options ...Option) <-chan Result[T]
Tap used to perform side effects for each value emitted by the source rx. The output channel will emit the same values as the source rx. Please note that any error that occurs in the `observer` function will not be handled
func ThrowError ¶
ThrowError creates a channel that emits an error result
func Timer ¶
Timer creates a channel that emits number 0 and immediately close the channel after the specified duration
Types ¶
type GroupedResults ¶
type GroupedResults[T any, K comparable] struct { Results <-chan Result[T] Key K }
GroupedResults represents a return value of `GroupBy` function
type Option ¶
type Option func(*config)
Option represents an option for the channel utility
func WithBufferSize ¶
WithBufferSize sets the buffer size of the channel
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/empty
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/from
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/from_channel
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/generate
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/interval
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/of
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/range
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/throw_error
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
creation/timer
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
filtering/filter
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
filtering/take
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
filtering/take_last
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
filtering/take_while
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
join/merge_all
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
join/switch_all
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
join_creation/merge
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
math/count
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
math/max
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
math/min
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
math/reduce
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
transformation/buffer_count
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
transformation/group_by
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
transformation/map
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
transformation/merge_map
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
transformation/scan
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
transformation/switch_map
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
utility/tap
command
Package main is the entry point of the program
|
Package main is the entry point of the program |
|
utility/to_slice
command
Package main is the entry point of the program
|
Package main is the entry point of the program |