| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- """
- Everything related to parsing links from input sources.
- For a list of supported services, see the README.md.
- For examples of supported import formats see tests/.
- Link: {
- 'url': 'https://example.com/example/?abc=123&xyc=345#lmnop',
- 'timestamp': '1544212312.4234',
- 'title': 'Example.com Page Title',
- 'tags': 'abc,def',
- 'sources': [
- 'output/sources/ril_export.html',
- 'output/sources/getpocket.com-1523422111.txt',
- 'output/sources/stdin-234234112312.txt'
- ]
- }
- """
- import re
- import json
- from typing import Tuple, List, IO, Iterable
- from datetime import datetime
- import xml.etree.ElementTree as etree
- from core.config import TIMEOUT
- from core.util import (
- htmldecode,
- str_between,
- URL_REGEX,
- check_url_parsing_invariants,
- TimedProgress,
- Link,
- enforce_types,
- )
- @enforce_types
- def parse_links(source_file: str) -> Tuple[List[Link], str]:
- """parse a list of URLs with their metadata from an
- RSS feed, bookmarks export, or text file
- """
- check_url_parsing_invariants()
- PARSERS = (
- # Specialized parsers
- ('Pocket HTML', parse_pocket_html_export),
- ('Pinboard RSS', parse_pinboard_rss_export),
- ('Shaarli RSS', parse_shaarli_rss_export),
- ('Medium RSS', parse_medium_rss_export),
-
- # General parsers
- ('Netscape HTML', parse_netscape_html_export),
- ('Generic RSS', parse_rss_export),
- ('Generic JSON', parse_json_export),
- # Fallback parser
- ('Plain Text', parse_plain_text_export),
- )
- timer = TimedProgress(TIMEOUT * 4)
- with open(source_file, 'r', encoding='utf-8') as file:
- for parser_name, parser_func in PARSERS:
- try:
- links = list(parser_func(file))
- if links:
- timer.end()
- return links, parser_name
- except Exception as err: # noqa
- # Parsers are tried one by one down the list, and the first one
- # that succeeds is used. To see why a certain parser was not used
- # due to error or format incompatibility, uncomment this line:
- # print('[!] Parser {} failed: {} {}'.format(parser_name, err.__class__.__name__, err))
- pass
- timer.end()
- return [], 'Failed to parse'
- ### Import Parser Functions
- @enforce_types
- def parse_pocket_html_export(html_file: IO[str]) -> Iterable[Link]:
- """Parse Pocket-format bookmarks export files (produced by getpocket.com/export/)"""
- html_file.seek(0)
- pattern = re.compile("^\\s*<li><a href=\"(.+)\" time_added=\"(\\d+)\" tags=\"(.*)\">(.+)</a></li>", re.UNICODE)
- for line in html_file:
- # example line
- # <li><a href="http://example.com/ time_added="1478739709" tags="tag1,tag2">example title</a></li>
- match = pattern.search(line)
- if match:
- url = match.group(1).replace('http://www.readability.com/read?url=', '') # remove old readability prefixes to get original url
- time = datetime.fromtimestamp(float(match.group(2)))
- tags = match.group(3)
- title = match.group(4).replace(' — Readability', '').replace('http://www.readability.com/read?url=', '')
-
- yield Link(
- url=htmldecode(url),
- timestamp=str(time.timestamp()),
- title=htmldecode(title) or None,
- tags=tags or '',
- sources=[html_file.name],
- )
- @enforce_types
- def parse_json_export(json_file: IO[str]) -> Iterable[Link]:
- """Parse JSON-format bookmarks export files (produced by pinboard.in/export/, or wallabag)"""
- json_file.seek(0)
- links = json.load(json_file)
- json_date = lambda s: datetime.strptime(s, '%Y-%m-%dT%H:%M:%S%z')
- for link in links:
- # example line
- # {"href":"http:\/\/www.reddit.com\/r\/example","description":"title here","extended":"","meta":"18a973f09c9cc0608c116967b64e0419","hash":"910293f019c2f4bb1a749fb937ba58e3","time":"2014-06-14T15:51:42Z","shared":"no","toread":"no","tags":"reddit android"}]
- if link:
- # Parse URL
- url = link.get('href') or link.get('url') or link.get('URL')
- if not url:
- raise Exception('JSON must contain URL in each entry [{"url": "http://...", ...}, ...]')
- # Parse the timestamp
- ts_str = str(datetime.now().timestamp())
- if link.get('timestamp'):
- # chrome/ff histories use a very precise timestamp
- ts_str = str(link['timestamp'] / 10000000)
- elif link.get('time'):
- ts_str = str(json_date(link['time'].split(',', 1)[0]).timestamp())
- elif link.get('created_at'):
- ts_str = str(json_date(link['created_at']).timestamp())
- elif link.get('created'):
- ts_str = str(json_date(link['created']).timestamp())
- elif link.get('date'):
- ts_str = str(json_date(link['date']).timestamp())
- elif link.get('bookmarked'):
- ts_str = str(json_date(link['bookmarked']).timestamp())
- elif link.get('saved'):
- ts_str = str(json_date(link['saved']).timestamp())
-
- # Parse the title
- title = None
- if link.get('title'):
- title = link['title'].strip()
- elif link.get('description'):
- title = link['description'].replace(' — Readability', '').strip()
- elif link.get('name'):
- title = link['name'].strip()
- yield Link(
- url=htmldecode(url),
- timestamp=ts_str,
- title=htmldecode(title) or None,
- tags=htmldecode(link.get('tags')) or '',
- sources=[json_file.name],
- )
- @enforce_types
- def parse_rss_export(rss_file: IO[str]) -> Iterable[Link]:
- """Parse RSS XML-format files into links"""
- rss_file.seek(0)
- items = rss_file.read().split('<item>')
- items = items[1:] if items else []
- for item in items:
- # example item:
- # <item>
- # <title><![CDATA[How JavaScript works: inside the V8 engine]]></title>
- # <category>Unread</category>
- # <link>https://blog.sessionstack.com/how-javascript-works-inside</link>
- # <guid>https://blog.sessionstack.com/how-javascript-works-inside</guid>
- # <pubDate>Mon, 21 Aug 2017 14:21:58 -0500</pubDate>
- # </item>
- trailing_removed = item.split('</item>', 1)[0]
- leading_removed = trailing_removed.split('<item>', 1)[-1].strip()
- rows = leading_removed.split('\n')
- def get_row(key):
- return [r for r in rows if r.strip().startswith('<{}>'.format(key))][0]
- url = str_between(get_row('link'), '<link>', '</link>')
- ts_str = str_between(get_row('pubDate'), '<pubDate>', '</pubDate>')
- time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %z")
- title = str_between(get_row('title'), '<![CDATA[', ']]').strip()
- yield Link(
- url=htmldecode(url),
- timestamp=str(time.timestamp()),
- title=htmldecode(title) or None,
- tags=None,
- sources=[rss_file.name],
- )
- @enforce_types
- def parse_shaarli_rss_export(rss_file: IO[str]) -> Iterable[Link]:
- """Parse Shaarli-specific RSS XML-format files into links"""
- rss_file.seek(0)
- entries = rss_file.read().split('<entry>')[1:]
- for entry in entries:
- # example entry:
- # <entry>
- # <title>Aktuelle Trojaner-Welle: Emotet lauert in gefälschten Rechnungsmails | heise online</title>
- # <link href="https://www.heise.de/security/meldung/Aktuelle-Trojaner-Welle-Emotet-lauert-in-gefaelschten-Rechnungsmails-4291268.html" />
- # <id>https://demo.shaarli.org/?cEV4vw</id>
- # <published>2019-01-30T06:06:01+00:00</published>
- # <updated>2019-01-30T06:06:01+00:00</updated>
- # <content type="html" xml:lang="en"><![CDATA[<div class="markdown"><p>— <a href="https://demo.shaarli.org/?cEV4vw">Permalink</a></p></div>]]></content>
- # </entry>
- trailing_removed = entry.split('</entry>', 1)[0]
- leading_removed = trailing_removed.strip()
- rows = leading_removed.split('\n')
- def get_row(key):
- return [r.strip() for r in rows if r.strip().startswith('<{}'.format(key))][0]
- title = str_between(get_row('title'), '<title>', '</title>').strip()
- url = str_between(get_row('link'), '<link href="', '" />')
- ts_str = str_between(get_row('published'), '<published>', '</published>')
- time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
- yield Link(
- url=htmldecode(url),
- timestamp=str(time.timestamp()),
- title=htmldecode(title) or None,
- tags=None,
- sources=[rss_file.name],
- )
- @enforce_types
- def parse_netscape_html_export(html_file: IO[str]) -> Iterable[Link]:
- """Parse netscape-format bookmarks export files (produced by all browsers)"""
- html_file.seek(0)
- pattern = re.compile("<a href=\"(.+?)\" add_date=\"(\\d+)\"[^>]*>(.+)</a>", re.UNICODE | re.IGNORECASE)
- for line in html_file:
- # example line
- # <DT><A HREF="https://example.com/?q=1+2" ADD_DATE="1497562974" LAST_MODIFIED="1497562974" ICON_URI="https://example.com/favicon.ico" ICON="data:image/png;base64,...">example bookmark title</A>
-
- match = pattern.search(line)
- if match:
- url = match.group(1)
- time = datetime.fromtimestamp(float(match.group(2)))
- title = match.group(3).strip()
- yield Link(
- url=htmldecode(url),
- timestamp=str(time.timestamp()),
- title=htmldecode(title) or None,
- tags=None,
- sources=[html_file.name],
- )
- @enforce_types
- def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
- """Parse Pinboard RSS feed files into links"""
- rss_file.seek(0)
- root = etree.parse(rss_file).getroot()
- items = root.findall("{http://purl.org/rss/1.0/}item")
- for item in items:
- find = lambda p: item.find(p).text.strip() if item.find(p) else None # type: ignore
- url = find("{http://purl.org/rss/1.0/}link")
- tags = find("{http://purl.org/dc/elements/1.1/}subject")
- title = find("{http://purl.org/rss/1.0/}title")
- ts_str = find("{http://purl.org/dc/elements/1.1/}date")
-
- # Pinboard includes a colon in its date stamp timezone offsets, which
- # Python can't parse. Remove it:
- if ts_str and ts_str[-3:-2] == ":":
- ts_str = ts_str[:-3]+ts_str[-2:]
- if ts_str:
- time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
- else:
- time = datetime.now()
- yield Link(
- url=htmldecode(url),
- timestamp=str(time.timestamp()),
- title=htmldecode(title) or None,
- tags=htmldecode(tags) or None,
- sources=[rss_file.name],
- )
- @enforce_types
- def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
- """Parse Medium RSS feed files into links"""
- rss_file.seek(0)
- root = etree.parse(rss_file).getroot()
- items = root.find("channel").findall("item") # type: ignore
- for item in items:
- url = item.find("link").text # type: ignore
- title = item.find("title").text.strip() # type: ignore
- ts_str = item.find("pubDate").text # type: ignore
- time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %Z") # type: ignore
-
- yield Link(
- url=htmldecode(url),
- timestamp=str(time.timestamp()),
- title=htmldecode(title) or None,
- tags=None,
- sources=[rss_file.name],
- )
- @enforce_types
- def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
- """Parse raw links from each line in a text file"""
- text_file.seek(0)
- for line in text_file.readlines():
- urls = re.findall(URL_REGEX, line) if line.strip() else ()
- for url in urls: # type: ignore
- yield Link(
- url=htmldecode(url),
- timestamp=str(datetime.now().timestamp()),
- title=None,
- tags=None,
- sources=[text_file.name],
- )
|