Java 5.0 多线程编程实践

2016-02-19 21:41 11 1 收藏

今天图老师小编给大家介绍下Java 5.0 多线程编程实践,平时喜欢Java 5.0 多线程编程实践的朋友赶紧收藏起来吧!记得点赞哦~

【 tulaoshi.com - 编程语言 】

  Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

  简介

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/bianchengyuyan/)

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

  1. 建立监听端口。

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。

  这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器模型将如下:

  1. 建立监听端口,创建线程池。

  2. 发现有新连接,使用线程池来执行服务任务。

  3. 服务完毕,释放线程到线程池。

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。

  初始化

  初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方法来建立线程池。

  ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

  使用ServerSocket对象来初始化监听端口。

  private static final int PORT = 19527;
  serverListenSocket = new ServerSocket(PORT);
  serverListenSocket.setReuseAddress(true);
  serverListenSocket.setReuseAddress(true);

  服务新连接

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。

  while(true){
  Socket socket = serverListenSocket.accept();
  pool.execute(new ServiceThread(socket));
  }

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。

  服务任务

  服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:

  private static ReentrantLock lock = new ReentrantLock ();
  private static int count = 0;
  private int getCount(){
  int ret = 0;
  try{
  lock.lock();
  ret = count;
  }finally{
  lock.unlock();
  }
  return ret;
  }
  private void increaseCount(){
  try{
  lock.lock();
  ++count;
  }finally{
  lock.unlock();
  }
  }

  服务线程在开始给客户端打印一个欢迎信息,

  increaseCount();
  int curCount = getCount();
  helloString = "hello, id = " + curCount+" ";
  dos = new DataOutputStream(connectedSocket.getOutputStream());
  dos.write(helloString.getBytes());

(本文来源于图老师网站,更多请访问http://www.tulaoshi.com/bianchengyuyan/)

服务器端的完整实现

  服务器端的完整实现代码如下:

  package com.andrew;

  import java.io.DataOutputStream;
  import java.io.IOException;
  import java.io.Serializable;
  import java.net.ServerSocket;
  import java.net.Socket;
  import java.util.concurrent.ArrayBlockingQueue;
  import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
  import java.util.concurrent.RejectedExecutionHandler;
  import java.util.concurrent.ThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.locks.ReentrantLock;

  public class Server {
  private static int produceTaskSleepTime = 100;
  private static int consumeTaskSleepTime = 1200;
  private static int produceTaskMaxNumber = 100;
  private static final int CORE_POOL_SIZE = 2;
  private static final int MAX_POOL_SIZE = 100;
  private static final int KEEPALIVE_TIME = 3;
  private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
  private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
  private static final String HOST = "127.0.0.1";
  private static final int PORT = 19527;
  private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
  //private ThreadPoolExecutor serverThreadPool = null;
  private ExecutorService pool = null;
  private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
  private ServerSocket serverListenSocket = null;
  private int times = 5;
  public void start() {
  // You can also init thread pool in this way.
  /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
  MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
  rejectedExecutionHandler);*/
  pool = Executors.newFixedThreadPool(10);
  try {
   serverListenSocket = new ServerSocket(PORT);
   serverListenSocket.setReuseAddress(true);

   System.out.println("I'm listening");
   while (times--  0) {
    Socket socket = serverListenSocket.accept();
    String welcomeString = "hello";
    //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
    pool.execute(new ServiceThread(socket));
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  cleanup();
  }

  public void cleanup() {
  if (null != serverListenSocket) {
   try {
    serverListenSocket.close();
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
  //serverThreadPool.shutdown();
  pool.shutdown();
  }

  public static void main(String args[]) {
  Server server = new Server();
  server.start();
  }
  }

  class ServiceThread implements Runnable, Serializable {
  private static final long serialVersionUID = 0;
  private Socket connectedSocket = null;
  private String helloString = null;
  private static int count = 0;
  private static ReentrantLock lock = new ReentrantLock();

  ServiceThread(Socket socket) {
  connectedSocket = socket;
  }

  public void run() {
  increaseCount();
  int curCount = getCount();
  helloString = "hello, id = " + curCount + "";

  ExecutorService executor = Executors.newSingleThreadExecutor();
  Future future = executor.submit(new TimeConsumingTask());

  DataOutputStream dos = null;
  try {
   dos = new DataOutputStream(connectedSocket.getOutputStream());
   dos.write(helloString.getBytes());
   try {
    dos.write("let's do soemthing other.".getBytes());
    String result = future.get();
    dos.write(result.getBytes());
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (ExecutionException e) {
    e.printStackTrace();
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } finally {
   if (null != connectedSocket) {
    try {
     connectedSocket.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   if (null != dos) {
    try {
     dos.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   executor.shutdown();
  }
  }

  private int getCount() {
  int ret = 0;
  try {
   lock.lock();
   ret = count;
  } finally {
   lock.unlock();
  }
  return ret;
  }

  private void increaseCount() {
  try {
   lock.lock();
   ++count;
  } finally {
   lock.unlock();
  }
  }
  }

  class TimeConsumingTask implements Callable {
  public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
  }

  }

来源:http://www.tulaoshi.com/n/20160219/1626879.html

延伸阅读
标签: Java JAVA基础
不提倡使用的方法是为支持向后兼容性而保留的那些方法,它们在以后的版本中可能出现,也可能不出现。Java 多线程支持在版本 1.1 和版本 1.2 中做了重大修订,stop()、suspend() 和 resume() 函数已不提倡使用。这些函数在 JVM 中可能引入微妙的错误。虽然函数名可能听起来很诱人,但请抵制诱惑不要使用它们。 调试线程化...
线程组 线程是被个别创建的,但可以将它们归类到线程组中,以便于调试和监视。只能在创建线程的同时将它与一个线程组相关联。在使用大量线程的程序中,使用线程组组织线程可能很有帮助。可以将它们看作是计算机上的目录和文件结构。 !-- frame contents -- !-- /frame contents -- 线程间发信 ...
import java.io.*;//多线程编程public class MultiThread{public static void main(String args[]){System.out.println("我是主线程!");//下面创建线程实例thread1ThreadUseExtends thread1=new ThreadUseExtends();//创建thread2时以实现了Runnable接口的THhreadUseRunnable类实例为参数Thread thread2=new Thread(new ThreadU...
一 Java 语言的
为什么会排队等待? 下面的这个简单的 Java 程序完成四项不相关的任务。这样的程序有单个控制线程,控制在这四个任务之间线性地移动。 !-- frame contents -- !-- /frame contents -- 此外,因为所需的资源 — 打印机、磁盘、数据库和显示屏 -- 由于硬件和软件的限制都有内在的潜伏时间,所以每项任务都包含明显的等待时间。因...

经验教程

578

收藏

42
微博分享 QQ分享 QQ空间 手机页面 收藏网站 回到头部