diff --git a/bloom_filter/pom.xml b/bloom_filter/pom.xml index 781e238..2ca3555 100644 --- a/bloom_filter/pom.xml +++ b/bloom_filter/pom.xml @@ -19,6 +19,14 @@ + + + org.projectlombok + lombok + 1.18.4 + provided + + org.springframework.boot spring-boot-starter @@ -37,6 +45,40 @@ 28.1-jre + + org.jsoup + jsoup + 1.12.1 + + + + org.apache.httpcomponents + httpclient + 4.5.10 + + + org.apache.httpcomponents + httpcore + 4.4.10 + + + org.apache.httpcomponents + httpmime + 4.5.10 + + + + com.google.code.gson + gson + 2.8.5 + + + + com.alibaba + fastjson + 1.2.59 + + diff --git a/bloom_filter/src/main/java/com/jamal/bloomfilter/GuavaBloomFilterTest.java b/bloom_filter/src/main/java/com/jamal/bloomfilter/GuavaBloomFilterTest.java index d2d8fe7..89fa1c5 100644 --- a/bloom_filter/src/main/java/com/jamal/bloomfilter/GuavaBloomFilterTest.java +++ b/bloom_filter/src/main/java/com/jamal/bloomfilter/GuavaBloomFilterTest.java @@ -3,6 +3,7 @@ import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -13,29 +14,34 @@ * @author 曾小辉 **/ public class GuavaBloomFilterTest { - - private static int size = 1000000; - - private static BloomFilter bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size,0.01); + // bit 数组大小 + private static int size = 10000; + // 布隆过滤器 + private static BloomFilter bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), size, 0.03); public static void main(String[] args) { + // 先向布隆过滤器中添加 10000 个url for (int i = 0; i < size; i++) { - bloomFilter.put(i); + String url = "https://voice.hupu.com/nba/" + i; + bloomFilter.put(url); } - + // 前10000个url不会出现误判 for (int i = 0; i < size; i++) { - if (!bloomFilter.mightContain(i)) { - System.out.println("有坏人逃脱了"); + String url = "https://voice.hupu.com/nba/" + i; + if (!bloomFilter.mightContain(url)) { + System.out.println("该 url 被采集过了"); } } - List list = new ArrayList(1000); - for (int i = size + 10000; i < size + 20000; i++) { - if (bloomFilter.mightContain(i)) { - list.add(i); + List list = new ArrayList(1000); + // 再向布隆过滤器中添加 2000 个 url ,在这2000 个中就会出现误判了 + // 误判的个数为 2000 * fpp + for (int i = size; i < size + 2000; i++) { + String url = "https://voice.hupu.com/nba/" + i; + if (bloomFilter.mightContain(url)) { + list.add(url); } } - System.out.println("有误伤的数量:" + list.size()); + System.out.println("误判数量:" + list.size()); } - } diff --git a/bloom_filter/src/main/java/com/jamal/bloomfilter/Person.java b/bloom_filter/src/main/java/com/jamal/bloomfilter/Person.java new file mode 100644 index 0000000..ee13852 --- /dev/null +++ b/bloom_filter/src/main/java/com/jamal/bloomfilter/Person.java @@ -0,0 +1,23 @@ +package com.jamal.bloomfilter; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * bloom_filter + * 2019/10/21 10:55 + * + * @author 曾小辉 + **/ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Person { + + private String name; + + private String age; +} diff --git a/bloom_filter/src/main/java/com/jamal/bloomfilter/SimpleBloomFilterTest.java b/bloom_filter/src/main/java/com/jamal/bloomfilter/SimpleBloomFilterTest.java index b8f59b0..c70d8d7 100644 --- a/bloom_filter/src/main/java/com/jamal/bloomfilter/SimpleBloomFilterTest.java +++ b/bloom_filter/src/main/java/com/jamal/bloomfilter/SimpleBloomFilterTest.java @@ -10,61 +10,70 @@ **/ public class SimpleBloomFilterTest { - private static final int DEFAULT_SIZE = 2 << 24 ; - private static final int [] seeds = new int [] { 7 , 11 , 13 , 31 , 37 , 61 , }; + // bit 数组的大小 + private static final int DEFAULT_SIZE = 1000; + // 用来生产三个不同的哈希函数的 + private static final int[] seeds = new int[]{7, 31, 61,}; + // bit 数组 + private BitSet bits = new BitSet(DEFAULT_SIZE); + // 存放哈希函数的数组 + private SimpleHash[] func = new SimpleHash[seeds.length]; - private BitSet bits = new BitSet(DEFAULT_SIZE); - private SimpleHash[] func = new SimpleHash[seeds.length]; - - public static void main(String[] args) { - String value = "stone2083@yahoo.cn" ; - SimpleBloomFilterTest filter = new SimpleBloomFilterTest(); - System.out.println(filter.contains(value)); - filter.add(value); - System.out.println(filter.contains(value)); - System.out.println(filter.contains("stone2083@yahoo.cm")); + public static void main(String[] args) { + SimpleBloomFilterTest filter = new SimpleBloomFilterTest(); + filter.add("https://voice.hupu.com/nba/2492440.html"); + filter.add("https://voice.hupu.com/nba/2492437.html"); + filter.add("https://voice.hupu.com/nba/2492439.html"); + System.out.println(filter.contains("https://voice.hupu.com/nba/2492440.html")); + System.out.println(filter.contains("https://voice.hupu.com/nba/249244.html")); } - - public SimpleBloomFilterTest() { - for ( int i = 0 ; i < seeds.length; i ++ ) { - func[i] = new SimpleHash(DEFAULT_SIZE, seeds[i]); + public SimpleBloomFilterTest() { + for (int i = 0; i < seeds.length; i++) { + func[i] = new SimpleHash(DEFAULT_SIZE, seeds[i]); } } - public void add(String value) { - - for (SimpleHash f : func) { - bits.set(f.hash(value), true ); + /** + * 向布隆过滤器添加元素 + * @param value + */ + public void add(String value) { + for (SimpleHash f : func) { + bits.set(f.hash(value), true); } } - public boolean contains(String value) { - if (value == null ) { - return false ; + /** + * 判断某元素是否存在布隆过滤器 + * @param value + * @return + */ + public boolean contains(String value) { + if (value == null) { + return false; } - boolean ret = true ; - for (SimpleHash f : func) { - ret = ret && bits.get(f.hash(value)); + boolean ret = true; + for (SimpleHash f : func) { + ret = ret && bits.get(f.hash(value)); } - return ret; + return ret; } - public static class SimpleHash { - - private int cap; - private int seed; - - public SimpleHash( int cap, int seed) { - this .cap = cap; - this .seed = seed; + /** + * 哈希函数 + */ + public static class SimpleHash { + private int cap; + private int seed; + public SimpleHash(int cap, int seed) { + this.cap = cap; + this.seed = seed; } - - public int hash(String value) { - int result = 0 ; - int len = value.length(); - for ( int i = 0 ; i < len; i ++ ) { - result = seed * result + value.charAt(i); + public int hash(String value) { + int result = 0; + int len = value.length(); + for (int i = 0; i < len; i++) { + result = seed * result + value.charAt(i); } - return (cap - 1 ) & result; + return (cap - 1) & result; } - } } diff --git a/bloom_filter/src/main/java/com/jamal/bloomfilter/Test.java b/bloom_filter/src/main/java/com/jamal/bloomfilter/Test.java new file mode 100644 index 0000000..f88da37 --- /dev/null +++ b/bloom_filter/src/main/java/com/jamal/bloomfilter/Test.java @@ -0,0 +1,65 @@ +package com.jamal.bloomfilter; + +import java.util.concurrent.TimeUnit; + +/** + * bloom_filter + * 2019/10/21 10:56 + * + * @author 曾小辉 + **/ +public class Test { +// +// private long count = 0; +// +// private void add10K() { +// int idx = 0; +// while (idx++ < 100000) { +// count += 1; +// } +// } +// +// public static void main(String[] args) throws InterruptedException { +// for (int i = 0;i<30;i++) { +// Test test = new Test(); +// Thread th1 = new Thread(() -> { +// test.add10K(); +// }); +// Thread th2 = new Thread(() -> { +// test.add10K(); +// }); // 启动两个线程 th1.start(); th2.start(); +// th1.start(); +// th2.start(); +// th1.join(); +// th2.join(); +// System.out.println(test.count); +// } +// } +public static void main(String[] args) throws InterruptedException{ + Thread thread1 = new Thread(){ + @Override + public void run(){ + System.out.println("start"); + boolean flag = true; + while(flag){ + ; + } + System.out.print("end"); + } + }; + thread1.setName("thread1"); + thread1.start(); + //当前线程休眠1秒 + TimeUnit.SECONDS.sleep(1); + //关闭线程thread1 +// thread1.stop(); + thread1.interrupt(); + //输出线程thread1的状态 + String s = thread1.getState() + "1------" + thread1.isInterrupted(); + System.out.println(s); + //当前线程休眠一秒 + TimeUnit.SECONDS.sleep(1); + System.out.println(thread1.getState()+"2------"+thread1.isInterrupted()); +} +} + diff --git a/bloom_filter/src/main/java/com/jamal/bloomfilter/ThreadCrawler.java b/bloom_filter/src/main/java/com/jamal/bloomfilter/ThreadCrawler.java new file mode 100644 index 0000000..1374439 --- /dev/null +++ b/bloom_filter/src/main/java/com/jamal/bloomfilter/ThreadCrawler.java @@ -0,0 +1,266 @@ +package com.jamal.bloomfilter; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; +import org.jsoup.Jsoup; +import org.jsoup.nodes.Document; +import org.jsoup.select.Elements; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Date; +import java.util.HashSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * 多线程爬虫 + */ +public class ThreadCrawler implements Runnable { + + // 采集的文章数 + private final AtomicLong pageCount = new AtomicLong(0); + + // 列表页链接正则表达式 + public static final String URL_LIST = "https://voice.hupu.com/nba/\\d"; + + + + protected Logger logger = LoggerFactory.getLogger(getClass()); + + // 待采集的队列 + LinkedBlockingQueue taskQueue; + + // 采集过的链接列表 +// HashSet visited; + + // 布隆过滤器 + private static BloomFilter bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), 500,0.01); + + // 线程池 + CountableThreadPool threadPool; + + + /** + * + * @param url 起始页 + * @param threadNum 线程数 + * @throws InterruptedException + */ + public ThreadCrawler(String url, int threadNum) throws InterruptedException { + this.taskQueue = new LinkedBlockingQueue<>(); + this.threadPool = new CountableThreadPool(threadNum); +// this.visited = new HashSet<>(); + this.taskQueue.put(url); + } + + @Override + public void run() { + logger.info("Spider started!"); + while (!Thread.currentThread().isInterrupted()) { + final String request = taskQueue.poll(); + + // 如果获取 request 为空,并且当前的线程采已经没有线程在运行 + if (request == null) { + if (threadPool.getThreadAlive() == 0) { + break; + } + } else { + // 执行采集任务 + threadPool.execute(new Runnable() { + @Override + public void run() { + try { + processRequest(request); + } catch (Exception e) { + logger.error("process request " + request + " error", e); + } finally { + // 采集页面 +1 + pageCount.incrementAndGet(); + } + } + }); + } + } + threadPool.shutdown(); + logger.info("Spider closed! {} pages downloaded.", pageCount.get()); + } + + /** + * 处理采集请求 + * @param url + */ + protected void processRequest(String url) { + // 判断是否为列表页 + if (url.matches(URL_LIST)) { + processTaskQueue(url); + } else { + processPage(url); + } + } + + + /** + * 处理链接采集 + * 处理列表页,将 url 添加到队列中 + * + * @param url + */ + protected void processTaskQueue(String url) { + try { + Document doc = Jsoup.connect(url).get(); + // 详情页链接 + Elements elements = doc.select(" div.news-list > ul > li > div.list-hd > h4 > a"); + elements.stream().forEach((element -> { + String request = element.attr("href"); + // 判断该链接是否存在队列或者已采集的 set 中,不存在则添加到队列中 + if (!bloomFilter.mightContain(request) && !taskQueue.contains(request)) { + try { + taskQueue.put(request); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + })); + // 列表页链接 + Elements list_urls = doc.select("div.voice-paging > a"); + list_urls.stream().forEach((element -> { + String request = element.absUrl("href"); + // 判断是否符合要提取的列表链接要求 + if (request.matches(URL_LIST)) { + // 判断该链接是否存在队列或者已采集的 set 中,不存在则添加到队列中 + if (!bloomFilter.mightContain(request) && !taskQueue.contains(request)) { + try { + taskQueue.put(request); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + })); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 解析页面 + * + * @param url + */ + protected void processPage(String url) { + try { + Document doc = Jsoup.connect(url).get(); + String title = doc.select("body > div.hp-wrap > div.voice-main > div.artical-title > h1").first().ownText(); + + System.out.println(Thread.currentThread().getName() + " 在 " + new Date() + " 采集了虎扑新闻 " + title); + // 将采集完的 url 存入到已经采集的 set 中 +// visited.add(url); + bloomFilter.put(url); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + + try { + new ThreadCrawler("https://voice.hupu.com/nba/1", 5).run(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} + +/** + * 线程池,来源 webmagic + */ +class CountableThreadPool { + + private int threadNum; + + private AtomicInteger threadAlive = new AtomicInteger(); + + private ReentrantLock reentrantLock = new ReentrantLock(); + + private Condition condition = reentrantLock.newCondition(); + + public CountableThreadPool(int threadNum) { + this.threadNum = threadNum; + this.executorService = Executors.newFixedThreadPool(threadNum); + } + + public CountableThreadPool(int threadNum, ExecutorService executorService) { + this.threadNum = threadNum; + this.executorService = executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public int getThreadAlive() { + return threadAlive.get(); + } + + public int getThreadNum() { + return threadNum; + } + + private ExecutorService executorService; + + public void execute(final Runnable runnable) { + + + if (threadAlive.get() >= threadNum) { + try { + reentrantLock.lock(); + while (threadAlive.get() >= threadNum) { + try { + condition.await(); + } catch (InterruptedException e) { + } + } + } finally { + reentrantLock.unlock(); + } + } + threadAlive.incrementAndGet(); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + runnable.run(); + } finally { + try { + reentrantLock.lock(); + threadAlive.decrementAndGet(); + condition.signal(); + } finally { + reentrantLock.unlock(); + } + } + } + }); + } + + public boolean isShutdown() { + return executorService.isShutdown(); + } + + public void shutdown() { + executorService.shutdown(); + } + + +} \ No newline at end of file diff --git a/java8/src/com/jamal/Stream.java b/java8/src/com/jamal/Stream.java deleted file mode 100644 index 3582cae..0000000 --- a/java8/src/com/jamal/Stream.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.jamal; - -/** - * java8 - * 2019/10/17 16:41 - * - * @author 曾小辉 - **/ -public class stream { -} diff --git a/java8/src/com/jamal/StreamTest.java b/java8/src/com/jamal/StreamTest.java new file mode 100644 index 0000000..8029ef0 --- /dev/null +++ b/java8/src/com/jamal/StreamTest.java @@ -0,0 +1,59 @@ +package com.jamal; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toList; + +/** + * java8 + * 2019/10/17 16:41 + * + * @author 曾小辉 + **/ +public class Stream { + + public static void main(String[] args) { + List personList = new ArrayList<>(); + + personList.add(new Person(20, "张三")); + personList.add(new Person(30, "张为为")); + personList.add(new Person(25, "张无畏")); + personList.add(new Person(29, "张斯")); + personList.add(new Person(21, "张逼")); + personList.add(new Person(20, "里斯")); + personList.add(new Person(28, "科尔")); + personList.add(new Person(25, "莫雷")); + personList.add(new Person(40, "校花")); + +// personList.stream().forEach((person)-> System.out.println(person.getName())); +// personList.stream().filter((person) -> { +// System.out.println("filter:"+person.getName()); +// return person.getAge()>22; +// }) +// .map((person)->{ +// System.out.println("map:"+person.getName()); +// return person; +// }) +// .sorted(Comparator.comparing(Person::getAge)) +// .forEach(person -> System.out.println("姓名:"+person.getName()+" 年龄:"+person.getAge())); +// java.util.stream.Stream s = personList.stream(); +// s.forEach(System.out::println); +// s.forEach(System.out::println); + +// List numbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4); +// numbers.stream() +// .filter(i -> i%2==0) +// .distinct() +// .forEach(System.out::println); + + List list = personList.stream() + .map(Person::getName) + .map(String::length) + .collect(toList()); + System.out.println(list); + } +}