Documentation
¶
Index ¶
- Constants
- Variables
- func CreateFile(path string) error
- func ErrorCodeToString(e int32) string
- func ForwardCreateReq(serverName string, req *pb.CreateChunkReq, peer string) error
- func GetAddr(host string, port uint32) string
- func IsClose[T any](ch <-chan T) bool
- func LoadChunk(path string, used uint32, start uint32, end uint32) ([]byte, error)
- func NewAppendDataResp(errorCode int32) *pb.AppendDataResp
- func NewCreateChunkResp(errorCode int32) *pb.CreateChunkResp
- func NewDeleteChunkResp(errorCode int32) *pb.DeleteChunkResp
- func NewForwardCreateResp(errorCode int32) *pb.ForwardCreateResp
- func NewGetVersionResp(errorCode int32, version *uint32, fileData []byte) *pb.GetVersionResp
- func NewPeerConn(address string) (*grpc.ClientConn, error)
- func NewReadResp(fileData []byte, errorCode int32, version *uint32) *pb.ReadResp
- func NewReadVersionResp(errorCode int32, version *uint32) *pb.ReadVersionResp
- func NewReplicateReq(req *pb.ReplicateReq, peer string) error
- func NewReplicateResp(errorCode int32, uuid string) *pb.ReplicateResp
- func NewStatus(errorCode int32) *pb.Status
- func OverWriteChunk(chunkMeta *ChunkMetaData, content []byte) error
- func ReplicateRespToAppendResp(replicateResp *pb.ReplicateResp) *pb.AppendDataResp
- func Sum[T Number](slice []T) T
- func WriteFile(chunkMeta *ChunkMetaData, content []byte) error
- type ChunkMetaData
- type ChunkServer
- func (s *ChunkServer) AppendData(ctx context.Context, appendReq *pb.AppendDataReq) (*pb.AppendDataResp, error)
- func (s *ChunkServer) AssignNewPrimary(ctx context.Context, req *pb.AssignNewPrimaryReq) (res *pb.AssignNewPrimaryResp, err error)
- func (s *ChunkServer) ChangeToPrimary(ctx context.Context, req *pb.ChangeToPrimaryReq) (res *pb.ChangeToPrimaryResp, err error)
- func (s *ChunkServer) CreateChunk(ctx context.Context, createChunkReq *pb.CreateChunkReq) (*pb.CreateChunkResp, error)
- func (s *ChunkServer) DeleteChunk(ctx context.Context, deleteReq *pb.DeleteChunkReq) (*pb.DeleteChunkResp, error)
- func (s *ChunkServer) ForwardCreate(ctx context.Context, forwardCreateReq *pb.ForwardCreateReq) (*pb.ForwardCreateResp, error)
- func (s *ChunkServer) GetVersion(ctx context.Context, req *pb.GetVersionReq) (res *pb.GetVersionResp, err error)
- func (s *ChunkServer) Read(ctx context.Context, readReq *pb.ReadReq) (*pb.ReadResp, error)
- func (s *ChunkServer) ReadVersion(ctx context.Context, readVersion *pb.ReadVersionReq) (*pb.ReadVersionResp, error)
- func (s *ChunkServer) Replicate(ctx context.Context, replicateReq *pb.ReplicateReq) (*pb.ReplicateResp, error)
- func (s *ChunkServer) SendGetVersion(chunkHandle string)
- func (s *ChunkServer) SendHeartBeat()
- func (s *ChunkServer) SendRegister() error
- func (s *ChunkServer) UpdateBackup(ctx context.Context, req *pb.UpdateBackupReq) (*pb.UpdateBackupResp, error)
- type DebugInfo
- type GetVersionTimer
- type HeartBeatTimer
- type Number
- type RespMetaData
Constants ¶
const ( Primary = iota Secondary )
const ( OK int32 = iota ERROR_NOT_PRIMARY ERROR_NOT_SECONDARY ERROR_READ_FAILED ERROR_CHUNK_ALREADY_EXISTS ERROR_CREATE_CHUNK_FAILED ERROR_APPEND_FAILED ERROR_REPLICATE_FAILED // ERROR_APPEND_NOT_EXISTS represents the chunk to be appended does not exist on local filesystem ERROR_APPEND_NOT_EXISTS ERROR_REPLICATE_NOT_EXISTS ERROR_SHOULD_NOT_HAPPEN ERROR_CHUNK_NOT_EXISTS ERROR_VERSIONS_DO_NOT_MATCH )
Variables ¶
var ClientOpts []grpc.DialOption = []grpc.DialOption{ grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(64*1024*1024+300), grpc.MaxCallSendMsgSize(64*1024*1024+300), ), grpc.WithInsecure(), }
Functions ¶
func CreateFile ¶
CreateFile creates an empty file on disk and creates all intermediate directories in path
func ErrorCodeToString ¶
func ForwardCreateReq ¶
func ForwardCreateReq(serverName string, req *pb.CreateChunkReq, peer string) error
ForwardCreateReq establishes a grpc.ClientConn with peer and forwards the pb.CreateChunkReq
func LoadChunk ¶
LoadChunk reads a file at the specified path with an offset start and ends the read at end if end equals to 0, LoadChunk reads and returns the whole data starting from start, otherwise it reads and returns (end - start) bytes
func NewAppendDataResp ¶
func NewAppendDataResp(errorCode int32) *pb.AppendDataResp
func NewCreateChunkResp ¶
func NewCreateChunkResp(errorCode int32) *pb.CreateChunkResp
func NewDeleteChunkResp ¶
func NewDeleteChunkResp(errorCode int32) *pb.DeleteChunkResp
func NewForwardCreateResp ¶
func NewForwardCreateResp(errorCode int32) *pb.ForwardCreateResp
func NewGetVersionResp ¶
func NewGetVersionResp(errorCode int32, version *uint32, fileData []byte) *pb.GetVersionResp
func NewPeerConn ¶
func NewPeerConn(address string) (*grpc.ClientConn, error)
NewPeerConn establishes and returns a grpc.ClientConn to the specified address
func NewReadResp ¶
NewReadResp returns a pointer to pb.ReadResp that represents the result of a read with pb.Status
func NewReadVersionResp ¶
func NewReadVersionResp(errorCode int32, version *uint32) *pb.ReadVersionResp
func NewReplicateReq ¶
func NewReplicateReq(req *pb.ReplicateReq, peer string) error
func NewReplicateResp ¶
func NewReplicateResp(errorCode int32, uuid string) *pb.ReplicateResp
func OverWriteChunk ¶
func OverWriteChunk(chunkMeta *ChunkMetaData, content []byte) error
OverWriteChunk overwrites existing file on disk with content
func ReplicateRespToAppendResp ¶
func ReplicateRespToAppendResp(replicateResp *pb.ReplicateResp) *pb.AppendDataResp
func WriteFile ¶
func WriteFile(chunkMeta *ChunkMetaData, content []byte) error
WriteFile opens chunk file in append mode and appends content to the file
Types ¶
type ChunkMetaData ¶
type ChunkMetaData struct {
// file location in local file system
ChunkLocation string
// role of current chunkserver for this chunk
Role uint32
// IP address of primary chunk server for this chunk
PrimaryChunkServer string
PeerAddress []string
// Already used size in bytes
Used uint32
// Add version number
Version uint32
MetaDataLock sync.Mutex
GetVersionChannel chan string
}
type ChunkServer ¶
type ChunkServer struct {
pb.UnimplementedChunkServerServer
// a mapping from ChunkHandle(string) to ChunkMetaData
Chunks map[string]*ChunkMetaData
// a mapping from client token to client last sequence number
ClientLastResp map[string]RespMetaData
// globally unique server name
ServerName string
// base directory to store chunk files
BasePath string
HostName string
Port uint32
MasterIP string
MasterPort uint32
Debug bool
DebugChan chan DebugInfo
}
func (*ChunkServer) AppendData ¶
func (s *ChunkServer) AppendData(ctx context.Context, appendReq *pb.AppendDataReq) (*pb.AppendDataResp, error)
func (*ChunkServer) AssignNewPrimary ¶
func (s *ChunkServer) AssignNewPrimary(ctx context.Context, req *pb.AssignNewPrimaryReq) (res *pb.AssignNewPrimaryResp, err error)
AssignNewPrimary role to certain chunkhandle change the primary.
func (*ChunkServer) ChangeToPrimary ¶
func (s *ChunkServer) ChangeToPrimary(ctx context.Context, req *pb.ChangeToPrimaryReq) (res *pb.ChangeToPrimaryResp, err error)
ChangeToPrimary will receive by backup chunk server, to notice it be the new Primary with certain chunk handle.
func (*ChunkServer) CreateChunk ¶
func (s *ChunkServer) CreateChunk(ctx context.Context, createChunkReq *pb.CreateChunkReq) (*pb.CreateChunkResp, error)
CreateChunk creates file on local filesystem that represents a chunk per Master Server's request
func (*ChunkServer) DeleteChunk ¶
func (s *ChunkServer) DeleteChunk(ctx context.Context, deleteReq *pb.DeleteChunkReq) (*pb.DeleteChunkResp, error)
DeleteChunk deletes the chunk metadata that corresponds with a string chunk handle on the primary chunk server as well as on backup servers
func (*ChunkServer) ForwardCreate ¶
func (s *ChunkServer) ForwardCreate(ctx context.Context, forwardCreateReq *pb.ForwardCreateReq) (*pb.ForwardCreateResp, error)
ForwardCreate create new chunk as backup
func (*ChunkServer) GetVersion ¶
func (s *ChunkServer) GetVersion(ctx context.Context, req *pb.GetVersionReq) (res *pb.GetVersionResp, err error)
func (*ChunkServer) ReadVersion ¶
func (s *ChunkServer) ReadVersion(ctx context.Context, readVersion *pb.ReadVersionReq) (*pb.ReadVersionResp, error)
ReadVersion returns the version of a chunk that corresponds with a string chunk handle and returns error if the chunk is not recorded by the ChunkServer
func (*ChunkServer) Replicate ¶
func (s *ChunkServer) Replicate(ctx context.Context, replicateReq *pb.ReplicateReq) (*pb.ReplicateResp, error)
func (*ChunkServer) SendGetVersion ¶
func (s *ChunkServer) SendGetVersion(chunkHandle string)
func (*ChunkServer) SendHeartBeat ¶
func (s *ChunkServer) SendHeartBeat()
func (*ChunkServer) SendRegister ¶
func (s *ChunkServer) SendRegister() error
func (*ChunkServer) UpdateBackup ¶
func (s *ChunkServer) UpdateBackup(ctx context.Context, req *pb.UpdateBackupReq) (*pb.UpdateBackupResp, error)
type GetVersionTimer ¶
type GetVersionTimer struct {
Srv *ChunkServer
ChunkHandle string
Timeout int
Quit <-chan string
}
func (*GetVersionTimer) Trigger ¶
func (t *GetVersionTimer) Trigger()
type HeartBeatTimer ¶
type HeartBeatTimer struct {
Srv *ChunkServer
// Timeout in millisecond
Timeout int
}
func (*HeartBeatTimer) Trigger ¶
func (t *HeartBeatTimer) Trigger()
type Number ¶
type Number interface {
constraints.Integer | constraints.Float
}
type RespMetaData ¶
type RespMetaData struct {
// client last seq
LastID string
// last response to client append request
AppendResp *pb.AppendDataResp
// error
Err error
}