From d977668ba38f86fb2caba0d943775b70828adc08 Mon Sep 17 00:00:00 2001 From: yourok <8yourok8@mail.ru> Date: Sun, 13 Oct 2019 22:28:14 +0300 Subject: [PATCH] back to old cache --- src/server/torr/storage/memcache/Buffer.go | 57 +++++------ src/server/torr/storage/memcache/Cache.go | 108 +++++++++++--------- src/server/torr/storage/memcache/Piece.go | 4 - src/server/torr/storage/memcache/Storage.go | 7 +- 4 files changed, 85 insertions(+), 91 deletions(-) diff --git a/src/server/torr/storage/memcache/Buffer.go b/src/server/torr/storage/memcache/Buffer.go index 20c15c3..9b8eeff 100644 --- a/src/server/torr/storage/memcache/Buffer.go +++ b/src/server/torr/storage/memcache/Buffer.go @@ -1,9 +1,9 @@ package memcache import ( + "fmt" "sync" - "log" "server/utils" ) @@ -33,11 +33,11 @@ func (b *BufferPool) mkBuffs() { return } b.buffs = make(map[int]*buffer, b.frees) - log.Println("Create", b.frees, "buffers") + fmt.Println("Create", b.frees, "buffers") for i := 0; i < b.frees; i++ { buf := buffer{ -1, - make([]byte, b.size, b.size), + make([]byte, b.size), false, } b.buffs[i] = &buf @@ -59,18 +59,8 @@ func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) { return } } - log.Println("Create slow buffer") - - buf := buffer{ - p.Id, - make([]byte, b.size, b.size), - true, - } - b.frees++ - b.buffs[b.frees] = &buf - buff = buf.buf - index = b.frees - return + fmt.Println("Create slow buffer") + return make([]byte, b.size), -1 } func (b *BufferPool) ReleaseBuffer(index int) { @@ -85,26 +75,27 @@ func (b *BufferPool) ReleaseBuffer(index int) { buff.used = false buff.pieceId = -1 b.frees++ + //fmt.Println("Release buffer:", index, b.frees) } else { utils.FreeOSMem() } } -//func (b *BufferPool) Used() map[int]struct{} { -// b.mu.Lock() -// defer b.mu.Unlock() -// if len(b.buffs) == 0 { -// b.mkBuffs() -// } -// used := make(map[int]struct{}) -// for _, b := range b.buffs { -// if b.used { -// used[b.pieceId] = struct{}{} -// } -// } -// return used -//} -// -//func (b *BufferPool) Len() int { -// return b.frees -//} +func (b *BufferPool) Used() map[int]struct{} { + if len(b.buffs) == 0 { + b.mu.Lock() + b.mkBuffs() + b.mu.Unlock() + } + used := make(map[int]struct{}) + for _, b := range b.buffs { + if b.used { + used[b.pieceId] = struct{}{} + } + } + return used +} + +func (b *BufferPool) Len() int { + return b.frees +} diff --git a/src/server/torr/storage/memcache/Cache.go b/src/server/torr/storage/memcache/Cache.go index 0cf7e0d..36e084c 100644 --- a/src/server/torr/storage/memcache/Cache.go +++ b/src/server/torr/storage/memcache/Cache.go @@ -1,15 +1,16 @@ package memcache import ( + "fmt" + "sort" "sync" + "server/torr/reader" "server/torr/storage/state" "server/utils" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" - "log" - "server/torr/reader" ) type Cache struct { @@ -32,30 +33,30 @@ type Cache struct { pieces map[int]*Piece bufferPull *BufferPool - usedPieces map[int]struct{} + + prcLoaded int readers map[*reader.Reader]struct{} } func NewCache(capacity int64, storage *Storage) *Cache { ret := &Cache{ - capacity: capacity, - filled: 0, - pieces: make(map[int]*Piece), - readers: make(map[*reader.Reader]struct{}), - usedPieces: make(map[int]struct{}), - s: storage, + capacity: capacity, + filled: 0, + pieces: make(map[int]*Piece), + s: storage, + readers: make(map[*reader.Reader]struct{}), } return ret } func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { - log.Println("Create cache for:", info.Name) + fmt.Println("Create cache for:", info.Name) //Min capacity of 2 pieces length - caps := info.PieceLength * 2 - if c.capacity < caps { - c.capacity = caps + cap := info.PieceLength * 2 + if c.capacity < cap { + c.capacity = cap } c.pieceLength = info.PieceLength c.pieceCount = info.NumPieces() @@ -75,10 +76,7 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { c.muPiece.Lock() - defer func() { - c.muPiece.Unlock() - go utils.FreeOSMemGC(c.capacity) - }() + defer c.muPiece.Unlock() if val, ok := c.pieces[m.Index()]; ok { return val } @@ -87,7 +85,7 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { func (c *Cache) Close() error { c.isRemove = false - log.Println("Close cache for:", c.hash) + fmt.Println("Close cache for:", c.hash) if _, ok := c.s.caches[c.hash]; ok { delete(c.s.caches, c.hash) } @@ -135,50 +133,64 @@ func (c *Cache) cleanPieces() { defer func() { c.isRemove = false }() c.muRemove.Unlock() - bufPieces := c.getBufferedPieces() - - if len(bufPieces) > 0 && c.filled >= c.capacity { - c.muReader.Lock() - for reader := range c.readers { - beg, end := c.getReaderPieces(reader) - for id := range bufPieces { - if id >= beg && id <= end { - delete(bufPieces, id) - } - } + remPieces := c.getRemPieces() + if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) { + remCount := int((c.filled - c.capacity) / c.pieceLength) + if remCount < 1 { + remCount = 1 } - c.muReader.Unlock() - if len(bufPieces) > 0 { - for _, p := range bufPieces { - p.Release() - } - bufPieces = nil - go utils.FreeOSMemGC(c.capacity) + if remCount > len(remPieces) { + remCount = len(remPieces) + } + + remPieces = remPieces[:remCount] + + for _, p := range remPieces { + c.removePiece(p) } } } -func (c *Cache) getBufferedPieces() map[int]*Piece { - pieces := make(map[int]*Piece) +func (c *Cache) getRemPieces() []*Piece { + pieces := make([]*Piece, 0) fill := int64(0) - used := c.usedPieces + loading := 0 + used := c.bufferPull.Used() for u := range used { - piece := c.pieces[u] - if piece.Size > 0 { - if piece.Id > 0 { - pieces[piece.Id] = piece + v := c.pieces[u] + if v.Size > 0 { + if v.Id > 0 { + pieces = append(pieces, v) + } + fill += v.Size + if !v.complete { + loading++ } - fill += piece.Size } } c.filled = fill + sort.Slice(pieces, func(i, j int) bool { + return pieces[i].accessed < pieces[j].accessed + }) + c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff) return pieces } func (c *Cache) removePiece(piece *Piece) { + c.muPiece.Lock() + defer c.muPiece.Unlock() piece.Release() - return + + if c.prcLoaded >= 95 { + utils.FreeOSMemGC(c.capacity) + } else { + utils.FreeOSMem() + } +} + +func prc(val, of int) int { + return int(float64(val) * 100.0 / float64(of)) } func (c *Cache) AddReader(r *reader.Reader) { @@ -199,9 +211,3 @@ func (c *Cache) ReadersLen() int { } return len(c.readers) } - -func (c *Cache) getReaderPieces(reader *reader.Reader) (begin, end int) { - end = int((reader.Offset() + reader.Readahead()) / c.pieceLength) - begin = int((reader.Offset() - c.capacity + reader.Readahead()) / c.pieceLength) - return -} diff --git a/src/server/torr/storage/memcache/Piece.go b/src/server/torr/storage/memcache/Piece.go index 5df5867..f21f75b 100644 --- a/src/server/torr/storage/memcache/Piece.go +++ b/src/server/torr/storage/memcache/Piece.go @@ -39,7 +39,6 @@ func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) { if p.buffer == nil { return 0, errors.New("Can't get buffer write") } - p.cache.usedPieces[p.Id] = struct{}{} } n = copy(p.buffer[off:], b[:]) p.Size += int64(n) @@ -102,9 +101,6 @@ func (p *Piece) Release() { } p.Size = 0 p.complete = false - if _, ok := p.cache.usedPieces[p.Id]; ok { - delete(p.cache.usedPieces, p.Id) - } } func (p *Piece) Stat() state.ItemState { diff --git a/src/server/torr/storage/memcache/Storage.go b/src/server/torr/storage/memcache/Storage.go index 83a9064..2a54fff 100644 --- a/src/server/torr/storage/memcache/Storage.go +++ b/src/server/torr/storage/memcache/Storage.go @@ -3,14 +3,15 @@ package memcache import ( "sync" + "server/torr/storage" "server/torr/storage/state" "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/storage" + storage2 "github.com/anacrolix/torrent/storage" ) type Storage struct { - storage.TorrentImpl + storage.Storage caches map[metainfo.Hash]*Cache capacity int64 @@ -24,7 +25,7 @@ func NewStorage(capacity int64) *Storage { return stor } -func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { +func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) { s.mu.Lock() defer s.mu.Unlock() ch := NewCache(s.capacity, s)