mirror of
https://github.com/MetaCubeX/mihomo.git
synced 2025-12-20 00:50:06 +08:00
Some checks failed
Test / test (1.20, macos-13) (push) Waiting to run
Test / test (1.20, macos-latest) (push) Waiting to run
Test / test (1.20, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.20, windows-latest) (push) Waiting to run
Test / test (1.21, macos-13) (push) Waiting to run
Test / test (1.21, macos-latest) (push) Waiting to run
Test / test (1.21, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.21, windows-latest) (push) Waiting to run
Test / test (1.22, macos-13) (push) Waiting to run
Test / test (1.22, macos-latest) (push) Waiting to run
Test / test (1.22, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.22, windows-latest) (push) Waiting to run
Test / test (1.23, macos-13) (push) Waiting to run
Test / test (1.23, macos-latest) (push) Waiting to run
Test / test (1.23, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.23, windows-latest) (push) Waiting to run
Test / test (1.24, macos-13) (push) Waiting to run
Test / test (1.24, macos-latest) (push) Waiting to run
Test / test (1.24, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.24, windows-latest) (push) Waiting to run
Test / test (1.20, ubuntu-latest) (push) Failing after 1s
Test / test (1.21, ubuntu-latest) (push) Failing after 1s
Test / test (1.22, ubuntu-latest) (push) Failing after 1s
Test / test (1.23, ubuntu-latest) (push) Failing after 1s
Test / test (1.24, ubuntu-latest) (push) Failing after 1s
Trigger CMFA Update / trigger-CMFA-update (push) Failing after 1s
69 lines
1.2 KiB
Go
69 lines
1.2 KiB
Go
package observable
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
type Observable[T any] struct {
|
|
iterable Iterable[T]
|
|
listener map[Subscription[T]]*Subscriber[T]
|
|
mux sync.Mutex
|
|
done bool
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
func (o *Observable[T]) process() {
|
|
for item := range o.iterable {
|
|
o.mux.Lock()
|
|
for _, sub := range o.listener {
|
|
sub.Emit(item)
|
|
}
|
|
o.mux.Unlock()
|
|
}
|
|
o.close()
|
|
}
|
|
|
|
func (o *Observable[T]) close() {
|
|
o.mux.Lock()
|
|
defer o.mux.Unlock()
|
|
|
|
o.done = true
|
|
for _, sub := range o.listener {
|
|
sub.Close()
|
|
}
|
|
close(o.stopCh)
|
|
}
|
|
|
|
func (o *Observable[T]) Subscribe() (Subscription[T], error) {
|
|
o.mux.Lock()
|
|
defer o.mux.Unlock()
|
|
if o.done {
|
|
return nil, errors.New("observable is closed")
|
|
}
|
|
subscriber := newSubscriber[T]()
|
|
o.listener[subscriber.Out()] = subscriber
|
|
return subscriber.Out(), nil
|
|
}
|
|
|
|
func (o *Observable[T]) UnSubscribe(sub Subscription[T]) {
|
|
o.mux.Lock()
|
|
defer o.mux.Unlock()
|
|
subscriber, exist := o.listener[sub]
|
|
if !exist {
|
|
return
|
|
}
|
|
delete(o.listener, sub)
|
|
subscriber.Close()
|
|
}
|
|
|
|
func NewObservable[T any](iter Iterable[T]) *Observable[T] {
|
|
observable := &Observable[T]{
|
|
iterable: iter,
|
|
listener: map[Subscription[T]]*Subscriber[T]{},
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
go observable.process()
|
|
return observable
|
|
}
|