atextcrawler package

Subpackages

Submodules

atextcrawler.application module

atextcrawler application.

class atextcrawler.application.Application(config=None)

Bases: object

atextcrawler application.

The basic structure of the application is this:
  • one site crawler works just on the site_queue: fetching start pages of sites and storing updated site information in table sites

  • N other CrawlWorkers each do this in a loop: checkout a site that is due for crawl and crawl its resources; they fill the site_queue

async handle_notifications()

Handle notifications using PostgreSQL’s NOTIFY/LISTEN.

handle_shutdown_signal()

Handle shutdown signal.

listen_callback(*args)

Handle notify event from PostgreSQL.

async run()

Application lifecycle.

running = True
async shutdown()

Asynchronous shutdown.

async sleep(duration, t_slice=3)

Sleep for duration seconds while self.running.

Check self.running every t_slice seconds.

async startup()

Asynchronous startup.

async wait_for_shutdown()

Create a shutdown event (asyncio.Event) and wait for it.

The event will be set by a signal handler for SIGINT and SIGTERM signals (see Application.handle_shutdown_signal()).

async atextcrawler.application.reset_site_locks(pool)

Remove locks leftover from last run: Set crawl_active=false for all sites.

This is relevant when the application was not shutdown properly (e.g. when the process was killed).

atextcrawler.config module

Configuration loader and validator.

class atextcrawler.config.Config

Bases: object

Application configuration.

Access the full application configuration using get().

It is a dictionary with these keys:

  • ‘directory’: the configuration directory being used

  • ‘main’: the main configuration from main.yaml, but postgresql configuration may be overriden by environment variable ATEXTCRAWLER_POSTGRESQL

config = None
classmethod get(out: Optional[io.TextIOBase] = None) Optional[dict]

Load and validate app configuration if not already done; return it.

On errors print them to out and if out is sys.stdout, then also exit with exit code 2. Otherwise just return None.

exception atextcrawler.config.ConfigError(err)

Bases: Exception

Application configuration error.

atextcrawler.config.plugins_dir(config)

Validate plugins directory (absolute or relative path).

If it is a relative path, prepend the config_dir.

atextcrawler.config.positive_number(value)

Validate a positive number (int or float).

atextcrawler.config.postgresql_identifier(value)

Validate a PostgreSQL identifier.

atextcrawler.crawl module

Crawl a site.

class atextcrawler.crawl.CrawlWorker(app, worker_number, pool)

Bases: object

Worker fetching sites, crawling their resources and storing statistics.

async crawl_resources()

Loop over resources of the site and process them. Collect statistics.

All workers operate on distinct sites, so no need for locking here.

async run()

Worker loop: fetch a site, crawl its resources and store statistics.

If no site needs to be crawled, sleep for self.site_delay seconds (configured in crawl.site_delay).

async shutdown()

Asynchronous shutdown.

async startup()

Asynchronous startup.

async atextcrawler.crawl.get_or_create_crawl(conn, site_id, is_full=True) atextcrawler.models.Crawl

Return a new or existing+unfinished crawl.

If an existing crawl is found, return it, disregarding whether it is a full crawl or not.

atextcrawler.db module

PostgreSQL connectivity.

PGPool can be used as context manager. It takes postgresql configuration parameters and gives a connection pool.

class atextcrawler.db.PGPool(postgresql_config: dict, out: Optional[io.TextIOBase] = None, check: bool = True)

Bases: object

Database connectivity: Provide a connection pool.

Can be used either as async context manager (giving a pool), or as a class using async init and the shutdown method and having the pool attribute.

After startup self.pool contains a PostgreSQL connection pool (instance of asyncpg.pool.Pool).

Startup also runs schema migrations (cf. directory migrations).

async check_or_migrate(conn: asyncpg.connection.Connection) bool

Check database connectivity.

Return whether database connectivity is working.

async shutdown()

Close the pool.

atextcrawler.db.get_migrations() Dict[int, str]

Return migrations (number and text content of migration file).

atextcrawler.models module

Data Models.

class atextcrawler.models.Crawl(site_id: Optional[int] = None, is_full: bool = False, t_begin: datetime.datetime = datetime.datetime(2021, 11, 29, 9, 34, 10, 327905), t_end: Optional[datetime.datetime] = None, n_resources: int = 0, n_resources_new: int = 0)

Bases: atextcrawler.models.ModelBase

The crawl process of a website (begin, end, statistics, …).

async finish(conn, set_t_end)

Save the crawl. Set t_end only if indicated.

is_full: bool = False
n_resources: int = 0
n_resources_new: int = 0
site_id: Optional[int] = None
t_begin: datetime.datetime = datetime.datetime(2021, 11, 29, 9, 34, 10, 327905)
t_end: Optional[datetime.datetime] = None
table: ClassVar = 'crawl'
class atextcrawler.models.Feed(entries: dataclasses.InitVar[list] = None, site_id: Optional[int] = None, url: Optional[str] = None, etag: Optional[str] = None, modified: Optional[str] = None, t_visit: Optional[datetime.datetime] = None, t_content: Optional[datetime.datetime] = None, version: Optional[str] = None, title: Optional[str] = None, description: Optional[str] = None, fail_count: int = 0)

Bases: atextcrawler.models.MetaResource

A site’s feed (RSS, Atom , …).

debug() str

Return the instance data asa string for debug print output.

description: Optional[str] = None
entries: dataclasses.InitVar[list] = None
etag: Optional[str] = None
fail_count: int = 0
modified: Optional[str] = None
async save(conn: asyncpg.connection.Connection)

Save, trying to merge with existing entry matching on site_id and url.

site_id: Optional[int] = None
t_content: Optional[datetime.datetime] = None
t_visit: Optional[datetime.datetime] = None
table: ClassVar = 'site_feed'
title: Optional[str] = None
url: Optional[str] = None
version: Optional[str] = None
class atextcrawler.models.MetaResource

Bases: atextcrawler.models.ModelBase

Parent class for Feed, Sitemap, SitemapIndex.

MetaResource is a parent class for Feed, Sitemap, SitemapIndex. Their instances are not stored. Note: class Feed contains feed meta data and is stored in the database.

table: ClassVar
class atextcrawler.models.ModelBase

Bases: object

Abstract base class for models.

Execute SQL to load, save, delete instances using asyncpg.

asdict()

Return instance data as dictionary.

async delete(conn: asyncpg.connection.Connection) None

Delete the object if it has an id_.

id_: Optional[int] = 0
async load(conn: asyncpg.connection.Connection, id_: int) Optional[Any]

If loading fails, return None.

async load_from_row(row)

If row is None, return None.

async save(conn: asyncpg.connection.Connection) None

Save the instance (update if self.id_ is set, else insert).

table: ClassVar
class atextcrawler.models.ResourceError(msg, status=None, headers=None)

Bases: object

Error encountered while trying to fetch a resource.

ResourceError is used for cases when fetching a resource fails.

class atextcrawler.models.ResourceRedirect(urls)

Bases: object

A resource containing a redirect.

class atextcrawler.models.Site(base_durl: dataclasses.InitVar[Durl] = None, feeds: dataclasses.InitVar[dict] = None, links_ext: dataclasses.InitVar[dict] = None, links_int: dataclasses.InitVar[dict] = None, startpage_text: dataclasses.InitVar[str] = None, canonical_url: Optional[str] = None, base_url: Optional[str] = None, base_urls: list[str] = <factory>, domains: list[str] = <factory>, ips: Optional[list[str]] = None, crawl_enabled: bool = False, crawl_active: bool = False, next_full_crawl: Optional[datetime.datetime] = None, next_feed_crawl: Optional[datetime.datetime] = None, last_update: Optional[datetime.datetime] = None, last_pub: Optional[datetime.datetime] = None, pub_dates: Optional[dict[str, str]] = None, langs: list[str] = <factory>, alt_langs: dict[str, str] = <factory>, title: Optional[str] = None, description: Optional[str] = None, keywords: list[str] = <factory>, linkbacks: dict[str, str] = <factory>, meta_info: dict = <factory>, boilerplate_texts: list[str] = <factory>)

Bases: atextcrawler.models.ModelBase

Website.

alt_langs: dict[str, str]
base_durl: dataclasses.InitVar[Durl] = None
base_url: Optional[str] = None
base_urls: list[str]
boilerplate_texts: list[str]
canonical_url: Optional[str] = None
crawl_active: bool = False
crawl_enabled: bool = False
description: Optional[str] = None
domains: list[str]
feeds: dataclasses.InitVar[dict] = None
ips: Optional[list[str]] = None
keywords: list[str]
langs: list[str]
last_pub: Optional[datetime.datetime] = None
last_update: Optional[datetime.datetime] = None
linkbacks: dict[str, str]
meta_info: dict
next_feed_crawl: Optional[datetime.datetime] = None
next_full_crawl: Optional[datetime.datetime] = None
pub_dates: Optional[dict[str, str]] = None
async save(conn, merge=True) tuple[typing.Optional[int], bool]

Store the site, optionally trying to merge it with an existing site.

Return the id of the saved instance and whether a new instance was created.

If self.id_ is not 0, replace the data of the existing site with this id. Else if not merge, store as new row, and if merge, try to merge with an existing matching site.

startpage_text: dataclasses.InitVar[str] = None
table: ClassVar = 'site'
title: Optional[str] = None
async update_base_url() None

Update the base_url, choosing the most relevant URL.

If canonical_url is not None, use this. Otherwise set self.base_url to the shortest from self.base_urls, but requiring a https-url if there is at least one.

class atextcrawler.models.SitePath(site: dataclasses.InitVar[str] = None, site_id: Optional[int] = None, path: Optional[str] = None, filtered: bool = False, last_visit: Optional[datetime.datetime] = None, ok_count: int = 0, canonical: Optional[bool] = None, resource_id: Optional[int] = None)

Bases: atextcrawler.models.ModelBase

Path of a website. May point to a Resource.

canonical: Optional[bool] = None
filtered: bool = False
last_visit: Optional[datetime.datetime] = None
ok_count: int = 0
path: Optional[str] = None
resource_id: Optional[int] = None
async save(conn: asyncpg.connection.Connection)

Save the instance, extending the parent’s method.

site: dataclasses.InitVar[str] = None
site_id: Optional[int] = None
table: ClassVar = 'site_path'

Unlink the resource and also delete it, if it has no more links.

url(site)

Return the full URL (combine the site’s base_url with our path).

class atextcrawler.models.Sitemap(urls: list = <factory>)

Bases: atextcrawler.models.MetaResource

A Sitemap meta resource.

Just a list of the resulting links, nothing more.

urls: list
class atextcrawler.models.SitemapIndex(sitemaps: list = <factory>)

Bases: atextcrawler.models.MetaResource

A SitemapIndex meta resource.

Just a list of the siteap URLs, nothing more.

sitemaps: list
class atextcrawler.models.TextResource(init_fields: dataclasses.InitVar[dict] = None, search_fields: dataclasses.InitVar[dict] = None, simhash: Optional[int] = None, content_type: Optional[str] = None, last_change: Optional[datetime.datetime] = None, text_len: int = 0, lang: Optional[str] = None, title: Optional[str] = None, summary: Optional[str] = None)

Bases: atextcrawler.models.ModelBase

TextResource (without path).

TextResource models web resources with relevant text content. They are instantiated in modules page, document, …; their metadata are stored in table resource and the text content is stored with the search engine.

Do not confuse with SitePath: Several SitePath instances may point to a TextResource. The TextResource holds the actual content.

If we are not dealing with the startpage of a new site, the init_fields dict usually will contain the site to which the resource belongs.

content_type: Optional[str] = None
init_fields: dataclasses.InitVar[dict] = None
lang: Optional[str] = None
last_change: Optional[datetime.datetime] = None
async save(conn: asyncpg.connection.Connection)

Save the instance, extending the parent’s method.

search_fields: dataclasses.InitVar[dict] = None
simhash: Optional[int] = None
summary: Optional[str] = None
table: ClassVar = 'resource'
text_len: int = 0
title: Optional[str] = None
async update_from_resource(upd: atextcrawler.models.TextResource)

Update self with values from another resource.

async atextcrawler.models.search_same_site(site: atextcrawler.models.Site, conn: asyncpg.connection.Connection) Optional[int]

Try to find a matching site for the given site and return its id.

TODO: if the path is non-trivial, require it also for the matching site

Two sites match when they return the same content for identical paths. The base_url (scheme and/or netloc) may differ. We do not have the content for all paths of both websites, so we need to estimate: We only take into account meta information from the start pages of both sites, in particular the title, description and information obtained the base_urls:

We use a combination of these conditions:

  1. one of the sites has a canonical URL which matches the URL of the other site

  2. the content fields (title, description) have sufficient information

  3. the content fields match exactly

  4. the domain matches

  5. the domain matches, except for the TLD

  6. the base_urls differ in their schemes (http vs. https)

  7. the hostnames in the base_urls are identical

  8. the hostnames in the base_urls differ by a prepended ‘www.’

  9. the IPs have at least one common address

The algorithm is this (first answer is final, yes means match):

  • if (1) : yes

  • if (2), (3), (4) : yes

  • if (2), (3), (5), (9) : yes

  • if (6), ((7) or (8)) : yes

  • no

atextcrawler.tensorflow module

Query the tensorflow_model_server’s REST API.

class atextcrawler.tensorflow.TensorFlow(app, session: aiohttp.client.ClientSession, timeout_sock_connect: Union[int, float] = 0.5, timeout_sock_read: Union[int, float] = 10)

Bases: object

Fetch an embedding vector from the tensorflow model server.

async embed(text: Union[str, list[str]]) Optional[Union[list[float], list[list[float]]]]

Query the tensorflow_model_server’s REST API for a prediction.

Take a string or a list of strings and return an embedding vector or a list of embedding vectors.

If the request fails or times out, return None.

Module contents