From 770fea51fb28a3e184b6ab9c5989d9ab8ca31c25 Mon Sep 17 00:00:00 2001 From: yourok <8yourok8@mail.ru> Date: Fri, 17 May 2019 10:53:59 +0300 Subject: [PATCH] refactor --- src/server/torr/storage/memcache/Buffer.go | 69 +++++++++------------ src/server/torr/storage/memcache/Cache.go | 25 +++++--- src/server/torr/storage/memcache/Piece.go | 16 +++-- src/server/torr/storage/memcache/Storage.go | 2 +- 4 files changed, 58 insertions(+), 54 deletions(-) diff --git a/src/server/torr/storage/memcache/Buffer.go b/src/server/torr/storage/memcache/Buffer.go index 9b8eeff..b636ba9 100644 --- a/src/server/torr/storage/memcache/Buffer.go +++ b/src/server/torr/storage/memcache/Buffer.go @@ -14,53 +14,43 @@ type buffer struct { } type BufferPool struct { - buffs map[int]*buffer - frees int - size int64 - mu sync.Mutex + buffs map[int]*buffer + bufferLength int64 + bufferCount int + mu sync.Mutex } -func NewBufferPool(bufferLength int64, capacity int64) *BufferPool { +func NewBufferPool(bufferLength int64) *BufferPool { bp := new(BufferPool) - buffsSize := int(capacity/bufferLength) + 3 - bp.frees = buffsSize - bp.size = bufferLength + bp.bufferLength = bufferLength + bp.buffs = make(map[int]*buffer) return bp } -func (b *BufferPool) mkBuffs() { - if b.buffs != nil { - return - } - b.buffs = make(map[int]*buffer, b.frees) - fmt.Println("Create", b.frees, "buffers") - for i := 0; i < b.frees; i++ { - buf := buffer{ - -1, - make([]byte, b.size), - false, - } - b.buffs[i] = &buf - } -} - func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) { b.mu.Lock() defer b.mu.Unlock() - b.mkBuffs() for id, buf := range b.buffs { if !buf.used { + fmt.Println("Get buffer:", id) buf.used = true buf.pieceId = p.Id buff = buf.buf index = id - b.frees-- - //fmt.Printf("Get buffer: %v %v %v %p\n", id, p.Id, b.frees, buff) return } } - fmt.Println("Create slow buffer") - return make([]byte, b.size), -1 + + fmt.Println("Create buffer:", b.bufferCount) + buf := new(buffer) + buf.buf = make([]byte, b.bufferLength) + buf.used = true + buf.pieceId = p.Id + b.buffs[b.bufferCount] = buf + index = b.bufferCount + buff = buf.buf + b.bufferCount++ + return } func (b *BufferPool) ReleaseBuffer(index int) { @@ -70,23 +60,18 @@ func (b *BufferPool) ReleaseBuffer(index int) { } b.mu.Lock() defer b.mu.Unlock() - b.mkBuffs() if buff, ok := b.buffs[index]; ok { + fmt.Println("Release buffer:", index) buff.used = false buff.pieceId = -1 - b.frees++ - //fmt.Println("Release buffer:", index, b.frees) } else { utils.FreeOSMem() } } func (b *BufferPool) Used() map[int]struct{} { - if len(b.buffs) == 0 { - b.mu.Lock() - b.mkBuffs() - b.mu.Unlock() - } + b.mu.Lock() + defer b.mu.Unlock() used := make(map[int]struct{}) for _, b := range b.buffs { if b.used { @@ -97,5 +82,13 @@ func (b *BufferPool) Used() map[int]struct{} { } func (b *BufferPool) Len() int { - return b.frees + b.mu.Lock() + defer b.mu.Unlock() + count := 0 + for _, b := range b.buffs { + if b.used { + count++ + } + } + return count } diff --git a/src/server/torr/storage/memcache/Cache.go b/src/server/torr/storage/memcache/Cache.go index ea56f16..db7be18 100644 --- a/src/server/torr/storage/memcache/Cache.go +++ b/src/server/torr/storage/memcache/Cache.go @@ -15,8 +15,6 @@ import ( type Cache struct { storage.TorrentImpl - s *Storage - capacity int64 filled int64 hash metainfo.Hash @@ -33,14 +31,14 @@ type Cache struct { bufferPull *BufferPool prcLoaded int + position int } -func NewCache(capacity int64, storage *Storage) *Cache { +func NewCache(capacity int64) *Cache { ret := &Cache{ capacity: capacity, filled: 0, pieces: make(map[int]*Piece), - s: storage, } return ret @@ -57,7 +55,7 @@ func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) { c.pieceCount = info.NumPieces() c.piecesBuff = int(c.capacity / c.pieceLength) c.hash = hash - c.bufferPull = NewBufferPool(c.pieceLength, c.capacity) + c.bufferPull = NewBufferPool(c.pieceLength) for i := 0; i < c.pieceCount; i++ { c.pieces[i] = &Piece{ @@ -81,9 +79,6 @@ func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl { func (c *Cache) Close() error { c.isRemove = false fmt.Println("Close cache for:", c.hash) - if _, ok := c.s.caches[c.hash]; ok { - delete(c.s.caches, c.hash) - } c.pieces = nil c.bufferPull = nil utils.FreeOSMemGC() @@ -114,6 +109,11 @@ func (c *Cache) GetState() state.CacheState { return cState } +func (c *Cache) setPos(pos int) { + c.position = (c.position + pos) / 2 + //fmt.Println("Read:", c.position) +} + func (c *Cache) cleanPieces() { if c.isRemove { return @@ -150,10 +150,15 @@ func (c *Cache) getRemPieces() []*Piece { fill := int64(0) loading := 0 used := c.bufferPull.Used() + + fpices := c.piecesBuff - int(utils.GetReadahead()/c.pieceLength) + low := c.position - fpices + 1 + high := c.position + c.piecesBuff - fpices + 3 + for u := range used { v := c.pieces[u] if v.Size > 0 { - if v.Id > 0 { + if v.Id > 0 && (v.Id < low || v.Id > high) { pieces = append(pieces, v) } fill += v.Size @@ -164,7 +169,7 @@ func (c *Cache) getRemPieces() []*Piece { } c.filled = fill sort.Slice(pieces, func(i, j int) bool { - return pieces[i].accessed.Before(pieces[j].accessed) + return pieces[i].accessed < pieces[j].accessed }) c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff) diff --git a/src/server/torr/storage/memcache/Piece.go b/src/server/torr/storage/memcache/Piece.go index 6f4a7a9..a03c67a 100644 --- a/src/server/torr/storage/memcache/Piece.go +++ b/src/server/torr/storage/memcache/Piece.go @@ -21,7 +21,7 @@ type Piece struct { complete bool readed bool - accessed time.Time + accessed int64 buffer []byte bufIndex int @@ -42,14 +42,13 @@ func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) { } n = copy(p.buffer[off:], b[:]) p.Size += int64(n) - p.accessed = time.Now() + p.accessed = time.Now().Unix() + 2000 return } func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { p.mu.RLock() defer p.mu.RUnlock() - size := len(b) if size+int(off) > len(p.buffer) { size = len(p.buffer) - int(off) @@ -61,13 +60,19 @@ func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) { return 0, io.ErrUnexpectedEOF } n = copy(b, p.buffer[int(off) : int(off)+size][:]) - p.accessed = time.Now() + if int(off)+size >= len(p.buffer) { p.readed = true } if int64(len(b))+off >= p.Size { go p.cache.cleanPieces() } + + if p.complete { + p.accessed = time.Now().Unix() + p.cache.setPos(p.Id) + } + return n, nil } @@ -81,12 +86,13 @@ func (p *Piece) MarkComplete() error { func (p *Piece) MarkNotComplete() error { p.complete = false + p.accessed = 0 return nil } func (p *Piece) Completion() storage.Completion { return storage.Completion{ - Complete: p.complete && len(p.buffer) > 0, + Complete: p.complete, Ok: true, } } diff --git a/src/server/torr/storage/memcache/Storage.go b/src/server/torr/storage/memcache/Storage.go index 4772765..aea901a 100644 --- a/src/server/torr/storage/memcache/Storage.go +++ b/src/server/torr/storage/memcache/Storage.go @@ -28,7 +28,7 @@ func NewStorage(capacity int64) storage.Storage { func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) { s.mu.Lock() defer s.mu.Unlock() - ch := NewCache(s.capacity, s) + ch := NewCache(s.capacity) ch.Init(info, infoHash) s.caches[infoHash] = ch return ch, nil