YeenDeer softness blog (programming and electronics)

Ellie the Yeen is a soft YeenDeer that mweeoops and does programming

View on GitHub About Bots Projects Tags

Command line poster for Twitter, Mastodon and Bsky

Decided to make a command line poster for Twitter, Mastodon and Bsky. It does not support posting images at least yet but it does support CWs. I hastily created it from converting my Discord bot so the code might be a bit messy but it works great. It has some unusual error handling that I implemented in case of smaller faults regarding previews. The code is below and feel free to use it for what you want.

#!/usr/bin/python3
import traceback
import datetime
import typing
import types
import json
import sys
import os
import re
import io

import httpx


def getmast():
    global mast
    try:
        return mast
    except NameError:
        pass

    import mastodon

    access_token = ...  # Replace with Mastodon token
    api_base_url = "https://mastodon.social" # Replace with your instance
    mast = mastodon.Mastodon(
        mastodon_version="1.4.3",
        access_token=access_token,
        api_base_url=api_base_url,
    )
    return mast


def gettwit():
    global ntwt

    try:
        return twit
    except NameError:
        pass

    import tweepy

    bearer_token = ...  # Replace this with twitter bearer_token
    consumer_key = ...  # Replace this with twitter consumer_key
    consumer_secret = ...  # Replace this with twitter consumer_secret

    access_token = ...  # Replace this with twitter access_token
    access_token_secret = ...  # Replace this with twitter access_token_secret

    api1 = tweepy.API(
        tweepy.OAuth1UserHandler(
            consumer_key=consumer_key,
            consumer_secret=consumer_secret,
            access_token=access_token,
            access_token_secret=access_token_secret,
        ),
        wait_on_rate_limit=False,
    )
    api2 = tweepy.Client(
        bearer_token=bearer_token,
        consumer_key=consumer_key,
        consumer_secret=consumer_secret,
        access_token=access_token,
        access_token_secret=access_token_secret,
        wait_on_rate_limit=False,
    )

    twit = api1, api2
    return twit


def getbsky():
    global bsky
    import atproto

    username = ...  # Replace this with bsky username (something.bsky.social)
    password = ...  # Replace this with bsky password (Maybe app password)

    fdir = os.path.split(__file__)[0]
    sessionfile = os.path.join(fdir, f"{username}.json")

    try:
        if bsky._should_refresh_session():
            bsky._refresh_and_set_session()

            odata = dict(
                did=client.me.did,
                handle=client.me.handle,
                access_jwt=client._access_jwt,
                refresh_jwt=client._refresh_jwt,
            )
            with open(sessionfile, "w") as f:
                print(json.dumps(odata, indent=4), file=f)
        return bsky
    except NameError:
        pass

    client = atproto.Client()
    try:
        with open(sessionfile) as f:
            odata = json.load(f)
        ses = types.SimpleNamespace(
            did=odata["did"],
            handle=odata["handle"],
            access_jwt=odata["access_jwt"],
            refresh_jwt=odata["refresh_jwt"],
        )
        client._set_session(ses)
        client.me = types.SimpleNamespace(
            did=odata["did"],
            handle=odata["handle"],
        )
    except Exception:  # No idea what this throws but it should be tested at some point
        traceback.print_exc()
        print("Could not find session, Logging in")
        client.login(username, password)
        odata = dict(
            did=client.me.did,
            handle=client.me.handle,
            access_jwt=client._access_jwt,
            refresh_jwt=client._refresh_jwt,
        )
        with open(sessionfile, "w") as f:
            print(json.dumps(odata, indent=4), file=f)

    if client._should_refresh_session():
        client._refresh_and_set_session()
        odata = dict(
            did=client.me.did,
            handle=client.me.handle,
            access_jwt=client._access_jwt,
            refresh_jwt=client._refresh_jwt,
        )
        with open(sessionfile, "w") as f:
            print(json.dumps(odata, indent=4), file=f)
    bsky = client
    return bsky


def makewrap(func, desc):
    def onwrap(msg):
        with ErrorWrap():
            reply = func(msg)
            gen = maybegenerator(reply)
            try:
                ret = next(gen)
                while True:
                    # mid = sendmsg(chan, ret)
                    print(ret)
                    ret = next(gen)
                    # ret = gen.send(mid)
            except StopIteration:
                pass

    return onwrap


# General functions


class Message(Exception):
    pass


class ErrorWrap:
    def __init__(
        self,
        message: typing.Optional[str] = None,
    ) -> None:
        self.message = message

    def __enter__(self) -> None:  # TODO: Add subtask describer
        pass

    def __exit__(self, t: type, e: BaseException, b: traceback) -> None:
        # print(e, t, b)
        if not isinstance(e, Exception):
            return False
        if isinstance(e, Message):
            text = " ".join(str(a) for a in e.args)
        else:
            text = (
                f"Error in {self.appname} while {self.message}\n"
                if self.message
                else ""
            ) + traceback.format_exc()
        print(text)
        return True


def makeerrorwrap():
    def wrapper(msg):
        return ErrorWrap(msg)

    return wrapper


def maybegenerator(
    value: typing.Union[types.GeneratorType, typing.Any]
) -> typing.Iterable:
    if isinstance(value, types.GeneratorType):
        yield from value
    elif value is not None:
        yield value


httx = httpx.Client(
    timeout=10,
    follow_redirects=True,
    headers={
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0"
    },
)

mastcmds = (
    "p",
    "pp",
    "pc",
    "ppc",
)
othercmds = (
    "bp",
    "tp",
    "tbp",
)
allcmds = mastcmds + othercmds


def onmsg(cont):
    cmd = cont[0] if len(cont) else ""
    args = cont[1:]

    if not cont:
        yield f"All commands: {' '.join(allcmds)}"
        return

    if cmd in mastcmds:
        text = " ".join(args).strip()
        cw = None
        if "c" in cmd:
            s = text.split("\n", 1)
            if len(s) < 2:
                raise Message(
                    "Requires 2 rows where first is cw and others are message or an attachment"
                )
            cw = s[0]
            text = len(s) >= 2 and s[1] or ""
        elif text.startswith(("CW:", "CW ")):
            cw, *t = map(str.strip, text[3:].split("\n", 1))
            text = t[0] if t else ""
        if not text:  # or not msg["attachments"]:
            raise Message("Requires args to post")

        mast = getmast()

        mediaids = []
        sensitive = False

        visibility = "public" if cmd.startswith(".pp") else "unlisted"
        yield mast.status_post(
            status=text,
            media_ids=mediaids,
            sensitive=sensitive,
            spoiler_text=cw,
            visibility=visibility,
        )["url"]
    elif cmd in othercmds:
        dotwitter = True
        dobluesky = True
        if cmd == "bp":
            dotwitter = False
        elif cmd == "tp":
            dobluesky = False

        text = " ".join(args).strip()

        if not text:  # and not msg["attachments"]:
            raise Message("Requires args or attachments to post")

        pics = []
        sensitive = False

        catch = makeerrorwrap()

        if dotwitter:
            with catch("posting on twitter"):
                if len(text) > 280:
                    raise Message(
                        f"Text length is {len(text)} (max 280) and will therefore not be posted on twitter"
                    )

                t1, t2 = gettwit()

                mediaids = []
                for pic in pics:
                    med = t1.media_upload("file.png", file=io.BytesIO(pic["data"]))
                    data = {
                        "alt_text": {
                            "text": (
                                pic["alt"]
                                or "Alt text seems to be missing and is likely a bug"
                            )[:1000]
                        },
                        "media_id": str(med.media_id),
                    }

                    if sensitive:
                        data["sensitive_media_warning"] = [
                            # "other",
                            # "graphic_violence",
                            "adult_content"
                        ]

                    t1.request(
                        "POST",
                        "media/metadata/create",
                        json_payload=data,
                        upload_api=True,
                    )
                    mediaids.append(med.media_id)

                post = t2.create_tweet(
                    text=text,
                    media_ids=mediaids or None,
                )

                yield f"https://fxtwitter.com/i/status/{post.data['id']}"

        if dobluesky:
            with catch("posting on bluesky"):
                if len(text) > 300:
                    raise Message(
                        f"Text length is {len(text)} (max 300) and will therefore not be posted on bluesky"
                    )

                bsky = getbsky()

                embed = None
                if pics:
                    images = []
                    for pic in pics:
                        upload = bsky.com.atproto.repo.upload_blob(
                            pic["data"], timeout=60
                        )
                        images.append(
                            {
                                "$type": "app.bsky.embed.images#image",
                                "alt": pic["alt"],
                                "image": upload.blob,
                            }
                        )
                    embed = {"$type": "app.bsky.embed.images", "images": images}

                facets = []
                firstlink = None
                for match in re.finditer(
                    rb'https?://[^\t\n\ "\'<>\\\^\{\|\}]+', text.encode("utf-8")
                ):
                    link = match.group().decode("utf-8")
                    if not firstlink:
                        firstlink = link
                    span = match.span()
                    facets.append(
                        {
                            "$type": "app.bsky.richtext.facet",
                            "features": [
                                {
                                    "$type": "app.bsky.richtext.facet#link",
                                    "uri": link,
                                }
                            ],
                            "index": {
                                "$type": "app.bsky.richtext.facet#byteSlice",
                                "byteStart": span[0],
                                "byteEnd": span[1],
                            },
                        }
                    )

                if embed is None and firstlink:
                    with catch(f"getting URL preview for {firstlink!a}"):
                        resp = httx.get(
                            "https://cardyb.bsky.app/v1/extract",
                            params=dict(url=firstlink),
                        )
                        resp.raise_for_status()
                        jdat = resp.json()
                        if jdat.get("error"):
                            raise ValueError(
                                f"Error from previewer: {jdat.get('error')!a}"
                            )
                        embe = {"$type": "app.bsky.embed.external#external"}
                        embe["uri"] = firstlink
                        embe["description"] = jdat.get("description", "No description")
                        embe["title"] = jdat.get("title", "No title")
                        with catch(f"handling preview picture {jdat.get('image')!a}"):
                            if imgurl := jdat.get("image"):
                                ires = httx.get(imgurl)
                                ires.raise_for_status()
                                upload = bsky.com.atproto.repo.upload_blob(
                                    ires.content, timeout=10
                                )
                                embe["thumb"] = upload.blob
                            embed = {
                                "$type": "app.bsky.embed.external",
                                "external": embe,
                            }

                labels = None
                if sensitive:
                    labels = {
                        "$type": "com.atproto.label.defs#selfLabels",
                        "values": [{"val": "porn"}],  # sexual, nudity, porn
                    }

                post = bsky.com.atproto.repo.create_record(
                    {
                        "repo": bsky.me.did,
                        "collection": "app.bsky.feed.post",
                        "record": {
                            "$type": "app.bsky.feed.post",
                            "created_at": datetime.datetime.now(
                                tz=datetime.timezone.utc
                            )
                            .replace(tzinfo=None)
                            .isoformat(timespec="milliseconds")
                            + "Z",
                            "facets": facets,
                            "text": text,
                            # "reply": replyto,
                            "embed": embed,
                            "labels": labels,
                            "langs": ["en"],
                        },
                    },
                    timeout=60,
                )

                uri = post.uri.rsplit("/", 2)[-1]
                yield f"https://bsky.app/profile/{bsky.me.did}/post/{uri}"
    else:
        yield "Unknown command"


if __name__ == "__main__":
    makewrap(onmsg, "posting")(sys.argv[1:])

*Mweeoops*

By ellietheyeen 14 December 2023 Permalink Tags: bsky mastodon python shell twitter


Instance: