<del id="nnjnj"></del><track id="nnjnj"></track>

<p id="nnjnj"></p>

<address id="nnjnj"></address>

    <pre id="nnjnj"><pre id="nnjnj"></pre></pre>

      <noframes id="nnjnj"><ruby id="nnjnj"><ruby id="nnjnj"></ruby></ruby>

      • 自動秒收錄
      • 軟件:1973
      • 資訊:56215|
      • 收錄網站:181187|

      IT精英團

      我用Java在幾分鐘內處理了30億條數據.

      我用Java在幾分鐘內處理了30億條數據.

      瀏覽次數:
      評論次數:
      編輯: 溫瑜
      信息來源: ITPUB
      更新日期: 2022-05-16 14:51:26
      摘要

      來源:https://c1n.cn/GM8hb目錄場景說明模擬數據場景分析讀取數據處理數據遇到的問題場景說明現有一個10G文件的數據,里面包含了18-70之間的整數,分別表示18-70歲的

      • 正文開始
      • 相關閱讀
      • 推薦作品

      來源:https://c1n.cn/GM8hb

      目錄

      場景描述

      模擬數據

      場景分析

      讀出數據

      處理數據

      遇到的問題

      場景說明

      現有10G文件的數據包含18到70之間的整數,分別代表18歲和70歲人口的統計。假設年齡范圍是均勻分布的,它分別代表系統中所有用戶的年齡。找出重復次數最多的數字。有一臺4G內存,2核CPU的電腦。請寫一個算法來實現。23,31,42,19,60,30,36,

      模擬數據

      在Java中,一個整數占用4個字節,模擬10G大約是30億個數據。10G數據以追加模式寫入硬盤。每一百萬條記錄寫一行,約4M,10G約2500行數據。packagebigdata

      import Java . io . *;

      import Java . util . random;

      /**

      *@Desc:

      * @ Author:bingbing冰冰

      * @ date :2022/5/4000419:05

      */

      publicclassGenerateData{

      privatestaticcrandomrandom=new random();

      publistaticintgeneraterandomdata(int start,intent){

      return random . nextint(end-start 1)start;

      }

      /**

      *在磁盤d上生成10G的1-1000個數據。

      */

      publicvoidgenerateData()throwsIOException {

      filefile=new file(' d : \ user . dat ');

      如果(!file.exists()){

      嘗試{

      file . create new file();

      }catch(IOExceptione){

      e . printstacktrace();

      sp;        }
              }

              int start = 18;
              int end = 70;
              long startTime = System.currentTimeMillis();
              BufferedWriter bos = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true)));
              for (long i = 1; i < Integer.MAX_VALUE * 1.7; i++) {
                  String data = generateRandomData(start, end) + ",";
                  bos.write(data);
                  // 每100萬條記錄成一行,100萬條數據大概4M
                  if (i % 1000000 == ) {
                      bos.write("\n");
                  }
              }
              System.out.println("寫入完成! 共花費時間:" + (System.currentTimeMillis() - startTime) / 1000 + " s");
              bos.close();
          }


          public static void main(String[] args) {
              GenerateData generateData = new GenerateData();
              try {
                  generateData.generateData();
              } catch (IOException e) {
                  e.printStackTrace();
              }

          }
      }
      上述代碼調整參數執行 2 次,湊 10 個 G 的數據在 D 盤的 User.dat 文件里。

      準備好 10G 數據后,接著寫如何處理這些數據。

      場景分析

      10G 的數據比當前擁有的運行內存大的多,不能全量加載到內存中讀取,如果采用全量加載,那么內存會直接爆掉,只能按行讀取,Java 中的 bufferedReader 的 readLine() 按行讀取文件里的內容。

      讀取數據

      首先我們寫一個方法單線程讀完這 30E 數據需要多少時間,每讀 100 行打印一次:
          private static void readData() throws IOException {

              BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8"));
              String line;
              long start = System.currentTimeMillis();
              int count = 1;
              while ((line = br.readLine()) != null) {
                  // 按行讀取
      //            SplitData.splitLine(line);
                  if (count % 100 == ) {
                      System.out.println("讀取100行,總耗時間: " + (System.currentTimeMillis() - start) / 1000 + " s");
                      System.gc();
                  }
                  count++;
              }
              running = false;
              br.close();

          }
      按行讀完 10G 的數據大概 20 秒,基本每 100 行,1E 多數據花 1S,速度還挺快:

      處理數據

      | 思路一:通過單線程處理

      通過單線程處理,初始化一個 countMap,key 為年齡,value 為出現的次數,將每行讀取到的數據按照 "," 進行分割,然后獲取到的每一項進行保存到 countMap 里,如果存在,那么值 key 的 value+1。
          for (int i = start; i <= end; i++) {
                  try {
                      File subFile = new File(dir + "\" + i + ".dat");
                      if (!file.exists()) {
                          subFile.createNewFile();
                      }
                      countMap.computeIfAbsent(i + "", integer -> new AtomicInteger());
                  } catch (FileNotFoundException e) {
                      e.printStackTrace();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
      單線程讀取并統計 countMap:
           public static void splitLine(String lineData) {
                  String[] arr = lineData.split(",");
                  for (String str : arr) {
                      if (StringUtils.isEmpty(str)) {
                          continue;
                      }
                      countMap.computeIfAbsent(str, s -> new AtomicInteger()).getAndIncrement();
                  }
              }
      通過比較找出年齡數最多的年齡并打印出來:
        private static void findMostAge() {
              Integer targetValue = ;
              String targetKey = null;
              Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
              while (entrySetIterator.hasNext()) {
                  Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
                  Integer value = entry.getValue().get();
                  String key = entry.getKey();
                  if (value > targetValue) {
                      targetValue = value;
                      targetKey = key;
                  }
              }
              System.out.println("數量最多的年齡為:" + targetKey + "數量為:" + targetValue);
          }

      完整代碼:

      package bigdata;

      import org.apache.commons.lang3.StringUtils;

      import java.io.*;
      import java.util.*;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.atomic.AtomicInteger;


      /**
       * @Desc:
       * @Author: bingbing
       * @Date: 2022/5/4 0004 19:19
       * 單線程處理
       */
      public class HandleMaxRepeatProblem_v0 {

          public static final int start = 18;
          public static final int end = 70;

          public static final String dir = "D:\dataDir";

          public static final String FILE_NAME = "D:\ User.dat";


          /**
           * 統計數量
           */
          private static Map<String, AtomicInteger> countMap = new ConcurrentHashMap<>();


          /**
           * 開啟消費的標志
           */
          private static volatile boolean startConsumer = false;

          /**
           * 消費者運行保證
           */
          private static volatile boolean consumerRunning = true;


          /**
           * 按照 "," 分割數據,并寫入到countMap里
           */
          static class SplitData {

              public static void splitLine(String lineData) {
                  String[] arr = lineData.split(",");
                  for (String str : arr) {
                      if (StringUtils.isEmpty(str)) {
                          continue;
                      }
                      countMap.computeIfAbsent(str, s -> new AtomicInteger()).getAndIncrement();
                  }
              }


          }

          /**
           *  init map
           */

          static {
              File file = new File(dir);
              if (!file.exists()) {
                  file.mkdir();
              }


              for (int i = start; i <= end; i++) {
                  try {
                      File subFile = new File(dir + "\" + i + ".dat");
                      if (!file.exists()) {
                          subFile.createNewFile();
                      }
                      countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
                  } catch (FileNotFoundException e) {
                      e.printStackTrace();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
          }

          public static void main(String[] args) {


              new Thread(() -> {
                  try {
                      readData();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }

              }).start();


          }


          private static void readData() throws IOException {

              BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8"));
              String line;
              long start = System.currentTimeMillis();
              int count = 1;
              while ((line = br.readLine()) != null) {
                  // 按行讀取,并向map里寫入數據
                  SplitData.splitLine(line);
                  if (count % 100 == 0) {
                      System.out.println("讀取100行,總耗時間: " + (System.currentTimeMillis() - start) / 1000 + " s");
                      try {
                          Thread.sleep(1000L);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
                  count++;
              }
              findMostAge();

              br.close();
          }

          private static void findMostAge() {
              Integer targetValue = 0;
              String targetKey = null;
              Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
              while (entrySetIterator.hasNext()) {
                  Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
                  Integer value = entry.getValue().get();
                  String key = entry.getKey();
                  if (value > targetValue) {
                      targetValue = value;
                      targetKey = key;
                  }
              }
              System.out.println("數量最多的年齡為:" + targetKey + "數量為:" + targetValue);
          }

          private static void clearTask() {
              // 清理,同時找出出現字符最大的數
              findMostAge();
              System.exit(-1);
          }


      }

      測試結果:總共花了 3 分鐘讀取完并統計完所有數據。

      內存消耗為 2G-2.5G,CPU 利用率太低,只向上浮動了 20%-25% 之間:

      要想提高 CPU 的利用率,那么可以使用多線程去處理。下面我們使用多線程去解決這個 CPU 利用率低的問題。

      | 思路二:分治法

      使用多線程去消費讀取到的數據。采用生產者、消費者模式去消費數據,因為在讀取的時候是比較快的,單線程的數據處理能力比較差,因此思路一的性能阻塞在取數據方,又是同步的,所以導致整個鏈路的性能會變的很差。


      所謂分治法就是分而治之,也就是說將海量數據分割處理。根據 CPU 的能力初始化 n 個線程,每一個線程去消費一個隊列,這樣線程在消費的時候不會出現搶占隊列的問題。

      同時為了保證線程安全和生產者消費者模式的完整,采用阻塞隊列,Java 中提供了 LinkedBlockingQueue 就是一個阻塞隊列。

      ①初始化阻塞隊列

      使用 linkedList 創建一個阻塞隊列列表:
          private static List<LinkedBlockingQueue<String>> blockQueueLists = new LinkedList<>();
      在 static 塊里初始化阻塞隊列的數量和單個阻塞隊列的容量為 256,上面講到了 30E 數據大概 2500 行,按行塞到隊列里,20 個隊列,那么每個隊列 125 個,因此可以容量可以設計為 256 即可:
          //每個隊列容量為256
              for (int i = ; i < threadNums; i++) {
                  blockQueueLists.add(new LinkedBlockingQueue<>(256));
              }

      ②生產者

      為了實現負載的功能, 首先定義一個 count 計數器,用來記錄行數:
          private static AtomicLong count = new AtomicLong();

      按照行數來計算隊列的下標:long index=count.get()%threadNums。

      下面算法就實現了對隊列列表中的隊列進行輪詢的投放:
         static class SplitData {

              public static void splitLine(String lineData) {
      //            System.out.println(lineData.length());
                  String[] arr = lineData.split("\n");
                  for (String str : arr) {
                      if (StringUtils.isEmpty(str)) {
                          continue;
                      }
                      long index = count.get() % threadNums;
                      try {
                          // 如果滿了就阻塞
                          blockQueueLists.get((int) index).put(str);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      count.getAndIncrement();

                  }
              }

      ③消費者

      隊列線程私有化:消費方在啟動線程的時候根據 index 去獲取到指定的隊列,這樣就實現了隊列的線程私有化。

          private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {
              //如果共用一個隊列,那么線程不宜過多,容易出現搶占現象
              System.out.println("開始消費...");
              for (int i = ; i < threadNums; i++) {
                  final int index = i;
                  // 每一個線程負責一個queue,這樣不會出現線程搶占隊列的情況。
                  new Thread(() -> {
                      while (consumerRunning) {
                          startConsumer = true;
                          try {
                              String str = blockQueueLists.get(index).take();
                              countNum(str);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              }


          }

      多子線程分割字符串:由于從隊列中多到的字符串非常的龐大,如果又是用單線程調用 split(",") 去分割,那么性能同樣會阻塞在這個地方。

          // 按照arr的大小,運用多線程分割字符串
          private static void countNum(String str) {
              int[] arr = new int[2];
              arr[1] = str.length() / 3;
      //        System.out.println("分割的字符串為start位置為:" + arr[0] + ",end位置為:" + arr[1]);
              for (int i = ; i < 3; i++) {
                  final String innerStr = SplitData.splitStr(str, arr);
      //            System.out.println("分割的字符串為start位置為:" + arr[] + ",end位置為:" + arr[1]);
                  new Thread(() -> {
                      String[] strArray = innerStr.split(",");
                      for (String s : strArray) {
                          countMap.computeIfAbsent(s, s1 -> new AtomicInteger()).getAndIncrement();
                      }
                  }).start();
              }
          }

      分割字符串算法:分割時從 0 開始,按照等分的原則,將字符串 n 等份,每一個線程分到一份。


      用一個 arr 數組的 arr[0] 記錄每次的分割開始位置,arr[1] 記錄每次分割的結束位置,如果遇到的開始的字符不為 ",",那么就 startIndex-1,如果結束的位置不為 ",",那么將 endIndex 向后移一位。


      如果 endIndex 超過了字符串的最大長度,那么就把最后一個字符賦值給 arr[1]。

              /**
               * 按照 x坐標 來分割 字符串,如果切到的字符不為“,”, 那么把坐標向前或者向后移動一位。
               *
               * @param line
               * @param arr  存放x1,x2坐標
               * @return
               */
              public static String splitStr(String line, int[] arr) {

                  int startIndex = arr[];
                  int endIndex = arr[1];
                  char start = line.charAt(startIndex);
                  char end = line.charAt(endIndex);
                  if ((startIndex ==  || start == ',') && end == ',') {
                      arr[] = endIndex + 1;
                      arr[1] = arr[] + line.length() / 3;
                      if (arr[1] >= line.length()) {
                          arr[1] = line.length() - 1;
                      }
                      return line.substring(startIndex, endIndex);
                  }

                  if (startIndex !=  && start != ',') {
                      startIndex = startIndex - 1;
                  }

                  if (end != ',') {
                      endIndex = endIndex + 1;
                  }

                  arr[] = startIndex;
                  arr[1] = endIndex;
                  if (arr[1] >= line.length()) {
                      arr[1] = line.length() - 1;
                  }
                  return splitStr(line, arr);
              }

      完整代碼:

      package bigdata;

      import cn.hutool.core.collection.CollectionUtil;
      import org.apache.commons.lang3.StringUtils;

      import java.io.*;
      import java.util.*;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.atomic.AtomicInteger;
      import java.util.concurrent.atomic.AtomicLong;
      import java.util.concurrent.locks.ReentrantLock;

      /**
       * @Desc:
       * @Author: bingbing
       * @Date: 2022/5/4 0004 19:19
       * 多線程處理
       */
      public class HandleMaxRepeatProblem {

          public static final int start = 18;
          public static final int end = 70;

          public static final String dir = "D:\dataDir";

          public static final String FILE_NAME = "D:\ User.dat";

          private static final int threadNums = 20;


          /**
           * key 為年齡,  value為所有的行列表,使用隊列
           */
          private static Map<Integer, Vector<String>> valueMap = new ConcurrentHashMap<>();


          /**
           * 存放數據的隊列
           */
          private static List<LinkedBlockingQueue<String>> blockQueueLists = new LinkedList<>();


          /**
           * 統計數量
           */
          private static Map<String, AtomicInteger> countMap = new ConcurrentHashMap<>();


          private static Map<Integer, ReentrantLock> lockMap = new ConcurrentHashMap<>();

          // 隊列負載均衡
          private static AtomicLong count = new AtomicLong();

          /**
           * 開啟消費的標志
           */
          private static volatile boolean startConsumer = false;

          /**
           * 消費者運行保證
           */
          private static volatile boolean consumerRunning = true;


          /**
           * 按照 "," 分割數據,并寫入到文件里
           */
          static class SplitData {

              public static void splitLine(String lineData) {
      //            System.out.println(lineData.length());
                  String[] arr = lineData.split("\n");
                  for (String str : arr) {
                      if (StringUtils.isEmpty(str)) {
                          continue;
                      }
                      long index = count.get() % threadNums;
                      try {
                          // 如果滿了就阻塞
                          blockQueueLists.get((int) index).put(str);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      count.getAndIncrement();

                  }
              }

              /**
               * 按照 x坐標 來分割 字符串,如果切到的字符不為“,”, 那么把坐標向前或者向后移動一位。
               *
               * @param line
               * @param arr  存放x1,x2坐標
               * @return
               */
              public static String splitStr(String line, int[] arr) {

                  int startIndex = arr[];
                  int endIndex = arr[1];
                  char start = line.charAt(startIndex);
                  char end = line.charAt(endIndex);
                  if ((startIndex ==  || start == ',') && end == ',') {
                      arr[] = endIndex + 1;
                      arr[1] = arr[] + line.length() / 3;
                      if (arr[1] >= line.length()) {
                          arr[1] = line.length() - 1;
                      }
                      return line.substring(startIndex, endIndex);
                  }

                  if (startIndex !=  && start != ',') {
                      startIndex = startIndex - 1;
                  }

                  if (end != ',') {
                      endIndex = endIndex + 1;
                  }

                  arr[] = startIndex;
                  arr[1] = endIndex;
                  if (arr[1] >= line.length()) {
                      arr[1] = line.length() - 1;
                  }
                  return splitStr(line, arr);
              }


              public static void splitLine(String lineData) {
                  String[] arr = lineData.split(",");
                  for (String str : arr) {
                      if (StringUtils.isEmpty(str)) {
                          continue;
                      }
                      int keyIndex = Integer.parseInt(str);
                      ReentrantLock lock = lockMap.computeIfAbsent(keyIndex, lockMap -> new ReentrantLock());
                      lock.lock();
                      try {
                          valueMap.get(keyIndex).add(str);
                      } finally {
                          lock.unlock();
                      }

      //                boolean wait = true;
      //                for (; ; ) {
      //                    if (!lockMap.get(Integer.parseInt(str)).isLocked()) {
      //                        wait = false;
      //                        valueMap.computeIfAbsent(Integer.parseInt(str), integer -> new Vector<>()).add(str);
      //                    }
      //                    // 當前阻塞,直到釋放鎖
      //                    if (!wait) {
      //                        break;
      //                    }
      //                }

                  }
              }

          }

          /**
           *  init map
           */

          static {
              File file = new File(dir);
              if (!file.exists()) {
                  file.mkdir();
              }

              //每個隊列容量為256
              for (int i = ; i < threadNums; i++) {
                  blockQueueLists.add(new LinkedBlockingQueue<>(256));
              }


              for (int i = start; i <= end; i++) {
                  try {
                      File subFile = new File(dir + "\" + i + ".dat");
                      if (!file.exists()) {
                          subFile.createNewFile();
                      }
                      countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
      //                lockMap.computeIfAbsent(i, lock -> new ReentrantLock());
                  } catch (FileNotFoundException e) {
                      e.printStackTrace();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
          }

          public static void main(String[] args) {


              new Thread(() -> {
                  try {
                      // 讀取數據
                      readData();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }


              }).start();

              new Thread(() -> {
                  try {
                      // 開始消費
                      startConsumer();
                  } catch (FileNotFoundException e) {
                      e.printStackTrace();
                  } catch (UnsupportedEncodingException e) {
                      e.printStackTrace();
                  }
              }).start();

              new Thread(() -> {
                  // 監控
                  monitor();
              }).start();


          }


          /**
           * 每隔60s去檢查棧是否為空
           */
          private static void monitor() {
              AtomicInteger emptyNum = new AtomicInteger(0);
              while (consumerRunning) {
                  try {
                      Thread.sleep(10 * 1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  if (startConsumer) {
                      // 如果所有棧的大小都為0,那么終止進程
                      AtomicInteger emptyCount = new AtomicInteger(0);
                      for (int i = 0; i < threadNums; i++) {
                          if (blockQueueLists.get(i).size() == 0) {
                              emptyCount.getAndIncrement();
                          }
                      }
                      if (emptyCount.get() == threadNums) {
                          emptyNum.getAndIncrement();
                          // 如果連續檢查指定次數都為空,那么就停止消費
                          if (emptyNum.get() > 12) {
                              consumerRunning = false;
                              System.out.println("消費結束...");
                              try {
                                  clearTask();
                              } catch (Exception e) {
                                  System.out.println(e.getCause());
                              } finally {
                                  System.exit(-1);
                              }
                          }
                      }
                  }

              }
          }


          private static void readData() throws IOException {

              BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), "utf-8"));
              String line;
              long start = System.currentTimeMillis();
              int count = 1;
              while ((line = br.readLine()) != null) {
                  // 按行讀取,并向隊列寫入數據
                  SplitData.splitLine(line);
                  if (count % 100 == 0) {
                      System.out.println("讀取100行,總耗時間: " + (System.currentTimeMillis() - start) / 1000 + " s");
                      try {
                          Thread.sleep(1000L);
                          System.gc();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
                  count++;
              }

              br.close();
          }

          private static void clearTask() {
              // 清理,同時找出出現字符最大的數
              Integer targetValue = ;
              String targetKey = null;
              Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
              while (entrySetIterator.hasNext()) {
                  Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
                  Integer value = entry.getValue().get();
                  String key = entry.getKey();
                  if (value > targetValue) {
                      targetValue = value;
                      targetKey = key;
                  }
              }
              System.out.println("數量最多的年齡為:" + targetKey + "數量為:" + targetValue);
              System.exit(-1);
          }

          /**
           * 使用linkedBlockQueue
           *
           * @throws FileNotFoundException
           * @throws UnsupportedEncodingException
           */
          private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {
              //如果共用一個隊列,那么線程不宜過多,容易出現搶占現象
              System.out.println("開始消費...");
              for (int i = 0; i < threadNums; i++) {
                  final int index = i;
                  // 每一個線程負責一個queue,這樣不會出現線程搶占隊列的情況。
                  new Thread(() -> {
                      while (consumerRunning) {
                          startConsumer = true;
                          try {
                              String str = blockQueueLists.get(index).take();
                              countNum(str);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              }


          }

          // 按照arr的大小,運用多線程分割字符串
          private static void countNum(String str) {
              int[] arr = new int[2];
              arr[1] = str.length() / 3;
      //        System.out.println("分割的字符串為start位置為:" + arr[] + ",end位置為:" + arr[1]);
              for (int i = ; i < 3; i++) {
                  final String innerStr = SplitData.splitStr(str, arr);
      //            System.out.println("分割的字符串為start位置為:" + arr[] + ",end位置為:" + arr[1]);
                  new Thread(() -> {
                      String[] strArray = innerStr.split(",");
                      for (String s : strArray) {
                          countMap.computeIfAbsent(s, s1 -> new AtomicInteger()).getAndIncrement();
                      }
                  }).start();
              }
          }


          /**
           * 后臺線程去消費map里數據寫入到各個文件里, 如果不消費,那么會將內存程爆
           */
          private static void startConsumer0() throws FileNotFoundException, UnsupportedEncodingException {
              for (int i = start; i <= end; i++) {
                  final int index = i;
                  BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dir + "\" + i + ".dat", false), "utf-8"));
                  new Thread(() -> {
                      int miss = 0;
                      int countIndex = 0;
                      while (true) {
                          // 每隔100萬打印一次
                          int count = countMap.get(index).get();
                          if (count > 1000000 * countIndex) {
                              System.out.println(index + "歲年齡的個數為:" + countMap.get(index).get());
                              countIndex += 1;
                          }
                          if (miss > 1000) {
                              // 終止線程
                              try {
                                  Thread.currentThread().interrupt();
                                  bw.close();
                              } catch (IOException e) {

                              }
                          }
                          if (Thread.currentThread().isInterrupted()) {
                              break;
                          }


                          Vector<String> lines = valueMap.computeIfAbsent(index, vector -> new Vector<>());
                          // 寫入到文件里
                          try {

                              if (CollectionUtil.isEmpty(lines)) {
                                  miss++;
                                  Thread.sleep(1000);
                              } else {
                                  // 100個一批
                                  if (lines.size() < 1000) {
                                      Thread.sleep(1000);
                                      continue;
                                  }
                                  // 1000個的時候開始處理
                                  ReentrantLock lock = lockMap.computeIfAbsent(index, lockIndex -> new ReentrantLock());
                                  lock.lock();
                                  try {
                                      Iterator<String> iterator = lines.iterator();
                                      StringBuilder sb = new StringBuilder();
                                      while (iterator.hasNext()) {
                                          sb.append(iterator.next());
                                          countMap.get(index).addAndGet(1);
                                      }
                                      try {
                                          bw.write(sb.toString());
                                          bw.flush();
                                      } catch (IOException e) {
                                          e.printStackTrace();
                                      }
                                      // 清除掉vector
                                      valueMap.put(index, new Vector<>());
                                  } finally {
                                      lock.unlock();
                                  }

                              }
                          } catch (InterruptedException e) {

                          }
                      }
                  }).start();
              }

          }
      }

      測試結果:

      內存和 CPU 初始占用大?。?img data-galleryid="" data-s="300,640" src="http://img1-itnpc.oss-cn-hangzhou.aliyuncs.com/upload_img/202205/5yidkyudfsr.jpg" width="1028" src="http://img1-itnpc.oss-cn-hangzhou.aliyuncs.com/upload_img/202205/5yidkyudfsr.jpg">啟動后,運行時穩定在 11.7,CPU 穩定利用在 90% 以上。總耗時由 180S 縮減到 103S,效率提升 75%,得到的結果也與單線程處理的一致!

      遇到的問題

      如果在運行了的時候,發現 GC 突然罷工了,開始不工作了,有可能是 JVM 的堆中存在的垃圾太多,沒回收導致內存的突增。

      解決方法:在讀取一定數量后,可以讓主線程暫停幾秒,手動調用 GC。

      提示:本 demo 的線程創建都是手動創建的,實際開發中使用的是線程池!


      標簽:隊列 數據 線程
      碼頭工人搭建彈性搜索集群教程
      ? 上一篇 2022-05-16
      MySQL的行格式是什么?
      下一篇 ? 2022-05-16
      • 胡迪核心知識點詳解(好文章合集)
        1閱讀 0條評論 個贊
        以下文章來源于公眾號-3分鐘秒懂大數據,作者在IT中穿梭旅行在Flink實時流中,經常會通過FlinkCDC插件讀取Mysql數據,然后寫入Hudi中。所以在執行上述操作時,需要了解……
      • 前端面試必須解決網絡中的跨域問題
        0閱讀 0條評論 個贊
        什么是跨域瀏覽器有一個重要的安全策略,稱之為「同源策略」其中,源=協議+主機+端口源=協議+主機+端口源=協議+主機+端口,兩個源相同,稱之為同源,兩個源不同,稱之為跨源或跨域比如:源1源2是否同……
      • 如何在Bash腳本中使用強大的Linux測試命令
        0閱讀 0條評論 個贊
        Linuxtest命令是Shell內置命令,用來檢測某個條件是否成立。test通常和if語句一起使用,并且大部分if語句都依賴test??梢詫⒁粋€元素與另一個元素進行比較,但它更?!?/div>
      • 真正的建筑設計是什么樣子的?
        1閱讀 0條評論 個贊
        什么是架構和架構本質在軟件行業,對于什么是架構,都有很多的爭論,每個人都有自己的理解。此君說的架構和彼君理解的架構未必是一回事。因此我們在討論架構之前,我們先討論架構的概念定義,概念是人認識這個世界的……
      • 10分鐘了解云原生 值得收藏~
        0閱讀 0條評論 個贊
        文章轉載:奇妙的Linux世界我們已經進入云計算下半場,不再像上半場在糾結要不要上云,而是討論怎么上云?才能把云計算的價值發揮到淋漓盡致。如何把云計算與不同的業務場景深度結合?如何讓技術真正作用于企業……
      發表評論 共有條評論
      用戶名: 密碼:
      驗證碼: 匿名發表
      • 我用Java在幾分鐘內處理了30億條數據.
        2閱讀 0條評論 個贊
        來源:https://c1n.cn/GM8hb目錄場景說明模擬數據場景分析讀取數據處理數據遇到的問題場景說明現有一個10G文件的數據,里面包含了18-70之間的整數,分別表示18-70歲的……
      • 透徹理解數據資產、數據資源、數據管理、數據治理等概念的區別
        1閱讀 0條評論 個贊
        以下文章來源于公眾號-大魚的數據人生,作者討厭的大魚先生數據成為生產要素后,各種跟數據相關的概念就出來了,其實很多概念沒有權威定義,大家各有各的理解,這導致了理解上的歧義。數據管理、數據治理、數據資源……
      • MySQL支持哈希索引嗎?(收藏)
        1閱讀 0條評論 個贊
        經常有朋友問,MySQL的InnoDB到底支不支持哈希索引?對于InnoDB的哈希索引,確切的應該這么說:(1)InnoDB用戶無法手動創建哈希索引,這一層上說,InnoDB確實不支持哈希索引;(2)……
      • Python極簡編碼規范
        1閱讀 0條評論 個贊
        本文是閱讀《PythonCodingRule》之后總結的最為精華及簡單的編碼規范,根據每個人不同喜好有些地方會有不同的選擇,我只是做了對自己來說最簡單易行的選擇,僅供大家參考。1、重要原則a.保持……
      • 如何正確計算Kubernetes容器的CPU利用率
        1閱讀 0條評論 個贊
        本文轉自博客園,原文:https://www.cnblogs.com/apink/p/15767687.html,版權歸原作者所有。參數解釋使用Prometheus配置kubernetes環境……
      • 碼頭工人搭建彈性搜索集群教程
        1閱讀 0條評論 個贊
        寫在前面:為什么要用ElasticSearch?我們的應用經常需要添加檢索功能,開源的ElasticSearch是目前全文檢索引擎的首選。它可以快速的存儲、搜索和分析海量數據。ElasticSear……
      • 教你如何在Linux中生成復雜的密碼 并檢查密碼強度
        0閱讀 0條評論 個贊
        在本教程中,我們將討論如何生成復雜密碼并且檢查密碼強度。生成復雜的密碼強密碼應由字母、數字和符號的混合組成。第二個要求是不要使用已知單詞、出生日期或姓名,因為很容易受到字典攻擊。密碼應該包含多少個字符……
      • 卡夫卡3.0新功能全暴露 好香??!
        1閱讀 0條評論 個贊
        以下文章來源于云加社區,作者屈志平導語|kafka3.0的版本已經試推行去zk的kafka架構了,如果去掉了zk,那么在kafka新的版本當中使用什么技術來代替了zk的位置呢,接下來我們一起來一探究竟……
      • SQL優化通用公式:5個步驟和10個案例
        1閱讀 0條評論 個贊
        導讀:在應用開發的早期,數據量少,開發人員開發功能時更重視功能上的實現,隨著生產數據的增長,很多SQL語句開始暴露出性能問題,對生產的影響也越來越大,有時可能這些有問題的SQL就是整個系統性能的瓶頸?!?/div>
      • 在Linux中檢查磁盤空間的12個有用的df命令
        1閱讀 0條評論 個贊
        1.檢查文件系統磁盤空間使用情況這df命令顯示文件系統上的設備名稱、總塊數、總磁盤空間、已用磁盤空間、可用磁盤空間和掛載點信息。[root@local~]#dfFilesystem1K-bloc……
      • 操作系統宕機 如何找回我的MySQL記錄?
        0閱讀 0條評論 個贊
        以下文章來源于公眾號-數據和云,作者楊豹一、概述如果Linux操作系統宕機,啟動不了,救援模式(rescueinstalledsystem)也行不通的時候,那么該機器上的MySQL數據還能恢復嗎?……
      • 說說春云的全鏈路灰度發布方案~
        1閱讀 0條評論 個贊
        以下文章來源于公眾號-碼猿技術專欄,作者不才陳某大家好實際生產中如有需求變更,并不會直接更新線上服務,最通常的做法便是:切出線上的小部分流量進行體驗測試,經過測試后無問題則全面的上線。這樣做的好處也是……
      • 優化Docker鏡像安全性的12個技巧 建議收藏起來!
        0閱讀 0條評論 個贊
        本文介紹了12個優化Docker鏡像安全性的技巧。每個技巧都解釋了底層的攻擊載體,以及一個或多個緩解方法。這些技巧包括了避免泄露構建密鑰、以非root用戶身份運行,或如何確保使用最新的依賴……
      • 如何在Linux下擴展XFS根分區
        2閱讀 0條評論 個贊
        在某些情況下,/分區在Linux中磁盤空間不足。即使壓縮和刪除舊的日志文件也無濟于事,因此在這種情況下,我們別無選擇,只能擴展/文件系統。在本文中,我們將演示如何在Linux系統中擴展不……
      • Linux最常用的命令:解決95%以上的問題
        1閱讀 0條評論 個贊
        Linux是目前應用最廣泛的服務器操作系統,基于Unix,開源免費,由于系統的穩定性和安全性,市場占有率很高,幾乎成為程序代碼運行的最佳系統環境。linux不僅可以長時間的運行我們編寫的程序代碼,還可……
      • 詳細解釋Linux中的diff命令和例子
        1閱讀 0條評論 個贊
        文件比較在Linux中起著重要的作用,特別是對于程序員和Linux系統管理員。例如,如果您想找到兩個源代碼文件之間的差異來開發補丁,那么您需要一個文件比較工具來簡化這個過程。Linux中有幾……
      • 創建Go語言最快的排序算法
        1閱讀 0條評論 個贊
        前言說到排序算法,很多同學會想起快速排序、堆排序、冒泡排序這些耳熟能詳的算法。了解得深一些的同學,也可能看過例如Python的timsort以及C++introsort之類的排序算法?!?/div>
      • Spring Boot的表現太差了 我教你幾招輕松搞定
        0閱讀 0條評論 個贊
        文章……
      • 記得網上一個K8s Ingress訪問故障排除 最后卻不是帖子的鍋
        1閱讀 0條評論 個贊
        具體現象應用遷移至我們的PaaS平臺后會出現偶發性的502問題,錯誤見圖片:相比于程序的請求量,錯誤肯定是比較少的,但是錯誤一直在發生,會影響調用方的代碼,需要檢查下問題原因。為啥我們只看到了POST……
      • 關于數據中心最強科普 一個就給你完整了解!
        1閱讀 0條評論 個贊
        數據中心,英文縮寫叫IDC,也就是InternetDataCenter(互聯網數據中心)。之所以不太直接稱之為“DC”,主要是為了避免和直流電(DirectCurrent)混淆。而且,現在的數……
      最近發布資訊
      更多
      警花高潮嗷嗷叫
      <del id="nnjnj"></del><track id="nnjnj"></track>

      <p id="nnjnj"></p>

      <address id="nnjnj"></address>

        <pre id="nnjnj"><pre id="nnjnj"></pre></pre>

          <noframes id="nnjnj"><ruby id="nnjnj"><ruby id="nnjnj"></ruby></ruby>