简要介绍实现多线程环形缓冲的方法

2016-02-20 01:02 7 1 收藏

今天图老师小编给大家展示的是简要介绍实现多线程环形缓冲的方法,精心挑选的内容希望大家多多支持、多多分享,喜欢就赶紧get哦!

【 tulaoshi.com - Web开发 】

 

我平时比较喜欢从网上听歌,有些链接下载速度太慢了。如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。我把实现多线程下载缓冲的方法写出来,不足之处恳请批评指正。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

一、HttpReader类功能:HTTP协议从指定URL读取数据

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

/** *//*** author by http://www.bt285.cn http://www.5a520.cn*/package instream; import java.io.IOException;   import java.io.InputStream;   import java.net.HttpURLConnection;   import java.net.URL; public final class HttpReader {   public static final int MAX_RETRY = 10;   private static long content_length;   private URL url;   private HttpURLConnection httpConnection;   private InputStream in_stream;   private long cur_pos;   //用于决定seek方法中是否执行文件定位   private int connect_timeout;   private int read_timeout;public HttpReader(URL u) {   this(u, 5000, 5000);   }public HttpReader(URL u, int connect_timeout, int read_timeout) {   this.connect_timeout = connect_timeout;   this.read_timeout = read_timeout;   url = u;   if (content_length == 0) { int retry = 0; while (retry  HttpReader.MAX_RETRY) try { this.seek(0); content_length = httpConnection.getContentLength(); break; } catch (Exception e) { retry++; }   }   }public static long getContentLength() {   return content_length;   }public int read(byte[] b, int off, int len) throws IOException {   int r = in_stream.read(b, off, len);   cur_pos += r;   return r;   }public int getData(byte[] b, int off, int len) throws IOException {   int r, rema = len;   while (rema  0) { if ((r = in_stream.read(b, off, rema)) == -1) { return -1; } rema -= r; off += r; cur_pos += r;   }   return len;   }public void close() {   if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null;   }   if (in_stream != null) { try { in_stream.close(); } catch (IOException e) {} in_stream = null;   }   url = null;   }/**//*   * 抛出异常通知再试   * 响应码503可能是由某种暂时的原因引起的,例如同一IP频繁的连接请求可能遭服务器拒绝   */  public void seek(long start_pos) throws IOException {   if (start_pos == cur_pos && in_stream != null) return;   if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null;   }   if (in_stream != null) { in_stream.close(); in_stream = null;   }   httpConnection = (HttpURLConnection) url.openConnection();   httpConnection.setConnectTimeout(connect_timeout);   httpConnection.setReadTimeout(read_timeout);   String sProperty = "bytes=" + start_pos + "-";   httpConnection.setRequestProperty("Range", sProperty);   //httpConnection.setRequestProperty("Connection", "Keep-Alive");   int responseCode = httpConnection.getResponseCode();   if (responseCode  200 || responseCode = 300) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } throw new IOException("HTTP responseCode="+responseCode);   } in_stream = httpConnection.getInputStream();   cur_pos = start_pos;   } }

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

二、IWriterCallBack接口功能:实现读/写通信。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

package instream; public interface IWriterCallBack {   public boolean tryWriting(Writer w) throws InterruptedException;   public void updateBuffer(int i, int len);   public void updateWriterCount();   public void terminateWriters();   }

 

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

三、Writer类:下载线程,负责向buf[]写数据。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

/** *//*** http://www.bt285.cn http://www.5a520.cn */package instream;   import java.io.IOException;   import java.net.URL; public final class Writer implements Runnable {   private static boolean isalive = true;   private byte[] buf;   private IWriterCallBack icb;   protected int index;//buf[]内"块"索引号   protected long start_pos;   //index对应的文件位置(相对于文件首的偏移量)   protected int await_count;  //用于判断:下载速度足够就退出一个"写"线程   private HttpReader hr;public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) {   hr = new HttpReader(u);   if(HttpReader.getContentLength() == 0)  //实例化HttpReader对象都不成功 return;   icb = call_back;   buf = b;   Thread t = new Thread(this,"dt_"+i);   t.setPriority(Thread.NORM_PRIORITY + 1);   t.start();   }public void run() {   int write_bytes=0, write_pos=0, rema = 0, retry = 0;   boolean cont = true;   while (cont) { try { // 1.等待空闲块 if(retry == 0) { if (icb.tryWriting(this) == false)break; write_bytes = 0; rema = BuffRandAcceURL.UNIT_LENGTH; write_pos = index  BuffRandAcceURL.UNIT_LENGTH_BITS; }   // 2.定位 hr.seek(start_pos);   // 3.下载"一块" int w; while (rema  0 && isalive) { w = (rema  2048) ? rema : 2048; //每次读几K合适? if ((w = hr.read(buf, write_pos, w)) == -1) {cont = false;break; } rema -= w; write_pos += w; start_pos += w; write_bytes += w; }   //4.通知"读"线程 retry = 0; icb.updateBuffer(index, write_bytes); } catch (InterruptedException e) { isalive = false; icb.terminateWriters(); break; } catch (IOException e) { if(++retry == HttpReader.MAX_RETRY) { isalive = false; icb.terminateWriters(); break; } }   }   icb.updateWriterCount();   try { hr.close();   } catch (Exception e) {}   hr = null;   buf = null;   icb = null;   } }

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

四、IRandomAccess接口:

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

package instream; public interface IRandomAccess {   public int read() throws Exception;   public int read(byte b[]) throws Exception;   public int read(byte b[], int off, int len) throws Exception;   public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;   public void seek(long pos) throws Exception;   public long length();   public long getFilePointer();   public void close();   }

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

关键是如何简单有效防止死锁?以下只是我的一次尝试,请指正。

 

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/webkaifa/)

/** *//*** http://www.5a520.cn  http://www.bt285.cn*/ package instream; import java.net.URL;   import java.net.URLDecoder;   import decode.Header;   import tag.MP3Tag;   import tag.TagThread; public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {   public static final int UNIT_LENGTH_BITS = 15;//32K   public static final int UNIT_LENGTH = 1  UNIT_LENGTH_BITS;   public static final int BUF_LENGTH = UNIT_LENGTH  4;//16块   public static final int UNIT_COUNT = BUF_LENGTH  UNIT_LENGTH_BITS;   public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);   private static final int MAX_WRITER = 8;   private static long file_pointer;   private static int read_pos;   private static int fill_bytes;   private static byte[] buf;  //同时也作读写同步锁:buf.wait()/buf.notify()   private static int[] buf_bytes;   private static int buf_index;   private static int alloc_pos;   private static URL url = null;   private static boolean isalive = true;   private static int writer_count;   private static int await_count;   private long file_length;   private long frame_bytes;public BuffRandAcceURL(String sURL) throws Exception {   this(sURL,MAX_WRITER);   }public BuffRandAcceURL(String sURL, int download_threads) throws Exception {   buf = new byte[BUF_LENGTH];   buf_bytes = new int[UNIT_COUNT];   url = new URL(sURL);//创建线程以异步方式解析ID3   new TagThread(url);//打印当前文件名   try { String s = URLDecoder.decode(sURL, "GBK"); System.out.println("start " + s.substring(s.lastIndexOf("/") + 1)); s = null;   } catch (Exception e) { System.out.println("start " + sURL);   }//创建"写"线程   for(int i = 0; i  download_threads; i++) new Writer(this, url, buf, i+1);   frame_bytes = file_length = HttpReader.getContentLength();   if(file_length == 0) { Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。"; throw new Exception("retry " + HttpReader.MAX_RETRY);   }   writer_count = download_threads;//缓冲   try_cache();//跳过ID3 v2   MP3Tag mP3Tag = new MP3Tag();   int v2_size = mP3Tag.checkID3V2(buf,0);   if (v2_size  0) { frame_bytes -= v2_size; //seek(v2_size): fill_bytes -= v2_size; file_pointer = v2_size; read_pos = v2_size; read_pos &= BUF_LENGTH_MASK; int units = v2_size  UNIT_LENGTH_BITS; for(int i = 0; i  units; i++) { buf_bytes[i] = 0; this.notifyWriter(); } buf_bytes[units] -= v2_size; this.notifyWriter();   }   mP3Tag = null;   }private void try_cache() throws InterruptedException {   int cache_size = BUF_LENGTH;   if(cache_size  (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos;   cache_size -= UNIT_LENGTH;//等待填写当前正在读的那"一块"缓冲区   /**//*if(fill_bytes = cache_size && writer_count  0) {synchronized (buf) {buf.wait();}return;  }*/   //等待填满缓冲区   while (fill_bytes  cache_size) { if (writer_count == 0 || isalive == false) return; if(BUF_LENGTH  (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos - UNIT_LENGTH; System.out.printf("r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100); synchronized (buf) { buf.wait(); }   }   System.out.printf("r");   }private int try_reading(int i, int len) throws Exception {   int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);   int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   while (r  len) { if (writer_count == 0 || isalive == false) return r; try_cache(); r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   }return len;   }/**//*   * 各个"写"线程互斥等待空闲块   */  public synchronized boolean tryWriting(Writer w) throws InterruptedException {   await_count++;   while (buf_bytes[buf_index] != 0 && isalive) { this.wait();   }//下载速度足够就结束一个"写"线程   if(writer_count  1 && w.await_count = await_count && w.await_count = writer_count) return false;if(alloc_pos = file_length) return false;   w.await_count = await_count;   await_count--;   w.start_pos = alloc_pos;   w.index = buf_index;   alloc_pos += UNIT_LENGTH;   buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;   return isalive;   }public void updateBuffer(int i, int len) {   synchronized (buf) { buf_bytes[i] = len; fill_bytes += len; buf.notify();   }   }public void updateWriterCount() {   synchronized (buf) { writer_count--; buf.notify();   }   }public synchronized void notifyWriter() {   this.notifyAll();   }public void terminateWriters() {   synchronized (buf) { if (isalive) { isalive = false; Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY+ " 次后放弃,请您稍后再试。"; } buf.notify();   }notifyWriter();}public int read() throws Exception {   int iret = -1;   int i = read_pos  UNIT_LENGTH_BITS;   // 1."等待"有1字节可读   while (buf_bytes[i]  1) { try_cache(); if (writer_count == 0) return -1;   }   if(isalive == false) return -1; // 2.读取   iret = buf[read_pos] & 0xff;   fill_bytes--;   file_pointer++;   read_pos++;   read_pos &= BUF_LENGTH_MASK;   if (--buf_bytes[i] == 0) notifyWriter(); // 3.通知 return iret;   }public int read(byte b[]) throws Exception {   return read(b, 0, b.length);   } public int read(byte[] b, int off, int len) throws Exception {   if(len  UNIT_LENGTH) len = UNIT_LENGTH;   int i = read_pos  UNIT_LENGTH_BITS;// 1."等待"有足够内容可读   if(try_reading(i, len)  len || isalive == false) return -1; // 2.读取   int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH   if (tail_len  len) { System.arraycopy(buf, read_pos, b, off, tail_len); System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);   } elseSystem.arraycopy(buf, read_pos, b, off, len); fill_bytes -= len;   file_pointer += len;   read_pos += len;   read_pos &= BUF_LENGTH_MASK;   buf_bytes[i] -= len;   if (buf_bytes[i]  0) { int ni = read_pos  UNIT_LENGTH_BITS; buf_bytes[ni] += buf_bytes[i]; buf_bytes[i] = 0; notifyWriter();   } else if (buf_bytes[i] == 0) notifyWriter();return len;   }/**//*   * 从src_off位置复制,不移动文件"指针"   */  public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {   int rpos = read_pos + src_off;   if(try_reading(rpos  UNIT_LENGTH_BITS, len)  len || isalive == false) return -1;   int tail_len = BUF_LENGTH - rpos;   if (tail_len  len) { System.arraycopy(buf, rpos, b, dst_off, tail_len); System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);   } elseSystem.arraycopy(buf, rpos, b, dst_off, len);   // 不发信号 return len;   }public long length() {   return file_length;   }public long getFilePointer() {   return file_pointer;   } public void close() {   //   }//   public void seek(long pos) throws Exception {   //   }  }

来源:http://www.tulaoshi.com/n/20160220/1633110.html

延伸阅读
下面我将对这两个问题和大家一起探讨一下。相信大家对生产者消费者问题并不生疏。在读书的时候我们采用系统体提供的p,v解决,这是对同一临界区资源同时进行读写需要的保护措施,本工程使用缓冲队列,故不需要对临界区进行加锁 。马上我会实现双缓存的版本。在此版本中我会实现对临界区的加减锁。 读取的数据要存储到相应的数...
标签: Delphi
  { 这里的多线程同步查询演示程序仅包括一个工程文件和一个单元文件 } { 窗体中放置的组件有: } { 两个Session组件 } { 两个Database组件 } { 两个Query组件 } { 两个DataSource组件 } { 两个DBGrid组件 } { 一个Button组件 } { 除非特别说明,否则上述各组件的属性都取默认值(见各组件注释...
什么是进程? 当一个程序开始运行时,它就是一个进程,进程包括运行中的程序和程序所使用到的内存和系统资源。而一个进程又是由多个线程所组成的。 什么是线程? 线程是程序中的一个执行流,每个线程都有自己的专有寄存器(栈指针、程序计数器等),但代码区是共享的,即不同的线程可以执行同样的函数。 什么是多线程? 多线程是指程序中包含多...
标签: Delphi
  优秀的数据库应用应当充分考虑数据库访问的速度问题。通常可以通过优化数据库、优化 查询语句、分页查询等途径收到明显的效果。即使是这样,也不可避免地会在查询时闪现一个带有 SQL符号的沙漏,即鼠标变成了查询等待。最可怜的是用户,他(她)在此时只能无奈地等待。遇到急性子的,干脆在此时尝试 Windows中的其它应用程序,...
注意:在2000年9月我们修改了这篇文章和它的例子以适用于一个更新版本的SwingWorker类。SwingWorker类的这个版本修正了一些微妙的线程bug。 Swing API的设计目标是强大、灵活和易用。特别地,我们希望能让程序员们方便地建立新的Swing组件,不论是从头开始还是通过扩展我们所提供的一些组件。 出于这个目的,我们不要求Swing组件...

经验教程

310

收藏

47
微博分享 QQ分享 QQ空间 手机页面 收藏网站 回到头部