rx

package module
v0.0.0-...-b7717d3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2024 License: MIT Imports: 4 Imported by: 0

README

RX

Documentation

Overview

Package rx provide utility functions for channel inspired by RX: https://rxjs.dev/guide/operators

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotFound is an error type that indicates the result is not found in the channel
	ErrNotFound = errors.New("result not found")
)

Functions

func BufferCount

func BufferCount[T any](source <-chan Result[T], n int, option ...Option) <-chan Result[[]T]

BufferCount buffers the source channel values until the buffer size is reached, then emits the buffer and starts a new buffer

func CatchError

func CatchError[T any](
	source <-chan Result[T],
	selector func(err error) <-chan Result[T],
	options ...Option,
) <-chan Result[T]

CatchError catches errors from the source channel. If errors are emitted, it will call the selector function to get a new channel to observe

func Concat

func Concat[T any](sources []<-chan Result[T], options ...Option) <-chan Result[T]

Concat creates a new channel that will concatenate the results of the given sources

func ConcatAll

func ConcatAll[T any](sources <-chan Result[<-chan Result[T]], options ...Option) <-chan Result[T]

ConcatAll merges all emitting channels into one by concatenating their emission

func ConcatMap

func ConcatMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], options ...Option) <-chan Result[R]

ConcatMap transforms the values from the source channel to another channel using the provided function and `ConcatAll` them

func Count

func Count[T any](source <-chan Result[T], options ...Option) <-chan Result[int]

Count emits the number of values from the source channel when the source channel closed

func CountBy

func CountBy[T any](source <-chan Result[T], pred func(value T, index int) (bool, error), options ...Option) <-chan Result[int]

CountBy emits the number of values from the source channel that satisfy the predicate function when the source channel closed

func Empty

func Empty[T any](options ...Option) <-chan Result[T]

Empty creates a channel that immediately close the channel

func Filter

func Filter[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), options ...Option) <-chan Result[T]

Filter emits values from the source channel that pass the predicate function

func First

func First[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), options ...Option) <-chan Result[T]

First emits the first value immediately from the source channel that pass the predicate function. If there are no values that pass the predicate function, it will emit an `ErrNotFound`. If there are any errors during the process, it will emit the error

func FirstOrElse

func FirstOrElse[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), defaultValue T, options ...Option) <-chan Result[T]

FirstOrElse same as `First`, but it will emit the `defaultValue` when there is no value that pass the predicate function

func FlatMap

func FlatMap[T any, R any](
	source <-chan Result[T],
	mapper func(value T, index int) <-chan Result[R],
	joinFn func(sources <-chan Result[<-chan Result[R]], options ...Option) <-chan Result[R],
	options ...Option,
) <-chan Result[R]

FlatMap transforms the values from the source channel to another channel using the provided mapper and joins them using the provided join function

func From

func From[T any](vv []T, options ...Option) <-chan Result[T]

From creates a channel that emits the values from the provided slice

func FromChannel

func FromChannel[T any](ch <-chan T, options ...Option) <-chan Result[T]

FromChannel creates a Result channel that emits the values from the provided channel. Default buffer size is the same as the provided channel.

func Generate

func Generate[T any, R any](seed T, condition func(v T) bool, iterate func(v T) T, resultSelector func(v T) R, options ...Option) <-chan Result[R]

Generate creates a channel that emits values generated by the provided seed, condition, iterate, and resultSelector functions

func GroupBy

func GroupBy[T any, K comparable](source <-chan Result[T], keySelector func(v T, index int) (K, error), options ...Option) <-chan Result[GroupedResults[T, K]]

GroupBy groups the values from the source channel based on the keySelector function

func Interval

func Interval(d time.Duration, options ...Option) <-chan Result[int]

Interval creates a channel that emits an integer at a fixed interval

func Last

func Last[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), options ...Option) <-chan Result[T]

Last emits the last value from the source channel that pass the predicate function when the source channel closed. If there are no values that pass the predicate function, it will emit an `ErrNotFound`. If there are any errors during the process, it will emit the error

func LastOrElse

func LastOrElse[T any](source <-chan Result[T], predicate func(value T, index int) (bool, error), defaultValue T, options ...Option) <-chan Result[T]

LastOrElse same as `Last`, but it will emit the `defaultValue` when there is no value that pass the predicate function

func Map

func Map[T any, R any](source <-chan Result[T], mapper func(value T, index int) (R, error), options ...Option) <-chan Result[R]

Map transforms the values from the source channel using the mapper function

func Max

func Max[T constraints.Ordered](source <-chan Result[T], options ...Option) <-chan Result[T]

Max emits the maximum value from the source channel when the source channel closed If the source channel is empty, it will lead ro an `ErrNotFound`

func Merge

func Merge[T any](sources []<-chan Result[T], options ...Option) <-chan Result[T]

Merge creates a new channel that will merge the results of the given sources.

If any of the sources emits an error, the returned channel will be immediately emits the error and close.

func MergeAll

func MergeAll[T any](sources <-chan Result[<-chan Result[T]], options ...Option) <-chan Result[T]

MergeAll merges all emitting channels into one by merging their emission

func MergeMap

func MergeMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], options ...Option) <-chan Result[R]

MergeMap transforms the values from the source channel to another channel using the provided mapper and `MergeAll` them

func Min

func Min[T constraints.Ordered](source <-chan Result[T], options ...Option) <-chan Result[T]

Min emits the minimum value from the source channel when the source channel closed If the source channel is empty, it will lead ro an `ErrNotFound`

func Observable

func Observable[T any](observe func(observer Observer[T]), options ...Option) <-chan Result[T]

Observable creates an Result channel with the given observer function

func Observe

func Observe[T any](results <-chan Result[T], observer Observer[T]) error

Observe subscribes to the results channel and calls the observer handlers functions

func Of

func Of[T any](vv ...T) <-chan Result[T]

Of creates a channel that emits the provided values. Please note that this function is not receive Option, so the channel is always no buffer. If you want to set some Option, please use From function instead

func Range

func Range(start int, count int, options ...Option) <-chan Result[int]

Range creates a channel that emits a range of integers

func Reduce

func Reduce[T any, R any](source <-chan Result[T], reducer func(acc R, cur T, index int) (R, error), seed R, options ...Option) <-chan Result[R]

Reduce reduces the values from the source channel into a single value that emit when the source channel closed using the reducer function If the source channel is empty, it will lead ro an `ErrNotFound`

func Scan

func Scan[T any, R any](source <-chan Result[T], reducer func(acc R, cur T, index int) (R, error), seed R, options ...Option) <-chan Result[R]

Scan transforms the values from the source channel using the provided accumulator function (or reducer function). Like Reduce, but emits values every time the source channel emits a value.

func SwitchAll

func SwitchAll[T any](sources <-chan Result[<-chan Result[T]], options ...Option) <-chan Result[T]

SwitchAll merges all emitting channels into one by merging their emission. It will immediately switch to the current emitting channel.

func SwitchMap

func SwitchMap[T any, R any](source <-chan Result[T], mapper func(value T, index int) <-chan Result[R], options ...Option) <-chan Result[R]

SwitchMap transforms the values from the source channel to another channel using the provided function and `SwitchAll` them

func Take

func Take[T any](source <-chan Result[T], n int, options ...Option) <-chan Result[T]

Take takes the first n values from the source channel and emits them to the output rx. If the source channel emits less than n values, the output channel will close after emitting all values

func TakeLast

func TakeLast[T any](source <-chan Result[T], n int, options ...Option) <-chan Result[T]

TakeLast takes the last n values from the source channel and emits them to the output when source channel closed

func TakeWhile

func TakeWhile[T any](source <-chan Result[T], pred func(value T, index int) (bool, error), options ...Option) <-chan Result[T]

TakeWhile takes values from the source channel while the predicate function returns false

func Tap

func Tap[T any](source <-chan Result[T], observer func(value T, index int), options ...Option) <-chan Result[T]

Tap used to perform side effects for each value emitted by the source rx. The output channel will emit the same values as the source rx. Please note that any error that occurs in the `observer` function will not be handled

func ThrowError

func ThrowError[T any](err error, options ...Option) <-chan Result[T]

ThrowError creates a channel that emits an error result

func Timer

func Timer(d time.Duration, options ...Option) <-chan Result[int]

Timer creates a channel that emits number 0 and immediately close the channel after the specified duration

func ToSlice

func ToSlice[T any](source <-chan Result[T], options ...Option) <-chan Result[[]T]

ToSlice waits until the source channel is closed and then emits a single slice containing all the values emitted by the source channel. If the source channel emits an error, the output channel will not emit any value and will be closed immediately.

Types

type GroupedResults

type GroupedResults[T any, K comparable] struct {
	Results <-chan Result[T]
	Key     K
}

GroupedResults represents a return value of `GroupBy` function

type Observer

type Observer[T any] struct {
	Next func(T)
	Err  func(error)
	Done func()
}

Observer represents the observer with `Next`, `Err`, and `Done` handler functions

type Option

type Option func(*config)

Option represents an option for the channel utility

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the buffer size of the channel

type Result

type Result[T any] struct {
	// contains filtered or unexported fields
}

Result represents value emitted by an Observable

func Err

func Err[T any](err error) Result[T]

Err returns Result when value is invalid

func Ok

func Ok[T any](value T) Result[T]

Ok returns Result when a value is valid

func (Result[T]) Error

func (r Result[T]) Error() error

Error returns error

func (Result[T]) Get

func (r Result[T]) Get() (T, error)

Get returns value or error

func (Result[T]) IsError

func (r Result[T]) IsError() bool

IsError returns true if there is an error, otherwise false

func (Result[T]) Value

func (r Result[T]) Value() T

Value returns value if no error, otherwise it returns zero value

Directories

Path Synopsis
Package main is the entry point of the program
Package main is the entry point of the program
creation/empty command
Package main is the entry point of the program
Package main is the entry point of the program
creation/from command
Package main is the entry point of the program
Package main is the entry point of the program
creation/from_channel command
Package main is the entry point of the program
Package main is the entry point of the program
creation/generate command
Package main is the entry point of the program
Package main is the entry point of the program
creation/interval command
Package main is the entry point of the program
Package main is the entry point of the program
creation/of command
Package main is the entry point of the program
Package main is the entry point of the program
creation/range command
Package main is the entry point of the program
Package main is the entry point of the program
creation/throw_error command
Package main is the entry point of the program
Package main is the entry point of the program
creation/timer command
Package main is the entry point of the program
Package main is the entry point of the program
filtering/filter command
Package main is the entry point of the program
Package main is the entry point of the program
filtering/take command
Package main is the entry point of the program
Package main is the entry point of the program
filtering/take_last command
Package main is the entry point of the program
Package main is the entry point of the program
filtering/take_while command
Package main is the entry point of the program
Package main is the entry point of the program
join/merge_all command
Package main is the entry point of the program
Package main is the entry point of the program
join/switch_all command
Package main is the entry point of the program
Package main is the entry point of the program
join_creation/merge command
Package main is the entry point of the program
Package main is the entry point of the program
math/count command
Package main is the entry point of the program
Package main is the entry point of the program
math/max command
Package main is the entry point of the program
Package main is the entry point of the program
math/min command
Package main is the entry point of the program
Package main is the entry point of the program
math/reduce command
Package main is the entry point of the program
Package main is the entry point of the program
transformation/buffer_count command
Package main is the entry point of the program
Package main is the entry point of the program
transformation/group_by command
Package main is the entry point of the program
Package main is the entry point of the program
transformation/map command
Package main is the entry point of the program
Package main is the entry point of the program
transformation/merge_map command
Package main is the entry point of the program
Package main is the entry point of the program
transformation/scan command
Package main is the entry point of the program
Package main is the entry point of the program
transformation/switch_map command
Package main is the entry point of the program
Package main is the entry point of the program
utility/tap command
Package main is the entry point of the program
Package main is the entry point of the program
utility/to_slice command
Package main is the entry point of the program
Package main is the entry point of the program

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL