Documentation
¶
Index ¶
- Variables
- type AnnotatedDAG
- type Annotation
- type BaseService
- type ClockValue
- type Clonable
- type Counter
- type Event
- type EventDAG
- func (d *EventDAG) AddNode(e *Event, parents []*Event)
- func (d *EventDAG) GetLatestNode(e *Event) (*Event, bool)
- func (d *EventDAG) GetNode(eid EventID) (*EventNode, bool)
- func (d *EventDAG) GetReceiveNode(e *Event) (*Event, bool)
- func (d *EventDAG) GetSendNode(e *Event) (*Event, bool)
- func (d *EventDAG) GetTimeoutEnd(e *Event) (*Event, bool)
- func (d *EventDAG) GetTimeoutStart(e *Event) (*Event, bool)
- func (d *EventDAG) MarshalJSON() ([]byte, error)
- type EventDAGExtension
- type EventID
- type EventNode
- func (n *EventNode) AddParents(parents []*EventNode)
- func (n *EventNode) Clone() *EventNode
- func (n *EventNode) GetNext() EventID
- func (n *EventNode) GetPrev() EventID
- func (n *EventNode) IsDirty() bool
- func (n *EventNode) Lt(other *EventNode) bool
- func (n *EventNode) MarkClean()
- func (n *EventNode) MarkDirty()
- func (n *EventNode) SetClock(cv ClockValue)
- func (n *EventNode) SetNext(next EventID)
- func (n *EventNode) SetPrev(prev EventID)
- type EventNodeSet
- type EventQueue
- type EventType
- type GenericEventType
- type GlobalClock
- type Message
- type MessageParser
- type MessageQueue
- func (q *MessageQueue) Add(m *Message)
- func (q *MessageQueue) Disable()
- func (q *MessageQueue) Enable()
- func (q *MessageQueue) Flush()
- func (q *MessageQueue) Pop() (*Message, bool)
- func (q *MessageQueue) Restart() error
- func (q *MessageQueue) Start() error
- func (q *MessageQueue) Stop() error
- func (q *MessageQueue) Subscribe(label string) chan *Message
- type MessageReceiveEventType
- type MessageSendEventType
- type MessageStore
- func (s *MessageStore) Add(m *Message) *Message
- func (s *MessageStore) Exists(id string) bool
- func (s *MessageStore) Get(id string) (*Message, bool)
- func (s *MessageStore) Iter() []*Message
- func (s *MessageStore) Remove(id string) *Message
- func (s *MessageStore) RemoveAll()
- func (s *MessageStore) Size() int
- type ParsedMessage
- type Replica
- type ReplicaID
- type ReplicaLog
- type ReplicaLogQueue
- type ReplicaLogStore
- type ReplicaState
- type ReplicaStateStore
- type ReplicaStore
- func (s *ReplicaStore) Add(p *Replica)
- func (s *ReplicaStore) Cap() int
- func (s *ReplicaStore) Count() int
- func (s *ReplicaStore) Get(id ReplicaID) (p *Replica, ok bool)
- func (s *ReplicaStore) GetRandom() (*Replica, bool)
- func (s *ReplicaStore) Iter() []*Replica
- func (s *ReplicaStore) NumReady() int
- func (s *ReplicaStore) ResetReady()
- type ReplicaTimeout
- type ReportStore
- type RestartableService
- type Service
- type Subscriber
- type TimeoutContext
- type TimeoutEndEventType
- type TimeoutStartEventType
- type TimeoutStore
- type VarSet
- func (v *VarSet) Exists(label string) bool
- func (v *VarSet) Get(label string) (interface{}, bool)
- func (v *VarSet) GetBool(label string) (bool, bool)
- func (v *VarSet) GetCounter(label string) (*Counter, bool)
- func (v *VarSet) GetInt(label string) (int, bool)
- func (v *VarSet) GetMessageSet(label string) (*MessageStore, bool)
- func (v *VarSet) GetString(label string) (string, bool)
- func (v *VarSet) NewMessageSet(label string)
- func (v *VarSet) Set(label string, value interface{})
- func (v *VarSet) SetCounter(label string)
Constants ¶
This section is empty.
Variables ¶
var ( ErrDuplicateSubs = errors.New("duplicate subscriber") ErrNoSubs = errors.New("subscriber does not exist") ErrNoData = errors.New("no data in message") ErrBadParser = errors.New("bad parser") DefaultSubsChSize = 10 )
var ( // ErrReplicaStoreFull is returned when more than the intended number of replicas register with the scheduler tool ErrReplicaStoreFull = errors.New("replica store is full") )
Functions ¶
This section is empty.
Types ¶
type AnnotatedDAG ¶ added in v0.1.1
type AnnotatedDAG struct {
*EventDAG
// contains filtered or unexported fields
}
func NewAnnotatedDAG ¶ added in v0.1.1
func NewAnnotatedDAG(dag *EventDAG) *AnnotatedDAG
func (*AnnotatedDAG) AnnotateNode ¶ added in v0.1.1
func (d *AnnotatedDAG) AnnotateNode(e *Event, a *Annotation)
func (*AnnotatedDAG) GetAnnotation ¶ added in v0.1.1
func (d *AnnotatedDAG) GetAnnotation(e *Event) (*Annotation, bool)
func (*AnnotatedDAG) RemoveAnnotations ¶ added in v0.1.1
func (d *AnnotatedDAG) RemoveAnnotations()
type Annotation ¶ added in v0.1.1
type Annotation struct{}
type BaseService ¶
BaseService provides the basic nuts an bolts needed to implement a service
func NewBaseService ¶
func NewBaseService(name string, parentLogger *log.Logger) *BaseService
NewBaseService instantiates BaseService
func (*BaseService) QuitCh ¶
func (b *BaseService) QuitCh() <-chan struct{}
QuitCh returns the quit channel which will be closed when the service stops running
func (*BaseService) StartRunning ¶
func (b *BaseService) StartRunning()
StartRunning is called to set the running flag
func (*BaseService) StopRunning ¶
func (b *BaseService) StopRunning()
StopRunning is called to unset the running flag
type ClockValue ¶
type ClockValue []float64
func ZeroClock ¶ added in v0.1.1
func ZeroClock(replicas int) ClockValue
func (ClockValue) Eq ¶
func (c ClockValue) Eq(other ClockValue) bool
func (ClockValue) Lt ¶
func (c ClockValue) Lt(other ClockValue) bool
type Clonable ¶
type Clonable interface {
Clone() Clonable
}
Clonable is any type which returns a copy of itself on Clone()
type Counter ¶ added in v0.1.1
type Counter struct {
// contains filtered or unexported fields
}
Counter threadsafe counter
type Event ¶
type Event struct {
// Replica at which the event occurs
Replica ReplicaID `json:"replica"`
// Type of the event
Type EventType `json:"-"`
// TypeS is the string representation of the event
TypeS string `json:"type"`
// ID unique identifier assigned for every new event
ID EventID `json:"id"`
// Timestamp of the event
Timestamp int64 `json:"timestamp"`
}
Event is a generic event that occurs at a replica
func (*Event) IsMessageReceive ¶
func (*Event) IsMessageSend ¶
func (*Event) IsTimeoutEnd ¶
func (*Event) IsTimeoutStart ¶
func (*Event) Timeout ¶
func (e *Event) Timeout() (*ReplicaTimeout, bool)
type EventDAG ¶
type EventDAG struct {
// contains filtered or unexported fields
}
func NewEventDag ¶
func NewEventDag(replicaStore *ReplicaStore) *EventDAG
func (*EventDAG) MarshalJSON ¶
type EventDAGExtension ¶ added in v0.1.1
type EventDAGExtension struct {
*EventDAG
// contains filtered or unexported fields
}
type EventNode ¶
type EventNode struct {
Event *Event `json:"event"`
ClockValue ClockValue `json:"-"`
Parents *EventNodeSet `json:"parents"`
Children *EventNodeSet `json:"children"`
// contains filtered or unexported fields
}
func NewEventNode ¶
func (*EventNode) AddParents ¶
func (*EventNode) SetClock ¶ added in v0.1.1
func (n *EventNode) SetClock(cv ClockValue)
type EventNodeSet ¶
type EventNodeSet struct {
// contains filtered or unexported fields
}
func NewEventNodeSet ¶
func NewEventNodeSet() *EventNodeSet
func (*EventNodeSet) Add ¶
func (d *EventNodeSet) Add(nid EventID)
func (*EventNodeSet) Clone ¶
func (d *EventNodeSet) Clone() *EventNodeSet
func (*EventNodeSet) Exists ¶
func (d *EventNodeSet) Exists(nid EventID) bool
func (*EventNodeSet) Iter ¶
func (d *EventNodeSet) Iter() []EventID
func (*EventNodeSet) MarshalJSON ¶
func (d *EventNodeSet) MarshalJSON() ([]byte, error)
func (*EventNodeSet) Size ¶
func (d *EventNodeSet) Size() int
type EventQueue ¶
type EventQueue struct {
*BaseService
// contains filtered or unexported fields
}
EventQueue datastructure to store the messages in a FIFO queue
func NewEventQueue ¶
func NewEventQueue(logger *log.Logger) *EventQueue
NewEventQueue returns an empty EventQueue
func (*EventQueue) Subscribe ¶
func (q *EventQueue) Subscribe(label string) chan *Event
Subscribe creates and returns a channel for the subscriber with the given label
type EventType ¶
type EventType interface {
// Clone copies the event type
Clone() EventType
// Type is a unique key for that event type
Type() string
// String should return a string representation of the event type
String() string
}
EventType abstract type for representing different types of events
type GenericEventType ¶
type GenericEventType struct {
// Marshalled parameters
Params map[string]string `json:"params"`
// Type of event for reference
// Eg: Commit
T string `json:"type"`
}
GenericEventType is the event type published by a replica It can be specific to the algorithm that is implemented
func NewGenericEventType ¶
func NewGenericEventType(params map[string]string, t string) *GenericEventType
NewGenericEventType instantiates GenericEventType
func (*GenericEventType) Clone ¶
func (g *GenericEventType) Clone() EventType
Clone returns a copy of the current GenericEventType
func (*GenericEventType) String ¶
func (g *GenericEventType) String() string
String returns a string representation of the event type
func (*GenericEventType) Type ¶
func (g *GenericEventType) Type() string
Type returns a unique key for GenericEventType
type GlobalClock ¶
type GlobalClock struct {
// contains filtered or unexported fields
}
func NewGlobalClock ¶
func NewGlobalClock(dag *EventDAG, messageStore *MessageStore) *GlobalClock
type Message ¶
type Message struct {
From ReplicaID `json:"from"`
To ReplicaID `json:"to"`
Data []byte `json:"data"`
Type string `json:"type"`
ID string `json:"id"`
Intercept bool `json:"intercept"`
ParsedMessage ParsedMessage `json:"-"`
Repr string `json:"repr"`
}
Message stores a message that has been interecepted between two replicas
func (*Message) Parse ¶
func (m *Message) Parse(parser MessageParser) error
type MessageParser ¶
type MessageParser interface {
Parse([]byte) (ParsedMessage, error)
}
type MessageQueue ¶
type MessageQueue struct {
*BaseService
// contains filtered or unexported fields
}
MessageQueue datastructure to store the messages in a FIFO queue
func NewMessageQueue ¶
func NewMessageQueue(logger *log.Logger) *MessageQueue
NewMessageQueue returns an empty MessageQueue
func (*MessageQueue) Disable ¶
func (q *MessageQueue) Disable()
Disable closes the queue and drops all incoming messages Use with caution
func (*MessageQueue) Enable ¶
func (q *MessageQueue) Enable()
Enable enqueues the messages and feeds it to the subscribers if any
func (*MessageQueue) Pop ¶
func (q *MessageQueue) Pop() (*Message, bool)
func (*MessageQueue) Subscribe ¶
func (q *MessageQueue) Subscribe(label string) chan *Message
Subscribe create and returns a channel for the subscriber with the specified label
type MessageReceiveEventType ¶
type MessageReceiveEventType struct {
// MessageID is the ID of the message received
MessageID string
}
MessageReceiveEventType is the event type when a replica receives a message
func NewMessageReceiveEventType ¶
func NewMessageReceiveEventType(messageID string) *MessageReceiveEventType
NewMessageReceiveEventType instantiates MessageReceiveEventType
func (*MessageReceiveEventType) Clone ¶
func (r *MessageReceiveEventType) Clone() EventType
Clone returns a copy of the current MessageReceiveEventType
func (*MessageReceiveEventType) String ¶
func (r *MessageReceiveEventType) String() string
String returns a string representation of the event type
func (*MessageReceiveEventType) Type ¶
func (r *MessageReceiveEventType) Type() string
Type returns a unique key for MessageReceiveEventType
type MessageSendEventType ¶
type MessageSendEventType struct {
// MessageID of the message that was sent
MessageID string
}
MessageSendEventType is the event type where a message is sent from the replica
func NewMessageSendEventType ¶
func NewMessageSendEventType(messageID string) *MessageSendEventType
NewMessageSendEventType instantiates MessageSendEventType
func (*MessageSendEventType) Clone ¶
func (s *MessageSendEventType) Clone() EventType
Clone returns a copy of the current MessageSendEventType
func (*MessageSendEventType) String ¶
func (s *MessageSendEventType) String() string
String returns a string representation of the event type
func (*MessageSendEventType) Type ¶
func (s *MessageSendEventType) Type() string
Type returns a unique key for MessageSendEventType
type MessageStore ¶
type MessageStore struct {
// contains filtered or unexported fields
}
MessageStore to store the messages. Thread safe
func NewMessageStore ¶
func NewMessageStore() *MessageStore
NewMessageStore creates a empty MessageStore
func (*MessageStore) Add ¶
func (s *MessageStore) Add(m *Message) *Message
Add adds a message to the store Returns any old message with the same ID if it exists or nil if not
func (*MessageStore) Exists ¶
func (s *MessageStore) Exists(id string) bool
Exists returns true if the message exists
func (*MessageStore) Get ¶
func (s *MessageStore) Get(id string) (*Message, bool)
Get returns a message and bool indicating if the message exists
func (*MessageStore) Iter ¶
func (s *MessageStore) Iter() []*Message
Iter returns a list of all the messages in the store
func (*MessageStore) Remove ¶
func (s *MessageStore) Remove(id string) *Message
Remove returns and deleted the message from the store if it exists. Returns nil otherwise
func (*MessageStore) RemoveAll ¶
func (s *MessageStore) RemoveAll()
RemoveAll empties the message store
func (*MessageStore) Size ¶
func (s *MessageStore) Size() int
Size returns the size of the message store
type ParsedMessage ¶
type ParsedMessage interface {
String() string
Clone() ParsedMessage
Marshal() ([]byte, error)
}
type Replica ¶
type Replica struct {
ID ReplicaID `json:"id"`
Ready bool `json:"ready"`
Info map[string]interface{} `json:"info"`
Addr string `json:"addr"`
// contains filtered or unexported fields
}
Replica immutable representation of the attributes of a replica
type ReplicaID ¶
type ReplicaID string
ReplicaID is an identifier for the replica encoded as a string
type ReplicaLog ¶
type ReplicaLog struct {
// Params is a marhsalled params of the log message
Params map[string]string `json:"params"`
// Message is the message that was logged
Message string `json:"message"`
// Timestamp of the log
Timestamp int64 `json:"timestamp"`
// Replica which posted the log
Replica ReplicaID `json:"replica"`
}
ReplicaLog encapsulates a log message with the necessary attributes
type ReplicaLogQueue ¶
type ReplicaLogQueue struct {
*BaseService
// contains filtered or unexported fields
}
ReplicaLogQueue is the queue of log messages
func NewReplicaLogQueue ¶
func NewReplicaLogQueue(logger *log.Logger) *ReplicaLogQueue
NewReplicaLogQueue instantiates ReplicaLogQueue
func (*ReplicaLogQueue) Flush ¶
func (q *ReplicaLogQueue) Flush()
Flush erases the contents of the queue
func (*ReplicaLogQueue) Subscribe ¶
func (q *ReplicaLogQueue) Subscribe(label string) chan *ReplicaLog
Subscribe creates and returns a channel for the subscriber
type ReplicaLogStore ¶
type ReplicaLogStore struct {
// contains filtered or unexported fields
}
ReplicaLogStore stores the logs as a map indexed by the replica ID
func NewReplicaLogStore ¶
func NewReplicaLogStore() *ReplicaLogStore
NewReplicaLogStore instantiates a ReplicaLogStore
func (*ReplicaLogStore) Add ¶
func (store *ReplicaLogStore) Add(log *ReplicaLog)
Add adds to the log store
func (*ReplicaLogStore) GetLogs ¶
func (store *ReplicaLogStore) GetLogs(replica ReplicaID, from, to int) ([]*ReplicaLog, int)
GetLogs returns the list of logs for a replica where from <=index<to
func (*ReplicaLogStore) Reset ¶
func (store *ReplicaLogStore) Reset()
type ReplicaState ¶
type ReplicaStateStore ¶
type ReplicaStateStore struct {
// contains filtered or unexported fields
}
func NewReplicaStateStore ¶
func NewReplicaStateStore() *ReplicaStateStore
type ReplicaStore ¶
type ReplicaStore struct {
// contains filtered or unexported fields
}
ReplicaStore to store all replica information, thread safe
func NewReplicaStore ¶
func NewReplicaStore(size int) *ReplicaStore
NewReplicaStore creates an empty ReplicaStore
func (*ReplicaStore) Add ¶
func (s *ReplicaStore) Add(p *Replica)
Add adds or updates a replica to the store
func (*ReplicaStore) Cap ¶
func (s *ReplicaStore) Cap() int
Cap returns the set of replicas used for the test
func (*ReplicaStore) Count ¶
func (s *ReplicaStore) Count() int
Count returns the total number of replicas
func (*ReplicaStore) Get ¶
func (s *ReplicaStore) Get(id ReplicaID) (p *Replica, ok bool)
Get returns the replica and a bool indicating if it exists or not
func (*ReplicaStore) GetRandom ¶ added in v0.1.1
func (s *ReplicaStore) GetRandom() (*Replica, bool)
func (*ReplicaStore) Iter ¶
func (s *ReplicaStore) Iter() []*Replica
Iter returns a list of the existing replicas
func (*ReplicaStore) NumReady ¶
func (s *ReplicaStore) NumReady() int
NumReady returns the number of replicas with Ready attribute set to true
func (*ReplicaStore) ResetReady ¶
func (s *ReplicaStore) ResetReady()
ResetReady sets the Ready attribute of all replicas to false
type ReplicaTimeout ¶
type ReplicaTimeout struct {
Replica ReplicaID `json:"replica"`
Type string `json:"type"`
Duration time.Duration `json:"duration"`
}
func TimeoutFromParams ¶
func TimeoutFromParams(replica ReplicaID, params map[string]string) (*ReplicaTimeout, bool)
func (*ReplicaTimeout) Eq ¶
func (t *ReplicaTimeout) Eq(other *ReplicaTimeout) bool
func (*ReplicaTimeout) MarshalJSON ¶
func (t *ReplicaTimeout) MarshalJSON() ([]byte, error)
type ReportStore ¶ added in v0.1.1
type ReportStore struct {
// contains filtered or unexported fields
}
func NewReportStore ¶ added in v0.1.1
func NewReportStore() *ReportStore
func (*ReportStore) GetLogs ¶ added in v0.1.1
func (r *ReportStore) GetLogs(keyvals map[string]string, count int, from int) []*reportLog
func (*ReportStore) Log ¶ added in v0.1.1
func (r *ReportStore) Log(keyvals map[string]string)
type RestartableService ¶
RestartableService is a service which can be restarted
type Service ¶
type Service interface {
// Name of the service
Name() string
// Start to start the service
Start() error
// Running to indicate if the service is running
Running() bool
// Stop to stop the service
Stop() error
// Quit returns a channel which will be closed once the service stops running
QuitCh() <-chan struct{}
}
Service is any entity which runs on a separate thread
type Subscriber ¶
type Subscriber struct {
Ch chan interface{}
}
Generic subscriber to maintain state of the subsciber
type TimeoutContext ¶
type TimeoutContext struct {
PendingTimeouts map[string]*timeoutWrapper
PendingReceives map[string]*Event
// contains filtered or unexported fields
}
func NewTimeoutContext ¶
func NewTimeoutContext(d *EventDAG) *TimeoutContext
func (*TimeoutContext) AddEvent ¶
func (t *TimeoutContext) AddEvent(e *Event)
func (*TimeoutContext) CanDeliverMessages ¶
func (t *TimeoutContext) CanDeliverMessages(messages []*Message)
type TimeoutEndEventType ¶
type TimeoutEndEventType struct {
Timeout *ReplicaTimeout
}
func NewTimeoutEndEventType ¶
func NewTimeoutEndEventType(timeout *ReplicaTimeout) *TimeoutEndEventType
func (*TimeoutEndEventType) Clone ¶
func (te *TimeoutEndEventType) Clone() EventType
func (*TimeoutEndEventType) String ¶
func (te *TimeoutEndEventType) String() string
func (*TimeoutEndEventType) Type ¶
func (te *TimeoutEndEventType) Type() string
type TimeoutStartEventType ¶
type TimeoutStartEventType struct {
Timeout *ReplicaTimeout
}
func NewTimeoutStartEventType ¶
func NewTimeoutStartEventType(timeout *ReplicaTimeout) *TimeoutStartEventType
func (*TimeoutStartEventType) Clone ¶
func (ts *TimeoutStartEventType) Clone() EventType
func (*TimeoutStartEventType) String ¶
func (ts *TimeoutStartEventType) String() string
func (*TimeoutStartEventType) Type ¶
func (ts *TimeoutStartEventType) Type() string
type TimeoutStore ¶
type TimeoutStore struct {
*BaseService
// contains filtered or unexported fields
}
func NewTimeoutStore ¶
func NewTimeoutStore(logger *log.Logger) *TimeoutStore
func (*TimeoutStore) AddTimeout ¶
func (s *TimeoutStore) AddTimeout(t *ReplicaTimeout)
func (*TimeoutStore) Reset ¶
func (s *TimeoutStore) Reset()
func (*TimeoutStore) Start ¶
func (s *TimeoutStore) Start() error
func (*TimeoutStore) Stop ¶
func (s *TimeoutStore) Stop() error
func (*TimeoutStore) ToDispatch ¶
func (s *TimeoutStore) ToDispatch() []*ReplicaTimeout
type VarSet ¶ added in v0.1.1
type VarSet struct {
// contains filtered or unexported fields
}
VarSet is a dictionary for storing auxilliary state during the execution of the testcase VarSet is stored in the context passed to actions and conditions
func (*VarSet) Exists ¶ added in v0.1.1
Exists returns true if there is a variable of the specified key
func (*VarSet) Get ¶ added in v0.1.1
Get returns the value stored of the specified label the second return argument is false if the label does not exist
func (*VarSet) GetBool ¶ added in v0.1.1
GetBool casts the value at label (if it exists) into boolean and returns it
func (*VarSet) GetCounter ¶ added in v0.1.1
GetCounter returns the counter at the label if it exists (nil, false) otherwise
func (*VarSet) GetInt ¶ added in v0.1.1
GetInt casts the value at label (if it exists) into integer and returns it
func (*VarSet) GetMessageSet ¶ added in v0.1.1
func (v *VarSet) GetMessageSet(label string) (*MessageStore, bool)
GetMessageSet returns the message set at label if one exists (nil, false) otherwise
func (*VarSet) GetString ¶ added in v0.1.1
GetString casts the value at label (if it exists) into string and returns it
func (*VarSet) NewMessageSet ¶ added in v0.1.1
NewMessageSet creates a message set at the specified label
func (*VarSet) SetCounter ¶ added in v0.1.1
SetCounter sets a counter instance at the specified label with initial value 1