From 2372ffc74cdc0aa9eb78a15a7ecddc86a76532d5 Mon Sep 17 00:00:00 2001 From: yourok <8yourok8@mail.ru> Date: Fri, 17 May 2019 10:52:25 +0300 Subject: [PATCH] memcache v2 --- src/server/torr/storage/memcacheV2/Buffer.go | 94 +++++++++ src/server/torr/storage/memcacheV2/Cache.go | 196 ++++++++++++++++++ src/server/torr/storage/memcacheV2/Piece.go | 118 +++++++++++ src/server/torr/storage/memcacheV2/Reader.go | 27 +++ src/server/torr/storage/memcacheV2/Storage.go | 66 ++++++ 5 files changed, 501 insertions(+) create mode 100644 src/server/torr/storage/memcacheV2/Buffer.go create mode 100644 src/server/torr/storage/memcacheV2/Cache.go create mode 100644 src/server/torr/storage/memcacheV2/Piece.go create mode 100644 src/server/torr/storage/memcacheV2/Reader.go create mode 100644 src/server/torr/storage/memcacheV2/Storage.go diff --git a/src/server/torr/storage/memcacheV2/Buffer.go b/src/server/torr/storage/memcacheV2/Buffer.go new file mode 100644 index 0000000..4c60d2d --- /dev/null +++ b/src/server/torr/storage/memcacheV2/Buffer.go @@ -0,0 +1,94 @@ +package memcacheV2 + +import ( + "fmt" + "sync" + + "server/utils" +) + +type buffer struct { + pieceId int + buf []byte + used bool +} + +type BufferPool struct { + buffs map[int]*buffer + bufferLength int64 + bufferCount int + mu sync.Mutex +} + +func NewBufferPool(bufferLength int64) *BufferPool { + bp := new(BufferPool) + bp.bufferLength = bufferLength + bp.buffs = make(map[int]*buffer) + return bp +} + +func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) { + b.mu.Lock() + defer b.mu.Unlock() + for id, buf := range b.buffs { + if !buf.used { + fmt.Println("Get buffer:", id) + buf.used = true + buf.pieceId = p.Id + buff = buf.buf + index = id + return + } + } + + fmt.Println("Create buffer:", b.bufferCount) + buf := new(buffer) + buf.buf = make([]byte, b.bufferLength) + buf.used = true + buf.pieceId = p.Id + b.buffs[b.bufferCount] = buf + index = b.bufferCount + buff = buf.buf + b.bufferCount++ + return +} + +func (b *BufferPool) ReleaseBuffer(index int) { + if index == -1 { + utils.FreeOSMem() + return + } + b.mu.Lock() + defer b.mu.Unlock() + if buff, ok := b.buffs[index]; ok { + fmt.Println("Release buffer:", index) + buff.used = false + buff.pieceId = -1 + } else { + utils.FreeOSMem() + } +} + +func (b *BufferPool) Used() map[int]struct{} { + b.mu.Lock() + defer b.mu.Unlock() + used := make(map[int]struct{}) + for _, b := range b.buffs { + if b.used { + used[b.pieceId] = struct{}{} + } + } + return used +} + +func (b *BufferPool) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + count := 0 + for _, b := range b.buffs { + if b.used { + count++ + } + } + return count +} diff --git a/src/server/torr/storage/memcacheV2/Cache.go b/src/server/torr/storage/memcacheV2/Cache.go new file mode 100644 index 0000000..dba605e --- /dev/null +++ b/src/server/torr/storage/memcacheV2/Cache.go @@ -0,0 +1,196 @@ +package memcacheV2 + +import ( + "fmt" + "sort" + "sync" + + "server/torr/storage/state" + "server/utils" + + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" +) + +type Cache struct { + storage.TorrentImpl + + capacity int64 + filled int64 + hash metainfo.Hash + + pieceLength int64 + pieceCount int + piecesBuff int + + muPiece sync.Mutex + muRemove sync.Mutex + isRemove bool + + pieces map[int]*Piece + bufferPull *BufferPool + + prcLoaded int + position int +} + +func NewCache(capacity int64) *Cache { + ret := &Cache{ + capacity: capacity, + filled: 0, + pieces: make(map[int]*Piece), + } + + return ret +} + +func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { + fmt.Println("Create cache for:", info.Name) + //Min capacity of 2 pieces length + cap := info.PieceLength * 2 + if c.capacity < cap { + c.capacity = cap + } + c.pieceLength = info.PieceLength + c.pieceCount = info.NumPieces() + c.piecesBuff = int(c.capacity / c.pieceLength) + c.hash = hash + c.bufferPull = NewBufferPool(c.pieceLength) + + for i := 0; i < c.pieceCount; i++ { + c.pieces[i] = &Piece{ + Id: i, + Length: info.Piece(i).Length(), + Hash: info.Piece(i).Hash().HexString(), + cache: c, + } + } +} + +func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { + c.muPiece.Lock() + defer c.muPiece.Unlock() + if val, ok := c.pieces[m.Index()]; ok { + return val + } + return nil +} + +func (c *Cache) Close() error { + c.isRemove = false + fmt.Println("Close cache for:", c.hash) + c.pieces = nil + c.bufferPull = nil + utils.FreeOSMemGC() + return nil +} + +func (c *Cache) GetState() state.CacheState { + cState := state.CacheState{} + cState.Capacity = c.capacity + cState.PiecesLength = c.pieceLength + cState.PiecesCount = c.pieceCount + cState.Hash = c.hash.HexString() + + stats := make(map[int]state.ItemState, 0) + c.muPiece.Lock() + var fill int64 = 0 + for _, value := range c.pieces { + stat := value.Stat() + if stat.BufferSize > 0 { + fill += stat.BufferSize + stats[stat.Id] = stat + } + } + c.filled = fill + c.muPiece.Unlock() + cState.Filled = c.filled + cState.Pieces = stats + return cState +} + +func (c *Cache) setPos(pos int) { + c.position = (c.position + pos) / 2 + //fmt.Println("Read:", c.position) +} + +func (c *Cache) cleanPieces() { + if c.isRemove { + return + } + c.muRemove.Lock() + if c.isRemove { + c.muRemove.Unlock() + return + } + c.isRemove = true + defer func() { c.isRemove = false }() + c.muRemove.Unlock() + + remPieces := c.getRemPieces() + if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) { + remCount := int((c.filled - c.capacity) / c.pieceLength) + if remCount < 1 { + remCount = 1 + } + if remCount > len(remPieces) { + remCount = len(remPieces) + } + + remPieces = remPieces[:remCount] + + for _, p := range remPieces { + c.removePiece(p) + } + } +} + +func (c *Cache) getRemPieces() []*Piece { + pieces := make([]*Piece, 0) + fill := int64(0) + loading := 0 + used := c.bufferPull.Used() + + fpices := c.piecesBuff - int(utils.GetReadahead()/c.pieceLength) + low := c.position - fpices + 1 + high := c.position + c.piecesBuff - fpices + 3 + + for u := range used { + v := c.pieces[u] + if v.Size > 0 { + if v.Id > 0 && (v.Id < low || v.Id > high) { + pieces = append(pieces, v) + } + fill += v.Size + if !v.complete { + loading++ + } + } + } + c.filled = fill + sort.Slice(pieces, func(i, j int) bool { + return pieces[i].accessed < pieces[j].accessed + }) + + c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff) + return pieces +} + +func (c *Cache) removePiece(piece *Piece) { + c.muPiece.Lock() + defer c.muPiece.Unlock() + piece.Release() + + //st := fmt.Sprintf("%v%% %v\t%s\t%s", c.prcLoaded, piece.Id, piece.accessed.Format("15:04:05.000"), piece.Hash) + if c.prcLoaded >= 95 { + //fmt.Println("Clean memory GC:", st) + utils.FreeOSMemGC() + } else { + //fmt.Println("Clean memory:", st) + utils.FreeOSMem() + } +} + +func prc(val, of int) int { + return int(float64(val) * 100.0 / float64(of)) +} diff --git a/src/server/torr/storage/memcacheV2/Piece.go b/src/server/torr/storage/memcacheV2/Piece.go new file mode 100644 index 0000000..334ce4f --- /dev/null +++ b/src/server/torr/storage/memcacheV2/Piece.go @@ -0,0 +1,118 @@ +package memcacheV2 + +import ( + "errors" + "io" + "sync" + "time" + + "server/torr/storage/state" + + "github.com/anacrolix/torrent/storage" +) + +type Piece struct { + storage.PieceImpl + + Id int + Hash string + Length int64 + Size int64 + + complete bool + accessed int64 + buffer []byte + bufIndex int + + mu sync.RWMutex + cache *Cache +} + +func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.buffer == nil { + go p.cache.cleanPieces() + p.buffer, p.bufIndex = p.cache.bufferPull.GetBuffer(p) + if p.buffer == nil { + return 0, errors.New("Can't get buffer write") + } + } + n = copy(p.buffer[off:], b[:]) + p.Size += int64(n) + p.accessed = time.Now().Unix() + 2000 + return +} + +func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { + p.mu.RLock() + defer p.mu.RUnlock() + size := len(b) + if size+int(off) > len(p.buffer) { + size = len(p.buffer) - int(off) + if size < 0 { + size = 0 + } + } + if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size { + return 0, io.ErrUnexpectedEOF + } + n = copy(b, p.buffer[int(off) : int(off)+size][:]) + + if int64(len(b))+off >= p.Size { + go p.cache.cleanPieces() + time.Sleep(time.Millisecond * 2000) + } + + if p.complete { + p.accessed = time.Now().Unix() + p.cache.setPos(p.Id) + } + + return n, nil +} + +func (p *Piece) MarkComplete() error { + if len(p.buffer) == 0 { + return errors.New("piece is not complete") + } + p.complete = true + return nil +} + +func (p *Piece) MarkNotComplete() error { + p.complete = false + p.accessed = 0 + return nil +} + +func (p *Piece) Completion() storage.Completion { + return storage.Completion{ + Complete: p.complete, + Ok: true, + } +} + +func (p *Piece) Release() { + p.mu.Lock() + defer p.mu.Unlock() + if p.buffer != nil { + p.buffer = nil + p.cache.bufferPull.ReleaseBuffer(p.bufIndex) + p.bufIndex = -1 + } + p.Size = 0 + p.complete = false +} + +func (p *Piece) Stat() state.ItemState { + itm := state.ItemState{ + Id: p.Id, + Hash: p.Hash, + Accessed: p.accessed, + Completed: p.complete, + BufferSize: p.Size, + } + return itm +} diff --git a/src/server/torr/storage/memcacheV2/Reader.go b/src/server/torr/storage/memcacheV2/Reader.go new file mode 100644 index 0000000..0798119 --- /dev/null +++ b/src/server/torr/storage/memcacheV2/Reader.go @@ -0,0 +1,27 @@ +package memcacheV2 + +import "github.com/anacrolix/torrent" + +type Reader struct { + torrent.Reader + + pos int64 +} + +func NewReader(file torrent.File) *Reader { + r := new(Reader) + r.Reader = file.NewReader() + return r +} + +func (r *Reader) Read(p []byte) (n int, err error) { + n, err = r.Read(p) + r.pos += int64(n) + return +} + +func (r *Reader) Seek(offset int64, whence int) (ret int64, err error) { + ret, err = r.Reader.Seek(offset, whence) + r.pos = ret + return +} diff --git a/src/server/torr/storage/memcacheV2/Storage.go b/src/server/torr/storage/memcacheV2/Storage.go new file mode 100644 index 0000000..8d96fa0 --- /dev/null +++ b/src/server/torr/storage/memcacheV2/Storage.go @@ -0,0 +1,66 @@ +package memcacheV2 + +import ( + "sync" + + "server/torr/storage" + "server/torr/storage/state" + + "github.com/anacrolix/torrent/metainfo" + storage2 "github.com/anacrolix/torrent/storage" +) + +type Storage struct { + storage.Storage + + caches map[metainfo.Hash]*Cache + capacity int64 + mu sync.Mutex +} + +func NewStorage(capacity int64) storage.Storage { + stor := new(Storage) + stor.capacity = capacity + stor.caches = make(map[metainfo.Hash]*Cache) + return stor +} + +func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) { + s.mu.Lock() + defer s.mu.Unlock() + ch := NewCache(s.capacity) + ch.Init(info, infoHash) + s.caches[infoHash] = ch + return ch, nil +} + +func (s *Storage) GetStats(hash metainfo.Hash) *state.CacheState { + s.mu.Lock() + defer s.mu.Unlock() + if c, ok := s.caches[hash]; ok { + st := c.GetState() + return &st + } + return nil +} + +func (s *Storage) CloseHash(hash metainfo.Hash) { + if s.caches == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + if ch, ok := s.caches[hash]; ok { + ch.Close() + delete(s.caches, hash) + } +} + +func (s *Storage) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + for _, ch := range s.caches { + ch.Close() + } + return nil +}