Documentation
¶
Index ¶
- Constants
- Variables
- func MakeOutboundMessage(in *clientpb.InboundMessage, bodyFunc func(out *clientpb.OutboundMessage)) *clientpb.OutboundMessage
- func NewClientSession(ctx context.Context, node *Node, t Transport, marshaler Marshaler) (*ClientSession, ClientCloseFunc, error)
- type Broker
- type BrokerEventHandler
- type ClientCloseFunc
- type ClientDesc
- type ClientSession
- func (c *ClientSession) Authenticated() bool
- func (c *ClientSession) Channels() []string
- func (c *ClientSession) ClientID() string
- func (c *ClientSession) ClientInfo() *ClientDesc
- func (c *ClientSession) Close(disconnect Disconnect) error
- func (c *ClientSession) HandleMessage(ctx context.Context, in *clientpb.InboundMessage) error
- func (c *ClientSession) LastSurveyRequestID() string
- func (c *ClientSession) ResetActivity()
- func (c *ClientSession) Send(ctx context.Context, msg *clientpb.OutboundMessage) error
- func (c *ClientSession) SessionID() string
- func (c *ClientSession) UserID() string
- type Disconnect
- type EncodingType
- type HeartbeatConfig
- type HeartbeatManager
- type HistoryFilter
- type HistoryOptions
- type Hub
- type JSONMarshaler
- type MarshalTypeError
- type Marshaler
- type Node
- func (n *Node) AddClient(c *ClientSession)
- func (n *Node) AddProxy(p proxy.Proxy, channelPattern, methodPattern string) error
- func (n *Node) AddSubscription(ctx context.Context, ch string, sub Subscriber) error
- func (n *Node) AddSurveyResponse(ctx context.Context, sessionID string, requestID string, payload []byte, ...)
- func (n *Node) Broker() Broker
- func (n *Node) FindProxy(channel, method string) proxy.Proxy
- func (n *Node) GetHeartbeatIdleTimeout() time.Duration
- func (n *Node) GetRPCTimeout() time.Duration
- func (n *Node) HandleJoin(ch string, info *ClientDesc) error
- func (n *Node) HandleLeave(ch string, info *ClientDesc) error
- func (n *Node) HandlePublication(ch string, pub *Publication) error
- func (n *Node) Hub() *Hub
- func (n *Node) ProxyRPC(ctx context.Context, channel, method string, req *proxy.RPCProxyRequest) (*proxy.RPCProxyResponse, error)
- func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) error
- func (n *Node) RemoveSubscription(ch string, c *ClientSession) error
- func (n *Node) Run() error
- func (n *Node) SetBroker(broker Broker)
- func (n *Node) SetupProxy(cfgs []*proxy.ProxyConfig) error
- func (n *Node) Survey(ctx context.Context, channel string, payload []byte, timeout time.Duration) ([]*SurveyResult, error)
- type ProtobufMarshaler
- type Publication
- type PublishOption
- type PublishOptions
- type StreamPosition
- type Subscriber
- type Survey
- func (s *Survey) AddResponse(sessionID string, payload []byte, err error)
- func (s *Survey) Channel() string
- func (s *Survey) Close()
- func (s *Survey) ID() string
- func (s *Survey) Payload() []byte
- func (s *Survey) Results() []*SurveyResult
- func (s *Survey) Timeout() time.Duration
- func (s *Survey) Wait(ctx context.Context) []*SurveyResult
- type SurveyResult
- type Transport
- type UnmarshalTypeError
Constants ¶
const (
SystemMethodAuthenticate = "$authenticate"
)
Variables ¶
var ( // DisconnectInvalidToken issued when client came with invalid token. DisconnectInvalidToken = Disconnect{ Code: 3500, Reason: "invalid token", } // DisconnectBadRequest issued when client uses malformed protocol frames. DisconnectBadRequest = Disconnect{ Code: 3501, Reason: "bad request", } // DisconnectStale issued to close connection that did not become // authenticated in configured interval after dialing. DisconnectStale = Disconnect{ Code: 3502, Reason: "stale", } // DisconnectForceNoReconnect issued when server disconnects connection // and asks it to not reconnect again. DisconnectForceNoReconnect = Disconnect{ Code: 3503, Reason: "force disconnect", } // DisconnectConnectionLimit can be issued when client connection exceeds a // configured connection limit (per user ID or due to other rule). DisconnectConnectionLimit = Disconnect{ Code: 3504, Reason: "connection limit", } // DisconnectChannelLimit can be issued when client connection exceeds a // configured channel limit. DisconnectChannelLimit = Disconnect{ Code: 3505, Reason: "channel limit", } // DisconnectInappropriateProtocol can be issued when client connection format can not // handle incoming data. For example, this happens when JSON-based clients receive // binary data in a channel. This is usually an indicator of programmer error, JSON // clients can not handle binary. DisconnectInappropriateProtocol = Disconnect{ Code: 3506, Reason: "inappropriate protocol", } // DisconnectPermissionDenied may be issued when client attempts accessing a server without // enough permissions. DisconnectPermissionDenied = Disconnect{ Code: 3507, Reason: "permission denied", } // DisconnectNotAvailable may be issued when ErrorNotAvailable does not fit message type, for example // we issue DisconnectNotAvailable when client sends asynchronous message without MessageHandler set // on server side. DisconnectNotAvailable = Disconnect{ Code: 3508, Reason: "not available", } // DisconnectTooManyErrors may be issued when client generates too many errors. DisconnectTooManyErrors = Disconnect{ Code: 3509, Reason: "too many errors", } // DisconnectIdleTimeout may be issued when client connection is idle for too long. DisconnectIdleTimeout = Disconnect{ Code: 3511, Reason: "idle timeout", } )
The codes below are built-in terminal codes.
var DisconnectConnectionClosed = Disconnect{
Code: 3000,
Reason: "connection closed",
}
DisconnectConnectionClosed is a special Disconnect object used when client connection was closed without any advice from a server side. This can be a clean disconnect, or temporary disconnect of the client due to internet connection loss. Server can not distinguish the actual reason of disconnect.
var Marshalers = []Marshaler{ JSONMarshaler{}, ProtobufMarshaler{}, }
Marshalers is a list of available marshalers.
var ProtoJSONMarshaler = &protoJSONMarshaler{ Marshaler: protojson.MarshalOptions{ UseProtoNames: true, EmitUnpopulated: false, }, Unmarshaler: protojson.UnmarshalOptions{ DiscardUnknown: true, }, }
ProtoJSONMarshaler is a JSON marshaler that uses protobuf JSON encoding.
Functions ¶
func MakeOutboundMessage ¶
func MakeOutboundMessage(in *clientpb.InboundMessage, bodyFunc func(out *clientpb.OutboundMessage)) *clientpb.OutboundMessage
func NewClientSession ¶
func NewClientSession(ctx context.Context, node *Node, t Transport, marshaler Marshaler) (*ClientSession, ClientCloseFunc, error)
Types ¶
type Broker ¶
type Broker interface {
// RegisterEventHandler called once on start when Broker already set to Node. At
// this moment node is ready to process broker events.
RegisterEventHandler(BrokerEventHandler) error
// Subscribe node on channel to listen all messages coming from channel.
Subscribe(ch string) error
// Unsubscribe node from channel to stop listening messages from it.
Unsubscribe(ch string) error
// Publish allows sending data into channel. Data should be
// delivered to all clients subscribed to this channel at moment on any
// Centrifuge node (with at most once delivery guarantee).
//
// Broker can optionally maintain publication history inside channel according
// to PublishOptions provided. See History method for rules that should be implemented
// for accessing publications from history stream.
//
// Saving message to a history stream and publish to PUB/SUB should be an atomic
// operation per channel. If this is not true – then publication to one channel
// must be serialized on the caller side, i.e. publish requests must be issued one
// after another. Otherwise, the order of publications and stable behaviour of
// subscribers with positioning/recovery enabled can't be guaranteed.
//
// StreamPosition returned here describes stream epoch and offset assigned to
// the publication. For channels without history this StreamPosition should be
// zero value.
// Second bool value returned here means whether Publish was suppressed due to
// the use of PublishOptions.IdempotencyKey. In this case StreamPosition is
// returned from the cache maintained by Broker.
Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, bool, error)
// PublishJoin publishes Join Push message into channel.
PublishJoin(ch string, info *ClientDesc) error
// PublishLeave publishes Leave Push message into channel.
PublishLeave(ch string, info *ClientDesc) error
// History used to extract Publications from history stream.
// Publications returned according to HistoryFilter which allows to set several
// filtering options. StreamPosition returned describes current history stream
// top offset and epoch.
History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error)
// RemoveHistory removes history from channel. This is in general not
// needed as history expires automatically (based on history_lifetime)
// but sometimes can be useful for application logic.
RemoveHistory(ch string) error
}
func NewMemoryBroker ¶
func NewMemoryBroker() Broker
type BrokerEventHandler ¶
type BrokerEventHandler interface {
// HandlePublication to handle received Publications.
HandlePublication(ch string, pub *Publication) error
// HandleJoin to handle received Join messages.
HandleJoin(ch string, info *ClientDesc) error
// HandleLeave to handle received Leave messages.
HandleLeave(ch string, info *ClientDesc) error
}
BrokerEventHandler can handle messages received from PUB/SUB system.
type ClientCloseFunc ¶
type ClientCloseFunc func() error
type ClientDesc ¶
type ClientSession ¶
type ClientSession struct {
// contains filtered or unexported fields
}
func (*ClientSession) Authenticated ¶
func (c *ClientSession) Authenticated() bool
func (*ClientSession) Channels ¶
func (c *ClientSession) Channels() []string
func (*ClientSession) ClientID ¶
func (c *ClientSession) ClientID() string
func (*ClientSession) ClientInfo ¶
func (c *ClientSession) ClientInfo() *ClientDesc
func (*ClientSession) Close ¶
func (c *ClientSession) Close(disconnect Disconnect) error
Close closes the client session with a disconnect reason. This is an exported method for use by the server-side API.
func (*ClientSession) HandleMessage ¶
func (c *ClientSession) HandleMessage(ctx context.Context, in *clientpb.InboundMessage) error
func (*ClientSession) LastSurveyRequestID ¶
func (c *ClientSession) LastSurveyRequestID() string
LastSurveyRequestID returns the last received survey request ID. This is useful for testing purposes.
func (*ClientSession) ResetActivity ¶
func (c *ClientSession) ResetActivity()
ResetActivity resets the last activity timestamp to now.
func (*ClientSession) Send ¶
func (c *ClientSession) Send(ctx context.Context, msg *clientpb.OutboundMessage) error
func (*ClientSession) SessionID ¶
func (c *ClientSession) SessionID() string
func (*ClientSession) UserID ¶
func (c *ClientSession) UserID() string
type Disconnect ¶
type Disconnect struct {
// Code is a disconnect code.
Code uint32 `json:"code,omitempty"`
// Reason is a short description of disconnect code for humans.
Reason string `json:"reason"`
}
func (Disconnect) Error ¶
func (d Disconnect) Error() string
Error to use Disconnect as a callback handler error to signal Centrifuge that client must be disconnected with corresponding Code and Reason.
type EncodingType ¶
type EncodingType int
const ( EncodingTypeJSON EncodingType = 1 EncodingTypeProtobuf EncodingType = 2 )
type HeartbeatConfig ¶
HeartbeatConfig contains parsed heartbeat configuration durations.
type HeartbeatManager ¶
type HeartbeatManager struct {
// contains filtered or unexported fields
}
HeartbeatManager manages client heartbeat monitoring.
func NewHeartbeatManager ¶
func NewHeartbeatManager(cfg HeartbeatConfig) *HeartbeatManager
NewHeartbeatManager creates a new HeartbeatManager with the given config.
func (*HeartbeatManager) Config ¶
func (hm *HeartbeatManager) Config() HeartbeatConfig
Config returns the heartbeat configuration.
func (*HeartbeatManager) Start ¶
func (hm *HeartbeatManager) Start(ctx context.Context, client *ClientSession)
Start starts the heartbeat goroutine for a client session.
type HistoryFilter ¶
type HistoryFilter struct {
// Since used to extract publications from stream since provided StreamPosition.
Since *StreamPosition
// Limit number of publications to return.
// -1 means no limit - i.e. return all publications currently in stream.
// 0 means that caller only interested in current stream top position so
// Broker should not return any publications.
Limit int
// Reverse direction.
Reverse bool
}
HistoryFilter allows filtering history according to fields set.
type HistoryOptions ¶
type HistoryOptions struct {
// Filter for history publications.
Filter HistoryFilter
// MetaTTL allows overriding default (set in Config.HistoryMetaTTL) history
// meta information expiration time.
MetaTTL time.Duration
}
HistoryOptions define some fields to alter History method behaviour.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
func (*Hub) GetSubscribers ¶
func (h *Hub) GetSubscribers(ch string) []*ClientSession
GetSubscribers returns a copy of all subscribers for a given channel.
func (*Hub) LookupSession ¶
func (h *Hub) LookupSession(sessionID string) *ClientSession
LookupSession returns a client session by session ID. Returns nil if session not found.
func (*Hub) NumSubscribers ¶
NumSubscribers returns number of current subscribers for a given channel.
func (*Hub) RemoveSession ¶
RemoveSession removes a session from the sessions map and connShards.
type JSONMarshaler ¶
type JSONMarshaler struct{}
JSONMarshaler implements JSON marshaling for protocol messages.
func (JSONMarshaler) Name ¶
func (JSONMarshaler) Name() string
func (JSONMarshaler) UseBytes ¶
func (JSONMarshaler) UseBytes() bool
type MarshalTypeError ¶
type MarshalTypeError struct {
Type any
}
MarshalTypeError is returned when Marshal receives an unexpected type.
func (*MarshalTypeError) Error ¶
func (e *MarshalTypeError) Error() string
type Marshaler ¶
type Marshaler interface {
// Marshal converts a message to bytes.
Marshal(msg any) ([]byte, error)
// Unmarshal converts bytes to a message.
Unmarshal(data []byte, msg any) error
// Name returns the marshaler name.
Name() string
// UseBytes returns true if this marshaler uses binary encoding.
UseBytes() bool
}
Marshaler defines the interface for marshaling protocol messages.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
func (*Node) AddClient ¶
func (n *Node) AddClient(c *ClientSession)
AddClient adds a client session to the node's hub.
func (*Node) AddSubscription ¶
AddSubscription adds a subscription for a client to a channel. This is an exported method for use by the server-side API.
func (*Node) AddSurveyResponse ¶
func (n *Node) AddSurveyResponse(ctx context.Context, sessionID string, requestID string, payload []byte, err error)
AddSurveyResponse adds a client response to the appropriate survey.
func (*Node) FindProxy ¶
FindProxy finds a proxy for the given channel and method. Returns nil if no matching proxy is found.
func (*Node) GetHeartbeatIdleTimeout ¶
GetHeartbeatIdleTimeout returns the configured heartbeat idle timeout. Returns 0 if heartbeat manager is not configured.
func (*Node) GetRPCTimeout ¶
GetRPCTimeout returns the configured RPC timeout.
func (*Node) HandleJoin ¶
func (n *Node) HandleJoin(ch string, info *ClientDesc) error
func (*Node) HandleLeave ¶
func (n *Node) HandleLeave(ch string, info *ClientDesc) error
func (*Node) HandlePublication ¶
func (n *Node) HandlePublication(ch string, pub *Publication) error
func (*Node) ProxyRPC ¶
func (n *Node) ProxyRPC(ctx context.Context, channel, method string, req *proxy.RPCProxyRequest) (*proxy.RPCProxyResponse, error)
ProxyRPC proxies an RPC request to the configured backend.
func (*Node) Publish ¶
func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) error
func (*Node) RemoveSubscription ¶
func (n *Node) RemoveSubscription(ch string, c *ClientSession) error
RemoveSubscription removes a subscription for a client from a channel. This is an exported method for use by the server-side API.
func (*Node) SetupProxy ¶
func (n *Node) SetupProxy(cfgs []*proxy.ProxyConfig) error
SetupProxy configures the proxy router with the given proxy configurations.
type ProtobufMarshaler ¶
type ProtobufMarshaler struct{}
ProtobufMarshaler implements protobuf marshaling for protocol messages.
func (ProtobufMarshaler) Name ¶
func (ProtobufMarshaler) Name() string
func (ProtobufMarshaler) UseBytes ¶
func (ProtobufMarshaler) UseBytes() bool
type Publication ¶
type PublishOption ¶
type PublishOption func(*PublishOptions)
func WithAsBytes ¶
func WithAsBytes(asBytes bool) PublishOption
func WithClientDesc ¶
func WithClientDesc(info *ClientDesc) PublishOption
WithClientDesc adds ClientDesc to Publication.
func WithEventType ¶
func WithEventType(eventType string) PublishOption
func WithIsText ¶
func WithIsText(isText bool) PublishOption
type PublishOptions ¶
type PublishOptions struct {
ClientDesc *ClientDesc
AsBytes bool
IsText bool
EventType string
}
type StreamPosition ¶
type StreamPosition struct {
// Offset defines publication incremental offset inside a stream.
Offset uint64
// Epoch allows handling situations when storage
// lost stream entirely for some reason (expired or lost after restart) and we
// want to track this fact to prevent successful recovery from another stream.
// I.e. for example we have a stream [1, 2, 3], then it's lost and new stream
// contains [1, 2, 3, 4], client that recovers from position 3 will only receive
// publication 4 missing 1, 2, 3 from new stream. With epoch, we can tell client
// that correct recovery is not possible.
Epoch string
}
type Subscriber ¶
type Subscriber struct {
Client *ClientSession
Ephemeral bool
}
Subscriber represents a client that can subscribe to channels.
func NewSubscriber ¶
func NewSubscriber(client *ClientSession, ephemeral bool) Subscriber
NewSubscriber creates a new Subscriber.
type Survey ¶
type Survey struct {
// contains filtered or unexported fields
}
Survey manages the lifecycle of a survey request and response collection.
func (*Survey) AddResponse ¶
AddResponse adds a client response to the survey.
func (*Survey) Results ¶
func (s *Survey) Results() []*SurveyResult
Results returns a copy of the current collected results.
type SurveyResult ¶
SurveyResult represents a response from a client to a survey request.
type UnmarshalTypeError ¶
type UnmarshalTypeError struct {
Type any
}
UnmarshalTypeError is returned when Unmarshal receives an unexpected type.
func (*UnmarshalTypeError) Error ¶
func (e *UnmarshalTypeError) Error() string