types

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2022 License: MIT Imports: 8 Imported by: 12

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDuplicateSubs = errors.New("duplicate subscriber")
	ErrNoSubs        = errors.New("subscriber does not exist")
	ErrNoData        = errors.New("no data in message")
	ErrBadParser     = errors.New("bad parser")

	DefaultSubsChSize = 10
)
View Source
var (
	// ErrReplicaStoreFull is returned when more than the intended number of replicas register with the scheduler tool
	ErrReplicaStoreFull = errors.New("replica store is full")
)

Functions

This section is empty.

Types

type AnnotatedDAG added in v0.1.1

type AnnotatedDAG struct {
	*EventDAG
	// contains filtered or unexported fields
}

func NewAnnotatedDAG added in v0.1.1

func NewAnnotatedDAG(dag *EventDAG) *AnnotatedDAG

func (*AnnotatedDAG) AnnotateNode added in v0.1.1

func (d *AnnotatedDAG) AnnotateNode(e *Event, a *Annotation)

func (*AnnotatedDAG) GetAnnotation added in v0.1.1

func (d *AnnotatedDAG) GetAnnotation(e *Event) (*Annotation, bool)

func (*AnnotatedDAG) RemoveAnnotations added in v0.1.1

func (d *AnnotatedDAG) RemoveAnnotations()

type Annotation added in v0.1.1

type Annotation struct{}

type BaseService

type BaseService struct {
	Logger *log.Logger
	// contains filtered or unexported fields
}

BaseService provides the basic nuts an bolts needed to implement a service

func NewBaseService

func NewBaseService(name string, parentLogger *log.Logger) *BaseService

NewBaseService instantiates BaseService

func (*BaseService) Name

func (b *BaseService) Name() string

Name returns the name of the service

func (*BaseService) QuitCh

func (b *BaseService) QuitCh() <-chan struct{}

QuitCh returns the quit channel which will be closed when the service stops running

func (*BaseService) Running

func (b *BaseService) Running() bool

Running returns the flag

func (*BaseService) StartRunning

func (b *BaseService) StartRunning()

StartRunning is called to set the running flag

func (*BaseService) StopRunning

func (b *BaseService) StopRunning()

StopRunning is called to unset the running flag

type ClockValue

type ClockValue []float64

func ZeroClock added in v0.1.1

func ZeroClock(replicas int) ClockValue

func (ClockValue) Eq

func (c ClockValue) Eq(other ClockValue) bool

func (ClockValue) Lt

func (c ClockValue) Lt(other ClockValue) bool

type Clonable

type Clonable interface {
	Clone() Clonable
}

Clonable is any type which returns a copy of itself on Clone()

type Counter added in v0.1.1

type Counter struct {
	// contains filtered or unexported fields
}

Counter threadsafe counter

func NewCounter added in v0.1.1

func NewCounter() *Counter

NewCounter returns a counter

func (*Counter) Incr added in v0.1.1

func (c *Counter) Incr()

Incr increments the counter

func (*Counter) SetValue added in v0.1.1

func (c *Counter) SetValue(v int)

SetValue sets the counter to the specified value

func (*Counter) Value added in v0.1.1

func (c *Counter) Value() int

Value returns the counter value

type Event

type Event struct {
	// Replica at which the event occurs
	Replica ReplicaID `json:"replica"`
	// Type of the event
	Type EventType `json:"-"`
	// TypeS is the string representation of the event
	TypeS string `json:"type"`
	// ID unique identifier assigned for every new event
	ID EventID `json:"id"`
	// Timestamp of the event
	Timestamp int64 `json:"timestamp"`
}

Event is a generic event that occurs at a replica

func NewEvent

func NewEvent(replica ReplicaID, t EventType, ts string, id EventID, time int64) *Event

func (*Event) Clone

func (e *Event) Clone() Clonable

Clone implements Clonable

func (*Event) IsMessageReceive

func (e *Event) IsMessageReceive() bool

func (*Event) IsMessageSend

func (e *Event) IsMessageSend() bool

func (*Event) IsTimeoutEnd

func (e *Event) IsTimeoutEnd() bool

func (*Event) IsTimeoutStart

func (e *Event) IsTimeoutStart() bool

func (*Event) MessageID

func (e *Event) MessageID() (string, bool)

func (*Event) Timeout

func (e *Event) Timeout() (*ReplicaTimeout, bool)

type EventDAG

type EventDAG struct {
	// contains filtered or unexported fields
}

func NewEventDag

func NewEventDag(replicaStore *ReplicaStore) *EventDAG

func (*EventDAG) AddNode

func (d *EventDAG) AddNode(e *Event, parents []*Event)

func (*EventDAG) GetLatestNode

func (d *EventDAG) GetLatestNode(e *Event) (*Event, bool)

func (*EventDAG) GetNode

func (d *EventDAG) GetNode(eid EventID) (*EventNode, bool)

func (*EventDAG) GetReceiveNode

func (d *EventDAG) GetReceiveNode(e *Event) (*Event, bool)

func (*EventDAG) GetSendNode

func (d *EventDAG) GetSendNode(e *Event) (*Event, bool)

func (*EventDAG) GetTimeoutEnd

func (d *EventDAG) GetTimeoutEnd(e *Event) (*Event, bool)

func (*EventDAG) GetTimeoutStart

func (d *EventDAG) GetTimeoutStart(e *Event) (*Event, bool)

func (*EventDAG) MarshalJSON

func (d *EventDAG) MarshalJSON() ([]byte, error)

type EventDAGExtension added in v0.1.1

type EventDAGExtension struct {
	*EventDAG
	// contains filtered or unexported fields
}

type EventID added in v0.1.1

type EventID uint64

type EventNode

type EventNode struct {
	Event      *Event     `json:"event"`
	ClockValue ClockValue `json:"-"`

	Parents  *EventNodeSet `json:"parents"`
	Children *EventNodeSet `json:"children"`
	// contains filtered or unexported fields
}

func NewEventNode

func NewEventNode(e *Event) *EventNode

func (*EventNode) AddParents

func (n *EventNode) AddParents(parents []*EventNode)

func (*EventNode) Clone

func (n *EventNode) Clone() *EventNode

func (*EventNode) GetNext

func (n *EventNode) GetNext() EventID

func (*EventNode) GetPrev

func (n *EventNode) GetPrev() EventID

func (*EventNode) IsDirty

func (n *EventNode) IsDirty() bool

func (*EventNode) Lt added in v0.1.1

func (n *EventNode) Lt(other *EventNode) bool

func (*EventNode) MarkClean

func (n *EventNode) MarkClean()

func (*EventNode) MarkDirty

func (n *EventNode) MarkDirty()

func (*EventNode) SetClock added in v0.1.1

func (n *EventNode) SetClock(cv ClockValue)

func (*EventNode) SetNext

func (n *EventNode) SetNext(next EventID)

func (*EventNode) SetPrev

func (n *EventNode) SetPrev(prev EventID)

type EventNodeSet

type EventNodeSet struct {
	// contains filtered or unexported fields
}

func NewEventNodeSet

func NewEventNodeSet() *EventNodeSet

func (*EventNodeSet) Add

func (d *EventNodeSet) Add(nid EventID)

func (*EventNodeSet) Clone

func (d *EventNodeSet) Clone() *EventNodeSet

func (*EventNodeSet) Exists

func (d *EventNodeSet) Exists(nid EventID) bool

func (*EventNodeSet) Iter

func (d *EventNodeSet) Iter() []EventID

func (*EventNodeSet) MarshalJSON

func (d *EventNodeSet) MarshalJSON() ([]byte, error)

func (*EventNodeSet) Size

func (d *EventNodeSet) Size() int

type EventQueue

type EventQueue struct {
	*BaseService
	// contains filtered or unexported fields
}

EventQueue datastructure to store the messages in a FIFO queue

func NewEventQueue

func NewEventQueue(logger *log.Logger) *EventQueue

NewEventQueue returns an empty EventQueue

func (*EventQueue) Add

func (q *EventQueue) Add(m *Event)

Add adds a message to the queue

func (*EventQueue) Flush

func (q *EventQueue) Flush()

Flush clears the queue of all messages

func (*EventQueue) Restart

func (q *EventQueue) Restart() error

Restart implements Service

func (*EventQueue) Start

func (q *EventQueue) Start() error

Start implements Service

func (*EventQueue) Stop

func (q *EventQueue) Stop() error

Stop implements Service

func (*EventQueue) Subscribe

func (q *EventQueue) Subscribe(label string) chan *Event

Subscribe creates and returns a channel for the subscriber with the given label

type EventType

type EventType interface {
	// Clone copies the event type
	Clone() EventType
	// Type is a unique key for that event type
	Type() string
	// String should return a string representation of the event type
	String() string
}

EventType abstract type for representing different types of events

type GenericEventType

type GenericEventType struct {
	// Marshalled parameters
	Params map[string]string `json:"params"`
	// Type of event for reference
	// Eg: Commit
	T string `json:"type"`
}

GenericEventType is the event type published by a replica It can be specific to the algorithm that is implemented

func NewGenericEventType

func NewGenericEventType(params map[string]string, t string) *GenericEventType

NewGenericEventType instantiates GenericEventType

func (*GenericEventType) Clone

func (g *GenericEventType) Clone() EventType

Clone returns a copy of the current GenericEventType

func (*GenericEventType) String

func (g *GenericEventType) String() string

String returns a string representation of the event type

func (*GenericEventType) Type

func (g *GenericEventType) Type() string

Type returns a unique key for GenericEventType

type GlobalClock

type GlobalClock struct {
	// contains filtered or unexported fields
}

func NewGlobalClock

func NewGlobalClock(dag *EventDAG, messageStore *MessageStore) *GlobalClock

type Message

type Message struct {
	From          ReplicaID     `json:"from"`
	To            ReplicaID     `json:"to"`
	Data          []byte        `json:"data"`
	Type          string        `json:"type"`
	ID            string        `json:"id"`
	Intercept     bool          `json:"intercept"`
	ParsedMessage ParsedMessage `json:"-"`
	Repr          string        `json:"repr"`
}

Message stores a message that has been interecepted between two replicas

func (*Message) Clone

func (m *Message) Clone() *Message

Clone to create a new Message object with the same attributes

func (*Message) Parse

func (m *Message) Parse(parser MessageParser) error

type MessageParser

type MessageParser interface {
	Parse([]byte) (ParsedMessage, error)
}

type MessageQueue

type MessageQueue struct {
	*BaseService
	// contains filtered or unexported fields
}

MessageQueue datastructure to store the messages in a FIFO queue

func NewMessageQueue

func NewMessageQueue(logger *log.Logger) *MessageQueue

NewMessageQueue returns an empty MessageQueue

func (*MessageQueue) Add

func (q *MessageQueue) Add(m *Message)

Add adds a message to the queue

func (*MessageQueue) Disable

func (q *MessageQueue) Disable()

Disable closes the queue and drops all incoming messages Use with caution

func (*MessageQueue) Enable

func (q *MessageQueue) Enable()

Enable enqueues the messages and feeds it to the subscribers if any

func (*MessageQueue) Flush

func (q *MessageQueue) Flush()

Flush clears the queue of all messages

func (*MessageQueue) Pop

func (q *MessageQueue) Pop() (*Message, bool)

func (*MessageQueue) Restart

func (q *MessageQueue) Restart() error

Restart implements Service

func (*MessageQueue) Start

func (q *MessageQueue) Start() error

Start implements Service

func (*MessageQueue) Stop

func (q *MessageQueue) Stop() error

Stop implements Service

func (*MessageQueue) Subscribe

func (q *MessageQueue) Subscribe(label string) chan *Message

Subscribe create and returns a channel for the subscriber with the specified label

type MessageReceiveEventType

type MessageReceiveEventType struct {
	// MessageID is the ID of the message received
	MessageID string
}

MessageReceiveEventType is the event type when a replica receives a message

func NewMessageReceiveEventType

func NewMessageReceiveEventType(messageID string) *MessageReceiveEventType

NewMessageReceiveEventType instantiates MessageReceiveEventType

func (*MessageReceiveEventType) Clone

Clone returns a copy of the current MessageReceiveEventType

func (*MessageReceiveEventType) String

func (r *MessageReceiveEventType) String() string

String returns a string representation of the event type

func (*MessageReceiveEventType) Type

func (r *MessageReceiveEventType) Type() string

Type returns a unique key for MessageReceiveEventType

type MessageSendEventType

type MessageSendEventType struct {
	// MessageID of the message that was sent
	MessageID string
}

MessageSendEventType is the event type where a message is sent from the replica

func NewMessageSendEventType

func NewMessageSendEventType(messageID string) *MessageSendEventType

NewMessageSendEventType instantiates MessageSendEventType

func (*MessageSendEventType) Clone

func (s *MessageSendEventType) Clone() EventType

Clone returns a copy of the current MessageSendEventType

func (*MessageSendEventType) String

func (s *MessageSendEventType) String() string

String returns a string representation of the event type

func (*MessageSendEventType) Type

func (s *MessageSendEventType) Type() string

Type returns a unique key for MessageSendEventType

type MessageStore

type MessageStore struct {
	// contains filtered or unexported fields
}

MessageStore to store the messages. Thread safe

func NewMessageStore

func NewMessageStore() *MessageStore

NewMessageStore creates a empty MessageStore

func (*MessageStore) Add

func (s *MessageStore) Add(m *Message) *Message

Add adds a message to the store Returns any old message with the same ID if it exists or nil if not

func (*MessageStore) Exists

func (s *MessageStore) Exists(id string) bool

Exists returns true if the message exists

func (*MessageStore) Get

func (s *MessageStore) Get(id string) (*Message, bool)

Get returns a message and bool indicating if the message exists

func (*MessageStore) Iter

func (s *MessageStore) Iter() []*Message

Iter returns a list of all the messages in the store

func (*MessageStore) Remove

func (s *MessageStore) Remove(id string) *Message

Remove returns and deleted the message from the store if it exists. Returns nil otherwise

func (*MessageStore) RemoveAll

func (s *MessageStore) RemoveAll()

RemoveAll empties the message store

func (*MessageStore) Size

func (s *MessageStore) Size() int

Size returns the size of the message store

type ParsedMessage

type ParsedMessage interface {
	String() string
	Clone() ParsedMessage
	Marshal() ([]byte, error)
}

type Replica

type Replica struct {
	ID    ReplicaID              `json:"id"`
	Ready bool                   `json:"ready"`
	Info  map[string]interface{} `json:"info"`
	Addr  string                 `json:"addr"`
	// contains filtered or unexported fields
}

Replica immutable representation of the attributes of a replica

type ReplicaID

type ReplicaID string

ReplicaID is an identifier for the replica encoded as a string

type ReplicaLog

type ReplicaLog struct {
	// Params is a marhsalled params of the log message
	Params map[string]string `json:"params"`
	// Message is the message that was logged
	Message string `json:"message"`
	// Timestamp of the log
	Timestamp int64 `json:"timestamp"`
	// Replica which posted the log
	Replica ReplicaID `json:"replica"`
}

ReplicaLog encapsulates a log message with the necessary attributes

func (*ReplicaLog) Clone

func (l *ReplicaLog) Clone() Clonable

Clone implements Clonable

type ReplicaLogQueue

type ReplicaLogQueue struct {
	*BaseService
	// contains filtered or unexported fields
}

ReplicaLogQueue is the queue of log messages

func NewReplicaLogQueue

func NewReplicaLogQueue(logger *log.Logger) *ReplicaLogQueue

NewReplicaLogQueue instantiates ReplicaLogQueue

func (*ReplicaLogQueue) Add

func (q *ReplicaLogQueue) Add(log *ReplicaLog)

Add adds to the queue

func (*ReplicaLogQueue) Flush

func (q *ReplicaLogQueue) Flush()

Flush erases the contents of the queue

func (*ReplicaLogQueue) Start

func (q *ReplicaLogQueue) Start()

Start implements Service

func (*ReplicaLogQueue) Stop

func (q *ReplicaLogQueue) Stop()

Stop implements Service

func (*ReplicaLogQueue) Subscribe

func (q *ReplicaLogQueue) Subscribe(label string) chan *ReplicaLog

Subscribe creates and returns a channel for the subscriber

type ReplicaLogStore

type ReplicaLogStore struct {
	// contains filtered or unexported fields
}

ReplicaLogStore stores the logs as a map indexed by the replica ID

func NewReplicaLogStore

func NewReplicaLogStore() *ReplicaLogStore

NewReplicaLogStore instantiates a ReplicaLogStore

func (*ReplicaLogStore) Add

func (store *ReplicaLogStore) Add(log *ReplicaLog)

Add adds to the log store

func (*ReplicaLogStore) GetLogs

func (store *ReplicaLogStore) GetLogs(replica ReplicaID, from, to int) ([]*ReplicaLog, int)

GetLogs returns the list of logs for a replica where from <=index<to

func (*ReplicaLogStore) Reset

func (store *ReplicaLogStore) Reset()

type ReplicaState

type ReplicaState struct {
	State     string    `json:"state"`
	Timestamp int64     `json:"timestamp"`
	Replica   ReplicaID `json:"replica"`
}

type ReplicaStateStore

type ReplicaStateStore struct {
	// contains filtered or unexported fields
}

func NewReplicaStateStore

func NewReplicaStateStore() *ReplicaStateStore

type ReplicaStore

type ReplicaStore struct {
	// contains filtered or unexported fields
}

ReplicaStore to store all replica information, thread safe

func NewReplicaStore

func NewReplicaStore(size int) *ReplicaStore

NewReplicaStore creates an empty ReplicaStore

func (*ReplicaStore) Add

func (s *ReplicaStore) Add(p *Replica)

Add adds or updates a replica to the store

func (*ReplicaStore) Cap

func (s *ReplicaStore) Cap() int

Cap returns the set of replicas used for the test

func (*ReplicaStore) Count

func (s *ReplicaStore) Count() int

Count returns the total number of replicas

func (*ReplicaStore) Get

func (s *ReplicaStore) Get(id ReplicaID) (p *Replica, ok bool)

Get returns the replica and a bool indicating if it exists or not

func (*ReplicaStore) GetRandom added in v0.1.1

func (s *ReplicaStore) GetRandom() (*Replica, bool)

func (*ReplicaStore) Iter

func (s *ReplicaStore) Iter() []*Replica

Iter returns a list of the existing replicas

func (*ReplicaStore) NumReady

func (s *ReplicaStore) NumReady() int

NumReady returns the number of replicas with Ready attribute set to true

func (*ReplicaStore) ResetReady

func (s *ReplicaStore) ResetReady()

ResetReady sets the Ready attribute of all replicas to false

type ReplicaTimeout

type ReplicaTimeout struct {
	Replica  ReplicaID     `json:"replica"`
	Type     string        `json:"type"`
	Duration time.Duration `json:"duration"`
}

func TimeoutFromParams

func TimeoutFromParams(replica ReplicaID, params map[string]string) (*ReplicaTimeout, bool)

func (*ReplicaTimeout) Eq

func (t *ReplicaTimeout) Eq(other *ReplicaTimeout) bool

func (*ReplicaTimeout) MarshalJSON

func (t *ReplicaTimeout) MarshalJSON() ([]byte, error)

type ReportStore added in v0.1.1

type ReportStore struct {
	// contains filtered or unexported fields
}

func NewReportStore added in v0.1.1

func NewReportStore() *ReportStore

func (*ReportStore) GetLogs added in v0.1.1

func (r *ReportStore) GetLogs(keyvals map[string]string, count int, from int) []*reportLog

func (*ReportStore) Log added in v0.1.1

func (r *ReportStore) Log(keyvals map[string]string)

type RestartableService

type RestartableService interface {
	Service
	// Restart restarts the service
	Restart() error
}

RestartableService is a service which can be restarted

type Service

type Service interface {
	// Name of the service
	Name() string
	// Start to start the service
	Start() error
	// Running to indicate if the service is running
	Running() bool
	// Stop to stop the service
	Stop() error
	// Quit returns a channel which will be closed once the service stops running
	QuitCh() <-chan struct{}
}

Service is any entity which runs on a separate thread

type Subscriber

type Subscriber struct {
	Ch chan interface{}
}

Generic subscriber to maintain state of the subsciber

type TimeoutContext

type TimeoutContext struct {
	PendingTimeouts map[string]*timeoutWrapper
	PendingReceives map[string]*Event
	// contains filtered or unexported fields
}

func NewTimeoutContext

func NewTimeoutContext(d *EventDAG) *TimeoutContext

func (*TimeoutContext) AddEvent

func (t *TimeoutContext) AddEvent(e *Event)

func (*TimeoutContext) CanDeliverMessages

func (t *TimeoutContext) CanDeliverMessages(messages []*Message)

type TimeoutEndEventType

type TimeoutEndEventType struct {
	Timeout *ReplicaTimeout
}

func NewTimeoutEndEventType

func NewTimeoutEndEventType(timeout *ReplicaTimeout) *TimeoutEndEventType

func (*TimeoutEndEventType) Clone

func (te *TimeoutEndEventType) Clone() EventType

func (*TimeoutEndEventType) String

func (te *TimeoutEndEventType) String() string

func (*TimeoutEndEventType) Type

func (te *TimeoutEndEventType) Type() string

type TimeoutStartEventType

type TimeoutStartEventType struct {
	Timeout *ReplicaTimeout
}

func NewTimeoutStartEventType

func NewTimeoutStartEventType(timeout *ReplicaTimeout) *TimeoutStartEventType

func (*TimeoutStartEventType) Clone

func (ts *TimeoutStartEventType) Clone() EventType

func (*TimeoutStartEventType) String

func (ts *TimeoutStartEventType) String() string

func (*TimeoutStartEventType) Type

func (ts *TimeoutStartEventType) Type() string

type TimeoutStore

type TimeoutStore struct {
	*BaseService
	// contains filtered or unexported fields
}

func NewTimeoutStore

func NewTimeoutStore(logger *log.Logger) *TimeoutStore

func (*TimeoutStore) AddTimeout

func (s *TimeoutStore) AddTimeout(t *ReplicaTimeout)

func (*TimeoutStore) Reset

func (s *TimeoutStore) Reset()

func (*TimeoutStore) Start

func (s *TimeoutStore) Start() error

func (*TimeoutStore) Stop

func (s *TimeoutStore) Stop() error

func (*TimeoutStore) ToDispatch

func (s *TimeoutStore) ToDispatch() []*ReplicaTimeout

type VarSet added in v0.1.1

type VarSet struct {
	// contains filtered or unexported fields
}

VarSet is a dictionary for storing auxilliary state during the execution of the testcase VarSet is stored in the context passed to actions and conditions

func NewVarSet added in v0.1.1

func NewVarSet() *VarSet

NewVarSet instantiates Vars

func (*VarSet) Exists added in v0.1.1

func (v *VarSet) Exists(label string) bool

Exists returns true if there is a variable of the specified key

func (*VarSet) Get added in v0.1.1

func (v *VarSet) Get(label string) (interface{}, bool)

Get returns the value stored of the specified label the second return argument is false if the label does not exist

func (*VarSet) GetBool added in v0.1.1

func (v *VarSet) GetBool(label string) (bool, bool)

GetBool casts the value at label (if it exists) into boolean and returns it

func (*VarSet) GetCounter added in v0.1.1

func (v *VarSet) GetCounter(label string) (*Counter, bool)

GetCounter returns the counter at the label if it exists (nil, false) otherwise

func (*VarSet) GetInt added in v0.1.1

func (v *VarSet) GetInt(label string) (int, bool)

GetInt casts the value at label (if it exists) into integer and returns it

func (*VarSet) GetMessageSet added in v0.1.1

func (v *VarSet) GetMessageSet(label string) (*MessageStore, bool)

GetMessageSet returns the message set at label if one exists (nil, false) otherwise

func (*VarSet) GetString added in v0.1.1

func (v *VarSet) GetString(label string) (string, bool)

GetString casts the value at label (if it exists) into string and returns it

func (*VarSet) NewMessageSet added in v0.1.1

func (v *VarSet) NewMessageSet(label string)

NewMessageSet creates a message set at the specified label

func (*VarSet) Set added in v0.1.1

func (v *VarSet) Set(label string, value interface{})

Set the value at the specified label

func (*VarSet) SetCounter added in v0.1.1

func (v *VarSet) SetCounter(label string)

SetCounter sets a counter instance at the specified label with initial value 1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL