Documentation
¶
Index ¶
- Variables
- type BaseService
- type ClockValue
- type Clonable
- type Event
- func (e *Event) Clone() Clonable
- func (e *Event) IsGeneric() bool
- func (e *Event) IsMessageReceive() bool
- func (e *Event) IsMessageSend() bool
- func (e *Event) IsTimeoutEnd() bool
- func (e *Event) IsTimeoutStart() bool
- func (e *Event) MessageID() (MessageID, bool)
- func (e *Event) Timeout() (*ReplicaTimeout, bool)
- type EventDAG
- type EventID
- type EventNode
- func (n *EventNode) AddParents(parents []*EventNode)
- func (n *EventNode) Clone() *EventNode
- func (n *EventNode) GetNext() EventID
- func (n *EventNode) GetPrev() EventID
- func (n *EventNode) Lt(other *EventNode) bool
- func (n *EventNode) SetClock(cv ClockValue)
- func (n *EventNode) SetNext(next EventID)
- func (n *EventNode) SetPrev(prev EventID)
- type EventNodeSet
- type EventType
- type GenericEventType
- type GlobalClock
- type List
- type Map
- func (s *Map[T, V]) Add(key T, val V)
- func (s *Map[T, V]) Exists(key T) bool
- func (s *Map[T, V]) Get(key T) (V, bool)
- func (s *Map[T, V]) IterValues() []V
- func (s *Map[T, V]) RandomValue() (V, bool)
- func (s *Map[T, V]) RandomValueWithSource(src rand.Source) (V, bool)
- func (s *Map[T, V]) Remove(key T)
- func (s *Map[T, V]) RemoveAll()
- func (s *Map[T, V]) Size() int
- func (s *Map[T, V]) ToMap() map[T]V
- type Message
- type MessageID
- type MessageParser
- type MessageReceiveEventType
- type MessageSendEventType
- type ParsedMessage
- type Queue
- type Replica
- type ReplicaID
- type ReplicaLog
- type ReplicaLogQueue
- type ReplicaLogStore
- type ReplicaState
- type ReplicaStateStore
- type ReplicaStore
- func (s *ReplicaStore) Add(p *Replica)
- func (s *ReplicaStore) Cap() int
- func (s *ReplicaStore) Count() int
- func (s *ReplicaStore) Get(id ReplicaID) (p *Replica, ok bool)
- func (s *ReplicaStore) GetRandom() (*Replica, bool)
- func (s *ReplicaStore) Iter() []*Replica
- func (s *ReplicaStore) NumReady() int
- func (s *ReplicaStore) ResetReady()
- type ReplicaTimeout
- type ReportLogs
- type RestartableService
- type Service
- type TimeoutEndEventType
- type TimeoutStartEventType
Constants ¶
This section is empty.
Variables ¶
var ( ErrDuplicateSubs = errors.New("duplicate subscriber") ErrNoSubs = errors.New("subscriber does not exist") ErrNoData = errors.New("no data in message") ErrBadParser = errors.New("bad parser") DefaultSubsChSize = 10 )
var ( // ErrReplicaStoreFull is returned when more than the intended number of replicas register with the scheduler tool ErrReplicaStoreFull = errors.New("replica store is full") )
Functions ¶
This section is empty.
Types ¶
type BaseService ¶
BaseService provides the basic nuts an bolts needed to implement a service
func NewBaseService ¶
func NewBaseService(name string, parentLogger *log.Logger) *BaseService
NewBaseService instantiates BaseService
func (*BaseService) QuitCh ¶
func (b *BaseService) QuitCh() <-chan struct{}
QuitCh returns the quit channel which will be closed when the service stops running
func (*BaseService) SetLogger ¶ added in v0.1.3
func (b *BaseService) SetLogger(logger *log.Logger)
func (*BaseService) StartRunning ¶
func (b *BaseService) StartRunning()
StartRunning is called to set the running flag
func (*BaseService) StopRunning ¶
func (b *BaseService) StopRunning()
StopRunning is called to unset the running flag
type ClockValue ¶
type ClockValue []float64
func ZeroClock ¶ added in v0.1.1
func ZeroClock(replicas int) ClockValue
func (ClockValue) Eq ¶
func (c ClockValue) Eq(other ClockValue) bool
func (ClockValue) Lt ¶
func (c ClockValue) Lt(other ClockValue) bool
type Clonable ¶
type Clonable interface {
Clone() Clonable
}
Clonable is any type which returns a copy of itself on Clone()
type Event ¶
type Event struct {
// Replica at which the event occurs
Replica ReplicaID `json:"replica"`
// Type of the event
Type EventType `json:"-"`
// TypeS is the string representation of the event
TypeS string `json:"type"`
// ID unique identifier assigned for every new event
ID EventID `json:"id"`
// Timestamp of the event
Timestamp int64 `json:"timestamp"`
}
Event is a generic event that occurs at a replica
func (*Event) IsMessageReceive ¶
func (*Event) IsMessageSend ¶
func (*Event) IsTimeoutEnd ¶
func (*Event) IsTimeoutStart ¶
func (*Event) Timeout ¶
func (e *Event) Timeout() (*ReplicaTimeout, bool)
type EventDAG ¶
type EventDAG struct {
// contains filtered or unexported fields
}
func NewEventDag ¶
func NewEventDag(replicaStore *ReplicaStore) *EventDAG
func (*EventDAG) MarshalJSON ¶
type EventNode ¶
type EventNode struct {
Event *Event `json:"event"`
ClockValue ClockValue `json:"-"`
Parents *EventNodeSet `json:"parents"`
Children *EventNodeSet `json:"children"`
// contains filtered or unexported fields
}
func NewEventNode ¶
func (*EventNode) AddParents ¶
func (*EventNode) SetClock ¶ added in v0.1.1
func (n *EventNode) SetClock(cv ClockValue)
type EventNodeSet ¶
type EventNodeSet struct {
// contains filtered or unexported fields
}
func NewEventNodeSet ¶
func NewEventNodeSet() *EventNodeSet
func (*EventNodeSet) Add ¶
func (d *EventNodeSet) Add(nid EventID)
func (*EventNodeSet) Clone ¶
func (d *EventNodeSet) Clone() *EventNodeSet
func (*EventNodeSet) Exists ¶
func (d *EventNodeSet) Exists(nid EventID) bool
func (*EventNodeSet) Iter ¶
func (d *EventNodeSet) Iter() []EventID
func (*EventNodeSet) MarshalJSON ¶
func (d *EventNodeSet) MarshalJSON() ([]byte, error)
func (*EventNodeSet) Size ¶
func (d *EventNodeSet) Size() int
type EventType ¶
type EventType interface {
// Clone copies the event type
Clone() EventType
// Type is a unique key for that event type
Type() string
// String should return a string representation of the event type
String() string
}
EventType abstract type for representing different types of events
type GenericEventType ¶
type GenericEventType struct {
// Marshalled parameters
Params map[string]string `json:"params"`
// Type of event for reference
// Eg: Commit
T string `json:"type"`
}
GenericEventType is the event type published by a replica It can be specific to the algorithm that is implemented
func NewGenericEventType ¶
func NewGenericEventType(params map[string]string, t string) *GenericEventType
NewGenericEventType instantiates GenericEventType
func (*GenericEventType) Clone ¶
func (g *GenericEventType) Clone() EventType
Clone returns a copy of the current GenericEventType
func (*GenericEventType) String ¶
func (g *GenericEventType) String() string
String returns a string representation of the event type
func (*GenericEventType) Type ¶
func (g *GenericEventType) Type() string
Type returns a unique key for GenericEventType
type GlobalClock ¶
type GlobalClock struct {
// contains filtered or unexported fields
}
func NewGlobalClock ¶
func NewGlobalClock(dag *EventDAG, messageStore *Map[string, *Message]) *GlobalClock
type List ¶ added in v0.1.5
type List[V any] struct { // contains filtered or unexported fields }
func NewEmptyList ¶ added in v0.1.5
type Map ¶ added in v0.1.3
type Map[T constraints.Ordered, V any] struct { // contains filtered or unexported fields }
func (*Map[T, V]) IterValues ¶ added in v0.1.3
func (s *Map[T, V]) IterValues() []V
func (*Map[T, V]) RandomValue ¶ added in v0.1.4
func (*Map[T, V]) RandomValueWithSource ¶ added in v0.1.4
type Message ¶
type Message struct {
From ReplicaID `json:"from"`
To ReplicaID `json:"to"`
Data []byte `json:"data"`
Type string `json:"type"`
ID MessageID `json:"id"`
Intercept bool `json:"intercept"`
ParsedMessage ParsedMessage `json:"-"`
Repr string `json:"repr"`
}
Message stores a message that has been intercepted between two replicas
func (*Message) Parse ¶
func (m *Message) Parse(parser MessageParser) error
type MessageParser ¶
type MessageParser interface {
Parse([]byte) (ParsedMessage, error)
}
type MessageReceiveEventType ¶
type MessageReceiveEventType struct {
// MessageID is the ID of the message received
MessageID MessageID
}
MessageReceiveEventType is the event type when a replica receives a message
func NewMessageReceiveEventType ¶
func NewMessageReceiveEventType(messageID string) *MessageReceiveEventType
NewMessageReceiveEventType instantiates MessageReceiveEventType
func (*MessageReceiveEventType) Clone ¶
func (r *MessageReceiveEventType) Clone() EventType
Clone returns a copy of the current MessageReceiveEventType
func (*MessageReceiveEventType) String ¶
func (r *MessageReceiveEventType) String() string
String returns a string representation of the event type
func (*MessageReceiveEventType) Type ¶
func (r *MessageReceiveEventType) Type() string
Type returns a unique key for MessageReceiveEventType
type MessageSendEventType ¶
type MessageSendEventType struct {
// MessageID of the message that was sent
MessageID MessageID
}
MessageSendEventType is the event type where a message is sent from the replica
func NewMessageSendEventType ¶
func NewMessageSendEventType(messageID string) *MessageSendEventType
NewMessageSendEventType instantiates MessageSendEventType
func (*MessageSendEventType) Clone ¶
func (s *MessageSendEventType) Clone() EventType
Clone returns a copy of the current MessageSendEventType
func (*MessageSendEventType) String ¶
func (s *MessageSendEventType) String() string
String returns a string representation of the event type
func (*MessageSendEventType) Type ¶
func (s *MessageSendEventType) Type() string
Type returns a unique key for MessageSendEventType
type ParsedMessage ¶
type ParsedMessage interface {
String() string
Clone() ParsedMessage
Marshal() ([]byte, error)
}
type Queue ¶ added in v0.1.3
type Queue[V Clonable] struct { *BaseService // contains filtered or unexported fields }
func (*Queue[V]) Flush ¶ added in v0.1.3
func (q *Queue[V]) Flush()
Flush clears the queue of all messages
type Replica ¶
type Replica struct {
ID ReplicaID `json:"id"`
Ready bool `json:"ready"`
Info map[string]interface{} `json:"info"`
Addr string `json:"addr"`
// contains filtered or unexported fields
}
Replica immutable representation of the attributes of a replica
type ReplicaID ¶
type ReplicaID string
ReplicaID is an identifier for the replica encoded as a string
type ReplicaLog ¶
type ReplicaLog struct {
// Params is a marhsalled params of the log message
Params map[string]string `json:"params"`
// Message is the message that was logged
Message string `json:"message"`
// Timestamp of the log
Timestamp int64 `json:"timestamp"`
// Replica which posted the log
Replica ReplicaID `json:"replica"`
}
ReplicaLog encapsulates a log message with the necessary attributes
type ReplicaLogQueue ¶
type ReplicaLogQueue struct {
*BaseService
// contains filtered or unexported fields
}
ReplicaLogQueue is the queue of log messages
func NewReplicaLogQueue ¶
func NewReplicaLogQueue(logger *log.Logger) *ReplicaLogQueue
NewReplicaLogQueue instantiates ReplicaLogQueue
func (*ReplicaLogQueue) Flush ¶
func (q *ReplicaLogQueue) Flush()
Flush erases the contents of the queue
func (*ReplicaLogQueue) Subscribe ¶
func (q *ReplicaLogQueue) Subscribe(label string) chan *ReplicaLog
Subscribe creates and returns a channel for the subscriber
type ReplicaLogStore ¶
type ReplicaLogStore struct {
// contains filtered or unexported fields
}
ReplicaLogStore stores the logs as a map indexed by the replica ID
func NewReplicaLogStore ¶
func NewReplicaLogStore() *ReplicaLogStore
NewReplicaLogStore instantiates a ReplicaLogStore
func (*ReplicaLogStore) Add ¶
func (store *ReplicaLogStore) Add(log *ReplicaLog)
Add adds to the log store
func (*ReplicaLogStore) GetLogs ¶
func (store *ReplicaLogStore) GetLogs(replica ReplicaID, from, to int) ([]*ReplicaLog, int)
GetLogs returns the list of logs for a replica where from <=index<to
func (*ReplicaLogStore) Reset ¶
func (store *ReplicaLogStore) Reset()
type ReplicaState ¶
type ReplicaStateStore ¶
type ReplicaStateStore struct {
// contains filtered or unexported fields
}
func NewReplicaStateStore ¶
func NewReplicaStateStore() *ReplicaStateStore
type ReplicaStore ¶
type ReplicaStore struct {
// contains filtered or unexported fields
}
ReplicaStore to store all replica information, thread safe
func NewReplicaStore ¶
func NewReplicaStore(size int) *ReplicaStore
NewReplicaStore creates an empty ReplicaStore
func (*ReplicaStore) Add ¶
func (s *ReplicaStore) Add(p *Replica)
Add adds or updates a replica to the store
func (*ReplicaStore) Cap ¶
func (s *ReplicaStore) Cap() int
Cap returns the set of replicas used for the test
func (*ReplicaStore) Count ¶
func (s *ReplicaStore) Count() int
Count returns the total number of replicas
func (*ReplicaStore) Get ¶
func (s *ReplicaStore) Get(id ReplicaID) (p *Replica, ok bool)
Get returns the replica and a bool indicating if it exists or not
func (*ReplicaStore) GetRandom ¶ added in v0.1.1
func (s *ReplicaStore) GetRandom() (*Replica, bool)
func (*ReplicaStore) Iter ¶
func (s *ReplicaStore) Iter() []*Replica
Iter returns a list of the existing replicas
func (*ReplicaStore) NumReady ¶
func (s *ReplicaStore) NumReady() int
NumReady returns the number of replicas with Ready attribute set to true
func (*ReplicaStore) ResetReady ¶
func (s *ReplicaStore) ResetReady()
ResetReady sets the Ready attribute of all replicas to false
type ReplicaTimeout ¶
type ReplicaTimeout struct {
Replica ReplicaID `json:"replica"`
Type string `json:"type"`
Duration time.Duration `json:"duration"`
}
func TimeoutFromParams ¶
func TimeoutFromParams(replica ReplicaID, params map[string]string) (*ReplicaTimeout, bool)
func (*ReplicaTimeout) Eq ¶
func (t *ReplicaTimeout) Eq(other *ReplicaTimeout) bool
func (*ReplicaTimeout) Key ¶ added in v0.1.3
func (t *ReplicaTimeout) Key() string
func (*ReplicaTimeout) MarshalJSON ¶
func (t *ReplicaTimeout) MarshalJSON() ([]byte, error)
type ReportLogs ¶ added in v0.1.3
type ReportLogs struct {
// contains filtered or unexported fields
}
func NewReportLogs ¶ added in v0.1.3
func NewReportLogs() *ReportLogs
func (*ReportLogs) GetLogs ¶ added in v0.1.3
func (r *ReportLogs) GetLogs(keyvals map[string]string, count int, from int) []*reportLog
func (*ReportLogs) Log ¶ added in v0.1.3
func (r *ReportLogs) Log(keyvals map[string]string)
type RestartableService ¶
RestartableService is a service which can be restarted
type Service ¶
type Service interface {
// Name of the service
Name() string
// Start to start the service
Start() error
// Running to indicate if the service is running
Running() bool
// Stop to stop the service
Stop() error
// Quit returns a channel which will be closed once the service stops running
QuitCh() <-chan struct{}
// SetLogger initializes the logger for the service
SetLogger(*log.Logger)
}
Service is any entity which runs on a separate thread
type TimeoutEndEventType ¶
type TimeoutEndEventType struct {
Timeout *ReplicaTimeout
}
func NewTimeoutEndEventType ¶
func NewTimeoutEndEventType(timeout *ReplicaTimeout) *TimeoutEndEventType
func (*TimeoutEndEventType) Clone ¶
func (te *TimeoutEndEventType) Clone() EventType
func (*TimeoutEndEventType) String ¶
func (te *TimeoutEndEventType) String() string
func (*TimeoutEndEventType) Type ¶
func (te *TimeoutEndEventType) Type() string
type TimeoutStartEventType ¶
type TimeoutStartEventType struct {
Timeout *ReplicaTimeout
}
func NewTimeoutStartEventType ¶
func NewTimeoutStartEventType(timeout *ReplicaTimeout) *TimeoutStartEventType
func (*TimeoutStartEventType) Clone ¶
func (ts *TimeoutStartEventType) Clone() EventType
func (*TimeoutStartEventType) String ¶
func (ts *TimeoutStartEventType) String() string
func (*TimeoutStartEventType) Type ¶
func (ts *TimeoutStartEventType) Type() string