stream

package module
v0.0.0-...-7a02d84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: MIT Imports: 4 Imported by: 0

README ΒΆ

go-stream

GoCI Go Report Card codecov GoDoc GitHub license GitHub issues GitHub stars

Stream Collections for Go. Inspired in Java 8 Streams.

Use channel and Go1.18+ generic support.

✨ xuender/go-stream is a Java 8 Streams style Go library based on Go 1.18+ Generics.

πŸš€ Install

To install the library and command line program:

go get -u github.com/xuender/go-stream@latest

πŸ’‘ Usage

You can import stream using:

import "github.com/xuender/go-stream"
NewBase

New BaseStream.

package main

import (
  "fmt"

  "github.com/xuender/go-stream"
)

func main() {
  base := stream.NewBase(stream.Range2Channel(5)).
    Peek(func(num int) { fmt.Println("peek1:", num) }).
    Filter(func(num int) bool { return num > 2 }).
    Peek(func(num int) { fmt.Println("peek2:", num) })

  fmt.Println(base.Count())
}

Output:

peek1: 0
peek1: 1
peek1: 2
peek1: 3
peek1: 4
peek2: 3
peek2: 4
2

[play]

Parallel

BaseStream to ParallelStream.

package main

import (
  "fmt"
  "math/rand"
  "time"

  "github.com/xuender/go-stream"
)

func main() {
  stream.NewBase(stream.Range2Channel(1000)).
    Parallel(100).
    Filter(func(num int) bool { return num%7 == 0 }).
    ForEach(func(num int) {
      dur := time.Duration(rand.Intn(1000)) * time.Millisecond

      time.Sleep(dur)
      fmt.Printf("%d\t%s\n", num, dur)
    })
}

Output:

623     2ms
497     2ms
273     15ms
252     26ms
616     33ms
756     10ms
91      47ms
7       59ms
21      59ms
602     59ms
350     78ms
28      81ms
...

[play]

Map

Integer to string.

package main

import (
  "fmt"

  "github.com/xuender/go-stream"
)

func main() {
  base := stream.Map(
    stream.Range2Channel(100),
    func(num int) string { return fmt.Sprintf("[%d]", num) },
  ).Limit(3)

  for i := range base.C {
    fmt.Println(i)
  }
}

Output:

[0]
[1]
[2]

[play]

FlatMap
package main

import (
  "fmt"

  "github.com/xuender/go-stream"
)

func main() {
  stream.FlatMap(
    stream.Slice2Channel([]int{0, 0}, []int{1, 2}, []int{2, 4}),
    func(num int) string { return fmt.Sprintf("[%d]", num) },
  ).ForEach(func(str string) {
    fmt.Println(str)
  })
}

Output:

[0]
[0]
[1]
[2]
[2]
[4]

[play]

Sorted

OrderedStream sorted.

package main

import (
  "fmt"

  "github.com/xuender/go-stream"
)

func main() {
  stream.NewOrdered(stream.Slice2Channel(3, 2, 7, 1)).
    Sorted().
    ForEach(func(num int) {
      fmt.Println(num)
    })
}

Output:

1
2
3
7

[play]

πŸ›© Functions

Function Type State
AnyMatch Terminal operations, short-circuiting √
AllMatch Terminal operations, short-circuiting √
Count Terminal operations √
Filter Intermediate operations, Parallel √
FindFirst Terminal operations, short-circuiting √
ForEach Terminal operations, Parallel √
Limit Intermediate operations √
NoneMatch Terminal operations, short-circuiting √
Parallel Intermediate operations √
Peek Intermediate operations, Parallel √
Skip Intermediate operations √
Sort Intermediate operations √
Distinct Intermediate operations, Comparable √
Max Terminal operations, Ordered √
Min Terminal operations, Ordered √
Reduce Terminal operations, Ordered √
Sorted Intermediate operations, Ordered √
Sequential Intermediate operations, Parallel √
Map Function √
FlatMap Function √

πŸ“ License

Β© ender, 2023~time.Now

MIT LICENSE

Documentation ΒΆ

Overview ΒΆ

Package stream implements like Java 8 Streams for Go. Use channel and Go1.18 generic support.

Index ΒΆ

Examples ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

This section is empty.

Functions ΒΆ

func Copy ΒΆ

func Copy[T any](source *BaseStream[T]) chan T

func Count ΒΆ

func Count[T any](input <-chan T) int

func Distinct ΒΆ

func Distinct[C comparable](input <-chan C) chan C

Distinct returns a channel consisting of the distinct elements of input channel.

func Distribute ΒΆ

func Distribute[T any](input <-chan T, output1, output2 chan<- T)

func Filter ΒΆ

func Filter[T any](input <-chan T, action FilterAction[T]) chan T

Filter returns a channel consisting of the elements of input channel that match the given action.

Example ΒΆ
chi := stream.Filter(
	stream.Range2Channel(10),
	func(num int) bool { return num > 5 },
)

for i := range chi {
	fmt.Println(i)
}
Output:

6
7
8
9

func FilterParallel ΒΆ

func FilterParallel[T any](input <-chan T, size int, action FilterAction[T]) chan T

FilterParallel returns a channel consisting of the elements of input channel that match the given action, parallel.

func Limit ΒΆ

func Limit[T any](input <-chan T, maxSize int) chan T

Limit returns a channel consisting of the elements of input channel, truncated to be no longer than maxSize in length.

func Peek ΒΆ

func Peek[T any](input <-chan T, action Action[T]) chan T

Peek returns a channel consisting of the elements of input channel, additionally performing the provided action on each element as elements are consumed from the resulting stream.

func PeekParallel ΒΆ

func PeekParallel[T any](input <-chan T, size int, action Action[T]) chan T

func Range2Channel ΒΆ

func Range2Channel(length int) chan int
Example ΒΆ
for i := range stream.Range2Channel(3) {
	fmt.Println(i)
}

for i := range stream.Range2Channel(-3) {
	fmt.Println(i)
}
Output:

0
1
2
0
-1
-2

func RangeFrom2Channel ΒΆ

func RangeFrom2Channel(start, length int) chan int
Example ΒΆ
for i := range stream.RangeFrom2Channel(3, 2) {
	fmt.Println(i)
}

for i := range stream.RangeFrom2Channel(8, -3) {
	fmt.Println(i)
}
Output:

3
4
8
7
6

func RangeWithSteps2Channel ΒΆ

func RangeWithSteps2Channel(start, end, step int) chan int
Example ΒΆ
for i := range stream.RangeWithSteps2Channel(0, 4, 3) {
	fmt.Println(i)
}

for i := range stream.RangeWithSteps2Channel(9, 5, -3) {
	fmt.Println(i)
}
Output:

0
3
9
6

func Reduce ΒΆ

func Reduce[O constraints.Ordered](input <-chan O) chan O

func Skip ΒΆ

func Skip[T any](input <-chan T, size int) chan T

func Slice2Channel ΒΆ

func Slice2Channel[T any](elems ...T) chan T
Example ΒΆ
for i := range stream.Slice2Channel(1, 2, 3) {
	fmt.Println(i)
}
Output:

1
2
3

func Sort ΒΆ

func Sort[T any](input <-chan T, action LessAction[T]) chan T

func Sorted ΒΆ

func Sorted[O constraints.Ordered](input <-chan O) chan O

Types ΒΆ

type Action ΒΆ

type Action[T any] func(T)

type BaseStream ΒΆ

type BaseStream[T any] struct {
	C chan T
}

func FlatMap ΒΆ

func FlatMap[I, O any](input chan []I, action MapAction[I, O]) *BaseStream[O]
Example ΒΆ
stream.FlatMap(
	stream.Slice2Channel([]int{0, 0}, []int{1, 2}, []int{2, 4}),
	func(num int) string { return fmt.Sprintf("[%d]", num) },
).ForEach(func(str string) {
	fmt.Println(str)
})
Output:

[0]
[0]
[1]
[2]
[2]
[4]

func Map ΒΆ

func Map[I, O any](input chan I, action MapAction[I, O]) *BaseStream[O]
Example ΒΆ
base1 := stream.NewBase(stream.Range2Channel(10)).
	Filter(func(num int) bool { return num > 5 })
base2 := stream.Map(
	base1.C,
	func(num int) string { return fmt.Sprintf("[%d]", num) },
)

for num := range base2.C {
	fmt.Println(num)
}
Output:

[6]
[7]
[8]
[9]

func NewBase ΒΆ

func NewBase[T any](input chan T) *BaseStream[T]
Example ΒΆ
base := stream.NewBase(stream.Range2Channel(10)).
	Filter(func(num int) bool { return num > 5 })

fmt.Println(base.Count())
Output:

4

func (*BaseStream[T]) AllMatch ΒΆ

func (p *BaseStream[T]) AllMatch(action FilterAction[T]) bool
Example ΒΆ
base1 := stream.NewBase(stream.Range2Channel(10))
base2 := stream.NewBase(stream.Copy(base1))

fmt.Println(base1.AllMatch(func(num int) bool { return num > 8 }))
fmt.Println(base2.AllMatch(func(num int) bool { return num >= 0 }))
Output:

false
true

func (*BaseStream[T]) AnyMatch ΒΆ

func (p *BaseStream[T]) AnyMatch(action FilterAction[T]) bool
Example ΒΆ
base1 := stream.NewBase(stream.Range2Channel(10))
base2 := stream.NewBase(stream.Copy(base1))

fmt.Println(base1.AnyMatch(func(num int) bool { return num > 100 }))
fmt.Println(base2.AnyMatch(func(num int) bool { return num > 1 }))
Output:

false
true

func (*BaseStream[T]) Count ΒΆ

func (p *BaseStream[T]) Count() int

func (*BaseStream[T]) Filter ΒΆ

func (p *BaseStream[T]) Filter(action FilterAction[T]) *BaseStream[T]

Filter returns a stream consisting of the elements of this stream that match the given action.

Example ΒΆ
stream.NewBase(stream.Range2Channel(10)).
	Filter(func(num int) bool { return num > 5 }).
	ForEach(func(num int) {
		fmt.Println(num)
	})
Output:

6
7
8
9

func (*BaseStream[T]) FindFirst ΒΆ

func (p *BaseStream[T]) FindFirst() T
Example ΒΆ
base := stream.NewBase(stream.Range2Channel(10)).
	Filter(func(num int) bool { return num > 5 })

fmt.Println(base.FindFirst())
Output:

6

func (*BaseStream[T]) ForEach ΒΆ

func (p *BaseStream[T]) ForEach(action Action[T])
Example ΒΆ
stream.NewBase(stream.Range2Channel(10)).
	ForEach(func(num int) { fmt.Println(num) })
Output:

0
1
2
3
4
5
6
7
8
9

func (*BaseStream[T]) Limit ΒΆ

func (p *BaseStream[T]) Limit(maxSize int) *BaseStream[T]

Limit returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.

Example ΒΆ
stream.NewBase(stream.Range2Channel(10)).
	Limit(3).
	ForEach(func(num int) {
		fmt.Println(num)
	})
Output:

0
1
2

func (*BaseStream[T]) NoneMatch ΒΆ

func (p *BaseStream[T]) NoneMatch(action FilterAction[T]) bool
Example ΒΆ
base1 := stream.NewBase(stream.Range2Channel(10))
base2 := stream.NewBase(stream.Copy(base1))

fmt.Println(base1.NoneMatch(func(num int) bool { return num > 100 }))
fmt.Println(base2.NoneMatch(func(num int) bool { return num > 1 }))
Output:

true
false

func (*BaseStream[T]) Parallel ΒΆ

func (p *BaseStream[T]) Parallel(size int) *ParallelStream[T]

func (*BaseStream[T]) Peek ΒΆ

func (p *BaseStream[T]) Peek(action Action[T]) *BaseStream[T]

Peek returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.

Example ΒΆ
count1 := 0
count2 := 0
base := stream.NewBase(stream.Range2Channel(3)).
	Peek(func(num int) { count1++ }).
	Filter(func(num int) bool { return num%2 == 0 }).
	Peek(func(num int) { count2++ })

fmt.Println(base.Count())
fmt.Println(count1)
fmt.Println(count2)
Output:

2
3
2

func (*BaseStream[T]) Skip ΒΆ

func (p *BaseStream[T]) Skip(size int) *BaseStream[T]
Example ΒΆ
stream.NewBase(stream.Range2Channel(6)).
	Skip(4).
	ForEach(func(num int) {
		fmt.Println(num)
	})
Output:

4
5

func (*BaseStream[T]) Sort ΒΆ

func (p *BaseStream[T]) Sort(action LessAction[T]) *BaseStream[T]
Example ΒΆ
stream.NewBase(stream.Slice2Channel(3, 2, 7, 1)).
	Sort(func(num1, num2 int) bool { return num2 < num1 }).
	ForEach(func(num int) {
		fmt.Println(num)
	})
Output:

7
3
2
1

type ComparableStream ΒΆ

type ComparableStream[C comparable] struct {
	BaseStream[C]
}

func FlatMapComparable ΒΆ

func FlatMapComparable[I any, O comparable](input chan []I, action MapAction[I, O]) *ComparableStream[O]
Example ΒΆ
stream.FlatMapOrdered(
	stream.Slice2Channel([]int{1, 2}, []int{2, 4}, []int{3, 6}),
	func(num int) string { return fmt.Sprintf("[%d]", num) },
).
	Sorted().
	ForEach(func(str string) {
		fmt.Println(str)
	})
Output:

[1]
[2]
[2]
[3]
[4]
[6]

func MapComparable ΒΆ

func MapComparable[I any, O comparable](input chan I, action MapAction[I, O]) *ComparableStream[O]
Example ΒΆ
com := stream.MapComparable(
	stream.Slice2Channel(1, 1, 1, 2, 3, 3, 4),
	func(num int) string { return fmt.Sprintf("[%d]", num) },
).Distinct()

for i := range com.C {
	fmt.Println(i)
}
Output:

[1]
[2]
[3]
[4]

func NewComparable ΒΆ

func NewComparable[C comparable](input chan C) *ComparableStream[C]

func (*ComparableStream[C]) Distinct ΒΆ

func (p *ComparableStream[C]) Distinct() *ComparableStream[C]

Distinct returns a stream consisting of the distinct elements of this stream.

Example ΒΆ
stream.NewComparable(stream.Slice2Channel(1, 1, 2, 3, 3, 4)).
	Distinct().
	ForEach(func(num int) {
		fmt.Println(num)
	})
Output:

1
2
3
4

type FilterAction ΒΆ

type FilterAction[T any] func(T) bool

type LessAction ΒΆ

type LessAction[T any] func(T, T) bool

type MapAction ΒΆ

type MapAction[I, O any] func(I) O

type OrderedStream ΒΆ

type OrderedStream[O constraints.Ordered] struct {
	ComparableStream[O]
}

func FlatMapOrdered ΒΆ

func FlatMapOrdered[I any, O constraints.Ordered](input chan []I, action MapAction[I, O]) *OrderedStream[O]
Example ΒΆ
stream.FlatMapComparable(stream.Slice2Channel([]int{1, 2}, []int{1, 2}, []int{2, 4},
	[]int{3, 6}, []int{3, 6}, []int{4, 8}),
	func(num int) string { return fmt.Sprintf("[%d]", num) },
).
	Distinct().
	ForEach(func(str string) {
		fmt.Println(str)
	})
Output:

[1]
[2]
[4]
[3]
[6]
[8]

func MapOrdered ΒΆ

func MapOrdered[I any, O constraints.Ordered](input chan I, action MapAction[I, O]) *OrderedStream[O]
Example ΒΆ
ordered := stream.MapOrdered(
	stream.Range2Channel(10),
	func(num int) string { return fmt.Sprintf("[%d]", num) },
)

fmt.Println(ordered.Max())
Output:

[9]

func NewOrdered ΒΆ

func NewOrdered[O constraints.Ordered](input chan O) *OrderedStream[O]

func (*OrderedStream[O]) Max ΒΆ

func (p *OrderedStream[O]) Max() O

func (*OrderedStream[O]) Min ΒΆ

func (p *OrderedStream[O]) Min() O
Example ΒΆ
ordered := stream.MapOrdered(stream.Range2Channel(10), func(num int) string { return fmt.Sprintf("[%d]", num) })

fmt.Println(ordered.Min())
Output:

[0]

func (*OrderedStream[O]) Reduce ΒΆ

func (p *OrderedStream[O]) Reduce() *OrderedStream[O]
Example ΒΆ
stream.NewOrdered(stream.Slice2Channel(3, 2, 7, 1)).
	Reduce().
	ForEach(func(num int) {
		fmt.Println(num)
	})
Output:

7
3
2
1

func (*OrderedStream[O]) Sorted ΒΆ

func (p *OrderedStream[O]) Sorted() *OrderedStream[O]
Example ΒΆ
stream.NewOrdered(stream.Slice2Channel(3, 2, 7, 1)).
	Sorted().ForEach(func(num int) {
	fmt.Println(num)
})
Output:

1
2
3
7

type ParallelStream ΒΆ

type ParallelStream[T any] struct {
	BaseStream[T]
	Size int
}

func NewParallel ΒΆ

func NewParallel[T any](input chan T, size int) *ParallelStream[T]

func (*ParallelStream[T]) Filter ΒΆ

func (p *ParallelStream[T]) Filter(action FilterAction[T]) *ParallelStream[T]

Filter returns a stream consisting of the elements of this stream that match the given action, parallel.

Example ΒΆ
stream.NewBase(stream.Range2Channel(10)).
	Parallel(3).
	Filter(func(num int) bool { return num > 5 }).
	ForEach(func(_ int) {
		fmt.Println("x")
	})
Output:

x
x
x
x

func (*ParallelStream[T]) ForEach ΒΆ

func (p *ParallelStream[T]) ForEach(action Action[T])
Example ΒΆ
stream.NewParallel(stream.Range2Channel(3), 3).
	ForEach(func(num int) {
		time.Sleep(time.Duration((3-num)*100) * time.Millisecond)
		fmt.Println(num)
	})
Output:

2
1
0

func (*ParallelStream[T]) Peek ΒΆ

func (p *ParallelStream[T]) Peek(action Action[T]) *ParallelStream[T]
Example ΒΆ
count := stream.NewParallel(stream.Range2Channel(3), 3).
	Peek(func(num int) {
		time.Sleep(time.Duration((3-num)*100) * time.Millisecond)
		fmt.Println(num)
	}).Count()

fmt.Println(count)
Output:

2
1
0
3

func (*ParallelStream[T]) Sequential ΒΆ

func (p *ParallelStream[T]) Sequential() *BaseStream[T]
Example ΒΆ
strea := stream.NewParallel(stream.Range2Channel(10), 10).
	Sequential().
	Filter(func(num int) bool { return num > 5 })

fmt.Println(strea.FindFirst())
Output:

6

Directories ΒΆ

Path Synopsis
_examples
base command
count command
filter command
flat_map command
map command
match command
parallel command
peek command
sorted command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL